diff --git a/Cargo.toml b/Cargo.toml index d67b0b27..02eb88f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,9 +10,10 @@ exclude = [ "wasm-wrappers/fdw/cfd1_fdw", "wasm-wrappers/fdw/clerk_fdw", "wasm-wrappers/fdw/helloworld_fdw", - "wasm-wrappers/fdw/snowflake_fdw", - "wasm-wrappers/fdw/paddle_fdw", "wasm-wrappers/fdw/notion_fdw", + "wasm-wrappers/fdw/orb_fdw", + "wasm-wrappers/fdw/paddle_fdw", + "wasm-wrappers/fdw/snowflake_fdw", ] resolver = "2" diff --git a/README.md b/README.md index d96f2580..9c361cad 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,7 @@ | [Cal.com](./wasm-wrappers/fdw/cal_fdw) | A Wasm FDW for [Cal.com](https://www.cal.com/) | ✅ | ❌ | | [Clerk](./wasm-wrappers/fdw/clerk_fdw) | A Wasm FDW for [Clerk](https://www.clerk.com/) | ✅ | ❌ | | [Cloudflare D1](./wasm-wrappers/fdw/cfd1_fdw) | A Wasm FDW for [Cloudflare D1](https://developers.cloudflare.com/d1/) | ✅ | ✅ | +| [Orb](./wasm-wrappers/fdw/orb_fdw) | A Wasm FDW for [Orb](https://www.withorb.com/) | ✅ | ❌ | ### Warning diff --git a/docs/catalog/index.md b/docs/catalog/index.md index f0df79a7..7196937f 100644 --- a/docs/catalog/index.md +++ b/docs/catalog/index.md @@ -23,6 +23,7 @@ Each FDW documentation includes a detailed "Limitations" section that describes | Firebase | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | | Logflare | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | | Notion | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | +| Orb | ✅ | ✅ | ✅ | ✅ | ❌ | ✅ | | Paddle | ✅ | ✅ | ✅ | ❌ | ❌ | ✅ | | Redis | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | | S3 | ✅ | ❌ | ❌ | ❌ | ❌ | ❌ | @@ -40,8 +41,9 @@ See [Developing a Wasm Wrapper](../guides/create-wasm-wrapper.md) for instructio | :-----------: | :------------------------------: | :------------------: | :------------------------------------------------------------------------------------: | | Cal.com | [Supabase](https://supabase.com) | [Link](cal.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/cal_fdw) | | Calendly | [Supabase](https://supabase.com) | [Link](calendly.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/calendly_fdw) | -| Clerk | [Supabase](https://supabase.com) | [Link](clerk.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/clerk_fdw) | +| Clerk | [Supabase](https://supabase.com) | [Link](clerk.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/clerk_fdw) | | Cloudflare D1 | [Supabase](https://supabase.com) | [Link](cfd1.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/cfd1_fdw) | | Notion | [Supabase](https://supabase.com) | [Link](notion.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/notion_fdw) | +| Orb | [Supabase](https://supabase.com) | [Link](orb.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/orb_fdw) | | Paddle | [Supabase](https://supabase.com) | [Link](paddle.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/paddle_fdw) | | Snowflake | [Supabase](https://supabase.com) | [Link](snowflake.md) | [Link](https://github.com/supabase/wrappers/tree/main/wasm-wrappers/fdw/snowflake_fdw) | diff --git a/docs/catalog/orb.md b/docs/catalog/orb.md new file mode 100644 index 00000000..770b3897 --- /dev/null +++ b/docs/catalog/orb.md @@ -0,0 +1,831 @@ +--- +source: +documentation: +author: supabase +tags: + - wasm + - official +--- + +# Orb + +[Orb](https://withorb.com/) is a metering and pricing platform built to support usage-based billing models. + +The Orb Wrapper is a WebAssembly(Wasm) foreign data wrapper which allows you to read data from Orb for use within your Postgres database. + +## Available Versions + +| Version | Wasm Package URL | Checksum | +| ------- | --------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------ | +| 0.1.0 | `https://github.com/supabase/wrappers/releases/download/wasm_orb_fdw_v0.1.0/orb_fdw.wasm` | `tbd` | + +## Preparation + +Before you can query Orb, you need to enable the Wrappers extension and store your credentials in Postgres. + +### Enable Wrappers + +Make sure the `wrappers` extension is installed on your database: + +```sql +create extension if not exists wrappers with schema extensions; +``` + +### Enable the Orb Wrapper + +Enable the Wasm foreign data wrapper: + +```sql +create foreign data wrapper wasm_wrapper + handler wasm_fdw_handler + validator wasm_fdw_validator; +``` + +### Store your credentials (optional) + +By default, Postgres stores FDW credentials inside `pg_catalog.pg_foreign_server` in plain text. Anyone with access to this table will be able to view these credentials. Wrappers is designed to work with [Vault](https://supabase.com/docs/guides/database/vault), which provides an additional level of security for storing credentials. We recommend using Vault to store your credentials. + +```sql +-- Save your Orb API key in Vault and retrieve the `key_id` +insert into vault.secrets (name, secret) +values ( + 'orb', + '' -- Orb API key +) +returning key_id; +``` + +### Connecting to Orb + +We need to provide Postgres with the credentials to access Orb and any additional options. We can do this using the `create server` command: + +=== "With Vault" + + ```sql + create server orb_server + foreign data wrapper wasm_wrapper + options ( + fdw_package_url 'https://github.com/supabase/wrappers/releases/download/wasm_orb_fdw_v0.1.0/orb_fdw.wasm', + fdw_package_name 'supabase:orb-fdw', + fdw_package_version '0.1.0', + fdw_package_checksum 'tbd', + api_url 'https://api.withorb.com/v1', -- optional + api_key_id '' -- The Key ID from above. + ); + ``` + +=== "Without Vault" + + ```sql + create server orb_server + foreign data wrapper wasm_wrapper + options ( + fdw_package_url 'https://github.com/supabase/wrappers/releases/download/wasm_orb_fdw_v0.1.0/orb_fdw.wasm', + fdw_package_name 'supabase:orb-fdw', + fdw_package_version '0.1.0', + fdw_package_checksum 'tbd', + api_url 'https://api.withorb.com/v1', -- optional + api_key '3e2e912...' -- Orb API key + ); + ``` + +Note the `fdw_package_*` options are required, which specify the Wasm package metadata. You can get the available package version list from [above](#available-versions). + +### Create a schema + +We recommend creating a schema to hold all the foreign tables: + +```sql +create schema if not exists orb; +``` + +## Options + +The full list of foreign table options are below: + +- `object` - Object name in Orb, required. + +Supported objects are listed below: + +| Object name | +| ------------------------ | +| alerts | +| coupons | +| credit_notes | +| customers | +| credits | +| credits_ledger | +| dimensional_price_groups | +| events_backfills | +| events_volume | +| invoices | +| items | +| metrics | +| plans | +| prices | +| subscriptions | + +- `rowid_column` - Primary key column name, optional for data scan, required for data modify + +## Entities + +### Alert + +This is a list of all alerts within Orb. + +Ref: [Orb API docs](https://docs.withorb.com/api-reference/alert/list-alerts) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| --------------------- | :----: | :----: | :----: | :----: | :------: | +| alerts | ✅ | ❌ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table orb.alerts ( + id text, + type text, + enabled boolean, + customer_id text, + external_customer_id text, + subscription_id text, + created_at timestamp, + attrs jsonb +) + server orb_server + options ( + object 'alerts' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format +- The query must specify one of `customer_id`, `external_customer_id`, or `subscription_id` + +### Coupon + +This is a list of all coupons for an account. + +Ref: [Orb API docs](https://docs.withorb.com/api-reference/coupon/list-coupons) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| --------------------- | :----: | :----: | :----: | :----: | :------: | +| coupons | ✅ | ❌ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table orb.coupons ( + id text, + redemption_code text, + times_redeemed bigint, + duration_in_months bigint, + archived_at timestamp, + attrs jsonb +) + server orb_server + options ( + object 'coupons' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format + +### Credit Note + +This is a list of all CreditNotes. + +Ref: [Orb API docs](https://docs.withorb.com/api-reference/credit-note/list-credit-notes) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ------------ | :----: | :----: | :----: | :----: | :------: | +| credit_notes | ✅ | ✅ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table orb.credit_notes ( + id text, + type text, + total numeric(18,2), + created_at timestamp, + attrs jsonb +) + server orb_server + options ( + object 'credit_notes', + rowid_column 'id' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format + +### Customer + +This is a list of all customers for an account. + +Ref: [Orb API docs](https://docs.withorb.com/api-reference/customer/list-customers) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ----------- | :----: | :----: | :----: | :----: | :------: | +| customers | ✅ | ✅ | ✅ | ✅ | ❌ | + +#### Usage + +```sql +create foreign table orb.customers ( + id text, + name text, + email text, + created_at timestamp, + auto_collection boolean, + attrs jsonb +) + server orb_server + options ( + object 'customers', + rowid_column 'id' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format + +### Credit + +This is a list of unexpired, non-zero credit blocks for a customer. + +Ref: [Orb API docs](https://docs.withorb.com/api-reference/credit/fetch-customer-credit-balance-by-external-customer-id) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ------------- | :----: | :----: | :----: | :----: | :------: | +| credits | ✅ | ❌ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table orb.credits ( + id text, + customer_id text, + external_customer_id text, + balance numeric(18,2), + status text, + effective_date timestamp, + expiry_date timestamp, + attrs jsonb +) + server orb_server + options ( + object 'credits' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format +- The query must specify one of `customer_id` or `external_customer_id` + +### Credits ledger + +This is a list of actions that have taken place to modify a customer’s credit balance. + +Ref: [Orb API docs](https://docs.withorb.com/api-reference/credit/fetch-customer-credits-ledger-by-external-id) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ------------------ | :----: | :----: | :----: | :----: | :------: | +| credits/ledger | ✅ | ❌ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table orb.credits_ledger ( + id text, + customer_id text, + external_customer_id text, + amount numeric(18,2), + created_at timestamp, + attrs jsonb +) + server orb_server + options ( + object 'credits/ledger' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format +- The query must specify one of `customer_id` or `external_customer_id` + +### Dimensional Price Group + +This is a list of dimensional price groups. + +Ref: [Orb API docs](https://docs.withorb.com/api-reference/dimensional-price-group/list-dimensional-price-groups) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ------------------------ | :----: | :----: | :----: | :----: | :------: | +| dimensional_price_groups | ✅ | ✅ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table orb.dimensional_price_groups ( + id text, + name text, + attrs jsonb +) + server orb_server + options ( + object 'dimensional_price_groups', + rowid_column 'id' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format + +### Event Backfill + +This is a list of all event backfills. + +Ref: [Orb API docs](https://docs.withorb.com/api-reference/event/list-backfills) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ---------------- | :----: | :----: | :----: | :----: | :------: | +| events/backfills | ✅ | ✅ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table orb.events_backfills ( + id text, + status text, + events_ingested bigint, + created_at timestamp, + attrs jsonb +) + server orb_server + options ( + object 'events/backfills', + rowid_column 'id' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format + +### Event Volume + +This returns the event volume for an account. + +Ref: [Orb API docs](https://docs.withorb.com/api-reference/event/get-event-volume) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ------------- | :----: | :----: | :----: | :----: | :------: | +| events/volume | ✅ | ❌ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table orb.events_volume ( + count bigint, + timeframe_start timestamp, + timeframe_end timestamp, + attrs jsonb +) + server orb_server + options ( + object 'events/volume' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format + +### Invoice + +This is a list of invoices for an account. + +Ref: [Orb API docs](https://docs.withorb.com/api-reference/invoice/list-invoices) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| -------- | :----: | :----: | :----: | :----: | :------: | +| invoices | ✅ | ✅ | ❌ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table orb.invoices ( + id text, + invoice_number text, + customer_id text, + external_customer_id text, + subscription_id text, + status text, + amount_due numeric(18,2), + currency text, + due_date timestamp, + issued_at timestamp, + created_at timestamp, + attrs jsonb +) + server orb_server + options ( + object 'invoices', + rowid_column 'id' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format + +### Item + +This is a list of all items. + +Ref: [Orb API docs](https://docs.withorb.com/api-reference/item/list-items) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ------ | :----: | :----: | :----: | :----: | :------: | +| items | ✅ | ✅ | ✅ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table orb.items ( + id text, + name text, + created_at timestamp, + attrs jsonb +) + server orb_server + options ( + object 'items', + rowid_column 'id' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format + +### Metric + +This is a list of metric details. + +Ref: [Orb API docs](https://docs.withorb.com/api-reference/metric/list-metrics) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ------- | :----: | :----: | :----: | :----: | :------: | +| metrics | ✅ | ✅ | ✅ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table orb.metrics ( + id text, + name text, + description text, + status text, + created_at timestamp, + attrs jsonb +) + server orb_server + options ( + object 'metrics', + rowid_column 'id' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format + +### Plan + +This is a list of all plans for an account. + +Ref: [Orb API docs](https://docs.withorb.com/api-reference/plan/list-plans) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ------ | :----: | :----: | :----: | :----: | :------: | +| plans | ✅ | ✅ | ✅ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table orb.plans ( + id text, + name text, + description text, + status text, + maximum_amount numeric(18,2), + minimum_amount numeric(18,2), + created_at timestamp, + attrs jsonb +) + server orb_server + options ( + object 'plans', + rowid_column 'id' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format + +### Price + +This is a list of all add-on prices. + +Ref: [Orb API docs](https://docs.withorb.com/api-reference/price/list-prices) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ------ | :----: | :----: | :----: | :----: | :------: | +| prices | ✅ | ✅ | ✅ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table orb.prices ( + id text, + name text, + external_price_id text, + price_type text, + maximum_amount numeric(18,2), + minimum_amount numeric(18,2), + created_at timestamp, + attrs jsonb +) + server orb_server + options ( + object 'prices', + rowid_column 'id' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format + +### Subscription + +This is a list of all subscriptions for an account. + +Ref: [Orb API docs](https://docs.withorb.com/api-reference/subscription/list-subscriptions) + +#### Operations + +| Object | Select | Insert | Update | Delete | Truncate | +| ------------- | :----: | :----: | :----: | :----: | :------: | +| subscriptions | ✅ | ✅ | ✅ | ❌ | ❌ | + +#### Usage + +```sql +create foreign table orb.subscriptions ( + id text, + customer_id text, + external_customer_id text, + billing_cycle_day bigint, + status text, + start_date timestamp, + end_date timestamp, + created_at timestamp, + attrs jsonb +) + server orb_server + options ( + object 'subscriptions', + rowid_column 'id' + ); +``` + +#### Notes + +- The `attrs` column contains additional attributes in JSON format + +## Query Pushdown Support + +### `where` clause pushdown + +This FDW supports `where id = 'xxx'` clause pushdown for below objects: + +- Coupon +- Credit Note +- Customer +- Dimensional Price Group +- Event Backfill +- Invoice +- Item +- Metric +- Plan +- Price +- Subscription + +Some other supported `where` clauses pushdown are listed below: + +#### Alert + +For example, `where customer_id = 'WmUkxWmvLvvXHaNV'`. + +- customer_id, (operations: `=`) +- external_customer_id, (operations: `=`) +- subscription_id, (operations: `=`) + +#### Customer + +For example, `where created_at >= '2025-02-15T10:25:36'`. + +- created_at, (operations: `<`, `<=`, `>`, `>=`) + +#### Event Volume + +For example, `where timeframe_start = '2025-02-15'`. + +- timeframe_start, (operations: `=`) + +#### Invoice + +For example, `where status = 'paid'`. + +- customer_id, (operations: `=`) +- external_customer_id, (operations: `=`) +- subscription_id, (operations: `=`) +- status, (operations: `=`) +- due_date, (operations: `=`, `<`, `>`) +- created_at, (operations: `<`, `<=`, `>`, `>=`) + +#### Subscription + +For example, `where status = 'active'`. + +- customer_id, (operations: `=`) +- external_customer_id, (operations: `=`) +- status, (operations: `=`) +- created_at, (operations: `<`, `<=`, `>`, `>=`) + +### `limit` clause pushdown + +This FDW supports `limit` clause pushdown for all the objects. For example, + +```sql +select * from orb.customers limit 200; +``` + +## Supported Data Types + +| Postgres Data Type | Orb Data Type | +| ------------------ | ------------------ | +| boolean | Boolean | +| bigint | Number | +| numeric | Number | +| text | String | +| timestamp | Time | +| jsonb | Json | + +The Orb API uses JSON formatted data, please refer to [Orb API docs](https://docs.withorb.com/api-reference) for more details. + +## Limitations + +This section describes important limitations and considerations when using this FDW: + +- Large result sets may experience slower performance due to full data transfer requirement +- Materialized views using these foreign tables may fail during logical backups + +## Examples + +Below are some examples on how to use Orb foreign tables. + +### Basic example + +This example will create a "foreign table" inside your Postgres database and query its data. + +```sql +create foreign table orb.customers ( + id text, + name text, + email text, + created_at timestamp, + auto_collection boolean, + attrs jsonb +) + server orb_server + options ( + object 'customers', + rowid_column 'id' + ); + +-- query all customers +select * from orb.customers; + +-- you can use `limit` clause to reduce query time if customer number is large +select * from orb.customers limit 200; +``` + +`attrs` is a special column which stores all the object attributes in JSON format, you can extract any attributes needed from it. See more examples below. + +### Query JSON attributes + +```sql +create foreign table orb.invoices ( + id text, + invoice_number text, + customer_id text, + external_customer_id text, + subscription_id text, + status text, + amount_due numeric(18,2), + currency text, + due_date timestamp, + issued_at timestamp, + created_at timestamp, + attrs jsonb +) + server orb_server + options ( + object 'invoices', + rowid_column 'id' + ); + +-- extract all line items from an invoice +select + i.id, + li->>'name' as line_item_name, + li->>'quantity' as line_item_quantity, + li->>'subtotal' as line_item_subtotal +from orb.invoices i + cross join json_array_elements((attrs->'line_items')::json) li +where + i.id = 'PsEhbLd88auyhZ8F'; +``` + +### Data Modify Example + +This example will modify data in a "foreign table" inside your Postgres database, note that `rowid_column` foreign table option is mandatory for data modify. Data modify is done through the `attrs` jsonb column, which will be posted as request body to Orb API endpoint. Please refer to [Orb API reference docs](https://docs.withorb.com/api-reference) for the JSON request details. + +```sql +-- create a new customer +insert into orb.customers(attrs) +values ( + '{ + "name": "John Doe", + "email": "test@test.com" + }'::jsonb +); + +-- update the existing customer +update orb.customers +set attrs = '{ + "name": "Jane Smith", + "billing_address": { + "city": "New York", + "country": "US" + } +}'::jsonb +where id = 'n6DaYELQYubChJWf'; + +-- delete a customer +delete from orb.customers +where id = 'n6DaYELQYubChJWf'; +``` diff --git a/docs/catalog/wasm/index.md b/docs/catalog/wasm/index.md index 9f0b5a4c..f4077fd6 100644 --- a/docs/catalog/wasm/index.md +++ b/docs/catalog/wasm/index.md @@ -73,6 +73,18 @@ Foreign data wrappers built with Wasm which can be used on Supabase platform. :octicons-code-24: [source](https://github.com/supabase/wrappers/tree/wasm_notion_fdw_v0.1.1/wasm-wrappers/fdw/notion_fdw)   :material-file-document: [docs](../notion.md) +- :simple-webassembly:   **[Orb](../orb.md)** + + ---- + + Foreign data wrapper for [Orb](https://www.withorb.com/). + + Supported by [Supabase](https://www.supabase.com) + + :octicons-tag-24: [v0.1.0](https://github.com/supabase/wrappers/releases/tag/orb_fdw_v0.1.0)   + :octicons-code-24: [source](https://github.com/supabase/wrappers/tree/wasm_orb_fdw_v0.1.0/wasm-wrappers/fdw/orb_fdw)   + :material-file-document: [docs](../orb.md) + - :simple-webassembly:   **[Paddle](../paddle.md)** ---- diff --git a/mkdocs.yaml b/mkdocs.yaml index 63479840..c8859228 100644 --- a/mkdocs.yaml +++ b/mkdocs.yaml @@ -33,6 +33,7 @@ nav: - Clerk: 'catalog/clerk.md' - Cloudflare D1: 'catalog/cfd1.md' - Notion: 'catalog/notion.md' + - Orb: 'catalog/orb.md' - Paddle: 'catalog/paddle.md' - Snowflake: 'catalog/snowflake.md' - Guides: diff --git a/wasm-wrappers/fdw/orb_fdw/Cargo.lock b/wasm-wrappers/fdw/orb_fdw/Cargo.lock new file mode 100644 index 00000000..b38abf8e --- /dev/null +++ b/wasm-wrappers/fdw/orb_fdw/Cargo.lock @@ -0,0 +1,102 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "itoa" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "orb_fdw" +version = "0.1.0" +dependencies = [ + "serde_json", + "wit-bindgen-rt", +] + +[[package]] +name = "proc-macro2" +version = "1.0.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60946a68e5f9d28b0dc1c21bb8a97ee7d018a8b322fa57838ba31cc878e22d99" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "ryu" +version = "1.0.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea1a2d0a644769cc99faa24c3ad26b379b786fe7c36fd3c546254801650e6dd" + +[[package]] +name = "serde" +version = "1.0.217" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.217" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.138" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d434192e7da787e94a6ea7e9670b26a036d0ca41e0b7efb2676dd32bae872949" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "syn" +version = "2.0.98" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36147f1a48ae0ec2b5b3bc5b537d267457555a10dc06f3dbc8cb11ba3006d3b1" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a210d160f08b701c8721ba1c726c11662f877ea6b7094007e1ca9a1041945034" + +[[package]] +name = "wit-bindgen-rt" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29c7526379ace8709ee9ab9f2bb50f112d95581063a59ef3097d9c10153886c9" diff --git a/wasm-wrappers/fdw/orb_fdw/Cargo.toml b/wasm-wrappers/fdw/orb_fdw/Cargo.toml new file mode 100644 index 00000000..29656b6c --- /dev/null +++ b/wasm-wrappers/fdw/orb_fdw/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "orb_fdw" +version = "0.1.0" +edition = "2021" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +wit-bindgen-rt = "0.26.0" +serde_json = "1.0" + +[package.metadata.component] +package = "supabase:orb-fdw" + +[package.metadata.component.dependencies] + +[package.metadata.component.target] +path = "wit" + +[package.metadata.component.target.dependencies] +"supabase:wrappers" = { path = "../../wit" } diff --git a/wasm-wrappers/fdw/orb_fdw/src/lib.rs b/wasm-wrappers/fdw/orb_fdw/src/lib.rs new file mode 100644 index 00000000..7b9a5ab8 --- /dev/null +++ b/wasm-wrappers/fdw/orb_fdw/src/lib.rs @@ -0,0 +1,558 @@ +#[allow(warnings)] +mod bindings; +use serde_json::Value as JsonValue; + +use bindings::{ + exports::supabase::wrappers::routines::Guest, + supabase::wrappers::{ + http, stats, time, + types::{Cell, Column, Context, FdwError, FdwResult, OptionsType, Row, TypeOid, Value}, + utils, + }, +}; + +#[derive(Debug, Default)] +struct OrbFdw { + base_url: String, + headers: Vec<(String, String)>, + object: String, + sub_obj: String, + sub_obj_value: String, + src_rows: Vec, + src_idx: usize, + src_cursor: Option, + src_limit: Option, + consumed_row_cnt: i64, +} + +static mut INSTANCE: *mut OrbFdw = std::ptr::null_mut::(); +static FDW_NAME: &str = "OrbFdw"; + +// max number of rows returned per request +static BATCH_SIZE: usize = 100; + +impl OrbFdw { + fn init() { + let instance = Self::default(); + unsafe { + INSTANCE = Box::leak(Box::new(instance)); + } + } + + fn this_mut() -> &'static mut Self { + unsafe { &mut (*INSTANCE) } + } + + // convert Orb response data field to a cell + fn src_to_cell(&self, src_row: &JsonValue, tgt_col: &Column) -> Result, FdwError> { + let tgt_col_name = tgt_col.name(); + + // put all properties into 'attrs' JSON column + if &tgt_col_name == "attrs" { + return Ok(Some(Cell::Json(src_row.to_string()))); + } + + match self.object.as_ref() { + "credits" | "credits/ledger" => { + // for credits endpoint, the customer_id and external_customer_id + // columns are in nested properties, we need to extract them + if tgt_col_name == "customer_id" || tgt_col_name == "external_customer_id" { + if self.sub_obj == tgt_col_name { + return Ok(Some(Cell::String(self.sub_obj_value.to_owned()))); + } else { + return Ok(None); + } + } + } + _ => {} + } + + let prop_path = match self.object.as_ref() { + "alerts" | "invoices" | "subscriptions" => { + // the customer_id, external_customer_id, and subscription_id + // columns are in nested properties, we need to extract them in different path + if tgt_col_name == "customer_id" { + "customer/id" + } else if tgt_col_name == "external_customer_id" { + "customer/external_customer_id" + } else if tgt_col_name == "subscription_id" { + "subscription/id" + } else { + &tgt_col_name + } + } + _ => &tgt_col_name, + }; + + let src = src_row + .pointer(&format!("/{}", prop_path)) + .ok_or(format!("source column '{}' not found", prop_path))?; + + // column type mapping + let cell = match tgt_col.type_oid() { + TypeOid::Bool => src.as_bool().map(Cell::Bool), + TypeOid::F64 => src.as_f64().map(Cell::F64), + TypeOid::I64 => src.as_i64().map(Cell::I64), + TypeOid::Numeric => src + .as_f64() + .or_else(|| { + src.as_str() + .map(|v| v.parse::()) + .transpose() + .unwrap_or_default() + }) + .map(Cell::Numeric), + TypeOid::String => src.as_str().map(|v| Cell::String(v.to_owned())), + TypeOid::Timestamp => { + if let Some(s) = src.as_str() { + let ts = time::parse_from_rfc3339(s)?; + Some(Cell::Timestamp(ts)) + } else { + None + } + } + TypeOid::Timestamptz => { + if let Some(s) = src.as_str() { + let ts = time::parse_from_rfc3339(s)?; + Some(Cell::Timestamptz(ts)) + } else { + None + } + } + TypeOid::Json => src.as_object().map(|_| Cell::Json(src.to_string())), + _ => { + return Err(format!( + "target column '{}' type is not supported", + tgt_col_name + )); + } + }; + + Ok(cell) + } + + // convert date comparison qual to pushdown query parameter + // e.g. "created_at > '2025-01-01" -> "created_at[gt]=2025-01-01T00:00:00" + fn translate_date_pushdown( + &self, + qs: &mut Vec, + field: &str, + oper: &str, + value: &Value, + ) -> Result<(), FdwError> { + if let Value::Cell(Cell::Timestamp(t)) = value { + let ts = time::epoch_ms_to_rfc3339(*t)?; + let oper = match oper { + "=" => "", + "<" => "[lt]", + "<=" => "[lte]", + ">" => "[gt]", + ">=" => "[gte]", + _ => return Ok(()), + }; + qs.push(format!("{}{}={}", field, oper, &ts[..19])); // remove the ending timezone part + } + Ok(()) + } + + // convert filter comparison qual to pushdown query parameter + // e.g. + // "customer_id = 'abc'" -> "customer_id=abc" + // "customer_id in ('abc', 'def')" -> "customer_id[]=abc&customer_id[]=def" + fn translate_filter_pushdown(&self, qs: &mut Vec, field: &str, value: &Value) { + match value { + Value::Cell(ref c) => { + if let Cell::String(s) = c { + qs.push(format!("{}={}", field, s)); + } + } + Value::Array(ref arr) => { + arr.iter() + .filter_map(|c| match c { + Cell::String(s) => Some(s.as_ref()), + _ => None, + }) + .for_each(|c: &str| { + qs.push(format!("{}[]={}", field, c)); + }); + } + } + } + + // add pushdown to query string + fn add_pushdown(&mut self, qs: &mut Vec, ctx: &Context) -> Result<(), FdwError> { + // push down quals + match self.object.as_ref() { + "alerts" => { + if let Some(q) = ctx.get_quals().iter().find(|q| { + // alerts endpoint needs one of customer_id, external_customer_id, or subscription_id + let field = q.field(); + (field == "customer_id" + || field == "external_customer_id" + || field == "subscription_id") + && (q.operator() == "=") + }) { + self.translate_filter_pushdown(qs, &q.field(), &q.value()); + } + } + "customers" => { + if let Some(q) = ctx.get_quals().iter().find(|q| q.field() == "created_at") { + self.translate_date_pushdown(qs, "created_at", &q.operator(), &q.value())?; + } + } + "events/volume" => { + if let Some(q) = ctx.get_quals().iter().find(|q| { + // events/volume endpoint needs timeframe_start + (q.field() == "timeframe_start") && (q.operator() == "=") + }) { + self.translate_date_pushdown(qs, &q.field(), &q.operator(), &q.value())?; + } + } + "invoices" => { + for qual in ctx.get_quals() { + let field = qual.field(); + let oper = qual.operator(); + let value = qual.value(); + + if (field == "customer_id" + || field == "external_customer_id" + || field == "subscription_id" + || field == "status") + && (oper == "=") + { + self.translate_filter_pushdown(qs, &field, &value); + } + + if field == "due_date" { + self.translate_date_pushdown(qs, &field, &oper, &value)?; + } + + if field == "created_at" { + self.translate_date_pushdown(qs, "invoice_date", &oper, &value)?; + } + } + } + "subscriptions" => { + for qual in ctx.get_quals() { + let field = qual.field(); + let oper = qual.operator(); + let value = qual.value(); + + if (field == "customer_id" + || field == "external_customer_id" + || field == "status") + && (oper == "=") + { + self.translate_filter_pushdown(qs, &field, &value); + } + + if field == "created_at" { + self.translate_date_pushdown(qs, &field, &oper, &value)?; + } + } + } + _ => {} + } + + // push down limits + // Note: Postgres will take limit and offset locally after reading rows + // from remote, so we calculate the real limit and only use it without + // pushing down offset. + self.src_limit = ctx.get_limit().map(|v| v.offset() + v.count()); + + Ok(()) + } + + // create a request instance + fn create_request(&mut self, ctx: &Context) -> Result { + let mut qs = vec![format!("limit={}", BATCH_SIZE)]; + let quals = ctx.get_quals(); + + // set request url, it is in `/` form if `id = ` qual is specified + let mut url = if let Some(q) = quals.iter().find(|q| { + if (q.field() == "id") && (q.operator() == "=") { + if let Value::Cell(Cell::String(_)) = q.value() { + return true; + } + } + false + }) { + match q.value() { + Value::Cell(Cell::String(id)) => { + format!("{}/{}/{}", self.base_url, self.object, id) + } + _ => unreachable!(), + } + } else { + if let Some(ref sc) = self.src_cursor { + qs.push(format!("cursor={}", sc)); + } + self.add_pushdown(&mut qs, ctx)?; + format!("{}/{}?{}", self.base_url, self.object, qs.join("&")) + }; + + match self.object.as_ref() { + // deal with url special case for "credits" and "credits/ledger" endpoints + // ref: https://docs.withorb.com/api-reference/credit/fetch-customer-credit-balance-by-external-customer-id + "credits" | "credits/ledger" => { + if let Some(q) = quals.iter().find(|q| { + let field = q.field(); + (field == "customer_id" || field == "external_customer_id") + && (q.operator() == "=") + }) { + let field = q.field(); + let seg = if field == "customer_id" { + "customers" + } else { + "customers/external_customer_id" + }; + if let Value::Cell(Cell::String(s)) = q.value() { + self.sub_obj = field.clone(); + self.sub_obj_value = s.clone(); + url = format!( + "{}/{}/{}/{}?currency=USD&{}", + self.base_url, + seg, + s, + self.object, + qs.join("&") + ); + } + } + } + _ => {} + } + + Ok(http::Request { + method: http::Method::Get, + url, + headers: self.headers.clone(), + body: String::default(), + }) + } + + // make API request to remote endpoint + fn make_request(&mut self, req: &http::Request) -> Result { + let resp = match req.method { + http::Method::Get => http::get(req)?, + _ => unreachable!("invalid request method"), + }; + + // if encounter the 404 error, we should take it as empty result rather than an error + if resp.status_code == 404 { + return Ok(serde_json::json!([])); + } + + // check for errors + http::error_for_status(&resp).map_err(|err| format!("{}: {}", err, resp.body))?; + + // transform response to json + let resp_json: JsonValue = serde_json::from_str(&resp.body).map_err(|e| e.to_string())?; + + stats::inc_stats(FDW_NAME, stats::Metric::BytesIn, resp.body.len() as i64); + + Ok(resp_json) + } + + // fetch source data rows from Orb API + fn fetch_source_data(&mut self, ctx: &Context) -> FdwResult { + self.src_rows.clear(); + self.src_idx = 0; + + // create a request and send it + let req = self.create_request(ctx)?; + let resp_json = self.make_request(&req)?; + + // unify response object to array and save source rows in local batch + let resp_data = resp_json + .pointer("/data") + .and_then(|v| v.as_array().cloned()) + .or_else(|| { + if resp_json.is_object() { + Some(vec![resp_json.clone()]) + } else { + None + } + }) + .ok_or("cannot parse query result data")?; + self.src_rows.extend(resp_data); + + // save pagination next cursor + if resp_json + .pointer("/pagination_metadata/has_more") + .and_then(|v| v.as_bool()) + == Some(true) + { + self.src_cursor = resp_json + .pointer("/pagination_metadata/next_cursor") + .and_then(|v| v.as_str()) + .map(|v| v.to_owned()); + } else { + self.src_cursor = None; + } + + Ok(()) + } +} + +impl Guest for OrbFdw { + fn host_version_requirement() -> String { + // semver ref: https://docs.rs/semver/latest/semver/enum.Op.html + "^0.1.0".to_string() + } + + fn init(ctx: &Context) -> FdwResult { + Self::init(); + let this = Self::this_mut(); + + // get foreign server options + let opts = ctx.get_options(OptionsType::Server); + this.base_url = opts.require_or("api_url", "https://api.withorb.com/v1"); + let api_key = match opts.get("api_key") { + Some(key) => key, + None => { + let key_id = opts.require("api_key_id")?; + utils::get_vault_secret(&key_id).unwrap_or_default() + } + }; + + // Orb API authentication + // ref: https://docs.withorb.com/api-reference + this.headers + .push(("user-agent".to_owned(), "Wrappers Orb FDW".to_string())); + this.headers + .push(("content-type".to_owned(), "application/json".to_string())); + this.headers + .push(("authorization".to_owned(), format!("Bearer {}", api_key))); + + stats::inc_stats(FDW_NAME, stats::Metric::CreateTimes, 1); + + Ok(()) + } + + fn begin_scan(ctx: &Context) -> FdwResult { + let this = Self::this_mut(); + let opts = ctx.get_options(OptionsType::Table); + this.object = opts.require("object")?; + this.fetch_source_data(ctx) + } + + fn iter_scan(ctx: &Context, row: &Row) -> Result, FdwError> { + let this = Self::this_mut(); + + // if all rows in local batch buffer are consumed + while this.src_idx >= this.src_rows.len() { + stats::inc_stats(FDW_NAME, stats::Metric::RowsIn, this.src_rows.len() as i64); + stats::inc_stats(FDW_NAME, stats::Metric::RowsOut, this.src_rows.len() as i64); + + // no more source records on remote or consumed records exceeds limit, stop the iteration scan + if this.src_cursor.is_none() || (Some(this.consumed_row_cnt) >= this.src_limit) { + return Ok(None); + } + + // otherwise, make a new request for the next batch + this.fetch_source_data(ctx)?; + } + + // convert Orb row to Postgres row + let src_row = &this.src_rows[this.src_idx]; + for tgt_col in ctx.get_columns() { + let cell = this.src_to_cell(src_row, &tgt_col)?; + row.push(cell.as_ref()); + } + this.src_idx += 1; + this.consumed_row_cnt += 1; + + Ok(Some(0)) + } + + fn re_scan(ctx: &Context) -> FdwResult { + let this = Self::this_mut(); + this.src_cursor = None; + this.consumed_row_cnt = 0; + this.fetch_source_data(ctx) + } + + fn end_scan(_ctx: &Context) -> FdwResult { + let this = Self::this_mut(); + this.src_rows.clear(); + Ok(()) + } + + fn begin_modify(ctx: &Context) -> FdwResult { + let this = Self::this_mut(); + let opts = ctx.get_options(OptionsType::Table); + this.object = opts.require("object")?; + Ok(()) + } + + fn insert(_ctx: &Context, row: &Row) -> FdwResult { + let this = Self::this_mut(); + // we assume 'attrs' is defined as the last column + if let Some(Some(Cell::Json(body))) = row.cells().last() { + let url = format!("{}/{}", this.base_url, this.object); + let headers = this.headers.clone(); + let req = http::Request { + method: http::Method::Post, + url, + headers, + body: body.to_owned(), + }; + let resp = http::post(&req)?; + http::error_for_status(&resp).map_err(|err| format!("{}: {}", err, resp.body))?; + stats::inc_stats(FDW_NAME, stats::Metric::RowsOut, 1); + return Ok(()); + } + Err("cannot find 'attrs' JSONB column to insert".to_owned()) + } + + fn update(_ctx: &Context, rowid: Cell, row: &Row) -> FdwResult { + let this = Self::this_mut(); + if let Cell::String(rowid) = rowid { + // we assume 'attrs' is defined as the last column + if let Some(Some(Cell::Json(body))) = row.cells().last() { + let url = format!("{}/{}/{}", this.base_url, this.object, rowid); + let headers = this.headers.clone(); + let req = http::Request { + method: http::Method::Put, + url, + headers, + body: body.to_owned(), + }; + let resp = http::put(&req)?; + http::error_for_status(&resp).map_err(|err| format!("{}: {}", err, resp.body))?; + stats::inc_stats(FDW_NAME, stats::Metric::RowsOut, 1); + Ok(()) + } else { + Err("cannot find 'attrs' JSONB column to update".to_owned()) + } + } else { + Err("no rowid column specified for update".to_owned()) + } + } + + fn delete(_ctx: &Context, rowid: Cell) -> FdwResult { + let this = Self::this_mut(); + if let Cell::String(rowid) = rowid { + let url = format!("{}/{}/{}", this.base_url, this.object, rowid); + let headers = this.headers.clone(); + let req = http::Request { + method: http::Method::Delete, + url, + headers, + body: String::default(), + }; + let resp = http::delete(&req)?; + http::error_for_status(&resp).map_err(|err| format!("{}: {}", err, resp.body))?; + Ok(()) + } else { + Err("no rowid column specified for delete".to_owned()) + } + } + + fn end_modify(_ctx: &Context) -> FdwResult { + Ok(()) + } +} + +bindings::export!(OrbFdw with_types_in bindings); diff --git a/wasm-wrappers/fdw/orb_fdw/wit/world.wit b/wasm-wrappers/fdw/orb_fdw/wit/world.wit new file mode 100644 index 00000000..3f99d261 --- /dev/null +++ b/wasm-wrappers/fdw/orb_fdw/wit/world.wit @@ -0,0 +1,10 @@ +package supabase:orb-fdw@0.1.0; + +world orb { + import supabase:wrappers/http@0.1.0; + import supabase:wrappers/jwt@0.1.0; + import supabase:wrappers/stats@0.1.0; + import supabase:wrappers/time@0.1.0; + import supabase:wrappers/utils@0.1.0; + export supabase:wrappers/routines@0.1.0; +} diff --git a/wrappers/dockerfiles/wasm/server.py b/wrappers/dockerfiles/wasm/server.py index 9018f2b6..6d5399a8 100644 --- a/wrappers/dockerfiles/wasm/server.py +++ b/wrappers/dockerfiles/wasm/server.py @@ -253,6 +253,50 @@ def do_GET(self): } ] ''' + elif fdw == "orb": + body = ''' +{ + "data": [ + { + "accounting_sync_configuration": { + "accounting_providers": [], + "excluded": false + }, + "additional_emails": [], + "auto_collection": true, + "balance": "0.00", + "billing_address": null, + "created_at": "2025-02-15T13:04:43+00:00", + "currency": "USD", + "email": "test@test.com", + "email_delivery": true, + "exempt_from_automated_tax": false, + "external_customer_id": "aaabbbcccddd", + "hierarchy": { + "children": [], + "parent": null + }, + "id": "XimGiw3pnsgusvc3", + "metadata": { + "is_local_entity": "true", + "mydata.0": "aaabbbcccddd" + }, + "name": "test@test.com customer", + "payment_provider": "stripe_charge", + "payment_provider_id": "cus_xxxx", + "portal_url": "https://portal.withorb.com/view?token=aaaa.bbb.ccc", + "reporting_configuration": null, + "shipping_address": null, + "tax_id": null, + "timezone": "Etc/UTC" + } + ], + "pagination_metadata": { + "has_more": false, + "next_cursor": null + } +} + ''' else: self.send_response(404) return diff --git a/wrappers/src/fdw/wasm_fdw/tests.rs b/wrappers/src/fdw/wasm_fdw/tests.rs index 59f0dcbc..18da19a5 100644 --- a/wrappers/src/fdw/wasm_fdw/tests.rs +++ b/wrappers/src/fdw/wasm_fdw/tests.rs @@ -325,6 +325,48 @@ mod tests { .filter_map(|r| r.get_by_name::<&str, _>("id").unwrap()) .collect::>(); assert_eq!(results, vec!["user_2rvWkk90azWI2o3PH4LDuCMDPPh"]); + + // Orb FDW test + c.update( + r#"CREATE SERVER orb_server + FOREIGN DATA WRAPPER wasm_wrapper + OPTIONS ( + fdw_package_url 'file://../../../wasm-wrappers/fdw/orb_fdw/target/wasm32-unknown-unknown/release/orb_fdw.wasm', + fdw_package_name 'supabase:orb-fdw', + fdw_package_version '>=0.1.0', + api_url 'http://localhost:8096/orb', + api_key 'ccc' + )"#, + None, + None, + ) + .unwrap(); + c.update( + r#" + CREATE FOREIGN TABLE orb_table ( + id text, + name text, + email text, + created_at timestamp, + auto_collection boolean, + attrs jsonb + ) + SERVER orb_server + OPTIONS ( + object 'customers' + ) + "#, + None, + None, + ) + .unwrap(); + + let results = c + .select("SELECT * FROM orb_table", None, None) + .unwrap() + .filter_map(|r| r.get_by_name::<&str, _>("id").unwrap()) + .collect::>(); + assert_eq!(results, vec!["XimGiw3pnsgusvc3"]); }); } }