diff --git a/nx.json b/nx.json index 1a4241962..432ec555b 100644 --- a/nx.json +++ b/nx.json @@ -103,10 +103,6 @@ "local": true, "cache": true }, - "dump-realtime-schema": { - "local": true, - "cache": true - }, "gen-types": { "local": true, "cache": true diff --git a/pkgs/core/.gitignore b/pkgs/core/.gitignore index 191dd15de..96b1c1fe7 100644 --- a/pkgs/core/.gitignore +++ b/pkgs/core/.gitignore @@ -1,2 +1 @@ atlas/schema.sql -atlas/.supabase-baseline-schema.sql diff --git a/pkgs/core/ATLAS.md b/pkgs/core/ATLAS.md deleted file mode 100644 index ecd6690bd..000000000 --- a/pkgs/core/ATLAS.md +++ /dev/null @@ -1,32 +0,0 @@ -# Atlas setup - -We use [Atlas](https://atlasgo.io/docs) to generate migrations from the declarative schemas stored in `./schemas/` folder. - -## Configuration - -The setup is configured in `atlas.hcl`. - -It is set to compare `schemas/` to what is in `supabase/migrations/`. - -### Docker dev image - -Atlas requires a dev database to be available for computing diffs. -The database must be empty, but contain everything needed for the schemas to apply. - -We need a configured [PGMQ](https://github.com/tembo-io/pgmq) extension, which Atlas does not support -in their dev images. - -That's why this setup relies on a custom built image `jumski/postgres-17-pgmq:latest`. - -Inspect `Dockerfile.atlas` to see how it is built. - -See also `./scripts/build-atlas-postgres-image` and `./scripts/push-atlas-postgres-image` scripts for building and pushing the image. - -## Workflow - -1. Make sure you start with a clean database (`pnpm supabase db reset`). -1. Modify the schemas in `schemas/` to a desired state. -1. Run `./scripts/atlas-migrate-diff ` to create a new migration based on the diff. -1. Run `pnpm supabase migration up` to apply the migration. -1. In case of any errors, remove the generated migration file, make changes in `schemas/` and repeat the process. -1. After the migration is applied, verify it does not break tests with `nx test:pgtap` diff --git a/pkgs/core/README.md b/pkgs/core/README.md index f568809aa..9c894cc35 100644 --- a/pkgs/core/README.md +++ b/pkgs/core/README.md @@ -5,10 +5,6 @@ PostgreSQL-native workflow engine for defining, managing, and tracking DAG-based > [!NOTE] > This project and all its components are licensed under [Apache 2.0](./LICENSE) license. -> [!WARNING] -> This project uses [Atlas](https://atlasgo.io/docs) to manage the schemas and migrations. -> See [ATLAS.md](ATLAS.md) for more details. - ## Table of Contents - [Overview](#overview) @@ -49,8 +45,7 @@ The actual execution of workflow tasks is handled by the [Edge Worker](../edge-w ## Requirements -> [!IMPORTANT] -> **pgmq Version Requirement** (since v0.8.0) +> [!IMPORTANT] > **pgmq Version Requirement** (since v0.8.0) > > pgflow v0.8.0 and later requires **pgmq 1.5.0 or higher**. This version of pgflow will NOT work with pgmq 1.4.x or earlier. > diff --git a/pkgs/core/atlas/.gitignore b/pkgs/core/atlas/.gitignore new file mode 100644 index 000000000..e8155c6a8 --- /dev/null +++ b/pkgs/core/atlas/.gitignore @@ -0,0 +1,2 @@ +# Temp Supabase project for baseline dumping +supabase/ diff --git a/pkgs/core/atlas/Dockerfile b/pkgs/core/atlas/Dockerfile index 7ab45c41c..db0b0a975 100644 --- a/pkgs/core/atlas/Dockerfile +++ b/pkgs/core/atlas/Dockerfile @@ -1,4 +1,8 @@ -# Use official PostgreSQL 17 +# Atlas Postgres image with extensions for pgflow +# +# Extension versions aligned with Supabase Postgres 17.6.1.054 +# Reference: https://github.com/supabase/postgres/blob/17.6.1.054/Dockerfile-17 + FROM postgres:17 # Set environment variables for postgres @@ -10,22 +14,65 @@ RUN apt-get update && \ apt-get install -y \ build-essential \ git \ - postgresql-server-dev-17 + postgresql-server-dev-17 \ + libcurl4-openssl-dev \ + libsodium-dev \ + libicu-dev \ + pkg-config -# Clone and install pgmq +# Clone and install pgmq v1.5.1 (not in Supabase image, pgflow dependency) RUN mkdir -p /usr/share/postgresql/17/extension && \ git clone https://github.com/tembo-io/pgmq.git /tmp/pgmq && \ cd /tmp/pgmq/pgmq-extension && \ git checkout v1.5.1 && \ - # Copy extension files manually to PostgreSQL 17 extensions directory make && \ cp pgmq.control /usr/share/postgresql/17/extension/ && \ - cp sql/pgmq--*.sql /usr/share/postgresql/17/extension/ + cp sql/pgmq--*.sql /usr/share/postgresql/17/extension/ && \ + rm -rf /tmp/pgmq + +# Install pg_cron v1.6.4 (v1.6.3+ required for PG17 compatibility) +RUN git clone https://github.com/citusdata/pg_cron.git /tmp/pg_cron && \ + cd /tmp/pg_cron && \ + git checkout v1.6.4 && \ + make && \ + make install && \ + rm -rf /tmp/pg_cron + +# Install pg_net v0.20.2 (PG17 compatible, requires libcurl) +RUN git clone https://github.com/supabase/pg_net.git /tmp/pg_net && \ + cd /tmp/pg_net && \ + git checkout v0.20.2 && \ + make && \ + make install && \ + rm -rf /tmp/pg_net -# Clean up -RUN apt-get remove -y build-essential git postgresql-server-dev-17 && \ +# Install pgsodium v3.1.6 (requires libsodium) +RUN git clone https://github.com/michelp/pgsodium.git /tmp/pgsodium && \ + cd /tmp/pgsodium && \ + git checkout v3.1.6 && \ + make && \ + make install && \ + rm -rf /tmp/pgsodium + +# Install supabase_vault v0.2.8 (depends on pgsodium) +RUN git clone https://github.com/supabase/vault.git /tmp/vault && \ + cd /tmp/vault && \ + git checkout v0.2.8 && \ + make && \ + make install && \ + rm -rf /tmp/vault + +# Clean up build dependencies (keep runtime libs: libcurl4t64, libsodium23, libicu) +RUN apt-get remove -y build-essential git postgresql-server-dev-17 \ + libcurl4-openssl-dev libsodium-dev libicu-dev pkg-config && \ apt-get autoremove -y && \ - rm -rf /var/lib/apt/lists/* /tmp/pgmq + apt-get update && \ + apt-get install -y --no-install-recommends libcurl4t64 libsodium23 && \ + rm -rf /var/lib/apt/lists/* + +# Configure shared_preload_libraries for extensions that require it +# pg_cron and pg_net need to be preloaded (pgsodium NOT needed for Atlas dev) +RUN echo "shared_preload_libraries = 'pg_cron,pg_net'" >> /usr/share/postgresql/postgresql.conf.sample # We need to ensure we don't create any init scripts that will run on startup # We must also remove any existing init scripts to ensure a clean database for Atlas diff --git a/pkgs/core/atlas/README.md b/pkgs/core/atlas/README.md new file mode 100644 index 000000000..267a840f9 --- /dev/null +++ b/pkgs/core/atlas/README.md @@ -0,0 +1,82 @@ +# Atlas Migration Management + +This directory contains configuration for generating pgflow database migrations using [Atlas](https://atlasgo.io/). + +## Files + +- `atlas.hcl` - Atlas configuration +- `Dockerfile` - Custom Postgres image with required extensions +- `supabase-baseline-schema.sql` - Schema baseline representing a fresh hosted Supabase project +- `fresh-extensions.txt` - Record of extensions in fresh Supabase (for reference) +- `dump-fresh-baseline.sh` - Script to regenerate baseline + +## Docker Image + +The `Dockerfile` builds a custom Postgres 17 image with extensions matching hosted Supabase. + +**Reference:** [Supabase Postgres 17.6.1.054 Dockerfile](https://github.com/supabase/postgres/blob/17.6.1.054/Dockerfile-17) + +### Installed Extensions + +| Extension | Git Tag | Ext Version | Notes | +|-----------|---------|-------------|-------| +| pgmq | v1.5.1 | 1.5.1 | pgflow dependency (not in Supabase image) | +| pg_cron | v1.6.4 | 1.6 | Scheduled jobs (v1.6.3+ for PG17) | +| pg_net | v0.7.1 | 0.20.2 | HTTP requests from SQL | +| pgsodium | v3.1.6 | 3.1.9 | Cryptography primitives | +| supabase_vault | v0.2.8 | 0.3.1 | Secrets management (depends on pgsodium) | + +### Building the Image + +```bash +cd pkgs/core +./scripts/build-atlas-postgres-image +./scripts/push-atlas-postgres-image +``` + +## Baseline Schema + +The baseline represents what a **fresh hosted Supabase project** looks like before pgflow migrations. + +### Pre-installed Extensions (hosted Supabase) + +| Extension | Status | +|-----------|--------| +| `supabase_vault` | Pre-installed | +| `pgmq` | Available, not installed | +| `pg_cron` | Available, not installed | +| `pg_net` | Available, not installed | + +Our migrations must `CREATE EXTENSION` for pgmq, pg_cron, and pg_net. + +### Local CLI Difference + +The local Supabase CLI pre-installs `pg_net`, but hosted Supabase does not. The `dump-fresh-baseline.sh` script accounts for this by dropping `pg_net` before dumping to match hosted behavior. + +## Regenerating the Baseline + +Run this when: +- Supabase CLI version changes significantly +- Adding new Supabase-provided schemas to baseline + +```bash +cd pkgs/core/atlas +./dump-fresh-baseline.sh +``` + +The script will: +1. Create a fresh Supabase project in `supabase/` (temp, gitignored) +2. Drop `pg_net` to match hosted Supabase defaults +3. Verify extension assumptions match hosted Supabase +4. Dump schema and extension list +5. Clean up temp project + +## Generating Migrations + +After updating SQL in `supabase/migrations/`, run from `pkgs/core/`: + +```bash +pnpm nx atlas:migrate:diff core --name="description_of_change" +``` + +This compares the current schema against the baseline + existing migrations. diff --git a/pkgs/core/atlas/atlas.hcl b/pkgs/core/atlas/atlas.hcl index 5b0e0e33a..564c51fc5 100644 --- a/pkgs/core/atlas/atlas.hcl +++ b/pkgs/core/atlas/atlas.hcl @@ -15,8 +15,10 @@ docker "postgres" "pgflow" { # image = "postgres:17" # custom image is built and pushed to speed up schema verification, # otherwise it takes around 30s - image = "jumski/postgres-17-pgmq:latest" - baseline = file(".supabase-baseline-schema.sql") + # contains: pgmq, pg_cron, pg_net, pgsodium, supabase_vault + # tag matches Supabase Postgres version we align extensions with + image = "jumski/atlas-postgres-pgflow:17.6.1.054" + baseline = file("supabase-baseline-schema.sql") build { dockerfile = "atlas/Dockerfile" context = "." diff --git a/pkgs/core/atlas/dump-fresh-baseline.sh b/pkgs/core/atlas/dump-fresh-baseline.sh new file mode 100755 index 000000000..8b3f4d840 --- /dev/null +++ b/pkgs/core/atlas/dump-fresh-baseline.sh @@ -0,0 +1,118 @@ +#!/bin/bash +# Dump baseline schema from a fresh Supabase instance +# +# This script: +# 1. Creates a fresh Supabase project (no pgflow) +# 2. Verifies extension availability matches our assumptions +# 3. Dumps the schema as our baseline +# 4. Cleans up +# +# Run this when: +# - Supabase CLI version changes +# - Adding new Supabase-provided schemas to baseline + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$SCRIPT_DIR" + +# Use npx to run supabase directly in this directory (not via pnpm which runs from pkgs/core) +SUPA="npx supabase" + +echo "=== Cleaning up any previous temp project ===" +rm -rf supabase/ || true + +echo "=== Initializing fresh Supabase project ===" +$SUPA init --force + +echo "=== Stopping any running Supabase instances ===" +$SUPA stop --all || true + +echo "=== Starting fresh Supabase ===" +$SUPA start + +echo "=== Getting database URL ===" +DB_URL=$($SUPA status --output json | jq -r '.DB_URL') +echo "DB_URL: $DB_URL" + +echo "=== Matching hosted Supabase defaults ===" +# Local CLI pre-installs pg_net, but hosted Supabase doesn't +# Drop it to match hosted behavior (our migrations will create it) +echo "Dropping pg_net to match hosted Supabase defaults..." +psql "$DB_URL" -c "DROP EXTENSION IF EXISTS pg_net CASCADE;" 2>/dev/null || true + +echo "=== Verifying extension assumptions ===" +# Query extension status (using --no-psqlrc to avoid extra output) +EXTENSION_STATUS=$(PAGER='' psql "$DB_URL" --no-psqlrc -t -A -F'|' -c " +SELECT + name, + CASE WHEN installed_version IS NOT NULL THEN 'installed' ELSE 'available' END as status +FROM pg_available_extensions +WHERE name IN ('pgmq', 'pg_cron', 'pg_net', 'supabase_vault') +ORDER BY name; +") + +echo "Extension status:" +echo "$EXTENSION_STATUS" + +# Expected state matching HOSTED Supabase (not local CLI) +# - supabase_vault: pre-installed +# - pgmq, pg_cron, pg_net: available but NOT installed +EXPECTED="pg_cron|available +pg_net|available +pgmq|available +supabase_vault|installed" + +if [ "$EXTENSION_STATUS" != "$EXPECTED" ]; then + echo "" + echo "ERROR: Extension status does not match hosted Supabase assumptions!" + echo "" + echo "Expected (matching hosted Supabase):" + echo "$EXPECTED" + echo "" + echo "Got:" + echo "$EXTENSION_STATUS" + echo "" + echo "If hosted Supabase changed defaults, update this script's expectations." + $SUPA stop + rm -rf supabase/ + exit 1 +fi + +echo "" +echo "Extension assumptions verified (matches hosted Supabase)!" + +echo "=== Dumping full \\dx output ===" +psql "$DB_URL" -c '\dx' > fresh-extensions.txt +echo "Saved to fresh-extensions.txt" + +echo "=== Dumping baseline schema ===" +# Only dump schemas we need: +# - realtime: we reference realtime.send() +# - vault: supabase_vault is pre-installed here +# Skip extensions schema - it contains Supabase internal event triggers +# that reference roles not available in Atlas dev container +atlas schema inspect \ + --schema realtime,vault \ + -u "$DB_URL?sslmode=disable" \ + --format "{{ sql . }}" > supabase-baseline-schema.sql + +# Strip VERSION strings (they change between Supabase versions) +sed -i 's/ VERSION "[^"]*"//g' supabase-baseline-schema.sql + +# Strip date-specific partitions (they change daily, we don't reference them) +# e.g., messages_2025_12_04, messages_2025_12_05, etc. +sed -i '/messages_20[0-9]\{2\}_[0-9]\{2\}_[0-9]\{2\}/d' supabase-baseline-schema.sql + +echo "Saved to supabase-baseline-schema.sql" + +echo "=== Stopping Supabase ===" +$SUPA stop + +echo "=== Cleaning up temp project ===" +rm -rf supabase/ + +echo "" +echo "=== Done! ===" +echo "Baseline regenerated from fresh Supabase." +echo "Commit the updated supabase-baseline-schema.sql and fresh-extensions.txt" diff --git a/pkgs/core/atlas/fresh-extensions.txt b/pkgs/core/atlas/fresh-extensions.txt new file mode 100644 index 000000000..ceec43691 --- /dev/null +++ b/pkgs/core/atlas/fresh-extensions.txt @@ -0,0 +1,13 @@ +Null display is "[NULL]". +Expanded display is used automatically. + List of installed extensions + Name | Version | Schema | Description +--------------------+---------+------------+------------------------------------------------------------------------ + pg_graphql | 1.5.11 | graphql | pg_graphql: GraphQL support + pg_stat_statements | 1.11 | extensions | track planning and execution statistics of all SQL statements executed + pgcrypto | 1.3 | extensions | cryptographic functions + plpgsql | 1.0 | pg_catalog | PL/pgSQL procedural language + supabase_vault | 0.3.1 | vault | Supabase Vault Extension + uuid-ossp | 1.1 | extensions | generate universally unique identifiers (UUIDs) +(6 rows) + diff --git a/pkgs/core/atlas/supabase-baseline-schema.sql b/pkgs/core/atlas/supabase-baseline-schema.sql new file mode 100644 index 000000000..c90196cbe --- /dev/null +++ b/pkgs/core/atlas/supabase-baseline-schema.sql @@ -0,0 +1,664 @@ +-- Add new schema named "pgsodium" +CREATE SCHEMA "pgsodium"; +-- Add new schema named "realtime" +CREATE SCHEMA "realtime"; +-- Add new schema named "vault" +CREATE SCHEMA "vault"; +-- Create extension "pgsodium" (required by supabase_vault) +CREATE EXTENSION "pgsodium" WITH SCHEMA "pgsodium"; +-- Create extension "supabase_vault" +CREATE EXTENSION "supabase_vault" WITH SCHEMA "vault"; +-- Create enum type "equality_op" +CREATE TYPE "realtime"."equality_op" AS ENUM ('eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'in'); +-- Create enum type "action" +CREATE TYPE "realtime"."action" AS ENUM ('INSERT', 'UPDATE', 'DELETE', 'TRUNCATE', 'ERROR'); +-- Create composite type "user_defined_filter" +CREATE TYPE "realtime"."user_defined_filter" AS ("column_name" text, "op" "realtime"."equality_op", "value" text); +-- Create composite type "wal_rls" +CREATE TYPE "realtime"."wal_rls" AS ("wal" jsonb, "is_rls_enabled" boolean, "subscription_ids" uuid[], "errors" text[]); +-- Create composite type "wal_column" +CREATE TYPE "realtime"."wal_column" AS ("name" text, "type_name" text, "type_oid" oid, "value" jsonb, "is_pkey" boolean, "is_selectable" boolean); +-- Create "messages" table +CREATE TABLE "realtime"."messages" ( + "topic" text NOT NULL, + "extension" text NOT NULL, + "payload" jsonb NULL, + "event" text NULL, + "private" boolean NULL DEFAULT false, + "updated_at" timestamp NOT NULL DEFAULT now(), + "inserted_at" timestamp NOT NULL DEFAULT now(), + "id" uuid NOT NULL DEFAULT gen_random_uuid(), + PRIMARY KEY ("id", "inserted_at") +) PARTITION BY RANGE ("inserted_at"); +-- Create index "messages_inserted_at_topic_index" to table: "messages" +CREATE INDEX "messages_inserted_at_topic_index" ON "realtime"."messages" ("inserted_at" DESC, "topic") WHERE ((extension = 'broadcast'::text) AND (private IS TRUE)); +-- Enable row-level security for "messages" table +ALTER TABLE "realtime"."messages" ENABLE ROW LEVEL SECURITY; +-- Create "schema_migrations" table +CREATE TABLE "realtime"."schema_migrations" ( + "version" bigint NOT NULL, + "inserted_at" timestamp(0) NULL, + PRIMARY KEY ("version") +); +-- Create "to_regrole" function +CREATE FUNCTION "realtime"."to_regrole" ("role_name" text) RETURNS regrole LANGUAGE sql IMMUTABLE AS $$ select role_name::regrole $$; +-- Create "subscription" table +CREATE TABLE "realtime"."subscription" ( + "id" bigint NOT NULL GENERATED ALWAYS AS IDENTITY, + "subscription_id" uuid NOT NULL, + "entity" regclass NOT NULL, + "filters" "realtime"."user_defined_filter"[] NOT NULL DEFAULT '{}', + "claims" jsonb NOT NULL, + "claims_role" regrole NOT NULL GENERATED ALWAYS AS (realtime.to_regrole((claims ->> 'role'::text))) STORED, + "created_at" timestamp NOT NULL DEFAULT timezone('utc'::text, now()), + CONSTRAINT "pk_subscription" PRIMARY KEY ("id") +); +-- Create index "ix_realtime_subscription_entity" to table: "subscription" +CREATE INDEX "ix_realtime_subscription_entity" ON "realtime"."subscription" ("entity"); +-- Create index "subscription_subscription_id_entity_filters_key" to table: "subscription" +CREATE UNIQUE INDEX "subscription_subscription_id_entity_filters_key" ON "realtime"."subscription" ("subscription_id", "entity", "filters"); +-- Create "cast" function +CREATE FUNCTION "realtime"."cast" ("val" text, "type_" regtype) RETURNS jsonb LANGUAGE plpgsql IMMUTABLE AS $$ +declare + res jsonb; + begin + execute format('select to_jsonb(%L::'|| type_::text || ')', val) into res; + return res; + end +$$; +-- Create "subscription_check_filters" function +CREATE FUNCTION "realtime"."subscription_check_filters" () RETURNS trigger LANGUAGE plpgsql AS $$ +/* + Validates that the user defined filters for a subscription: + - refer to valid columns that the claimed role may access + - values are coercable to the correct column type + */ + declare + col_names text[] = coalesce( + array_agg(c.column_name order by c.ordinal_position), + '{}'::text[] + ) + from + information_schema.columns c + where + format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity + and pg_catalog.has_column_privilege( + (new.claims ->> 'role'), + format('%I.%I', c.table_schema, c.table_name)::regclass, + c.column_name, + 'SELECT' + ); + filter realtime.user_defined_filter; + col_type regtype; + + in_val jsonb; + begin + for filter in select * from unnest(new.filters) loop + -- Filtered column is valid + if not filter.column_name = any(col_names) then + raise exception 'invalid column for filter %', filter.column_name; + end if; + + -- Type is sanitized and safe for string interpolation + col_type = ( + select atttypid::regtype + from pg_catalog.pg_attribute + where attrelid = new.entity + and attname = filter.column_name + ); + if col_type is null then + raise exception 'failed to lookup type for column %', filter.column_name; + end if; + + -- Set maximum number of entries for in filter + if filter.op = 'in'::realtime.equality_op then + in_val = realtime.cast(filter.value, (col_type::text || '[]')::regtype); + if coalesce(jsonb_array_length(in_val), 0) > 100 then + raise exception 'too many values for `in` filter. Maximum 100'; + end if; + else + -- raises an exception if value is not coercable to type + perform realtime.cast(filter.value, col_type); + end if; + + end loop; + + -- Apply consistent order to filters so the unique constraint on + -- (subscription_id, entity, filters) can't be tricked by a different filter order + new.filters = coalesce( + array_agg(f order by f.column_name, f.op, f.value), + '{}' + ) from unnest(new.filters) f; + + return new; + end; +$$; +-- Create trigger "tr_check_filters" +CREATE TRIGGER "tr_check_filters" BEFORE INSERT OR UPDATE ON "realtime"."subscription" FOR EACH ROW EXECUTE FUNCTION "realtime"."subscription_check_filters"(); +-- Create "build_prepared_statement_sql" function +CREATE FUNCTION "realtime"."build_prepared_statement_sql" ("prepared_statement_name" text, "entity" regclass, "columns" "realtime"."wal_column"[]) RETURNS text LANGUAGE sql AS $$ +/* + Builds a sql string that, if executed, creates a prepared statement to + tests retrive a row from *entity* by its primary key columns. + Example + select realtime.build_prepared_statement_sql('public.notes', '{"id"}'::text[], '{"bigint"}'::text[]) + */ + select + 'prepare ' || prepared_statement_name || ' as + select + exists( + select + 1 + from + ' || entity || ' + where + ' || string_agg(quote_ident(pkc.name) || '=' || quote_nullable(pkc.value #>> '{}') , ' and ') || ' + )' + from + unnest(columns) pkc + where + pkc.is_pkey + group by + entity +$$; +-- Create "check_equality_op" function +CREATE FUNCTION "realtime"."check_equality_op" ("op" "realtime"."equality_op", "type_" regtype, "val_1" text, "val_2" text) RETURNS boolean LANGUAGE plpgsql IMMUTABLE AS $$ +/* + Casts *val_1* and *val_2* as type *type_* and check the *op* condition for truthiness + */ + declare + op_symbol text = ( + case + when op = 'eq' then '=' + when op = 'neq' then '!=' + when op = 'lt' then '<' + when op = 'lte' then '<=' + when op = 'gt' then '>' + when op = 'gte' then '>=' + when op = 'in' then '= any' + else 'UNKNOWN OP' + end + ); + res boolean; + begin + execute format( + 'select %L::'|| type_::text || ' ' || op_symbol + || ' ( %L::' + || ( + case + when op = 'in' then type_::text || '[]' + else type_::text end + ) + || ')', val_1, val_2) into res; + return res; + end; +$$; +-- Create "is_visible_through_filters" function +CREATE FUNCTION "realtime"."is_visible_through_filters" ("columns" "realtime"."wal_column"[], "filters" "realtime"."user_defined_filter"[]) RETURNS boolean LANGUAGE sql IMMUTABLE AS $$ +/* + Should the record be visible (true) or filtered out (false) after *filters* are applied + */ + select + -- Default to allowed when no filters present + $2 is null -- no filters. this should not happen because subscriptions has a default + or array_length($2, 1) is null -- array length of an empty array is null + or bool_and( + coalesce( + realtime.check_equality_op( + op:=f.op, + type_:=coalesce( + col.type_oid::regtype, -- null when wal2json version <= 2.4 + col.type_name::regtype + ), + -- cast jsonb to text + val_1:=col.value #>> '{}', + val_2:=f.value + ), + false -- if null, filter does not match + ) + ) + from + unnest(filters) f + join unnest(columns) col + on f.column_name = col.name; +$$; +-- Create "apply_rls" function +CREATE FUNCTION "realtime"."apply_rls" ("wal" jsonb, "max_record_bytes" integer DEFAULT (1024 * 1024)) RETURNS SETOF "realtime"."wal_rls" LANGUAGE plpgsql AS $$ +declare +-- Regclass of the table e.g. public.notes +entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass; + +-- I, U, D, T: insert, update ... +action realtime.action = ( + case wal ->> 'action' + when 'I' then 'INSERT' + when 'U' then 'UPDATE' + when 'D' then 'DELETE' + else 'ERROR' + end +); + +-- Is row level security enabled for the table +is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_; + +subscriptions realtime.subscription[] = array_agg(subs) + from + realtime.subscription subs + where + subs.entity = entity_; + +-- Subscription vars +roles regrole[] = array_agg(distinct us.claims_role::text) + from + unnest(subscriptions) us; + +working_role regrole; +claimed_role regrole; +claims jsonb; + +subscription_id uuid; +subscription_has_access bool; +visible_to_subscription_ids uuid[] = '{}'; + +-- structured info for wal's columns +columns realtime.wal_column[]; +-- previous identity values for update/delete +old_columns realtime.wal_column[]; + +error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes; + +-- Primary jsonb output for record +output jsonb; + +begin +perform set_config('role', null, true); + +columns = + array_agg( + ( + x->>'name', + x->>'type', + x->>'typeoid', + realtime.cast( + (x->'value') #>> '{}', + coalesce( + (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 + (x->>'type')::regtype + ) + ), + (pks ->> 'name') is not null, + true + )::realtime.wal_column + ) + from + jsonb_array_elements(wal -> 'columns') x + left join jsonb_array_elements(wal -> 'pk') pks + on (x ->> 'name') = (pks ->> 'name'); + +old_columns = + array_agg( + ( + x->>'name', + x->>'type', + x->>'typeoid', + realtime.cast( + (x->'value') #>> '{}', + coalesce( + (x->>'typeoid')::regtype, -- null when wal2json version <= 2.4 + (x->>'type')::regtype + ) + ), + (pks ->> 'name') is not null, + true + )::realtime.wal_column + ) + from + jsonb_array_elements(wal -> 'identity') x + left join jsonb_array_elements(wal -> 'pk') pks + on (x ->> 'name') = (pks ->> 'name'); + +for working_role in select * from unnest(roles) loop + + -- Update `is_selectable` for columns and old_columns + columns = + array_agg( + ( + c.name, + c.type_name, + c.type_oid, + c.value, + c.is_pkey, + pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') + )::realtime.wal_column + ) + from + unnest(columns) c; + + old_columns = + array_agg( + ( + c.name, + c.type_name, + c.type_oid, + c.value, + c.is_pkey, + pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT') + )::realtime.wal_column + ) + from + unnest(old_columns) c; + + if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then + return next ( + jsonb_build_object( + 'schema', wal ->> 'schema', + 'table', wal ->> 'table', + 'type', action + ), + is_rls_enabled, + -- subscriptions is already filtered by entity + (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), + array['Error 400: Bad Request, no primary key'] + )::realtime.wal_rls; + + -- The claims role does not have SELECT permission to the primary key of entity + elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where c.is_pkey then + return next ( + jsonb_build_object( + 'schema', wal ->> 'schema', + 'table', wal ->> 'table', + 'type', action + ), + is_rls_enabled, + (select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role), + array['Error 401: Unauthorized'] + )::realtime.wal_rls; + + else + output = jsonb_build_object( + 'schema', wal ->> 'schema', + 'table', wal ->> 'table', + 'type', action, + 'commit_timestamp', to_char( + ((wal ->> 'timestamp')::timestamptz at time zone 'utc'), + 'YYYY-MM-DD"T"HH24:MI:SS.MS"Z"' + ), + 'columns', ( + select + jsonb_agg( + jsonb_build_object( + 'name', pa.attname, + 'type', pt.typname + ) + order by pa.attnum asc + ) + from + pg_attribute pa + join pg_type pt + on pa.atttypid = pt.oid + where + attrelid = entity_ + and attnum > 0 + and pg_catalog.has_column_privilege(working_role, entity_, pa.attname, 'SELECT') + ) + ) + -- Add "record" key for insert and update + || case + when action in ('INSERT', 'UPDATE') then + jsonb_build_object( + 'record', + ( + select + jsonb_object_agg( + -- if unchanged toast, get column name and value from old record + coalesce((c).name, (oc).name), + case + when (c).name is null then (oc).value + else (c).value + end + ) + from + unnest(columns) c + full outer join unnest(old_columns) oc + on (c).name = (oc).name + where + coalesce((c).is_selectable, (oc).is_selectable) + and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) + ) + ) + else '{}'::jsonb + end + -- Add "old_record" key for update and delete + || case + when action = 'UPDATE' then + jsonb_build_object( + 'old_record', + ( + select jsonb_object_agg((c).name, (c).value) + from unnest(old_columns) c + where + (c).is_selectable + and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) + ) + ) + when action = 'DELETE' then + jsonb_build_object( + 'old_record', + ( + select jsonb_object_agg((c).name, (c).value) + from unnest(old_columns) c + where + (c).is_selectable + and ( not error_record_exceeds_max_size or (octet_length((c).value::text) <= 64)) + and ( not is_rls_enabled or (c).is_pkey ) -- if RLS enabled, we can't secure deletes so filter to pkey + ) + ) + else '{}'::jsonb + end; + + -- Create the prepared statement + if is_rls_enabled and action <> 'DELETE' then + if (select 1 from pg_prepared_statements where name = 'walrus_rls_stmt' limit 1) > 0 then + deallocate walrus_rls_stmt; + end if; + execute realtime.build_prepared_statement_sql('walrus_rls_stmt', entity_, columns); + end if; + + visible_to_subscription_ids = '{}'; + + for subscription_id, claims in ( + select + subs.subscription_id, + subs.claims + from + unnest(subscriptions) subs + where + subs.entity = entity_ + and subs.claims_role = working_role + and ( + realtime.is_visible_through_filters(columns, subs.filters) + or ( + action = 'DELETE' + and realtime.is_visible_through_filters(old_columns, subs.filters) + ) + ) + ) loop + + if not is_rls_enabled or action = 'DELETE' then + visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; + else + -- Check if RLS allows the role to see the record + perform + -- Trim leading and trailing quotes from working_role because set_config + -- doesn't recognize the role as valid if they are included + set_config('role', trim(both '"' from working_role::text), true), + set_config('request.jwt.claims', claims::text, true); + + execute 'execute walrus_rls_stmt' into subscription_has_access; + + if subscription_has_access then + visible_to_subscription_ids = visible_to_subscription_ids || subscription_id; + end if; + end if; + end loop; + + perform set_config('role', null, true); + + return next ( + output, + is_rls_enabled, + visible_to_subscription_ids, + case + when error_record_exceeds_max_size then array['Error 413: Payload Too Large'] + else '{}' + end + )::realtime.wal_rls; + + end if; +end loop; + +perform set_config('role', null, true); +end; +$$; +-- Create "send" function +CREATE FUNCTION "realtime"."send" ("payload" jsonb, "event" text, "topic" text, "private" boolean DEFAULT true) RETURNS void LANGUAGE plpgsql AS $$ +DECLARE + generated_id uuid; + final_payload jsonb; +BEGIN + BEGIN + -- Generate a new UUID for the id + generated_id := gen_random_uuid(); + + -- Check if payload has an 'id' key, if not, add the generated UUID + IF payload ? 'id' THEN + final_payload := payload; + ELSE + final_payload := jsonb_set(payload, '{id}', to_jsonb(generated_id)); + END IF; + + -- Set the topic configuration + EXECUTE format('SET LOCAL realtime.topic TO %L', topic); + + -- Attempt to insert the message + INSERT INTO realtime.messages (id, payload, event, topic, private, extension) + VALUES (generated_id, final_payload, event, topic, private, 'broadcast'); + EXCEPTION + WHEN OTHERS THEN + -- Capture and notify the error + RAISE WARNING 'ErrorSendingBroadcastMessage: %', SQLERRM; + END; +END; +$$; +-- Create "broadcast_changes" function +CREATE FUNCTION "realtime"."broadcast_changes" ("topic_name" text, "event_name" text, "operation" text, "table_name" text, "table_schema" text, "new" record, "old" record, "level" text DEFAULT 'ROW') RETURNS void LANGUAGE plpgsql AS $$ +DECLARE + -- Declare a variable to hold the JSONB representation of the row + row_data jsonb := '{}'::jsonb; +BEGIN + IF level = 'STATEMENT' THEN + RAISE EXCEPTION 'function can only be triggered for each row, not for each statement'; + END IF; + -- Check the operation type and handle accordingly + IF operation = 'INSERT' OR operation = 'UPDATE' OR operation = 'DELETE' THEN + row_data := jsonb_build_object('old_record', OLD, 'record', NEW, 'operation', operation, 'table', table_name, 'schema', table_schema); + PERFORM realtime.send (row_data, event_name, topic_name); + ELSE + RAISE EXCEPTION 'Unexpected operation type: %', operation; + END IF; +EXCEPTION + WHEN OTHERS THEN + RAISE EXCEPTION 'Failed to process the row: %', SQLERRM; +END; +$$; +-- Create "quote_wal2json" function +CREATE FUNCTION "realtime"."quote_wal2json" ("entity" regclass) RETURNS text LANGUAGE sql STRICT IMMUTABLE AS $$ +select + ( + select string_agg('' || ch,'') + from unnest(string_to_array(nsp.nspname::text, null)) with ordinality x(ch, idx) + where + not (x.idx = 1 and x.ch = '"') + and not ( + x.idx = array_length(string_to_array(nsp.nspname::text, null), 1) + and x.ch = '"' + ) + ) + || '.' + || ( + select string_agg('' || ch,'') + from unnest(string_to_array(pc.relname::text, null)) with ordinality x(ch, idx) + where + not (x.idx = 1 and x.ch = '"') + and not ( + x.idx = array_length(string_to_array(nsp.nspname::text, null), 1) + and x.ch = '"' + ) + ) + from + pg_class pc + join pg_namespace nsp + on pc.relnamespace = nsp.oid + where + pc.oid = entity +$$; +-- Create "list_changes" function +CREATE FUNCTION "realtime"."list_changes" ("publication" name, "slot_name" name, "max_changes" integer, "max_record_bytes" integer) RETURNS SETOF "realtime"."wal_rls" LANGUAGE sql SET "log_min_messages" = 'fatal' AS $$ +with pub as ( + select + concat_ws( + ',', + case when bool_or(pubinsert) then 'insert' else null end, + case when bool_or(pubupdate) then 'update' else null end, + case when bool_or(pubdelete) then 'delete' else null end + ) as w2j_actions, + coalesce( + string_agg( + realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass), + ',' + ) filter (where ppt.tablename is not null and ppt.tablename not like '% %'), + '' + ) w2j_add_tables + from + pg_publication pp + left join pg_publication_tables ppt + on pp.pubname = ppt.pubname + where + pp.pubname = publication + group by + pp.pubname + limit 1 + ), + w2j as ( + select + x.*, pub.w2j_add_tables + from + pub, + pg_logical_slot_get_changes( + slot_name, null, max_changes, + 'include-pk', 'true', + 'include-transaction', 'false', + 'include-timestamp', 'true', + 'include-type-oids', 'true', + 'format-version', '2', + 'actions', pub.w2j_actions, + 'add-tables', pub.w2j_add_tables + ) x + ) + select + xyz.wal, + xyz.is_rls_enabled, + xyz.subscription_ids, + xyz.errors + from + w2j, + realtime.apply_rls( + wal := w2j.data::jsonb, + max_record_bytes := max_record_bytes + ) xyz(wal, is_rls_enabled, subscription_ids, errors) + where + w2j.w2j_add_tables <> '' + and xyz.subscription_ids[1] is not null +$$; +-- Create "topic" function +CREATE FUNCTION "realtime"."topic" () RETURNS text LANGUAGE sql STABLE AS $$ select nullif(current_setting('realtime.topic', true), '')::text; $$; diff --git a/pkgs/core/project.json b/pkgs/core/project.json index 22b16e709..33182ca5d 100644 --- a/pkgs/core/project.json +++ b/pkgs/core/project.json @@ -22,28 +22,8 @@ "supabase:ci-marker": { "executor": "nx:noop" }, - "dump-realtime-schema": { - "executor": "nx:run-commands", - "local": true, - "inputs": [ - "{projectRoot}/package.json", - "{projectRoot}/supabase/config.toml", - "{projectRoot}/scripts/atlas-dump-realtime-schema" - ], - "outputs": ["{projectRoot}/atlas/.supabase-baseline-schema.sql"], - "options": { - "cwd": "{projectRoot}", - "commands": [ - "../../scripts/supabase-start-locked.sh .", - "scripts/atlas-dump-realtime-schema" - ], - "parallel": false - }, - "cache": true - }, "verify-schemas-synced": { "executor": "nx:run-commands", - "dependsOn": ["dump-realtime-schema"], "inputs": [ "schemas", "migrations", diff --git a/pkgs/core/scripts/atlas-dump-realtime-schema b/pkgs/core/scripts/atlas-dump-realtime-schema deleted file mode 100755 index 4395e1286..000000000 --- a/pkgs/core/scripts/atlas-dump-realtime-schema +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash -set -e - -echo "Dumping baseline schema for Atlas" - -echo > atlas/.supabase-baseline-schema.sql - -echo "Inspecting realtime schema..." -atlas schema inspect --schema realtime -u "postgres://postgres:postgres@localhost:50422/postgres?sslmode=disable" --format "{{ sql . }}" >> atlas/.supabase-baseline-schema.sql || { - echo "Failed to inspect realtime schema" - exit 1 -} - -echo "Inspecting pgmq schema..." -atlas schema inspect --schema pgmq -u "postgres://postgres:postgres@localhost:50422/postgres?sslmode=disable" --format "{{ sql . }}" >> atlas/.supabase-baseline-schema.sql || { - echo "Failed to inspect pgmq schema" - exit 1 -} diff --git a/pkgs/core/scripts/atlas-migrate-diff b/pkgs/core/scripts/atlas-migrate-diff index 64b91120b..f260ebbbc 100755 --- a/pkgs/core/scripts/atlas-migrate-diff +++ b/pkgs/core/scripts/atlas-migrate-diff @@ -1,7 +1,8 @@ #!/bin/bash set -e -./scripts/atlas-dump-realtime-schema +# Baseline is pre-generated from fresh Supabase using atlas/dump-fresh-baseline.sh +# Regenerate only when Supabase CLI version changes significantly # Check if migration name is provided if [ $# -eq 0 ]; then diff --git a/pkgs/core/scripts/atlas-verify-schemas-synced b/pkgs/core/scripts/atlas-verify-schemas-synced index 737e4ad94..fdfda7067 100755 --- a/pkgs/core/scripts/atlas-verify-schemas-synced +++ b/pkgs/core/scripts/atlas-verify-schemas-synced @@ -31,9 +31,8 @@ docker rm -f atlas-dev-postgres-pgflow 2>/dev/null || true echo "Creating temporary migrations directory..." cp -r supabase/migrations "$TEMP_MIGRATIONS_DIR" -# First, ensure the baseline schema file is updated -echo "Running atlas-dump-realtime-schema..." -./scripts/atlas-dump-realtime-schema +# Baseline is pre-generated and committed (see atlas/dump-fresh-baseline.sh) +# No need to regenerate it here # Use a simple fixed temporary name temp_migration="temp_verify_schemas" diff --git a/pkgs/core/scripts/build-atlas-postgres-image b/pkgs/core/scripts/build-atlas-postgres-image index 0fdee36a4..b7c326a9f 100755 --- a/pkgs/core/scripts/build-atlas-postgres-image +++ b/pkgs/core/scripts/build-atlas-postgres-image @@ -1,2 +1,9 @@ #!/bin/bash -docker build -t jumski/postgres-17-pgmq:latest --file atlas/Dockerfile atlas/ +# Tag matches Supabase Postgres version we align extensions with +# See: https://github.com/supabase/postgres/releases +SUPABASE_VERSION="17.6.1.054" + +docker build \ + -t jumski/atlas-postgres-pgflow:${SUPABASE_VERSION} \ + -t jumski/atlas-postgres-pgflow:latest \ + --file atlas/Dockerfile atlas/ diff --git a/pkgs/core/scripts/push-atlas-postgres-image b/pkgs/core/scripts/push-atlas-postgres-image index 56576da6b..8b8647a0c 100755 --- a/pkgs/core/scripts/push-atlas-postgres-image +++ b/pkgs/core/scripts/push-atlas-postgres-image @@ -1,2 +1,6 @@ #!/bin/bash -docker push jumski/postgres-17-pgmq:latest +# Tag matches Supabase Postgres version we align extensions with +SUPABASE_VERSION="17.6.1.054" + +docker push jumski/atlas-postgres-pgflow:${SUPABASE_VERSION} +docker push jumski/atlas-postgres-pgflow:latest diff --git a/pkgs/edge-worker/scripts/concatenate-migrations.sh b/pkgs/edge-worker/scripts/concatenate-migrations.sh index 5de4dbe48..b78520034 100755 --- a/pkgs/edge-worker/scripts/concatenate-migrations.sh +++ b/pkgs/edge-worker/scripts/concatenate-migrations.sh @@ -9,7 +9,7 @@ echo "-- Generated on $(date)" >> "$target_file" echo "" >> "$target_file" # First add realtime schema if it exists -realtime_schema_file="../core/atlas/.supabase-baseline-schema.sql" +realtime_schema_file="../core/atlas/supabase-baseline-schema.sql" if [ -f "$realtime_schema_file" ]; then echo "-- Including realtime schema from atlas dump" >> "$target_file" cat "$realtime_schema_file" >> "$target_file" @@ -17,7 +17,7 @@ if [ -f "$realtime_schema_file" ]; then echo "" >> "$target_file" else echo -e "\e[31mERROR: Realtime schema file not found at $realtime_schema_file\e[0m" - echo -e "\e[31mRun 'nx dump-realtime-schema core' to generate it\e[0m" + echo -e "\e[31mRun 'cd pkgs/core/atlas && ./dump-fresh-baseline.sh' to generate it\e[0m" exit 1 fi diff --git a/pkgs/edge-worker/tests/db/compose.yaml b/pkgs/edge-worker/tests/db/compose.yaml index dd1647c4e..69450ca03 100644 --- a/pkgs/edge-worker/tests/db/compose.yaml +++ b/pkgs/edge-worker/tests/db/compose.yaml @@ -1,8 +1,7 @@ services: db: - image: jumski/postgres-17-pgmq:latest - # image: postgres:17-alpine - # image: supabase/postgres:17.x + # Must match image in pkgs/core/atlas/atlas.hcl for baseline compatibility + image: jumski/atlas-postgres-pgflow:17.6.1.054 ports: - '5432:5432' volumes: