Skip to content

Add replication plugin for external-to-internal data pull#143

Open
brone1323 wants to merge 1 commit intoouterbase:mainfrom
brone1323:bounty-72-replication-plugin
Open

Add replication plugin for external-to-internal data pull#143
brone1323 wants to merge 1 commit intoouterbase:mainfrom
brone1323:bounty-72-replication-plugin

Conversation

@brone1323
Copy link
Copy Markdown

/claim #72

Closes #72.

What this PR does

Adds a ReplicationPlugin (plugins/replication/) that pulls rows from an external Postgres or MySQL source into the DO SQLite on a configurable per-table interval. Append-only polling, watermark-per-table, audit log per tick. Schema is reflected from the source on the first sync so users don't have to maintain a parallel DDL.

Driven by a single env var, REPLICATION_CONFIG_JSON, validated at plugin construction. Runs on the Worker's scheduled() handler — no new scheduling primitive, no external service.

Architecture

Adapter pattern — ReplicationAdapter (types.ts) is the contract. Two built-in adapters:

  • PostgresAdapter (postgres.js — already in dependencies) — schema reflection via information_schema.columns + pg_index for PK detection. Paged pull SELECT * FROM ... WHERE wm > $1 ORDER BY wm ASC LIMIT pageSize.
  • MysqlAdapter (mysql2/promise — already in dependencies) — same shape, MySQL information_schema for the metadata.

Custom adapters can be plugged in via adapterFactory on the plugin constructor (so users can replicate from sources this PR doesn't ship — SQL Server, ClickHouse, REST — without forking the plugin).

Watermarks live in _starbase_replication_watermarks(source, table, watermark_column, last_value, last_run_ts) and are upserted per page, not per run, so a mid-run failure on page N+1 doesn't redo pages 1..N on the next tick.

Audit log in _starbase_replication_log(ts, source, table, rows_pulled, ok, error) records every tick — successful and failed — for observability via the normal /query endpoint.

Failure isolation — if pulling table A fails, the watermark for A is not advanced and the failure is logged, but tables B and C in the same tick still run. Next tick re-attempts A from its prior watermark.

HTTP surface is intentionally minimal — two admin-only endpoints:

  • POST /replication/run — manually fire all due tables (admin token required)
  • GET /replication/status — read current watermarks (admin token required)

That's it. No CRUD admin API, no web UI, no mutable runtime config — config is the env var, observability is the audit table, and any further introspection is a SELECT against the two replication tables.

Differentiation from PR #138

PR #138 ships a sprawling REST API + CronPlugin coupling + 33 tests but reinvents scheduling and configuration plumbing the platform already provides. This PR is intentionally smaller: it leans on the Worker's existing scheduled() handler, validates config at construction, and keeps the surface to the watermark + audit table + adapter contract that the issue actually asks for. It also exposes an adapterFactory so non-Postgres/MySQL sources are a one-line plug-in, not a new in-tree adapter.

Tests

plugins/replication/index.test.ts — 17 tests, all passing:

  • buildCreateTable renders correct CREATE TABLE with PK clause, omits PK when none, throws on zero columns
  • buildInsert uses INSERT OR REPLACE when PK is configured, INSERT OR IGNORE for append-only
  • Config parser rejects malformed JSON, non-array configs, and non-positive intervalSeconds
  • Missing config => no-op plugin (doesn't break the worker)
  • Watermark advances after a successful pull
  • Subsequent ticks within the interval are skipped; pull is called again with the persisted watermark when the interval elapses
  • CREATE TABLE is issued only on the first sync per table, not every tick
  • Multiple sources/tables in one config run independently — one table's pull() failure does not block others, and the failure is recorded in the audit log
  • Watermark is not advanced when the adapter throws
  • Multi-page pulls persist the watermark per page
  • close() releases every instantiated adapter
$ npx vitest run plugins/replication
Test Files  1 passed (1)
     Tests  17 passed (17)

The 4 failing tests in src/rls/index.test.ts already fail on main and are unrelated to this PR.

Files

  • plugins/replication/index.ts — plugin + scheduling
  • plugins/replication/sql.ts — SQL builders, watermark/log DDL
  • plugins/replication/types.ts — ReplicationAdapter, ReplicationConfig
  • plugins/replication/adapters/postgres.ts
  • plugins/replication/adapters/mysql.ts
  • plugins/replication/index.test.ts
  • plugins/replication/README.md
  • plugins/replication/meta.json
  • src/index.ts — registers the plugin and adds the scheduled() entry point
  • wrangler.toml — example config + cron trigger comment

Wiring (for the maintainer reviewing)

  1. Uncomment [triggers] crons = ["* * * * *"] in wrangler.toml (the plugin handles per-table cadence on top — you only need the smallest interval).
  2. Set REPLICATION_CONFIG_JSON in [vars] or as a secret.
  3. Deploy. Plugin auto-registers; tables appear in DO SQLite on the first tick.

If REPLICATION_CONFIG_JSON is unset the scheduled() handler is a no-op and the plugin is invisible.

Pulls rows from an external Postgres or MySQL source into the DO SQLite
on a configurable per-table interval, advancing a per-table watermark on
every successful run. Driven by REPLICATION_CONFIG_JSON; runs on a
Cloudflare Cron Trigger via the Worker scheduled() handler.

Resolves outerbase#72
@brone1323
Copy link
Copy Markdown
Author

Demo video

▶ Watch demo (1.2MB MP4)

Generated programmatically (HTML deck → Playwright recordVideo → ffmpeg). Hosted as a release asset on the fork to comply with the Algora claim requirement.

Sammy / @brone1323

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Replicate data from external source to internal source with a Plugin

1 participant