From 8e64d79910c8132d7243560134fbf3bd69ca71c3 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Mon, 20 Apr 2026 10:35:18 +0000 Subject: [PATCH 01/10] sharding docs --- docs/RESHARDING.md | 292 +++++++++++++++++++++++++++++++++++++++++++++ docs/SHARDING.md | 230 +++++++++++++++++++++++++++++++++++ 2 files changed, 522 insertions(+) create mode 100644 docs/RESHARDING.md create mode 100644 docs/SHARDING.md diff --git a/docs/RESHARDING.md b/docs/RESHARDING.md new file mode 100644 index 000000000..69da10bed --- /dev/null +++ b/docs/RESHARDING.md @@ -0,0 +1,292 @@ +# Resharding — Implementation + +This document describes how the resharding pipeline works at the code level. For the user-facing +prerequisites, step-by-step guide, and cutover configuration see +[Resharding Postgres](https://docs.pgdog.dev/features/sharding/resharding/) and the companion +blog post [Shard Postgres with one command](https://pgdog.dev/blog/shard-postgres-with-one-command). +For sharding routing internals see [SHARDING.md](./SHARDING.md). + +--- + +## Entry point — `RESHARD` command + +```sql +RESHARD ; +``` + +Issued against the admin database. Parsed in [`pgdog/src/admin/reshard.rs`](../pgdog/src/admin/reshard.rs), which calls +`Orchestrator::new(source, destination, publication, slot_name)` and then +`orchestrator.replicate_and_cutover().await`. + +> **Multi-node deployments:** Traffic cutover via `RESHARD` is supported on single-node PgDog only. +> The [Enterprise Edition control plane](https://docs.pgdog.dev/enterprise_edition/control_plane/) +> is required for coordinated cutover across multiple PgDog containers. + +--- + +## Orchestrator + +`Orchestrator` in [`pgdog/src/backend/replication/logical/orchestrator.rs`](../pgdog/src/backend/replication/logical/orchestrator.rs) owns: +- `source: Cluster` / `destination: Cluster` — connection handles to the two database clusters +- `publisher: Arc>` — manages replication slots, table list, and lag tracking +- `replication_slot: String` — auto-generated as `__pgdog_repl_` unless overridden + +`replicate_and_cutover()` is the top-level method and calls the five steps below in sequence: + +```mermaid +flowchart LR + A["1. load_schema
pg_dump on source"] + B["2. schema_sync_pre
pre-data to dest
reload schema cache"] + C["3. data_sync
ParallelSyncManager
binary COPY"] + D["4. schema_sync_post
secondary indexes"] + E["5. replicate().cutover()
WAL drain
traffic swap"] + + A --> B --> C --> D --> E +``` + +--- + +## Step 1 — Schema dump + +`Orchestrator::load_schema()` creates a `PgDump` ([`pgdog/src/backend/schema/sync/pg_dump.rs`](../pgdog/src/backend/schema/sync/pg_dump.rs)) +with the source cluster and publication name, calls `pg_dump.dump().await`, and stores the +`PgDumpOutput` on the orchestrator. This output carries pre-data (tables, types, extensions, +primary key constraints), secondary index DDL, post-cutover operations, and sequences — split +into `SyncState` phases so they can be applied in the right order later. + +--- + +## Step 2 — Pre-data schema sync + +`schema_sync_pre()` restores `SyncState::PreData` from the dump to the destination cluster, then: +1. Calls `reload_from_existing()` to refresh PgDog's in-memory schema cache so subsequent routing + decisions reflect the new destination schema. +2. Re-fetches `source` and `destination` clusters from `databases()` (addresses may have changed + after the reload). +3. If the destination has `RewriteMode::RewriteOmni`, installs the sharded sequence schema via + `Schema::install()`. + +> **Prerequisite:** all tables in the publication must have a primary key. `Table::valid()` in +> [`pgdog/src/backend/replication/logical/publisher/table.rs`](../pgdog/src/backend/replication/logical/publisher/table.rs) checks this and returns +> `Error::NoPrimaryKey(table)` before any data moves. Without a PK, the upsert conflict target +> is undefined and the replication stream cannot be made idempotent. + +--- + +## Step 3 — Data sync (parallel COPY) + +`Orchestrator::data_sync()` delegates to `Publisher::data_sync()`, which builds a +`ParallelSyncManager` and calls `manager.run().await`. + +### ParallelSyncManager ([`publisher/parallel_sync.rs`](../pgdog/src/backend/replication/logical/publisher/parallel_sync.rs)) + +`ParallelSyncManager::new()` takes the table list, a set of source replica connection pools, and +the destination cluster. It sizes a `Semaphore` to +`replicas.len() × dest.resharding_parallel_copies()`. Each table is spawned as a `tokio::spawn` +task via `ParallelSync::run()`. All tasks share an `UnboundedSender`; the manager collects +completions via `rx.recv()`. Replicas are round-robined across tasks. + +> **Replica isolation:** replicas tagged `resharding_only = true` in `pgdog.toml` are included +> here and excluded from normal application traffic. The `Semaphore` ensures the source replicas +> and destination shards are not overwhelmed. + +> **WAL disk space:** each per-table `ReplicationSlot` created during the copy prevents PostgreSQL +> from recycling WAL on the source until the slot is drained. Estimate WAL write rate × copy +> duration and provision that headroom before starting. An orphaned slot from a failed reshard +> accumulates WAL indefinitely — drop it before retrying (see "When things go wrong" below). + +### Per-table copy flow ([`Table::data_sync()`](../pgdog/src/backend/replication/logical/publisher/table.rs)) + +Each task performs this sequence against its assigned source replica: + +1. Creates a `CopySubscriber` — opens connections to all destination shards. +2. Creates a `ReplicationSlot::data_sync()` — opens a streaming replication connection to the + source replica. +3. `slot.create_slot()` — creates a **temporary** logical replication slot, returning the current + LSN. This pins the WAL position atomically inside the same transaction as the copy. +4. `copy.start()` — issues `COPY table TO STDOUT (FORMAT BINARY)` on the source. +5. Streams each row through `copy_sub.copy_data(row)` — the `CopySubscriber` runs the same + `ContextBuilder` → `Context::apply()` sharding pipeline used for live queries, and forwards + each row to the correct destination shard(s). +6. `copy_sub.copy_done()` — sends `CopyDone` to each destination shard, flushes, disconnects. +7. `slot.start_replication()` + drain loop — replays any WAL accumulated since slot creation, + then sends a status update confirming the slot position. The slot is `TEMPORARY` and is + automatically dropped when the replication connection closes. +8. `COMMIT` closes the transaction on the source replica. + +The recorded LSN becomes the replay watermark for that table's WAL stream in Step 5. + +--- + +## Step 4 — Post-data schema sync + +`schema_sync_post()` restores `SyncState::PostData` — secondary indexes, non-PK constraints, +and any other DDL that was deferred. Deferring index creation until after the bulk copy avoids +index maintenance overhead during the high-throughput copy phase. + +--- + +## Step 5 — Replication and cutover + +`replicate()` creates a `ReplicationWaiter` that wraps a `Waiter` from `Publisher::replicate()`. +`ReplicationWaiter::cutover()` then runs two serial wait phases followed by the atomic swap. + +### Publisher and StreamSubscriber + +`Publisher` in [`publisher/publisher_impl.rs`](../pgdog/src/backend/replication/logical/publisher/publisher_impl.rs) owns the replication slot, table list, and lag map. +It opens a logical replication connection to the source and streams `XLogPayload` messages to +`StreamSubscriber` in [`subscriber/stream.rs`](../pgdog/src/backend/replication/logical/subscriber/stream.rs). + +`StreamSubscriber` maintains: +- `relations: HashMap` — table OID metadata sent once per connection +- `statements: HashMap` — one set of prepared statements per table OID, + generated once from `Table::insert()` / `update()` / `delete()`: + - `insert`: `INSERT INTO "schema"."table" ($1,$2,...) ON CONFLICT (pk_cols) DO UPDATE SET non_pk=$N` + - `update`: `UPDATE "schema"."table" SET non_pk=$N WHERE pk=$M` + - `delete`: `DELETE FROM "schema"."table" WHERE pk=$N` +- `table_lsns: HashMap` — per-table replay watermark (set from Step 3 LSNs) +- `connections: Vec` — one open connection per destination shard + +**Per-message handling:** + +| WAL message | Action | +|---|---| +| `Insert` | Check `lsn_applied(oid)` → if above watermark, run upsert prepared statement on correct shard | +| `Update` | PK change → decomposed into `delete(old) + insert(new)`; no PK change → `update` statement | +| `Delete` | `delete` prepared statement on correct shard | +| `Commit` | Send `Sync` to all open shard connections to close the transaction | + +**LSN guard** (`lsn_applied()`): if `current_lsn ≤ table_lsns[oid]`, the row was already +bulk-copied in Step 3 and is skipped. Watermarks advance per-table on `COMMIT`. + +**Omnisharded tables** (`statements.omni = true`): upsert is sent to all shards simultaneously. + +**Routing**: `StreamContext::shard()` runs `ContextBuilder` + `Context::apply()` on the sharding +key column extracted from the WAL tuple — identical to the live query routing path. + +### Cutover phases + +**Phase 1 — `wait_for_replication()`**: polls lag every 1 second. When +`lag ≤ cutover_traffic_stop_threshold`: +1. Calls `maintenance_mode::start()` — new queries queue behind a barrier. +2. Calls `cancel_all(source_db)` — cancels any queries already in flight. + +**Phase 2 — `wait_for_cutover()`**: polls at 50 ms intervals. Three independent triggers can fire +cutover (whichever comes first): + +| Trigger | Config key | Action | +|---|---|---| +| `lag ≤ threshold` | `cutover_replication_lag_threshold` | `CutoverReason::Lag` → proceed | +| elapsed ≥ timeout | `cutover_timeout` | `CutoverReason::Timeout` → proceed or abort (see `cutover_timeout_action`) | +| no transactions for N ms | `cutover_last_transaction_delay` | `CutoverReason::LastTransaction` → proceed | + +**Point of no return** — the `ok_or_abort!` macro wraps every subsequent call. Any failure resumes +traffic immediately via `maintenance_mode::stop()` and returns an error. Steps in order: + +1. `publisher.request_stop()` + `waiter.wait()` — stops the replication stream; drains remaining WAL. +2. `schema_sync_cutover()` — applies `SyncState::Cutover` operations (e.g. drops sequences that + won't be used in the sharded cluster). +3. `cutover(source_db, dest_db)` in [`pgdog/src/backend/databases.rs`](../pgdog/src/backend/databases.rs) — atomically swaps source and + destination in the in-memory routing table. +4. `orchestrator.refresh()` — re-fetches both clusters from `databases()` so the orchestrator now + treats the new cluster as source for reverse replication. +5. `schema_sync_post_cutover()` — applies `SyncState::PostCutover` (removes blockers that would + prevent reverse replication, such as unique constraints on sequence columns). +6. `orchestrator.replicate()` — starts reverse replication (new cluster → old cluster) as a + background `AsyncTasks` task. This enables rollback without data loss. +7. `maintenance_mode::stop()` — releases the barrier; queued and new queries flow to the new cluster. + +--- + +## Error handling and fault tolerance + +### Pre-cutover failures — plain propagation + +Steps 1–4 (`load_schema`, `schema_sync_pre`, `data_sync`, `schema_sync_post`) propagate errors +with `?` directly from `replicate_and_cutover()`. Maintenance mode is never entered during these +steps. A failure here leaves traffic unaffected and the source untouched, making a full restart safe. + +### Schema DDL — intentional error tolerance + +`schema_sync_pre`, `schema_sync_post`, `schema_sync_cutover`, and `schema_sync_post_cutover` are +all called with `ignore_errors = true`. The `PgDumpOutput::restore()` method logs errors and +continues when this flag is set. The intent is to tolerate pre-existing objects on the destination +— a common condition when a previous reshard attempt failed mid-schema-sync and left partial DDL +behind. Re-running `RESHARD` after such a failure will not abort on `table already exists` or +similar conflicts. + +### Data sync — abort propagation and cooperative cancellation + +[`Table::data_sync()`](../pgdog/src/backend/replication/logical/publisher/table.rs) runs the COPY row loop under a `tokio::select!` that races two futures: +the next row from the source, and `AbortSignal::aborted()`. `AbortSignal` wraps the closed-state +of the `UnboundedSender` shared with `ParallelSyncManager` — it resolves when the channel is +dropped. If the channel closes mid-copy (e.g. because another table's task failed and the manager +is torn down), the loop returns `Error::CopyAborted`. The task does not need to be explicitly +cancelled. + +[`ParallelSync::run()`](../pgdog/src/backend/replication/logical/publisher/parallel_sync.rs) checks `tx.is_closed()` before acquiring the semaphore permit. A task that +wakes after the channel is already closed returns `Error::DataSyncAborted` immediately without +starting a copy. + +Error propagation from the manager: `run()` drives completion via `rx.recv()`. The first `Err` +returned by any task surfaces via `table?` and aborts the manager's loop. Remaining tasks run to +completion or abort via their own `AbortSignal`, but their results are ignored once the channel +is dropped. + +### Temporary vs permanent replication slots + +Per-table slots created in [`Table::data_sync()`](../pgdog/src/backend/replication/logical/publisher/table.rs) are `TEMPORARY` — PostgreSQL drops them +automatically when the replication connection closes, including on error or panic. A failed copy +task leaves no orphaned per-table slot. + +The `Publisher`'s named replication slot (the one used for the WAL streaming phase) is permanent. +[`Publisher::cleanup()`](../pgdog/src/backend/replication/logical/publisher/publisher_impl.rs) drops it by calling `slot.drop_slot()`, which issues +`DROP_REPLICATION_SLOT "name" WAIT` over the replication protocol connection. `cleanup()` is an +explicit method on `Orchestrator` — it is not called automatically inside `replicate_and_cutover()`. +If the orchestrator is dropped after Step 5 begins but before `cleanup()` is called (e.g. a +process crash), the permanent slot survives and continues accumulating WAL on the source. + +### The `ok_or_abort!` macro — guaranteed traffic resumption after cutover starts + +```rust +macro_rules! ok_or_abort { + ($expr:expr) => { + match $expr { + Ok(res) => res, + Err(err) => { + maintenance_mode::stop(); + cutover_state(CutoverState::Abort { error: err.to_string() }); + return Err(err.into()); + } + } + }; +} +``` + +Once `maintenance_mode::start()` is called in `wait_for_replication()`, traffic is paused. +`ok_or_abort!` is the only place that calls `maintenance_mode::stop()` for the remaining steps. +Every call after the point of no return — `waiter.wait()`, `schema_sync_cutover()`, `cutover()`, +`orchestrator.refresh()`, `schema_sync_post_cutover()`, `orchestrator.replicate()` — is wrapped +in it. This guarantees traffic always resumes, regardless of which step fails. + +The macro also transitions the global `CutoverState` to `Abort`, which is visible via +`SHOW REPLICATION_SLOTS` in the admin database. + +### AbortTimeout — the one pre-point-of-no-return stop + +When `cutover_timeout_action = "abort"` and the timeout fires in `wait_for_cutover()`, the code +explicitly calls `maintenance_mode::stop()` before returning `Err(Error::AbortTimeout)`. This is +the only code path that stops maintenance mode without being inside `ok_or_abort!` — it is the +case where the cutover was never attempted, so no data was moved and no swap occurred. + +### Idempotency guarantees + +Several mechanisms make it safe to replay data across a restart: + +| Mechanism | Where | Effect | +|---|---|---| +| Temporary replication slots | `Table::data_sync()` | Auto-dropped on connection close; no orphaned per-table slots | +| `ignore_errors = true` | All schema sync steps | Pre-existing DDL on destination does not abort the run | +| LSN watermark guard | `StreamSubscriber::lsn_applied()` | Rows bulk-copied in Step 3 are skipped during WAL replay in Step 5 | +| Upsert on INSERT messages | `Table::insert(upsert=true)` | `ON CONFLICT (pk) DO UPDATE SET` prevents duplicates on WAL re-delivery | +| PK validation | `Table::valid()` | Fails before any data moves; restart is clean | \ No newline at end of file diff --git a/docs/SHARDING.md b/docs/SHARDING.md new file mode 100644 index 000000000..a3a951f51 --- /dev/null +++ b/docs/SHARDING.md @@ -0,0 +1,230 @@ +# Sharding — Implementation + +This document describes how PgDog routes queries across shards at the code level. For user-facing +configuration and concepts see [Sharding basics](https://docs.pgdog.dev/features/sharding/basics/) and +[Sharding functions](https://docs.pgdog.dev/features/sharding/sharding-functions/). For the resharding +workflow see [RESHARDING.md](./RESHARDING.md). + +--- + +## Architecture overview + +```mermaid +flowchart TD + APP["Application
(unmodified)"] + + subgraph PgDog + FE["Frontend
(client connection)"] + QP["QueryParser
(SQL → Shard)"] + COPY["CopyRow
(COPY → Shard)"] + ROUTER["Router
(Shard dispatcher)"] + end + + subgraph Shards + S0["Shard 0"] + S1["Shard 1"] + SN["Shard N"] + end + + APP -->|"SQL / COPY"| FE + FE --> QP + FE --> COPY + QP --> ROUTER + COPY --> ROUTER + ROUTER --> S0 + ROUTER --> S1 + ROUTER --> SN +``` + +The universal routing token is `Shard` in [`pgdog/src/frontend/router/parser/route.rs`](../pgdog/src/frontend/router/parser/route.rs): + +```rust +pub enum Shard { + Direct(usize), // single shard by index + Multi(Vec), // explicit subset + All, // broadcast to every shard +} +``` + +Every routing decision — SQL, COPY, WAL replay — produces one of these three variants and nothing else. +The rest of the system only needs to know which variant it received. + +--- + +## Router entry point + +`Router` in [`pgdog/src/frontend/router/mod.rs`](../pgdog/src/frontend/router/mod.rs) holds a `QueryParser` and the last-computed `Command`. + +- **SQL messages** → `Router::query()` calls `QueryParser::parse()` and stores the result as + `Command::Query(Route)`. In COPY mode it returns the cached command without reparsing. +- **COPY data** → `Router::copy_data()` matches on `Command::Copy` and calls `Copy::shard()`, + which returns `Vec`. If the current command is not a COPY (i.e. not a sharded table), + every row gets `CopyRow::omnishard` — `Shard::All`. + +`Route` (also in [`route.rs`](../pgdog/src/frontend/router/parser/route.rs)) wraps a `Shard` together with metadata the connection pool and response +merger need: `read: bool` (primary vs replica), `order_by`, `aggregate`, `limit`, `distinct`, and +`rewrite_plan`. The connection pool reads `route.shard()` to select the backend; the response merger +reads the rest to reassemble cross-shard results. + +--- + +## SQL routing — QueryParser + +`QueryParser` in [`pgdog/src/frontend/router/parser/query/mod.rs`](../pgdog/src/frontend/router/parser/query/mod.rs) is re-created per query. + +### Pre-parse shortcuts + +Before touching the SQL AST, `parse()` checks two bypass conditions: + +1. **Comment override** — `/* shard=N */` or `/* role=primary */` embedded in the SQL is extracted + from `Ast::comment_shard` and pushed onto `shards_calculator` before the AST walk. The role + override (`/* role=primary */`) sets `write_override = true`. + +2. **Parser bypass** — if a shard is already known (from a prior `SET pgdog.shard = N` command or + a sticky connection) and the cluster has only one shard, the full AST walk is skipped entirely + via `query_parser_bypass()`, which returns a `Route` directly. + +### Per-statement dispatch + +After the pre-parse phase, the root `NodeEnum` is matched: + +| NodeEnum | Handler | Notes | +|---|---|---| +| `SelectStmt` | `select()` | Key extraction + aggregation metadata | +| `InsertStmt` | `insert()` | Key from VALUES column list | +| `UpdateStmt` | `update()` | Key from SET + WHERE | +| `DeleteStmt` | `delete()` | Key from WHERE | +| `CopyStmt` | `copy()` | Sets up `Command::Copy` for row-level routing | +| `VariableSetStmt` | `set()` | Handles `SET pgdog.shard` / `SET pgdog.role` | +| `VariableShowStmt` | `show()` | Admin SHOW commands | +| `DeallocateStmt` | — | Returns `Command::Deallocate` immediately | + +Empty queries (no FROM clause, e.g. `SELECT 1` or `SELECT NOW()`) are round-robined to +`Shard::Direct(round_robin::next() % shards)` and never hit the WHERE clause walker. + +### SELECT routing decision tree + +`select()` in [`pgdog/src/frontend/router/parser/query/select.rs`](../pgdog/src/frontend/router/parser/query/select.rs) applies this decision sequence: + +1. **Already routed** — if `shards_calculator.shard().is_direct()` (set by a prior `SET` or comment), + skip the AST walk and return immediately with that shard. + +2. **WHERE clause key** — `StatementParser::from_select().shard()` walks the AST looking for an + equality predicate on the configured sharding column. If found, produces `Shard::Direct(hash % n)`. + +3. **Vector ORDER BY** — if the ORDER BY contains an `<->` expression (L2 distance) against a + vector-type sharding column, `Centroids::shard()` is called on the query vector. The nearest + centroid's shard index is used. + +4. **Sharded table, no key** — table is in `[[sharded_tables]]` but WHERE has no equality on the + sharding key. Result: `Shard::All`. Aggregates, ORDER BY, and LIMIT recorded for cross-shard merge. + +5. **Unsharded table (omnishard)** — table is not in `[[sharded_tables]]`: + - If marked `sticky` → routes to `sticky.omni_index` (connection-pinned, same shard for the + session). + - Otherwise → `round_robin::next() % shards`. + +6. **Single-shard cluster** — after the above, if result is `Shard::All` or `Shard::Multi` but + `context.shards == 1`, it is collapsed to `Shard::Direct(0)`. + +Cross-shard queries (`Shard::All` or `Shard::Multi`) carry `AggregateRewritePlan` in the `Route`. +The plan describes which columns are aggregated (`COUNT`, `SUM`, `MAX`, `MIN`, `AVG`, etc.) so the +response handler can merge partial results from each shard before returning them to the client. + +--- + +## Sharding functions + +### Operator selection + +`ContextBuilder` in [`pgdog/src/frontend/router/sharding/context_builder.rs`](../pgdog/src/frontend/router/sharding/context_builder.rs) reads the `ShardedTable` +config entry for the matched column and constructs an `Operator` in this priority order: + +1. `centroids` populated → `Operator::Centroids { shards, probes, centroids }` +2. `mapping` is `Mapping::Range(_)` → `Operator::Range(ranges)` +3. `mapping` is `Mapping::List(_)` → `Operator::List(lists)` +4. Otherwise → `Operator::Shards(n)` (hash) + +`Context::apply()` in [`context.rs`](../pgdog/src/frontend/router/sharding/context.rs) matches on the operator and calls the appropriate function, +returning `Shard::All` if the value cannot be parsed (rather than erroring): + +``` +Operator::Shards(n) → hash(value) % n → Shard::Direct +Operator::Centroids → nearest centroid index → Shard::Direct or Shard::Multi +Operator::Range → ranges.shard(value) → Shard::Direct or Shard::All (no match) +Operator::List → lists.shard(value) → Shard::Direct or Shard::All (no match) +``` + +### Hash functions + +`bigint()`, `uuid()`, `varchar()` in [`pgdog/src/frontend/router/sharding/mod.rs`](../pgdog/src/frontend/router/sharding/mod.rs) all call into +[`hashfn.c`](../pgdog/src/frontend/router/sharding/hashfn.c) via FFI ([`pgdog/src/frontend/router/sharding/ffi.rs`](../pgdog/src/frontend/router/sharding/ffi.rs)). The functions are PostgreSQL's own +`hashint8extended` and `hash_bytes_extended`, so `hash(42) % N` in PgDog produces the same shard +as PostgreSQL's own hash partitioning would. + +`shard_value()` handles text-format parameters; `shard_binary()` handles wire-format binary +parameters by decoding them first. `shard_str()` is called when the type is unknown — it tries +`i64` parse, then `Uuid` parse, then falls through to varchar. This is a best-effort path with a +TODO noting that having the type OID would be more reliable. + +For the `SHA1` hasher (configured via `hasher = "sha1"`), `Hasher::Sha1` routes through +[`pgdog/src/frontend/router/sharding/hasher.rs`](../pgdog/src/frontend/router/sharding/hasher.rs) instead of the FFI functions. + +### List and Range: unmatched values + +Neither `Lists::shard()` nor `Ranges::shard()` errors on a value that matches no rule — both return +`Shard::All`. This means a misconfigured mapping silently broadcasts instead of failing. If strict +routing is required, all possible values must be covered in the mapping. + +### Vector routing + +`Centroids` lives in the `pgdog-vector` crate (re-exported from +[`pgdog/src/frontend/router/sharding/vector.rs`](../pgdog/src/frontend/router/sharding/vector.rs)). `Centroids::shard()` computes the L2 distance from +the query vector to each centroid using SIMD (AVX2 on x86-64, NEON on ARM64) and returns the index +of the nearest centroid. `centroid_probes` controls how many centroids are checked — higher values +improve recall at the cost of more shards being queried. + +--- + +## COPY routing + +`CopyRow` in [`pgdog/src/frontend/router/copy.rs`](../pgdog/src/frontend/router/copy.rs) wraps a `CopyData` message with a `Shard`. + +The `Copy` router (in [`pgdog/src/frontend/router/parser/copy.rs`](../pgdog/src/frontend/router/parser/copy.rs)) is set up when the parser sees a +`CopyStmt` targeting a sharded table. As each `CopyData` arrives, `Copy::shard()` extracts the +sharding key column from the row (text or binary format), runs it through the same +`ContextBuilder` → `Context::apply()` pipeline, and produces a tagged `CopyRow`. + +Two special cases: +- **CSV/text headers** — the header row (column names) is sent to all shards via `CopyRow::headers()`. +- **Non-sharded table** — if `Command::Copy` is not set (the target table is unsharded), every row + becomes `CopyRow::omnishard(row)` with `Shard::All`. + +The binary COPY format is handled transparently; `shard_binary()` decodes each column value from the +PostgreSQL wire format before hashing. + +--- + +## Configuration + +Config types are in [`pgdog-config/src/sharding.rs`](../pgdog-config/src/sharding.rs). The TOML keys map directly to struct fields +(all `snake_case`, `deny_unknown_fields`). + +Key fields on `ShardedTable`: + +| Field | Type | Notes | +|---|---|---| +| `database` | `String` | Matches a `[[databases]]` entry | +| `name` | `Option` | Restricts rule to one table; absent = all tables with this column | +| `schema` | `Option` | PostgreSQL schema scope | +| `column` | `String` | Sharding key column name | +| `data_type` | `DataType` | `bigint` (default), `uuid`, `varchar`, `vector` | +| `hasher` | `Hasher` | `postgres` (default, FFI to `hashint8extended`) or `sha1` | +| `centroids` | `Vec` | Inline centroid vectors for vector sharding | +| `centroids_path` | `Option` | External JSON file for large centroid sets | +| `centroid_probes` | `usize` | Probes per query; defaults to `√(centroid count)` | +| `mapping` | `Option` | Resolved from `[[sharded_mappings]]` at startup; not set in TOML directly | + +`[[sharded_mappings]]` entries resolve to either `Mapping::List(ListShards)` or `Mapping::Range(Ranges)` +and are joined to their `ShardedTable` at startup. A mapping that covers only some values leaves the +rest as `Shard::All` — there is no error. A `ShardedMappingKind::Default` entry acts as a catch-all. From 40f7d9068d09c11036175b8fa8588ad326309d33 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Mon, 20 Apr 2026 09:43:49 +0000 Subject: [PATCH 02/10] fix(copy_data): add retries for copy_data command --- docs/issues/894-897-copy-data-retry.md | 180 ++++++++++++++++++ integration/copy_data/pgdog.retry_test.toml | 32 ++++ integration/copy_data/retry_test.sh | 124 ++++++++++++ pgdog-config/src/general.rs | 22 +++ pgdog-config/src/users.rs | 2 +- pgdog/src/backend/pool/cluster.rs | 20 ++ .../src/backend/replication/logical/error.rs | 50 +++++ .../logical/publisher/parallel_sync.rs | 89 ++++++--- .../replication/logical/publisher/table.rs | 42 ++++ .../src/backend/replication/logical/status.rs | 13 ++ .../two_pc/server_transactions.rs | 6 +- pgdog/src/frontend/router/parser/context.rs | 2 +- pgdog/src/frontend/router/parser/statement.rs | 2 +- 13 files changed, 552 insertions(+), 32 deletions(-) create mode 100644 docs/issues/894-897-copy-data-retry.md create mode 100644 integration/copy_data/pgdog.retry_test.toml create mode 100644 integration/copy_data/retry_test.sh diff --git a/docs/issues/894-897-copy-data-retry.md b/docs/issues/894-897-copy-data-retry.md new file mode 100644 index 000000000..0986d34f7 --- /dev/null +++ b/docs/issues/894-897-copy-data-retry.md @@ -0,0 +1,180 @@ +# COPY_DATA Retry Reliability (Issues #894 + #897) + +## Problem + +**#894:** During `COPY_DATA` resharding, if a destination shard temporarily goes down, PgDog +continues processing the next tables without retrying. This leaves the destination incompletely +populated (276 started, 63 finished in the reported case). + +**#897:** Two distinct failure scenarios need retry coverage: +1. **Destination shard is down** — connection to dest fails or drops mid-COPY +2. **Origin shard is down** — source connection drops, temporary replication slot is lost + +The fix: add per-table retry logic with TRUNCATE-before-retry inside `ParallelSync`. + +## Why a Single Top-Level Retry Handles Both #897 Scenarios + +`Table::data_sync()` is fully self-contained: + +``` +data_sync() { + CopySubscriber::new() → dest connection + ReplicationSlot::data_sync() → fresh slot name (random_string(24)), temp slot + slot.connect() + slot.create_slot() + ... copy rows ... + slot drops on disconnect (TEMPORARY) +} +``` + +On any failure and retry, `data_sync` is called from scratch: +- **Dest down** → new `CopySubscriber` reconnects to destination. +- **Origin down** → new `ReplicationSlot` with a fresh random name reconnects to source and + re-creates the temporary slot. The old slot was `TEMPORARY` → auto-dropped by Postgres when + its connection closed. No slot cleanup needed. + +## Destination Commit Model and TRUNCATE Safety + +**How the copy works per-table:** +- Source side: `slot.create_slot()` opens `BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ` + on the source. `COPY ... TO STDOUT` streams all rows from that consistent snapshot. + `COMMIT` is called after all rows are streamed (source-only commit, unrelated to dest). +- Destination side: `CopySubscriber::connect()` opens one **standalone** connection per + destination shard (no explicit `BEGIN`). `start_copy()` sends `COPY ... FROM STDIN` to + each shard. Each `COPY FROM STDIN` runs inside PostgreSQL's own **implicit transaction**. + `copy_done()` sends `CopyDone` to each shard **sequentially** — shard 0 commits, then + shard 1, etc. **There is no cross-shard atomicity.** + +**Failure scenarios and destination state:** + +| Failure point | Destination state | TRUNCATE effect | +|---|---|---| +| During row streaming (conn drops mid-COPY) | PG auto-rolls back implicit tx → dest empty | No-op (safe) | +| Inside `copy_done()` — some shards committed, others not | Partially committed | Wipes committed shards; retry starts clean | +| After `copy_done()`, before `data_sync` returns `Ok` | All shards committed rows | Wipes all shards; retry starts clean | + +**Why TRUNCATE before retry is required (not optional):** + +Without TRUNCATE, a retry after a partial or full `copy_done()` would attempt to insert the +same rows again via `COPY FROM STDIN`. Since destination tables have primary keys, this +produces **primary key constraint violations** → the retry fails immediately on those shards. +TRUNCATE is the only correct way to get a clean destination for retry. + +**Why TRUNCATE is safe:** + +These are fresh destination shards being populated as part of resharding. Any rows in the +destination tables for this table were put there by a previous COPY attempt. Wiping and +retrying is idempotent with respect to the final goal. + +Truncation of one table's destination does not affect parallel copies of other tables; +each `ParallelSync` handles exactly one table. + +**TRUNCATE itself fails (destination still down):** + +If the destination is still down when we try TRUNCATE, we log a warning and continue. +The retry attempt that follows will also fail to connect. Eventually the shard comes back; +at that point TRUNCATE succeeds and COPY proceeds. If the shard was down when `copy_done()` +was attempted, it has no committed data (auto-rollback) — TRUNCATE is a no-op when it +eventually runs successfully. + +**Non-retryable errors** (`CopyAborted`, `DataSyncAborted`, `NoPrimaryKey`, `NoReplicaIdentity`, +`ParallelConnection`) bypass the retry loop immediately. `Cluster::execute` runs against all +primaries — correct for TRUNCATE. + +## Code Path + +``` +admin/copy_data.rs → Orchestrator::data_sync() +orchestrator.rs → Publisher::data_sync() +publisher/publisher_impl.rs → per-shard ParallelSyncManager::run() +publisher/parallel_sync.rs → ParallelSync::run() ← calls run_with_retry + ParallelSync::run_with_retry() ← retry loop (new) +publisher/table.rs → Table::data_sync() ← actual COPY (self-contained) +``` + +## Implementation + +### 1. Config — add retry knobs (`pgdog-config/src/general.rs`) + +Add after `resharding_parallel_copies` in the `General` struct: +```rust +/// Maximum number of retries for a failed table copy during resharding (per-table). +/// _Default:_ `5` +#[serde(default = "General::resharding_copy_retry_max_attempts")] +pub resharding_copy_retry_max_attempts: usize, + +/// Delay in milliseconds between table copy retries. Doubles each attempt, capped at 32×. +/// _Default:_ `1000` +#[serde(default = "General::resharding_copy_retry_min_delay")] +pub resharding_copy_retry_min_delay: u64, +``` + +Private defaults and `impl Default` entries added following the existing `resharding_parallel_copies` pattern. + +### 2. Cluster — propagate and expose (`pgdog/src/backend/pool/cluster.rs`) + +Add both fields to `ClusterConfig<'a>` and the private `Cluster` struct; populate in +`ClusterConfig::new()` from `general.*`; wire through `Cluster::new()`; expose via: +```rust +pub fn resharding_copy_retry_max_attempts(&self) -> usize { ... } +pub fn resharding_copy_retry_min_delay(&self) -> Duration { ... } +``` + +### 3. Table — truncate helpers (`pgdog/src/backend/replication/logical/publisher/table.rs`) + +```rust +/// Generate a TRUNCATE SQL statement for the given schema and table name. +pub fn truncate_statement(schema: &str, name: &str) -> String { ... } + +/// Truncate this table on all destination primaries before a retry. +pub async fn truncate_destination(&self, dest: &Cluster) -> Result<(), Error> { ... } +``` + +### 4. Error — retryability predicate (`pgdog/src/backend/replication/logical/error.rs`) + +Whitelist approach — only connection-level wrappers and direct availability variants return `true`. +New variants default to non-retryable, which is the safe choice. + +```rust +pub fn is_retryable(&self) -> bool { + match self { + // Shard was unreachable; each retry opens a fresh connection. + // Some sub-variants (TLS, protocol errors) aren't truly transient but + // will just exhaust the budget and fail cleanly. + Self::Net(_) | Self::Pool(_) => true, + + // No connection yet, or primary is down — worth retrying. + Self::NotConnected | Self::NoPrimary => true, + + // Replication stalled; temporary slot is gone, next attempt starts fresh. + Self::ReplicationTimeout => true, + + // Abort signals, schema mismatches, protocol violations — retrying won't help. + _ => false, + } +} +``` + +### 5. ParallelSync — retry loop (`pgdog/src/backend/replication/logical/publisher/parallel_sync.rs`) + +Split `run()` into `run()` (public entry point, spawns task) and `run_with_retry()` (private). + +On each failed attempt: +1. Compute exponential backoff: `min_delay * 2^attempt`, capped at 32×. +2. Log the error and how long we are waiting, e.g. `failed (attempt 1/5): …, retrying after 500ms…` +3. Sleep for the backoff duration (gives the shard time to recover before TRUNCATE is attempted). +4. TRUNCATE the destination table; log a warning and continue if TRUNCATE itself fails. +5. Increment attempt counter and loop. + +## Configuration Reference + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `resharding_copy_retry_max_attempts` | `usize` | `5` | Maximum per-table retry attempts | +| `resharding_copy_retry_min_delay` | `u64` | `1000` | Base backoff delay in milliseconds; doubles each attempt, capped at 32× | + +## Integration Test + +`integration/copy_data/retry_test.sh` — stops shard_1 before the sync starts, brings it +back after ~2 s, asserts exit 0 and correct row counts on all tables. +Requires the `integration/copy_data/` docker-compose stack to be running. +Config: `integration/copy_data/pgdog.retry_test.toml` (faster retry settings for CI speed). diff --git a/integration/copy_data/pgdog.retry_test.toml b/integration/copy_data/pgdog.retry_test.toml new file mode 100644 index 000000000..84e8f4042 --- /dev/null +++ b/integration/copy_data/pgdog.retry_test.toml @@ -0,0 +1,32 @@ +[general] +resharding_copy_format = "binary" +resharding_copy_retry_max_attempts = 5 +resharding_copy_retry_min_delay = 500 + +[[databases]] +name = "source" +host = "127.0.0.1" +port = 15432 +database_name = "pgdog" + +[[databases]] +name = "destination" +host = "127.0.0.1" +port = 15433 +database_name = "pgdog1" +shard = 0 + +[[databases]] +name = "destination" +host = "127.0.0.1" +port = 15434 +database_name = "pgdog2" +shard = 1 + +[[sharded_tables]] +database = "destination" +column = "tenant_id" +data_type = "bigint" + +[admin] +password = "pgdog" diff --git a/integration/copy_data/retry_test.sh b/integration/copy_data/retry_test.sh new file mode 100644 index 000000000..6b61afd41 --- /dev/null +++ b/integration/copy_data/retry_test.sh @@ -0,0 +1,124 @@ +#!/bin/bash +# Integration test: data-sync retries when a destination shard is temporarily unavailable. +# +# What is tested: +# - data-sync --sync-only completes (exit 0) when shard_1 is stopped before the +# sync starts and brought back while retries are in flight. +# - Row counts on all destination tables match the source after sync completes. +# +# Requires: the copy_data docker-compose stack to be running. +set -e + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +DEFAULT_BIN="${SCRIPT_DIR}/../../target/debug/pgdog" +PGDOG_BIN=${PGDOG_BIN:-$DEFAULT_BIN} +PGDOG_CONFIG="${SCRIPT_DIR}/pgdog.retry_test.toml" +USERS_CONFIG="${SCRIPT_DIR}/users.toml" +export PGPASSWORD=pgdog + +SYNC_PID="" + +cleanup() { + if [ -n "${SYNC_PID}" ]; then + kill "${SYNC_PID}" 2>/dev/null || true + wait "${SYNC_PID}" 2>/dev/null || true + fi + # Always bring shard_1 back on exit so the stack is not left broken. + cd "${SCRIPT_DIR}" && docker compose start shard_1 2>/dev/null || true +} +trap cleanup EXIT + +pushd "${SCRIPT_DIR}" + +# Reset destination and reload source data. +psql -h 127.0.0.1 -p 15432 -U pgdog pgdog -f init.sql + +# Schema sync: create tables on destination shards. +"${PGDOG_BIN}" --config "${PGDOG_CONFIG}" --users "${USERS_CONFIG}" \ + schema-sync --from-database source --to-database destination --publication pgdog + +# Stop shard_1 before the copy starts. +# Every table copy connects to all destination shards, so all tables will fail on the +# first attempt and enter the retry loop. +echo "[retry_test] Stopping shard_1..." +docker compose stop shard_1 + +# Start data-sync in the background. +"${PGDOG_BIN}" --config "${PGDOG_CONFIG}" --users "${USERS_CONFIG}" \ + data-sync --sync-only \ + --from-database source \ + --to-database destination \ + --publication pgdog & +SYNC_PID=$! + +# Let data-sync start and hit connection failures on shard_1. +sleep 2 + +# Bring shard_1 back while retries are in flight. +echo "[retry_test] Starting shard_1..." +docker compose start shard_1 + +# Wait for shard_1 postgres to be ready. +READY_ATTEMPTS=0 +until pg_isready -h 127.0.0.1 -p 15434 -U pgdog -d pgdog2 -q; do + READY_ATTEMPTS=$((READY_ATTEMPTS + 1)) + if [ "${READY_ATTEMPTS}" -ge 120 ]; then + echo "[retry_test] FAIL: shard_1 not ready after $((READY_ATTEMPTS / 2))s" + exit 1 + fi + sleep 0.5 +done +echo "[retry_test] shard_1 is ready." + +# Wait for data-sync to complete. +set +e +wait "${SYNC_PID}" +SYNC_EXIT=$? +set -e +SYNC_PID="" + +if [ "${SYNC_EXIT}" -ne 0 ]; then + echo "[retry_test] FAIL: data-sync exited with code ${SYNC_EXIT}" + exit "${SYNC_EXIT}" +fi + +# Verify row counts. +# Sharded tables: sum across both destination shards must equal source. +SHARDED_TABLES="copy_data.users copy_data.orders copy_data.order_items copy_data.log_actions copy_data.with_identity" +# Omni tables: each shard must have the full source row count. +OMNI_TABLES="copy_data.countries copy_data.currencies copy_data.categories" + +FAILED=0 + +for TABLE in ${SHARDED_TABLES}; do + SRC=$(psql -h 127.0.0.1 -p 15432 -U pgdog pgdog -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST0=$(psql -h 127.0.0.1 -p 15433 -U pgdog pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST1=$(psql -h 127.0.0.1 -p 15434 -U pgdog pgdog2 -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST=$((DST0 + DST1)) + if [ "${SRC}" -ne "${DST}" ]; then + echo "[retry_test] MISMATCH ${TABLE}: source=${SRC} total_dest=${DST} (shard0=${DST0} shard1=${DST1})" + FAILED=1 + else + echo "[retry_test] OK ${TABLE}: ${SRC} rows" + fi +done + +for TABLE in ${OMNI_TABLES}; do + SRC=$(psql -h 127.0.0.1 -p 15432 -U pgdog pgdog -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST0=$(psql -h 127.0.0.1 -p 15433 -U pgdog pgdog1 -tAc "SELECT COUNT(*) FROM ${TABLE}") + DST1=$(psql -h 127.0.0.1 -p 15434 -U pgdog pgdog2 -tAc "SELECT COUNT(*) FROM ${TABLE}") + if [ "${SRC}" -ne "${DST0}" ] || [ "${SRC}" -ne "${DST1}" ]; then + echo "[retry_test] MISMATCH ${TABLE} (omni): source=${SRC} shard0=${DST0} shard1=${DST1}" + FAILED=1 + else + echo "[retry_test] OK ${TABLE} (omni): ${SRC} rows on each shard" + fi +done + +if [ "${FAILED}" -ne 0 ]; then + echo "[retry_test] FAIL: row count mismatches detected." + exit 1 +fi + +echo "[retry_test] PASS: all row counts match. Retry test succeeded." +popd diff --git a/pgdog-config/src/general.rs b/pgdog-config/src/general.rs index bb3bd6863..bcf9a2e2e 100644 --- a/pgdog-config/src/general.rs +++ b/pgdog-config/src/general.rs @@ -600,6 +600,18 @@ pub struct General { #[serde(default = "General::resharding_parallel_copies")] pub resharding_parallel_copies: usize, + /// Maximum number of retries for a failed table copy during resharding (per-table). + /// Retries use exponential backoff starting at `resharding_copy_retry_min_delay`. + /// _Default:_ `5` + #[serde(default = "General::resharding_copy_retry_max_attempts")] + pub resharding_copy_retry_max_attempts: usize, + + /// Base delay in milliseconds between table copy retries. + /// Each successive attempt doubles the delay, capped at 32×. + /// _Default:_ `1000` + #[serde(default = "General::resharding_copy_retry_min_delay")] + pub resharding_copy_retry_min_delay: u64, + /// Automatically reload the schema cache used by PgDog to route queries upon detecting DDL statements. /// /// **Note:** This setting requires PgDog Enterprise Edition to work as expected. If using the open source edition, it will only work with single-node PgDog deployments, e.g., in local development or CI. @@ -745,6 +757,8 @@ impl Default for General { omnisharded_sticky: bool::default(), resharding_copy_format: CopyFormat::default(), resharding_parallel_copies: Self::resharding_parallel_copies(), + resharding_copy_retry_max_attempts: Self::resharding_copy_retry_max_attempts(), + resharding_copy_retry_min_delay: Self::resharding_copy_retry_min_delay(), reload_schema_on_ddl: Self::reload_schema_on_ddl(), load_schema: Self::load_schema(), cutover_replication_lag_threshold: Self::cutover_replication_lag_threshold(), @@ -966,6 +980,14 @@ impl General { 1 } + fn resharding_copy_retry_max_attempts() -> usize { + 5 + } + + fn resharding_copy_retry_min_delay() -> u64 { + 1000 + } + fn default_shutdown_termination_timeout() -> Option { Self::env_option("PGDOG_SHUTDOWN_TERMINATION_TIMEOUT") } diff --git a/pgdog-config/src/users.rs b/pgdog-config/src/users.rs index a69962f1b..f855d5535 100644 --- a/pgdog-config/src/users.rs +++ b/pgdog-config/src/users.rs @@ -300,7 +300,7 @@ impl User { .passwords .clone() .into_iter() - .map(|p| PasswordKind::Plain(p)) + .map(PasswordKind::Plain) .collect(); if !self.password().is_empty() { passwords.push(PasswordKind::Plain(self.password().to_string())); diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index 26523c44b..3886aaa3c 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -82,6 +82,8 @@ pub struct Cluster { reload_schema_on_ddl: bool, load_schema: LoadSchema, resharding_parallel_copies: usize, + resharding_copy_retry_max_attempts: usize, + resharding_copy_retry_min_delay: u64, regex_parser: RegexParser, } @@ -159,6 +161,8 @@ pub struct ClusterConfig<'a> { pub reload_schema_on_ddl: bool, pub load_schema: LoadSchema, pub resharding_parallel_copies: usize, + pub resharding_copy_retry_max_attempts: usize, + pub resharding_copy_retry_min_delay: u64, pub regex_parser_limit: usize, } @@ -213,6 +217,8 @@ impl<'a> ClusterConfig<'a> { reload_schema_on_ddl: general.reload_schema_on_ddl, load_schema: general.load_schema, resharding_parallel_copies: general.resharding_parallel_copies, + resharding_copy_retry_max_attempts: general.resharding_copy_retry_max_attempts, + resharding_copy_retry_min_delay: general.resharding_copy_retry_min_delay, regex_parser_limit: general.regex_parser_limit, } } @@ -251,6 +257,8 @@ impl Cluster { reload_schema_on_ddl, load_schema, resharding_parallel_copies, + resharding_copy_retry_max_attempts, + resharding_copy_retry_min_delay, regex_parser_limit, } = config; @@ -301,6 +309,8 @@ impl Cluster { reload_schema_on_ddl, load_schema, resharding_parallel_copies, + resharding_copy_retry_max_attempts, + resharding_copy_retry_min_delay, regex_parser: RegexParser::new(regex_parser_limit, query_parser), } } @@ -553,6 +563,16 @@ impl Cluster { self.resharding_parallel_copies } + /// Maximum retries for a per-table copy during resharding. + pub fn resharding_copy_retry_max_attempts(&self) -> usize { + self.resharding_copy_retry_max_attempts + } + + /// Base delay between table copy retry attempts. Doubles each attempt, capped at 32×. + pub fn resharding_copy_retry_min_delay(&self) -> std::time::Duration { + std::time::Duration::from_millis(self.resharding_copy_retry_min_delay) + } + /// Launch the connection pools. pub(crate) fn launch(&self) { for shard in self.shards() { diff --git a/pgdog/src/backend/replication/logical/error.rs b/pgdog/src/backend/replication/logical/error.rs index 3f9bbf910..13ca7dde5 100644 --- a/pgdog/src/backend/replication/logical/error.rs +++ b/pgdog/src/backend/replication/logical/error.rs @@ -147,3 +147,53 @@ impl From for Error { Self::SchemaSync(Box::new(value)) } } + +impl Error { + /// Whether the table copy should be retried after this error. + pub fn is_retryable(&self) -> bool { + match self { + // Shard was unreachable; each retry opens a fresh connection. + // Some sub-variants (TLS, protocol errors) aren't truly transient but + // will just exhaust the budget and fail cleanly. + Self::Net(_) | Self::Pool(_) => true, + + // No connection yet, or primary is down — worth retrying. + Self::NotConnected | Self::NoPrimary => true, + + // Replication stalled; temporary slot is gone, next attempt starts fresh. + Self::ReplicationTimeout => true, + + // Abort signals, schema mismatches, protocol violations — retrying won't help. + _ => false, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::backend::pool::Error as PE; + use crate::backend::replication::publisher::PublicationTable; + use crate::net::Error as NE; + + #[test] + fn retryable() { + let io = std::io::Error::new(std::io::ErrorKind::ConnectionReset, "reset"); + assert!(Error::Net(NE::Io(io)).is_retryable()); + assert!(Error::Net(NE::UnexpectedEof).is_retryable()); + assert!(Error::Pool(PE::NoPrimary).is_retryable()); + assert!(Error::Pool(PE::CheckoutTimeout).is_retryable()); + assert!(Error::NotConnected.is_retryable()); + assert!(Error::NoPrimary.is_retryable()); + assert!(Error::ReplicationTimeout.is_retryable()); + } + + #[test] + fn not_retryable() { + assert!(!Error::CopyAborted(PublicationTable::default()).is_retryable()); + assert!(!Error::DataSyncAborted.is_retryable()); + assert!(!Error::NoPrimaryKey(PublicationTable::default()).is_retryable()); + assert!(!Error::NoReplicaIdentity("s".into(), "t".into()).is_retryable()); + assert!(!Error::ParallelConnection.is_retryable()); + } +} diff --git a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs index c0cb3ef6d..9d483b61c 100644 --- a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs +++ b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs @@ -13,7 +13,7 @@ use tokio::{ }, task::JoinHandle, }; -use tracing::info; +use tracing::{info, warn}; use super::super::Error; use super::AbortSignal; @@ -40,9 +40,10 @@ impl ParallelSync { // This won't acquire until we have at least 1 available permit. // Permit will be given back when this task completes. - let _permit = self - .permit - .acquire() + // acquire_owned() consumes a cloned Arc, returning an OwnedSemaphorePermit with + // no lifetime tied to `self`, which allows the subsequent `&mut self` borrow. + let _permit = Arc::clone(&self.permit) + .acquire_owned() .await .map_err(|_| Error::ParallelConnection)?; @@ -50,26 +51,60 @@ impl ParallelSync { return Err(Error::DataSyncAborted); } + self.run_with_retry(&tracker).await + }) + } + + /// Retry loop: attempt the table copy up to `max_retries` times. + /// Before each retry, TRUNCATE the destination to avoid PK violations from + /// partially-committed rows. Abort signals and schema errors are not retried. + async fn run_with_retry(&mut self, tracker: &TableCopy) -> Result<(), Error> { + let max_retries = self.dest.resharding_copy_retry_max_attempts(); + let base_delay = self.dest.resharding_copy_retry_min_delay(); + let mut attempt = 0usize; + + loop { let abort = AbortSignal::new(self.tx.clone()); - let result = match self + match self .table - .data_sync(&self.addr, &self.dest, abort, &tracker) + .data_sync(&self.addr, &self.dest, abort, tracker) .await { - Ok(_) => Ok(self.table), - Err(err) => { + Ok(_) => { + self.tx + .send(Ok(self.table.clone())) + .map_err(|_| Error::ParallelConnection)?; + return Ok(()); + } + Err(err) if !err.is_retryable() || attempt >= max_retries => { tracker.error(&err); return Err(err); } - }; - - self.tx - .send(result) - .map_err(|_| Error::ParallelConnection)?; - - Ok::<(), Error>(()) - }) + Err(err) => { + let backoff = base_delay * 2u32.pow(attempt.min(5) as u32); + attempt += 1; + + warn!( + "data sync for \"{}\".\"{}\" failed (attempt {}/{}): {err}, retrying after {}ms...", + self.table.table.schema, + self.table.table.name, + attempt, + max_retries, + backoff.as_millis(), + ); + + tokio::time::sleep(backoff).await; + + if let Err(trunc_err) = self.table.truncate_destination(&self.dest).await { + warn!( + "truncate before retry failed for \"{}\".\"{}\" : {trunc_err}", + self.table.table.schema, self.table.table.name, + ); + } + } + } + } } } @@ -89,6 +124,11 @@ impl ParallelSyncManager { } Ok(Self { + // TODO: this single shared semaphore cannot enforce per-replica limits — all + // permits could be consumed by tasks that round-robin happened to assign to the + // same replica, leaving others idle. Fix: replace with one Semaphore per replica, + // each sized to `parallel_copies`, and have each ParallelSync acquire from its + // assigned replica's semaphore. permit: Arc::new(Semaphore::new( replicas.len() * dest.resharding_parallel_copies(), )), @@ -106,22 +146,19 @@ impl ParallelSyncManager { self.permit.available_permits() / self.replicas.len(), ); - let mut replicas_iter = self.replicas.iter(); - // Loop through replicas, one at a time. - // This works around Rust iterators not having a "rewind" function. - let replica = loop { - if let Some(replica) = replicas_iter.next() { - break replica; - } else { - replicas_iter = self.replicas.iter(); - } - }; + // cycle() is the idiomatic "rewind": it restarts the iterator from the + // beginning once exhausted, giving round-robin distribution across replicas. + let mut replicas_iter = self.replicas.iter().cycle(); let (tx, mut rx) = unbounded_channel(); let mut tables = vec![]; let mut handles = vec![]; for table in self.tables { + // SAFETY: cycle() on a non-empty slice never returns None. + let replica = replicas_iter + .next() + .expect("replicas is non-empty; checked in new()"); handles.push( ParallelSync { table, diff --git a/pgdog/src/backend/replication/logical/publisher/table.rs b/pgdog/src/backend/replication/logical/publisher/table.rs index ad3bc1b75..d9703eb3f 100644 --- a/pgdog/src/backend/replication/logical/publisher/table.rs +++ b/pgdog/src/backend/replication/logical/publisher/table.rs @@ -285,6 +285,30 @@ impl Table { Ok(self.lsn) } + + /// Generate a `TRUNCATE` SQL statement for the given schema and table name. + /// Used before retrying a failed table copy to ensure a clean destination. + pub fn truncate_statement(schema: &str, name: &str) -> String { + format!( + "TRUNCATE \"{}\".\"{}\"", + escape_identifier(schema), + escape_identifier(name), + ) + } + + /// Truncate this table on all destination primaries before a retry. + /// + /// Always safe: if the previous copy attempt auto-rolled back (mid-stream + /// failure), this is a no-op. If rows were partially or fully committed, + /// this wipes them so the retry starts clean and avoids PK violations. + pub async fn truncate_destination(&self, dest: &Cluster) -> Result<(), Error> { + dest.execute(Self::truncate_statement( + self.table.destination_schema(), + self.table.destination_name(), + )) + .await + .map_err(Error::Backend) + } } #[cfg(test)] @@ -461,3 +485,21 @@ mod test { publication.cleanup().await; } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn truncate_statement_basic() { + let sql = Table::truncate_statement("public", "users"); + assert_eq!(sql, r#"TRUNCATE "public"."users""#); + } + + #[test] + fn truncate_statement_quotes_identifiers() { + // escape_identifier doubles embedded double-quotes per Postgres convention. + let sql = Table::truncate_statement("my\"schema", "my\"table"); + assert_eq!(sql, "TRUNCATE \"my\"\"schema\".\"my\"\"table\""); + } +} diff --git a/pgdog/src/backend/replication/logical/status.rs b/pgdog/src/backend/replication/logical/status.rs index b4e575031..f9560aee2 100644 --- a/pgdog/src/backend/replication/logical/status.rs +++ b/pgdog/src/backend/replication/logical/status.rs @@ -79,6 +79,19 @@ impl TableCopy { state.sql = Arc::new(sql.to_owned()); } } + + /// Reset byte and row counters before retrying a failed table copy. + /// Prevents accumulated counts from a discarded attempt inflating totals + /// and throughput calculations across retries. + pub(crate) fn reset(&self) { + if let Some(mut state) = TableCopies::get().get_mut(self) { + state.bytes = 0; + state.rows = 0; + state.bytes_per_sec = 0; + state.last_update = SystemTime::now(); + data_sync_progress(self, &state); + } + } } impl Drop for TableCopy { diff --git a/pgdog/src/frontend/client/query_engine/two_pc/server_transactions.rs b/pgdog/src/frontend/client/query_engine/two_pc/server_transactions.rs index ca3df408c..8e95be6f0 100644 --- a/pgdog/src/frontend/client/query_engine/two_pc/server_transactions.rs +++ b/pgdog/src/frontend/client/query_engine/two_pc/server_transactions.rs @@ -32,11 +32,11 @@ impl TwoPcTransactions { let mut transactions = vec![]; for record in records { - let transaction = record.get_text(1).and_then(|name| { + let transaction = record.get_text(1).map(|name| { if let Ok(ours) = TwoPcTransaction::from_str(&name) { - Some(TwoPcServerTransaction::Ours(ours)) + TwoPcServerTransaction::Ours(ours) } else { - Some(TwoPcServerTransaction::Other { name }) + TwoPcServerTransaction::Other { name } } }); diff --git a/pgdog/src/frontend/router/parser/context.rs b/pgdog/src/frontend/router/parser/context.rs index e66702702..712a29c7e 100644 --- a/pgdog/src/frontend/router/parser/context.rs +++ b/pgdog/src/frontend/router/parser/context.rs @@ -93,7 +93,7 @@ impl<'a> QueryParserContext<'a> { pub(super) fn use_parser(&self) -> bool { self.router_context .cluster - .use_query_parser(&self.router_context.client_request) + .use_query_parser(self.router_context.client_request) } /// Get the query we're parsing, if any. diff --git a/pgdog/src/frontend/router/parser/statement.rs b/pgdog/src/frontend/router/parser/statement.rs index 2c0eba9a1..cec05c806 100644 --- a/pgdog/src/frontend/router/parser/statement.rs +++ b/pgdog/src/frontend/router/parser/statement.rs @@ -182,7 +182,7 @@ fn integer_arg(node: &Node, bind: Option<&Bind>) -> Option { fn is_param_ref(node: &Node) -> bool { match node.node.as_ref() { Some(NodeEnum::ParamRef(_)) => true, - Some(NodeEnum::TypeCast(cast)) => cast.arg.as_deref().map_or(false, is_param_ref), + Some(NodeEnum::TypeCast(cast)) => cast.arg.as_deref().is_some_and(is_param_ref), _ => false, } } From 49fa573beb796f1c8ffeea289cb4debad57976db Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Tue, 21 Apr 2026 09:53:03 +0000 Subject: [PATCH 03/10] fix: remove truncate of tables --- docs/RESHARDING.md | 2 +- docs/SHARDING.md | 24 +++++- docs/issues/894-897-copy-data-retry.md | 79 +++++++++++-------- pgdog/src/backend/error.rs | 18 +++++ pgdog/src/backend/pool/cluster.rs | 14 ++-- pgdog/src/backend/pool/error.rs | 60 ++++++++++++++ .../src/backend/replication/logical/error.rs | 47 ++++++++--- .../logical/publisher/parallel_sync.rs | 45 ++++++++--- .../replication/logical/publisher/table.rs | 60 +++++--------- pgdog/src/frontend/prepared_statements/mod.rs | 2 +- pgdog/src/net/error.rs | 31 ++++++++ 11 files changed, 276 insertions(+), 106 deletions(-) diff --git a/docs/RESHARDING.md b/docs/RESHARDING.md index 69da10bed..c65d7f36b 100644 --- a/docs/RESHARDING.md +++ b/docs/RESHARDING.md @@ -161,7 +161,7 @@ bulk-copied in Step 3 and is skipped. Watermarks advance per-table on `COMMIT`. **Omnisharded tables** (`statements.omni = true`): upsert is sent to all shards simultaneously. -**Routing**: `StreamContext::shard()` runs `ContextBuilder` + `Context::apply()` on the sharding +**Routing**: `StreamContext::shard()` in [`subscriber/context.rs`](../pgdog/src/backend/replication/logical/subscriber/context.rs) runs `ContextBuilder` + `Context::apply()` on the sharding key column extracted from the WAL tuple — identical to the live query routing path. ### Cutover phases diff --git a/docs/SHARDING.md b/docs/SHARDING.md index a3a951f51..8aa7ba503 100644 --- a/docs/SHARDING.md +++ b/docs/SHARDING.md @@ -92,7 +92,7 @@ After the pre-parse phase, the root `NodeEnum` is matched: |---|---|---| | `SelectStmt` | `select()` | Key extraction + aggregation metadata | | `InsertStmt` | `insert()` | Key from VALUES column list | -| `UpdateStmt` | `update()` | Key from SET + WHERE | +| `UpdateStmt` | `update()` | Key from SET + WHERE; may trigger shard-key rewrite (see below) | | `DeleteStmt` | `delete()` | Key from WHERE | | `CopyStmt` | `copy()` | Sets up `Command::Copy` for row-level routing | | `VariableSetStmt` | `set()` | Handles `SET pgdog.shard` / `SET pgdog.role` | @@ -131,6 +131,15 @@ Cross-shard queries (`Shard::All` or `Shard::Multi`) carry `AggregateRewritePlan The plan describes which columns are aggregated (`COUNT`, `SUM`, `MAX`, `MIN`, `AVG`, etc.) so the response handler can merge partial results from each shard before returning them to the client. +### UPDATE on the sharding key + +When an UPDATE sets the sharding key to a new value the row must move shards. `StatementRewrite` in +[`pgdog/src/frontend/router/parser/rewrite/statement/`](../pgdog/src/frontend/router/parser/rewrite/statement/) detects this case and rewrites the statement +as three operations: `SELECT` (fetch the full old row), `DELETE` (remove from the source shard), and +`INSERT ... ON CONFLICT DO UPDATE` (upsert on the destination shard). The rewrite is transparent to +the client. It is enabled by default and can be disabled via `rewrite.shard_key = "ignore"` in +`pgdog.toml`. + --- ## Sharding functions @@ -138,7 +147,7 @@ response handler can merge partial results from each shard before returning them ### Operator selection `ContextBuilder` in [`pgdog/src/frontend/router/sharding/context_builder.rs`](../pgdog/src/frontend/router/sharding/context_builder.rs) reads the `ShardedTable` -config entry for the matched column and constructs an `Operator` in this priority order: +config entry for the matched column and constructs an [`Operator`](../pgdog/src/frontend/router/sharding/operator.rs) in this priority order: 1. `centroids` populated → `Operator::Centroids { shards, probes, centroids }` 2. `mapping` is `Mapping::Range(_)` → `Operator::Range(ranges)` @@ -224,7 +233,18 @@ Key fields on `ShardedTable`: | `centroids_path` | `Option` | External JSON file for large centroid sets | | `centroid_probes` | `usize` | Probes per query; defaults to `√(centroid count)` | | `mapping` | `Option` | Resolved from `[[sharded_mappings]]` at startup; not set in TOML directly | +| `primary` | `bool` | Marks this table as the sharding anchor for FK-based query resolution | `[[sharded_mappings]]` entries resolve to either `Mapping::List(ListShards)` or `Mapping::Range(Ranges)` and are joined to their `ShardedTable` at startup. A mapping that covers only some values leaves the rest as `Shard::All` — there is no error. A `ShardedMappingKind::Default` entry acts as a catch-all. + +### Schema-based sharding + +`[[sharded_schemas]]` in `pgdog.toml` maps a PostgreSQL schema name to a fixed shard index. This is +useful for schema-per-tenant deployments where each tenant's data lives in a dedicated schema. +`SchemaSharder` in [`pgdog/src/frontend/router/sharding/schema.rs`](../pgdog/src/frontend/router/sharding/schema.rs) resolves the current +`search_path` against the `ShardedSchema` list (`pgdog-config/src/sharding.rs`). A `name = null` +entry acts as the catch-all; a specific schema name overrides it even if the catch-all was matched first. + +Schema routing takes effect at the connection level and is re-evaluated whenever `search_path` changes. \ No newline at end of file diff --git a/docs/issues/894-897-copy-data-retry.md b/docs/issues/894-897-copy-data-retry.md index 0986d34f7..ae1f98d8f 100644 --- a/docs/issues/894-897-copy-data-retry.md +++ b/docs/issues/894-897-copy-data-retry.md @@ -10,7 +10,7 @@ populated (276 started, 63 finished in the reported case). 1. **Destination shard is down** — connection to dest fails or drops mid-COPY 2. **Origin shard is down** — source connection drops, temporary replication slot is lost -The fix: add per-table retry logic with TRUNCATE-before-retry inside `ParallelSync`. +The fix: add per-table retry logic with exponential backoff inside `ParallelSync`. ## Why a Single Top-Level Retry Handles Both #897 Scenarios @@ -32,7 +32,7 @@ On any failure and retry, `data_sync` is called from scratch: re-creates the temporary slot. The old slot was `TEMPORARY` → auto-dropped by Postgres when its connection closed. No slot cleanup needed. -## Destination Commit Model and TRUNCATE Safety +## Destination Commit Model **How the copy works per-table:** - Source side: `slot.create_slot()` opens `BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ` @@ -46,39 +46,39 @@ On any failure and retry, `data_sync` is called from scratch: **Failure scenarios and destination state:** -| Failure point | Destination state | TRUNCATE effect | -|---|---|---| -| During row streaming (conn drops mid-COPY) | PG auto-rolls back implicit tx → dest empty | No-op (safe) | -| Inside `copy_done()` — some shards committed, others not | Partially committed | Wipes committed shards; retry starts clean | -| After `copy_done()`, before `data_sync` returns `Ok` | All shards committed rows | Wipes all shards; retry starts clean | +| Failure point | Destination state | +|---|---| +| During row streaming (conn drops mid-COPY) | PG auto-rolls back implicit tx → dest empty | +| Inside `copy_done()` — some shards committed, others not | Partially committed | +| After `copy_done()`, before `data_sync` returns `Ok` | All shards committed rows | -**Why TRUNCATE before retry is required (not optional):** +The common case (connection drop during streaming) leaves the destination clean — PostgreSQL +rolls back the implicit transaction automatically. The rare race is when `copy_done()` has +already committed on some or all shards and then the connection drops before `data_sync` +returns `Ok`. In that case, rows survive in the destination and a naive retry would immediately +hit primary key constraint violations. -Without TRUNCATE, a retry after a partial or full `copy_done()` would attempt to insert the -same rows again via `COPY FROM STDIN`. Since destination tables have primary keys, this -produces **primary key constraint violations** → the retry fails immediately on those shards. -TRUNCATE is the only correct way to get a clean destination for retry. +**Current behavior — manual TRUNCATE guidance:** -**Why TRUNCATE is safe:** +PgDog does not automatically truncate the destination before retrying. Auto-TRUNCATE is the +correct long-term fix but requires reliable "is destination" guards that don't yet exist; +running TRUNCATE on the wrong cluster would be catastrophic. That logic is stubbed out as a +commented future extension in `run_with_retry()` in `parallel_sync.rs`. -These are fresh destination shards being populated as part of resharding. Any rows in the -destination tables for this table were put there by a previous COPY attempt. Wiping and -retrying is idempotent with respect to the final goal. +Instead, when a table copy fails fatally (non-retryable error or max attempts exhausted), +`Table::destination_has_rows` queries each shard's primary with `SELECT 1 … LIMIT 1`. If +any rows are found, PgDog logs a `warn!` that includes the exact TRUNCATE statement to run: -Truncation of one table's destination does not affect parallel copies of other tables; -each `ParallelSync` handles exactly one table. - -**TRUNCATE itself fails (destination still down):** +``` +data sync for "public"."orders" failed with rows remaining in destination; +truncate manually before retrying: TRUNCATE "public"."orders_new"; +``` -If the destination is still down when we try TRUNCATE, we log a warning and continue. -The retry attempt that follows will also fail to connect. Eventually the shard comes back; -at that point TRUNCATE succeeds and COPY proceeds. If the shard was down when `copy_done()` -was attempted, it has no committed data (auto-rollback) — TRUNCATE is a no-op when it -eventually runs successfully. +If the row-count check itself fails (destination unreachable), a separate warning is emitted. +The original error is always returned regardless. **Non-retryable errors** (`CopyAborted`, `DataSyncAborted`, `NoPrimaryKey`, `NoReplicaIdentity`, -`ParallelConnection`) bypass the retry loop immediately. `Cluster::execute` runs against all -primaries — correct for TRUNCATE. +`ParallelConnection`) bypass the retry loop immediately and still trigger the row check. ## Code Path @@ -116,17 +116,22 @@ Add both fields to `ClusterConfig<'a>` and the private `Cluster` struct; populat `ClusterConfig::new()` from `general.*`; wire through `Cluster::new()`; expose via: ```rust pub fn resharding_copy_retry_max_attempts(&self) -> usize { ... } -pub fn resharding_copy_retry_min_delay(&self) -> Duration { ... } +pub fn resharding_copy_retry_min_delay(&self) -> &Duration { ... } ``` -### 3. Table — truncate helpers (`pgdog/src/backend/replication/logical/publisher/table.rs`) +### 3. Table — helpers (`pgdog/src/backend/replication/logical/publisher/table.rs`) ```rust /// Generate a TRUNCATE SQL statement for the given schema and table name. pub fn truncate_statement(schema: &str, name: &str) -> String { ... } -/// Truncate this table on all destination primaries before a retry. +/// Truncate this table on all destination primaries. +/// Not called automatically — preserved for future use once "is destination" guards exist. pub async fn truncate_destination(&self, dest: &Cluster) -> Result<(), Error> { ... } + +/// Returns true if any shard's primary has rows in the destination table. +/// Used after fatal failure to detect the COPY-committed-before-error race. +pub async fn destination_has_rows(&self, dest: &Cluster) -> Result { ... } ``` ### 4. Error — retryability predicate (`pgdog/src/backend/replication/logical/error.rs`) @@ -158,19 +163,23 @@ pub fn is_retryable(&self) -> bool { Split `run()` into `run()` (public entry point, spawns task) and `run_with_retry()` (private). -On each failed attempt: +On each retryable failed attempt: 1. Compute exponential backoff: `min_delay * 2^attempt`, capped at 32×. 2. Log the error and how long we are waiting, e.g. `failed (attempt 1/5): …, retrying after 500ms…` -3. Sleep for the backoff duration (gives the shard time to recover before TRUNCATE is attempted). -4. TRUNCATE the destination table; log a warning and continue if TRUNCATE itself fails. -5. Increment attempt counter and loop. +3. Sleep for the backoff duration. +4. Increment attempt counter and loop. + +On fatal failure (non-retryable error or attempts exhausted): +1. Record the error via `tracker.error()`. +2. Call `Table::destination_has_rows` — if rows are found, emit a `warn!` with the exact TRUNCATE SQL. +3. Return the original error to the caller. ## Configuration Reference | Key | Type | Default | Description | |-----|------|---------|-------------| | `resharding_copy_retry_max_attempts` | `usize` | `5` | Maximum per-table retry attempts | -| `resharding_copy_retry_min_delay` | `u64` | `1000` | Base backoff delay in milliseconds; doubles each attempt, capped at 32× | +| `resharding_copy_retry_min_delay` | `u64` (ms) | `1000` | Base backoff delay in milliseconds; doubles each attempt, capped at 32× | ## Integration Test diff --git a/pgdog/src/backend/error.rs b/pgdog/src/backend/error.rs index 6024079df..e75b8f527 100644 --- a/pgdog/src/backend/error.rs +++ b/pgdog/src/backend/error.rs @@ -163,4 +163,22 @@ impl Error { _ => false, } } + + /// Transient network/pool fault worth retrying. + pub fn is_retryable(&self) -> bool { + match self { + Self::Io(_) => true, + Self::Net(inner) => inner.is_retryable(), + Self::Pool(inner) => inner.is_retryable(), + // Connection dropped between operations. + Self::NotConnected + | Self::DirectToShardNotConnected + | Self::MultiShardNotConnected + | Self::CopyNotConnected + | Self::ClusterNotConnected => true, + // Server stopped responding mid-stream. + Self::ReadTimeout => true, + _ => false, + } + } } diff --git a/pgdog/src/backend/pool/cluster.rs b/pgdog/src/backend/pool/cluster.rs index 3886aaa3c..b2c117a1e 100644 --- a/pgdog/src/backend/pool/cluster.rs +++ b/pgdog/src/backend/pool/cluster.rs @@ -83,7 +83,7 @@ pub struct Cluster { load_schema: LoadSchema, resharding_parallel_copies: usize, resharding_copy_retry_max_attempts: usize, - resharding_copy_retry_min_delay: u64, + resharding_copy_retry_min_delay: Duration, regex_parser: RegexParser, } @@ -310,7 +310,7 @@ impl Cluster { load_schema, resharding_parallel_copies, resharding_copy_retry_max_attempts, - resharding_copy_retry_min_delay, + resharding_copy_retry_min_delay: Duration::from_millis(resharding_copy_retry_min_delay), regex_parser: RegexParser::new(regex_parser_limit, query_parser), } } @@ -569,8 +569,8 @@ impl Cluster { } /// Base delay between table copy retry attempts. Doubles each attempt, capped at 32×. - pub fn resharding_copy_retry_min_delay(&self) -> std::time::Duration { - std::time::Duration::from_millis(self.resharding_copy_retry_min_delay) + pub fn resharding_copy_retry_min_delay(&self) -> &Duration { + &self.resharding_copy_retry_min_delay } /// Launch the connection pools. @@ -811,7 +811,7 @@ mod test { database: "pgdog".into(), }); - let cluster = Cluster { + Cluster { shards: vec![Shard::new(ShardConfig { number: 0, primary: &Some(PoolConfig { @@ -836,9 +836,7 @@ mod test { two_phase_commit: config.config.general.two_phase_commit, two_phase_commit_auto: config.config.general.two_phase_commit_auto.unwrap_or(false), ..Default::default() - }; - - cluster + } } pub fn set_read_write_strategy(&mut self, rw_strategy: ReadWriteStrategy) { diff --git a/pgdog/src/backend/pool/error.rs b/pgdog/src/backend/pool/error.rs index 9d854d6d5..27e32ba8b 100644 --- a/pgdog/src/backend/pool/error.rs +++ b/pgdog/src/backend/pool/error.rs @@ -80,3 +80,63 @@ pub enum Error { #[error("replica lag")] ReplicaLag, } + +impl Error { + /// Transient availability fault worth retrying. + /// + /// Non-retryable: config errors, admin decisions, programming errors. + /// Everything else (timeouts, server faults, lag, health misses) is transient. + pub fn is_retryable(&self) -> bool { + !matches!( + self, + // Config / wiring errors — retrying changes nothing. + Self::NullBytes + | Self::NoShard(_) + | Self::NoDatabases + | Self::PubSubDisabled + | Self::PoolNoHealthTarget(_) + | Self::MappingMissing(_) + // Admin decisions — respect them. + | Self::ManualBan + // Programming errors. + | Self::UntrackedConnCheckin(_) + // Deliberate shutdown. + | Self::FastShutdown + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn retryable() { + assert!(Error::CheckoutTimeout.is_retryable()); + assert!(Error::ConnectTimeout.is_retryable()); + assert!(Error::ReplicaCheckoutTimeout.is_retryable()); + assert!(Error::NoPrimary.is_retryable()); + assert!(Error::AllReplicasDown.is_retryable()); + assert!(Error::Banned.is_retryable()); + assert!(Error::NoReplicas.is_retryable()); + assert!(Error::ServerError.is_retryable()); + assert!(Error::HealthcheckTimeout.is_retryable()); + assert!(Error::HealthcheckError.is_retryable()); + assert!(Error::PrimaryLsnQueryFailed.is_retryable()); + assert!(Error::ReplicaLsnQueryFailed.is_retryable()); + assert!(Error::Offline.is_retryable()); + assert!(Error::ReplicaLag.is_retryable()); + assert!(Error::PoolUnhealthy.is_retryable()); + } + + #[test] + fn not_retryable() { + assert!(!Error::ManualBan.is_retryable()); + assert!(!Error::NullBytes.is_retryable()); + assert!(!Error::NoDatabases.is_retryable()); + assert!(!Error::PubSubDisabled.is_retryable()); + assert!(!Error::FastShutdown.is_retryable()); + assert!(!Error::NoShard(0).is_retryable()); + assert!(!Error::MappingMissing(0).is_retryable()); + } +} diff --git a/pgdog/src/backend/replication/logical/error.rs b/pgdog/src/backend/replication/logical/error.rs index 13ca7dde5..8d39ff70c 100644 --- a/pgdog/src/backend/replication/logical/error.rs +++ b/pgdog/src/backend/replication/logical/error.rs @@ -152,18 +152,13 @@ impl Error { /// Whether the table copy should be retried after this error. pub fn is_retryable(&self) -> bool { match self { - // Shard was unreachable; each retry opens a fresh connection. - // Some sub-variants (TLS, protocol errors) aren't truly transient but - // will just exhaust the budget and fail cleanly. - Self::Net(_) | Self::Pool(_) => true, - - // No connection yet, or primary is down — worth retrying. + Self::Net(inner) => inner.is_retryable(), + Self::Pool(inner) => inner.is_retryable(), + Self::Backend(inner) => inner.is_retryable(), + // No connection yet, or primary is down. Self::NotConnected | Self::NoPrimary => true, - // Replication stalled; temporary slot is gone, next attempt starts fresh. Self::ReplicationTimeout => true, - - // Abort signals, schema mismatches, protocol violations — retrying won't help. _ => false, } } @@ -188,6 +183,40 @@ mod tests { assert!(Error::ReplicationTimeout.is_retryable()); } + #[test] + fn retryable_via_backend_wrapper() { + use crate::backend::Error as BE; + + // IO reset wrapped as Backend — the common path for network drops during COPY. + let io = std::io::Error::new(std::io::ErrorKind::ConnectionReset, "reset"); + assert!(Error::Backend(BE::Io(io)).is_retryable()); + + // Read timeout mid-stream. + assert!(Error::Backend(BE::ReadTimeout).is_retryable()); + + // Pool couldn't hand out a connection. + assert!(Error::Backend(BE::Pool(PE::CheckoutTimeout)).is_retryable()); + assert!(Error::Backend(BE::Pool(PE::NoPrimary)).is_retryable()); + assert!(Error::Backend(BE::Pool(PE::AllReplicasDown)).is_retryable()); + + // Connection variants. + assert!(Error::Backend(BE::NotConnected).is_retryable()); + assert!(Error::Backend(BE::ClusterNotConnected).is_retryable()); + } + + #[test] + fn not_retryable_via_backend_wrapper() { + use crate::backend::Error as BE; + use crate::net::messages::ErrorResponse; + + // Postgres-level error response: permanent, not a network fault. + let pg_err = ErrorResponse::default(); + assert!(!Error::Backend(BE::ConnectionError(Box::new(pg_err))).is_retryable()); + + // Protocol violations are not transient. + assert!(!Error::Backend(BE::ProtocolOutOfSync).is_retryable()); + } + #[test] fn not_retryable() { assert!(!Error::CopyAborted(PublicationTable::default()).is_retryable()); diff --git a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs index 9d483b61c..d959450f3 100644 --- a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs +++ b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs @@ -12,6 +12,7 @@ use tokio::{ Semaphore, }, task::JoinHandle, + time::sleep, }; use tracing::{info, warn}; @@ -56,11 +57,10 @@ impl ParallelSync { } /// Retry loop: attempt the table copy up to `max_retries` times. - /// Before each retry, TRUNCATE the destination to avoid PK violations from - /// partially-committed rows. Abort signals and schema errors are not retried. + /// Abort signals and schema errors are not retried. async fn run_with_retry(&mut self, tracker: &TableCopy) -> Result<(), Error> { let max_retries = self.dest.resharding_copy_retry_max_attempts(); - let base_delay = self.dest.resharding_copy_retry_min_delay(); + let base_delay = *self.dest.resharding_copy_retry_min_delay(); let mut attempt = 0usize; loop { @@ -79,6 +79,24 @@ impl ParallelSync { } Err(err) if !err.is_retryable() || attempt >= max_retries => { tracker.error(&err); + // COPY is usually atomic, but rows may remain if the connection dropped + // after COMMIT. Warn so the user can truncate manually before retrying. + match self.table.destination_has_rows(&self.dest).await { + Ok(true) => warn!( + "data sync for \"{}\".\"{}\" failed with rows remaining in destination; \ + truncate manually before retrying: TRUNCATE \"{}\".\"{}\";", + self.table.table.schema, + self.table.table.name, + self.table.table.destination_schema(), + self.table.table.destination_name(), + ), + Ok(false) => {} // destination is clean; next run starts fresh + Err(check_err) => warn!( + "could not check destination row count for \"{}\".\"{}\" after failure: {check_err}", + self.table.table.schema, + self.table.table.name, + ), + } return Err(err); } Err(err) => { @@ -94,14 +112,19 @@ impl ParallelSync { backoff.as_millis(), ); - tokio::time::sleep(backoff).await; - - if let Err(trunc_err) = self.table.truncate_destination(&self.dest).await { - warn!( - "truncate before retry failed for \"{}\".\"{}\" : {trunc_err}", - self.table.table.schema, self.table.table.name, - ); - } + // Reset counters so the next attempt's progress is reported accurately. + tracker.reset(); + + sleep(backoff).await; + // FUTURE: truncate before retry to handle the COPY-committed-but-dropped + // race (rows remain → PK violations). Safe once source-guard checks exist. + // + // if let Err(trunc_err) = self.table.truncate_destination(&self.dest).await { + // warn!( + // "truncate before retry failed for \"{}\".\"{}\" : {trunc_err}", + // self.table.table.schema, self.table.table.name, + // ); + // } } } } diff --git a/pgdog/src/backend/replication/logical/publisher/table.rs b/pgdog/src/backend/replication/logical/publisher/table.rs index d9703eb3f..8e0dae637 100644 --- a/pgdog/src/backend/replication/logical/publisher/table.rs +++ b/pgdog/src/backend/replication/logical/publisher/table.rs @@ -10,10 +10,12 @@ use crate::backend::pool::Address; use crate::backend::replication::publisher::progress::Progress; use crate::backend::replication::publisher::Lsn; +use crate::backend::pool::Request; use crate::backend::replication::status::TableCopy; use crate::backend::{Cluster, Server, ShardedTables}; use crate::config::config; use crate::frontend::router::parser::Column; +use crate::net::messages::Protocol; use crate::net::replication::StatusUpdate; use crate::util::escape_identifier; @@ -286,28 +288,26 @@ impl Table { Ok(self.lsn) } - /// Generate a `TRUNCATE` SQL statement for the given schema and table name. - /// Used before retrying a failed table copy to ensure a clean destination. - pub fn truncate_statement(schema: &str, name: &str) -> String { - format!( - "TRUNCATE \"{}\".\"{}\"", - escape_identifier(schema), - escape_identifier(name), - ) - } - - /// Truncate this table on all destination primaries before a retry. + /// Returns `true` if the destination table has any rows on any shard. /// - /// Always safe: if the previous copy attempt auto-rolled back (mid-stream - /// failure), this is a no-op. If rows were partially or fully committed, - /// this wipes them so the retry starts clean and avoids PK violations. - pub async fn truncate_destination(&self, dest: &Cluster) -> Result<(), Error> { - dest.execute(Self::truncate_statement( - self.table.destination_schema(), - self.table.destination_name(), - )) - .await - .map_err(Error::Backend) + /// COPY is transactional and normally auto-rolls back on failure, leaving + /// the destination empty. This check catches the rare race where COPY + /// committed but an error was returned afterward (e.g., network drop during + /// CommandComplete), resulting in rows the retry would collide with. + pub async fn destination_has_rows(&self, dest: &Cluster) -> Result { + let sql = format!( + "SELECT 1 FROM \"{}\".\"{}\" LIMIT 1", + escape_identifier(self.table.destination_schema()), + escape_identifier(self.table.destination_name()), + ); + for (shard, _) in dest.shards().iter().enumerate() { + let mut server = dest.primary(shard, &Request::default()).await?; + let messages = server.execute_checked(sql.as_str()).await?; + if messages.iter().any(|m| m.code() == 'D') { + return Ok(true); + } + } + Ok(false) } } @@ -485,21 +485,3 @@ mod test { publication.cleanup().await; } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn truncate_statement_basic() { - let sql = Table::truncate_statement("public", "users"); - assert_eq!(sql, r#"TRUNCATE "public"."users""#); - } - - #[test] - fn truncate_statement_quotes_identifiers() { - // escape_identifier doubles embedded double-quotes per Postgres convention. - let sql = Table::truncate_statement("my\"schema", "my\"table"); - assert_eq!(sql, "TRUNCATE \"my\"\"schema\".\"my\"\"table\""); - } -} diff --git a/pgdog/src/frontend/prepared_statements/mod.rs b/pgdog/src/frontend/prepared_statements/mod.rs index 62f8bddea..af96195f2 100644 --- a/pgdog/src/frontend/prepared_statements/mod.rs +++ b/pgdog/src/frontend/prepared_statements/mod.rs @@ -300,7 +300,7 @@ mod test { let global = statements.global.read(); assert_eq!(global.statements().len(), 2); - for (_, stmt) in global.statements() { + for stmt in global.statements().values() { if stmt.name() == first_name { // Old entry: should be decremented to 0 (no longer referenced). assert_eq!(stmt.used, 0, "old entry should be decremented"); diff --git a/pgdog/src/net/error.rs b/pgdog/src/net/error.rs index 18288ed12..7d6ce0b39 100644 --- a/pgdog/src/net/error.rs +++ b/pgdog/src/net/error.rs @@ -103,3 +103,34 @@ pub enum Error { #[error("{0}")] TypeError(#[from] pgdog_postgres_types::Error), } + +impl Error { + /// Transient network fault worth retrying. + pub fn is_retryable(&self) -> bool { + matches!( + self, + Self::Io(_) | Self::UnexpectedEof | Self::ConnectionDown + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn retryable() { + let io = std::io::Error::new(std::io::ErrorKind::ConnectionReset, "reset"); + assert!(Error::Io(io).is_retryable()); + assert!(Error::UnexpectedEof.is_retryable()); + assert!(Error::ConnectionDown.is_retryable()); + } + + #[test] + fn not_retryable() { + // TLS and protocol errors are permanent. + assert!(!Error::UnexpectedMessage('Z', 'Q').is_retryable()); + assert!(!Error::NotTextEncoding.is_retryable()); + assert!(!Error::UnexpectedPayload.is_retryable()); + } +} From 41bd3c4a0396d144c99c136ef513d8a4583d715e Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Tue, 21 Apr 2026 20:34:43 +0000 Subject: [PATCH 04/10] test: add integration test for retry of copy_data --- .github/workflows/ci.yml | 2 +- .rwx/integration.yml | 5 +- .../copy_data/{ => data_sync}/init.sql | 1 - .../copy_data/{ => data_sync}/pgbench.sql | 0 .../copy_data/{ => data_sync}/pgdog.toml | 0 .../copy_data/{dev.sh => data_sync/run.sh} | 48 ++++++++--- .../copy_data/{ => data_sync}/users.toml | 0 integration/copy_data/retry_test/init.sql | 6 ++ .../pgdog.toml} | 2 +- .../{retry_test.sh => retry_test/run.sh} | 82 ++++++++++++++----- integration/copy_data/retry_test/users.toml | 11 +++ integration/copy_data/run.sh | 19 +++++ .../logical/publisher/parallel_sync.rs | 7 -- 13 files changed, 139 insertions(+), 44 deletions(-) rename integration/copy_data/{ => data_sync}/init.sql (96%) rename integration/copy_data/{ => data_sync}/pgbench.sql (100%) rename integration/copy_data/{ => data_sync}/pgdog.toml (100%) rename integration/copy_data/{dev.sh => data_sync/run.sh} (57%) mode change 100644 => 100755 rename integration/copy_data/{ => data_sync}/users.toml (100%) create mode 100644 integration/copy_data/retry_test/init.sql rename integration/copy_data/{pgdog.retry_test.toml => retry_test/pgdog.toml} (92%) rename integration/copy_data/{retry_test.sh => retry_test/run.sh} (52%) mode change 100644 => 100755 create mode 100644 integration/copy_data/retry_test/users.toml create mode 100644 integration/copy_data/run.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8779fef34..f06aec6a7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -171,7 +171,7 @@ jobs: - name: Stop shared PgDog run: bash -lc 'source integration/common.sh; stop_pgdog' - name: Data sync - run: bash integration/copy_data/dev.sh + run: bash integration/copy_data/run.sh - name: Python run: bash integration/python/run.sh - name: Load balancer diff --git a/.rwx/integration.yml b/.rwx/integration.yml index b0152d527..8051c7704 100644 --- a/.rwx/integration.yml +++ b/.rwx/integration.yml @@ -423,13 +423,14 @@ tasks: - key: integration-copy-data use: integration-build-pgdog-cov + docker: true background-processes: *postgres-bg–processes - timeout: 15m + timeout: 20m run: | export LLVM_PROFILE_FILE="$PWD/target/llvm-cov-target/profiles/copy-data-%p-%m.profraw" bash integration/setup.sh - timeout --signal=TERM --kill-after=30s 10m bash integration/copy_data/dev.sh + timeout --signal=TERM --kill-after=30s 18m bash integration/copy_data/run.sh cargo llvm-cov report --release --package pgdog --lcov --output-path copy-data.lcov outputs: diff --git a/integration/copy_data/init.sql b/integration/copy_data/data_sync/init.sql similarity index 96% rename from integration/copy_data/init.sql rename to integration/copy_data/data_sync/init.sql index 771564768..038f36787 100644 --- a/integration/copy_data/init.sql +++ b/integration/copy_data/data_sync/init.sql @@ -10,4 +10,3 @@ DROP SCHEMA IF EXISTS copy_data CASCADE; DROP SCHEMA IF EXISTS copy_data CASCADE; \c pgdog SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots; -\i setup.sql diff --git a/integration/copy_data/pgbench.sql b/integration/copy_data/data_sync/pgbench.sql similarity index 100% rename from integration/copy_data/pgbench.sql rename to integration/copy_data/data_sync/pgbench.sql diff --git a/integration/copy_data/pgdog.toml b/integration/copy_data/data_sync/pgdog.toml similarity index 100% rename from integration/copy_data/pgdog.toml rename to integration/copy_data/data_sync/pgdog.toml diff --git a/integration/copy_data/dev.sh b/integration/copy_data/data_sync/run.sh old mode 100644 new mode 100755 similarity index 57% rename from integration/copy_data/dev.sh rename to integration/copy_data/data_sync/run.sh index e39a6c245..94c684202 --- a/integration/copy_data/dev.sh +++ b/integration/copy_data/data_sync/run.sh @@ -1,8 +1,20 @@ #!/bin/bash -set -e +# Integration test: 0→2 and 2→2 resharding with live write traffic. +# +# Requires: +# - local postgres at port 5432 +# - databases: pgdog, pgdog1, pgdog2, shard_0, shard_1 (created by integration/setup.sh) +# - max_replication_slots >= 32 in postgresql.conf +# Each data-sync creates one permanent slot per source shard plus one temporary +# slot per parallel table copy. With resharding_parallel_copies=5 and a 2-shard +# source, peak usage is 3 permanent + 2×5 temporary = 13 slots. The default of 10 +# is not enough; set max_replication_slots = 32 in postgresql.conf and reload. +set -euo pipefail SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) -DEFAULT_BIN="${SCRIPT_DIR}/../../target/debug/pgdog" +DEFAULT_BIN="${SCRIPT_DIR}/../../../target/debug/pgdog" PGDOG_BIN=${PGDOG_BIN:-$DEFAULT_BIN} +PGDOG_CONFIG="${SCRIPT_DIR}/pgdog.toml" +PGDOG_USERS="${SCRIPT_DIR}/users.toml" export PGUSER=pgdog export PGDATABASE=pgdog @@ -10,6 +22,9 @@ export PGHOST=127.0.0.1 export PGPORT=5432 export PGPASSWORD=pgdog +BENCH_PID="" +REPL_PID="" + cleanup() { if [ -n "${BENCH_PID}" ]; then kill ${BENCH_PID} 2>/dev/null || true @@ -24,7 +39,6 @@ trap cleanup EXIT start_pgbench() { ( - pgbench -h 127.0.0.1 -p 5432 -U pgdog pgdog \ -t 100000000 -c 3 --protocol extended \ -f "${SCRIPT_DIR}/pgbench.sql" -P 1 @@ -41,16 +55,24 @@ stop_pgbench() { fi } +SHARDED_TABLES="copy_data.users copy_data.orders copy_data.order_items copy_data.log_actions copy_data.with_identity" +OMNI_TABLES="copy_data.countries copy_data.currencies copy_data.categories" + pushd ${SCRIPT_DIR} -psql -f init.sql +# Teardown: drop stale slots and schemas. +psql -f "${SCRIPT_DIR}/init.sql" +# Setup: populate source database. +psql -f "${SCRIPT_DIR}/../setup.sql" # # 0 -> 2 # -${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog +${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ + schema-sync --from-database source --to-database destination --publication pgdog start_pgbench -${PGDOG_BIN} data-sync --from-database source --to-database destination --publication pgdog & +${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ + data-sync --from-database source --to-database destination --publication pgdog & REPL_PID=$! # Give replication a moment to connect. @@ -82,14 +104,18 @@ if [ ${REPL_EXIT} -ne 0 ] && [ ${REPL_EXIT} -ne 130 ] && [ ${REPL_EXIT} -ne 143 fi stop_pgbench -${PGDOG_BIN} schema-sync --from-database source --to-database destination --publication pgdog --cutover +${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ + schema-sync --from-database source --to-database destination --publication pgdog --cutover # # 2 --> 2 # -${PGDOG_BIN} schema-sync --from-database destination --to-database destination2 --publication pgdog -${PGDOG_BIN} data-sync --sync-only --from-database destination --to-database destination2 --publication pgdog --replication-slot copy_data_2 -${PGDOG_BIN} schema-sync --from-database destination --to-database destination2 --publication pgdog --cutover +${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ + schema-sync --from-database destination --to-database destination2 --publication pgdog +${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ + data-sync --sync-only --from-database destination --to-database destination2 --publication pgdog --replication-slot copy_data_2 +${PGDOG_BIN} --config "${PGDOG_CONFIG}" --users "${PGDOG_USERS}" \ + schema-sync --from-database destination --to-database destination2 --publication pgdog --cutover # Check row counts: destination (pgdog1 + pgdog2) vs destination2 (shard_0 + shard_1) echo "Checking row counts: destination -> destination2..." @@ -118,6 +144,6 @@ for TABLE in ${OMNI_TABLES}; do echo "OK ${TABLE}: ${SRC} rows on each shard" done -psql -f init.sql +psql -f "${SCRIPT_DIR}/init.sql" popd diff --git a/integration/copy_data/users.toml b/integration/copy_data/data_sync/users.toml similarity index 100% rename from integration/copy_data/users.toml rename to integration/copy_data/data_sync/users.toml diff --git a/integration/copy_data/retry_test/init.sql b/integration/copy_data/retry_test/init.sql new file mode 100644 index 000000000..a5077e964 --- /dev/null +++ b/integration/copy_data/retry_test/init.sql @@ -0,0 +1,6 @@ +-- Source-only reset for the docker-compose retry test. +-- Each database lives in its own container, so \c cannot cross server boundaries. +-- Destination shards are reset by retry_test.sh via separate psql invocations. +DROP SCHEMA IF EXISTS copy_data CASCADE; +SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots; +\i setup.sql diff --git a/integration/copy_data/pgdog.retry_test.toml b/integration/copy_data/retry_test/pgdog.toml similarity index 92% rename from integration/copy_data/pgdog.retry_test.toml rename to integration/copy_data/retry_test/pgdog.toml index 84e8f4042..daa29a71f 100644 --- a/integration/copy_data/pgdog.retry_test.toml +++ b/integration/copy_data/retry_test/pgdog.toml @@ -1,6 +1,6 @@ [general] resharding_copy_format = "binary" -resharding_copy_retry_max_attempts = 5 +resharding_copy_retry_max_attempts = 8 resharding_copy_retry_min_delay = 500 [[databases]] diff --git a/integration/copy_data/retry_test.sh b/integration/copy_data/retry_test/run.sh old mode 100644 new mode 100755 similarity index 52% rename from integration/copy_data/retry_test.sh rename to integration/copy_data/retry_test/run.sh index 6b61afd41..bed0022a5 --- a/integration/copy_data/retry_test.sh +++ b/integration/copy_data/retry_test/run.sh @@ -1,18 +1,19 @@ #!/bin/bash -# Integration test: data-sync retries when a destination shard is temporarily unavailable. +# Integration test: data-sync retries when a destination shard connection drops mid-copy. # # What is tested: -# - data-sync --sync-only completes (exit 0) when shard_1 is stopped before the -# sync starts and brought back while retries are in flight. +# - data-sync --sync-only completes (exit 0) when shard_1 is killed mid-copy and +# brought back while the retry loop is running. # - Row counts on all destination tables match the source after sync completes. # -# Requires: the copy_data docker-compose stack to be running. -set -e +# Manages its own docker-compose stack — no pre-started containers required. +set -euo pipefail SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) -DEFAULT_BIN="${SCRIPT_DIR}/../../target/debug/pgdog" +COMPOSE_DIR="${SCRIPT_DIR}/.." +DEFAULT_BIN="${SCRIPT_DIR}/../../../target/debug/pgdog" PGDOG_BIN=${PGDOG_BIN:-$DEFAULT_BIN} -PGDOG_CONFIG="${SCRIPT_DIR}/pgdog.retry_test.toml" +PGDOG_CONFIG="${SCRIPT_DIR}/pgdog.toml" USERS_CONFIG="${SCRIPT_DIR}/users.toml" export PGPASSWORD=pgdog @@ -23,27 +24,41 @@ cleanup() { kill "${SYNC_PID}" 2>/dev/null || true wait "${SYNC_PID}" 2>/dev/null || true fi - # Always bring shard_1 back on exit so the stack is not left broken. - cd "${SCRIPT_DIR}" && docker compose start shard_1 2>/dev/null || true + cd "${COMPOSE_DIR}" && docker compose down 2>/dev/null || true } trap cleanup EXIT -pushd "${SCRIPT_DIR}" +pushd "${COMPOSE_DIR}" + +# Start the docker-compose stack and wait for all three postgres instances. +echo "[retry_test] Starting docker-compose stack..." +docker compose up -d + +echo "[retry_test] Waiting for postgres instances to be ready..." +for PORT in 15432 15433 15434; do + READY_ATTEMPTS=0 + until pg_isready -h 127.0.0.1 -p "${PORT}" -q 2>/dev/null; do + READY_ATTEMPTS=$((READY_ATTEMPTS + 1)) + if [ "${READY_ATTEMPTS}" -ge 60 ]; then + echo "[retry_test] FAIL: postgres on port ${PORT} not ready after 30s" + exit 1 + fi + sleep 0.5 + done +done +echo "[retry_test] All postgres instances ready." -# Reset destination and reload source data. -psql -h 127.0.0.1 -p 15432 -U pgdog pgdog -f init.sql +# Reset destination shards then source — each is a separate container. +psql -h 127.0.0.1 -p 15433 -U pgdog pgdog1 -c "DROP SCHEMA IF EXISTS copy_data CASCADE;" +psql -h 127.0.0.1 -p 15434 -U pgdog pgdog2 -c "DROP SCHEMA IF EXISTS copy_data CASCADE;" +psql -h 127.0.0.1 -p 15432 -U pgdog pgdog -f "${SCRIPT_DIR}/init.sql" # Schema sync: create tables on destination shards. "${PGDOG_BIN}" --config "${PGDOG_CONFIG}" --users "${USERS_CONFIG}" \ schema-sync --from-database source --to-database destination --publication pgdog -# Stop shard_1 before the copy starts. -# Every table copy connects to all destination shards, so all tables will fail on the -# first attempt and enter the retry loop. -echo "[retry_test] Stopping shard_1..." -docker compose stop shard_1 - -# Start data-sync in the background. +# Start data-sync with all shards up so the connection pool initialises cleanly +# and CopySubscriber can connect to both shards before we inject a failure. "${PGDOG_BIN}" --config "${PGDOG_CONFIG}" --users "${USERS_CONFIG}" \ data-sync --sync-only \ --from-database source \ @@ -51,10 +66,34 @@ docker compose stop shard_1 --publication pgdog & SYNC_PID=$! -# Let data-sync start and hit connection failures on shard_1. +# Wait until data-sync has created a temporary replication slot on the source. +# The slot is created after copy_sub.connect() — meaning CopySubscriber already +# holds open connections to both shards. Killing shard_1 now lands a mid-copy +# network error that run_with_retry() must handle, not a pre-connection timeout. +echo "[retry_test] Waiting for data-sync to start copying..." +WAIT_ATTEMPTS=0 +until psql -h 127.0.0.1 -p 15432 -U pgdog pgdog -tAc \ + "SELECT 1 FROM pg_replication_slots WHERE temporary = true LIMIT 1" 2>/dev/null | grep -q 1; do + WAIT_ATTEMPTS=$((WAIT_ATTEMPTS + 1)) + if [ "${WAIT_ATTEMPTS}" -ge 200 ]; then + echo "[retry_test] FAIL: data-sync never created a replication slot after $((WAIT_ATTEMPTS / 20))s" + exit 1 + fi + if ! kill -0 "${SYNC_PID}" 2>/dev/null; then + echo "[retry_test] FAIL: data-sync exited before copy started" + exit 1 + fi + sleep 0.05 +done + +# SIGKILL the container immediately — no grace period — so the kill lands before +# the in-flight COPY can finish. docker compose stop has a 10s grace period. +echo "[retry_test] Killing shard_1 during active copy..." +docker compose kill shard_1 + +# Let the retry loop run a few cycles before bringing shard_1 back. sleep 2 -# Bring shard_1 back while retries are in flight. echo "[retry_test] Starting shard_1..." docker compose start shard_1 @@ -121,4 +160,5 @@ if [ "${FAILED}" -ne 0 ]; then fi echo "[retry_test] PASS: all row counts match. Retry test succeeded." +docker compose down popd diff --git a/integration/copy_data/retry_test/users.toml b/integration/copy_data/retry_test/users.toml new file mode 100644 index 000000000..67142d309 --- /dev/null +++ b/integration/copy_data/retry_test/users.toml @@ -0,0 +1,11 @@ +[[users]] +database = "source" +name = "pgdog" +password = "pgdog" +schema_admin = true + +[[users]] +database = "destination" +name = "pgdog" +password = "pgdog" +schema_admin = true diff --git a/integration/copy_data/run.sh b/integration/copy_data/run.sh new file mode 100644 index 000000000..17d13a608 --- /dev/null +++ b/integration/copy_data/run.sh @@ -0,0 +1,19 @@ +#!/bin/bash +# Runs all copy_data integration tests in sequence. +# +# Tests: +# data_sync/run.sh — 0→2 and 2→2 resharding with live write traffic +# (uses local postgres from integration/setup.sh) +# retry_test/run.sh — data-sync retry loop under mid-copy shard failure +# (manages its own docker-compose stack) +set -e + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +echo "=== [copy_data] data_sync ===" +bash "${SCRIPT_DIR}/data_sync/run.sh" + +echo "=== [copy_data] retry_test ===" +bash "${SCRIPT_DIR}/retry_test/run.sh" + +echo "=== [copy_data] all tests passed ===" diff --git a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs index d959450f3..3b2047f62 100644 --- a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs +++ b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs @@ -118,13 +118,6 @@ impl ParallelSync { sleep(backoff).await; // FUTURE: truncate before retry to handle the COPY-committed-but-dropped // race (rows remain → PK violations). Safe once source-guard checks exist. - // - // if let Err(trunc_err) = self.table.truncate_destination(&self.dest).await { - // warn!( - // "truncate before retry failed for \"{}\".\"{}\" : {trunc_err}", - // self.table.table.schema, self.table.table.name, - // ); - // } } } } From 556027685c0a98717c1a20f81b98f128ed61d55b Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Tue, 21 Apr 2026 22:05:27 +0000 Subject: [PATCH 05/10] drop parallel_connections wrapper --- docs/issues/894-897-copy-data-retry.md | 217 +++++---------- .../src/backend/replication/logical/error.rs | 4 - .../logical/publisher/parallel_sync.rs | 9 +- .../replication/logical/subscriber/copy.rs | 69 +++-- .../replication/logical/subscriber/mod.rs | 2 - .../logical/subscriber/parallel_connection.rs | 256 ------------------ 6 files changed, 109 insertions(+), 448 deletions(-) delete mode 100644 pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs diff --git a/docs/issues/894-897-copy-data-retry.md b/docs/issues/894-897-copy-data-retry.md index ae1f98d8f..958e35909 100644 --- a/docs/issues/894-897-copy-data-retry.md +++ b/docs/issues/894-897-copy-data-retry.md @@ -1,189 +1,100 @@ # COPY_DATA Retry Reliability (Issues #894 + #897) -## Problem - -**#894:** During `COPY_DATA` resharding, if a destination shard temporarily goes down, PgDog -continues processing the next tables without retrying. This leaves the destination incompletely +**#894:** During `COPY_DATA` resharding, if a destination shard temporarily went down, PgDog +continued processing the next tables without retrying. The destination was left incompletely populated (276 started, 63 finished in the reported case). -**#897:** Two distinct failure scenarios need retry coverage: +**#897:** Two failure modes need coverage: 1. **Destination shard is down** — connection to dest fails or drops mid-COPY 2. **Origin shard is down** — source connection drops, temporary replication slot is lost -The fix: add per-table retry logic with exponential backoff inside `ParallelSync`. +--- -## Why a Single Top-Level Retry Handles Both #897 Scenarios +## Solution -`Table::data_sync()` is fully self-contained: +### Retry mechanism -``` -data_sync() { - CopySubscriber::new() → dest connection - ReplicationSlot::data_sync() → fresh slot name (random_string(24)), temp slot - slot.connect() + slot.create_slot() - ... copy rows ... - slot drops on disconnect (TEMPORARY) -} -``` +`Table::data_sync()` in `publisher/table.rs` opens fresh connections on every call, so a +single retry loop covers both failure modes: -On any failure and retry, `data_sync` is called from scratch: - **Dest down** → new `CopySubscriber` reconnects to destination. -- **Origin down** → new `ReplicationSlot` with a fresh random name reconnects to source and - re-creates the temporary slot. The old slot was `TEMPORARY` → auto-dropped by Postgres when - its connection closed. No slot cleanup needed. +- **Origin down** → new `ReplicationSlot` with a fresh random name. The old slot was + `TEMPORARY` and was auto-dropped by Postgres when its connection closed. -## Destination Commit Model +The retry loop lives in `ParallelSync::run_with_retry()` (`publisher/parallel_sync.rs`): +exponential backoff starting at `resharding_copy_retry_min_delay`, doubling each attempt, +capped at 32×, up to `resharding_copy_retry_max_attempts` tries. -**How the copy works per-table:** -- Source side: `slot.create_slot()` opens `BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ` - on the source. `COPY ... TO STDOUT` streams all rows from that consistent snapshot. - `COMMIT` is called after all rows are streamed (source-only commit, unrelated to dest). -- Destination side: `CopySubscriber::connect()` opens one **standalone** connection per - destination shard (no explicit `BEGIN`). `start_copy()` sends `COPY ... FROM STDIN` to - each shard. Each `COPY FROM STDIN` runs inside PostgreSQL's own **implicit transaction**. - `copy_done()` sends `CopyDone` to each shard **sequentially** — shard 0 commits, then - shard 1, etc. **There is no cross-shard atomicity.** +| Key | Default | Description | +|-----|---------|-------------| +| `resharding_copy_retry_max_attempts` | `5` | Per-table retry attempts | +| `resharding_copy_retry_min_delay` | `1000` ms | Base backoff; doubles each attempt, capped at 32× | -**Failure scenarios and destination state:** +### Truncation + +Each table COPY runs inside PostgreSQL's own implicit transaction — no explicit `BEGIN`. +There is no cross-shard atomicity. | Failure point | Destination state | |---|---| -| During row streaming (conn drops mid-COPY) | PG auto-rolls back implicit tx → dest empty | +| During row streaming | PG rolls back implicit tx → dest empty | | Inside `copy_done()` — some shards committed, others not | Partially committed | -| After `copy_done()`, before `data_sync` returns `Ok` | All shards committed rows | - -The common case (connection drop during streaming) leaves the destination clean — PostgreSQL -rolls back the implicit transaction automatically. The rare race is when `copy_done()` has -already committed on some or all shards and then the connection drops before `data_sync` -returns `Ok`. In that case, rows survive in the destination and a naive retry would immediately -hit primary key constraint violations. - -**Current behavior — manual TRUNCATE guidance:** - -PgDog does not automatically truncate the destination before retrying. Auto-TRUNCATE is the -correct long-term fix but requires reliable "is destination" guards that don't yet exist; -running TRUNCATE on the wrong cluster would be catastrophic. That logic is stubbed out as a -commented future extension in `run_with_retry()` in `parallel_sync.rs`. +| After `copy_done()`, before `data_sync` returns `Ok` | All shards have rows | -Instead, when a table copy fails fatally (non-retryable error or max attempts exhausted), -`Table::destination_has_rows` queries each shard's primary with `SELECT 1 … LIMIT 1`. If -any rows are found, PgDog logs a `warn!` that includes the exact TRUNCATE statement to run: +The common case (drop during streaming) leaves the destination clean. The rare race is a +commit that landed before the connection dropped — a retry then hits primary key violations. -``` -data sync for "public"."orders" failed with rows remaining in destination; -truncate manually before retrying: TRUNCATE "public"."orders_new"; -``` - -If the row-count check itself fails (destination unreachable), a separate warning is emitted. -The original error is always returned regardless. - -**Non-retryable errors** (`CopyAborted`, `DataSyncAborted`, `NoPrimaryKey`, `NoReplicaIdentity`, -`ParallelConnection`) bypass the retry loop immediately and still trigger the row check. +PgDog does not auto-TRUNCATE before retrying (wrong-cluster risk). On fatal failure, +`Table::destination_has_rows` checks each shard and logs a `warn!` with the exact `TRUNCATE` +statement to run manually. Auto-truncate is stubbed as a future extension in `run_with_retry()`. -## Code Path +### Error handling -``` -admin/copy_data.rs → Orchestrator::data_sync() -orchestrator.rs → Publisher::data_sync() -publisher/publisher_impl.rs → per-shard ParallelSyncManager::run() -publisher/parallel_sync.rs → ParallelSync::run() ← calls run_with_retry - ParallelSync::run_with_retry() ← retry loop (new) -publisher/table.rs → Table::data_sync() ← actual COPY (self-contained) -``` +`is_retryable()` in `error.rs` uses a whitelist: `Net`, `Pool`, `NotConnected`, `NoPrimary`, +and `ReplicationTimeout` return `true`; everything else defaults to `false`. Non-retryable +errors (`CopyAborted`, `DataSyncAborted`, `NoPrimaryKey`, `NoReplicaIdentity`) bypass the +retry loop immediately but still trigger the destination row check. -## Implementation +### `ParallelConnection` issue -### 1. Config — add retry knobs (`pgdog-config/src/general.rs`) +`CopySubscriber` originally held a `Vec`, each wrapping a `Server` in a +background Tokio task. The intent was parallel shard writes; the reality was a sequential +loop: -Add after `resharding_parallel_copies` in the `General` struct: ```rust -/// Maximum number of retries for a failed table copy during resharding (per-table). -/// _Default:_ `5` -#[serde(default = "General::resharding_copy_retry_max_attempts")] -pub resharding_copy_retry_max_attempts: usize, - -/// Delay in milliseconds between table copy retries. Doubles each attempt, capped at 32×. -/// _Default:_ `1000` -#[serde(default = "General::resharding_copy_retry_min_delay")] -pub resharding_copy_retry_min_delay: u64, -``` - -Private defaults and `impl Default` entries added following the existing `resharding_parallel_copies` pattern. - -### 2. Cluster — propagate and expose (`pgdog/src/backend/pool/cluster.rs`) - -Add both fields to `ClusterConfig<'a>` and the private `Cluster` struct; populate in -`ClusterConfig::new()` from `general.*`; wire through `Cluster::new()`; expose via: -```rust -pub fn resharding_copy_retry_max_attempts(&self) -> usize { ... } -pub fn resharding_copy_retry_min_delay(&self) -> &Duration { ... } -``` - -### 3. Table — helpers (`pgdog/src/backend/replication/logical/publisher/table.rs`) - -```rust -/// Generate a TRUNCATE SQL statement for the given schema and table name. -pub fn truncate_statement(schema: &str, name: &str) -> String { ... } - -/// Truncate this table on all destination primaries. -/// Not called automatically — preserved for future use once "is destination" guards exist. -pub async fn truncate_destination(&self, dest: &Cluster) -> Result<(), Error> { ... } - -/// Returns true if any shard's primary has rows in the destination table. -/// Used after fatal failure to detect the COPY-committed-before-error race. -pub async fn destination_has_rows(&self, dest: &Cluster) -> Result { ... } -``` - -### 4. Error — retryability predicate (`pgdog/src/backend/replication/logical/error.rs`) - -Whitelist approach — only connection-level wrappers and direct availability variants return `true`. -New variants default to non-retryable, which is the safe choice. - -```rust -pub fn is_retryable(&self) -> bool { - match self { - // Shard was unreachable; each retry opens a fresh connection. - // Some sub-variants (TLS, protocol errors) aren't truly transient but - // will just exhaust the budget and fail cleanly. - Self::Net(_) | Self::Pool(_) => true, - - // No connection yet, or primary is down — worth retrying. - Self::NotConnected | Self::NoPrimary => true, - - // Replication stalled; temporary slot is gone, next attempt starts fresh. - Self::ReplicationTimeout => true, - - // Abort signals, schema mismatches, protocol violations — retrying won't help. - _ => false, - } +for server in &mut self.connections { + server.send_one(&stmt.clone().into()).await?; // channel push — fast + server.flush().await?; // channel push — fast + let msg = server.read().await?; // BLOCKS until shard replies } ``` -### 5. ParallelSync — retry loop (`pgdog/src/backend/replication/logical/publisher/parallel_sync.rs`) - -Split `run()` into `run()` (public entry point, spawns task) and `run_with_retry()` (private). +`send_one` and `flush` pushed to the mpsc channel and returned immediately, but `read()` +blocked on the reply channel until the background task completed a full socket round-trip. +Shard 1 never started until shard 0 finished. No parallelism at all. -On each retryable failed attempt: -1. Compute exponential backoff: `min_delay * 2^attempt`, capped at 32×. -2. Log the error and how long we are waiting, e.g. `failed (attempt 1/5): …, retrying after 500ms…` -3. Sleep for the backoff duration. -4. Increment attempt counter and loop. +The `Listener` task's `select!` also polled `server.read()` continuously, including during +COPY IN when Postgres never sends unsolicited messages. When the socket died, the real error +surfaced inside the `Listener`, which exited — but the `JoinHandle` was fire-and-forget, so +the error was dropped. The main task found the channel closed and returned +`Error::ParallelConnection` with `is_retryable() = false`. The retry loop never fired. -On fatal failure (non-retryable error or attempts exhausted): -1. Record the error via `tracker.error()`. -2. Call `Table::destination_has_rows` — if rows are found, emit a `warn!` with the exact TRUNCATE SQL. -3. Return the original error to the caller. +#### Solution -## Configuration Reference +`ParallelConnection` was removed. `CopySubscriber` (`subscriber/copy.rs`) now holds +`Vec` directly. `start_copy`, the per-buffer `flush`, and `copy_done` all use +`futures::future::try_join_all` to drive every shard concurrently — this is the first version +that is actually parallel. Real errors propagate directly up the call stack and +`is_retryable()` sees the true failure. -| Key | Type | Default | Description | -|-----|------|---------|-------------| -| `resharding_copy_retry_max_attempts` | `usize` | `5` | Maximum per-table retry attempts | -| `resharding_copy_retry_min_delay` | `u64` (ms) | `1000` | Base backoff delay in milliseconds; doubles each attempt, capped at 32× | +| Phase | Before | After | +|---|---|---| +| `start_copy` | N × RTT | 1 × RTT | +| `flush` (per buffer) | N × RTT | 1 × RTT | +| `copy_done` | N × RTT | 1 × RTT | -## Integration Test +### Tests -`integration/copy_data/retry_test.sh` — stops shard_1 before the sync starts, brings it -back after ~2 s, asserts exit 0 and correct row counts on all tables. -Requires the `integration/copy_data/` docker-compose stack to be running. -Config: `integration/copy_data/pgdog.retry_test.toml` (faster retry settings for CI speed). +`integration/copy_data/retry_test/run.sh` kills shard_1 mid-sync, brings it back after ~2 s, +and asserts exit 0 with correct row counts. Uses faster retry settings in +`retry_test/pgdog.toml` for CI speed. diff --git a/pgdog/src/backend/replication/logical/error.rs b/pgdog/src/backend/replication/logical/error.rs index 8d39ff70c..641d654a2 100644 --- a/pgdog/src/backend/replication/logical/error.rs +++ b/pgdog/src/backend/replication/logical/error.rs @@ -81,9 +81,6 @@ pub enum Error { #[error("shard {0} has no replication slot")] NoReplicationSlot(usize), - #[error("parallel connection error")] - ParallelConnection, - #[error("no replicas available for table sync")] NoReplicas, @@ -223,6 +220,5 @@ mod tests { assert!(!Error::DataSyncAborted.is_retryable()); assert!(!Error::NoPrimaryKey(PublicationTable::default()).is_retryable()); assert!(!Error::NoReplicaIdentity("s".into(), "t".into()).is_retryable()); - assert!(!Error::ParallelConnection.is_retryable()); } } diff --git a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs index 3b2047f62..f3d017a3a 100644 --- a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs +++ b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs @@ -46,7 +46,7 @@ impl ParallelSync { let _permit = Arc::clone(&self.permit) .acquire_owned() .await - .map_err(|_| Error::ParallelConnection)?; + .map_err(|_| Error::DataSyncAborted)?; if self.tx.is_closed() { return Err(Error::DataSyncAborted); @@ -72,9 +72,10 @@ impl ParallelSync { .await { Ok(_) => { - self.tx - .send(Ok(self.table.clone())) - .map_err(|_| Error::ParallelConnection)?; + // Best-effort report: if the coordinator receiver is already gone, + // the whole sync is shutting down and our result is irrelevant. + // The copy succeeded; don't mask that with a channel error. + let _ = self.tx.send(Ok(self.table.clone())); return Ok(()); } Err(err) if !err.is_retryable() || attempt >= max_retries => { diff --git a/pgdog/src/backend/replication/logical/subscriber/copy.rs b/pgdog/src/backend/replication/logical/subscriber/copy.rs index 262db9d58..869a63fd3 100644 --- a/pgdog/src/backend/replication/logical/subscriber/copy.rs +++ b/pgdog/src/backend/replication/logical/subscriber/copy.rs @@ -1,12 +1,13 @@ //! Shard COPY stream from one source //! between N shards. +use futures::future::try_join_all; use pg_query::{parse_raw, NodeEnum}; use pgdog_config::QueryParserEngine; use tracing::debug; use crate::{ - backend::{replication::subscriber::ParallelConnection, Cluster, ConnectReason}, + backend::{Cluster, ConnectReason, Server}, config::Role, frontend::router::parser::{CopyParser, Shard}, net::{CopyData, CopyDone, ErrorResponse, FromBytes, Protocol, Query, ToBytes}, @@ -24,7 +25,7 @@ pub struct CopySubscriber { copy: CopyParser, cluster: Cluster, buffer: Vec, - connections: Vec, + connections: Vec, stmt: CopyStatement, bytes_sharded: usize, } @@ -85,7 +86,7 @@ impl CopySubscriber { .1 .standalone(ConnectReason::Replication) .await?; - servers.push(ParallelConnection::new(primary)?); + servers.push(primary); } self.connections = servers; @@ -95,38 +96,38 @@ impl CopySubscriber { /// Disconnect from all shards. pub async fn disconnect(&mut self) -> Result<(), Error> { - for conn in std::mem::take(&mut self.connections) { - conn.reattach().await?; - } + self.connections.clear(); Ok(()) } /// Start COPY on all shards. pub async fn start_copy(&mut self) -> Result<(), Error> { - let stmt = Query::new(self.stmt.copy_in()); - if self.connections.is_empty() { self.connect().await?; } - for server in &mut self.connections { - debug!("{} [{}]", stmt.query(), server.addr()); + let stmt = Query::new(self.stmt.copy_in()); - server.send_one(&stmt.clone().into()).await?; - server.flush().await?; + // Start COPY IN on all shards concurrently. + try_join_all(self.connections.iter_mut().map(|server| { + let msg: crate::net::ProtocolMessage = stmt.clone().into(); + debug!("{} [{}]", stmt.query(), server.addr()); - let msg = server.read().await?; - match msg.code() { - 'G' => (), - 'E' => { - return Err(Error::PgError(Box::new(ErrorResponse::from_bytes( - msg.to_bytes()?, - )?))) + async move { + server.send_one(&msg).await?; + server.flush().await?; + let reply = server.read().await?; + match reply.code() { + 'G' => Ok(()), + 'E' => Err(Error::PgError(Box::new(ErrorResponse::from_bytes( + reply.to_bytes()?, + )?))), + c => Err(Error::OutOfSync(c)), } - c => return Err(Error::OutOfSync(c)), } - } + })) + .await?; Ok(()) } @@ -135,20 +136,20 @@ impl CopySubscriber { pub async fn copy_done(&mut self) -> Result<(), Error> { self.flush().await?; - for server in &mut self.connections { + // Finalise COPY on all shards concurrently. + try_join_all(self.connections.iter_mut().map(|server| async move { server.send_one(&CopyDone.into()).await?; server.flush().await?; - let command_complete = server.read().await?; - match command_complete.code() { + let cc = server.read().await?; + match cc.code() { 'E' => { - let error = ErrorResponse::from_bytes(command_complete.to_bytes()?)?; + let error = ErrorResponse::from_bytes(cc.to_bytes()?)?; if error.code == "08P01" && error.message == "insufficient data left in message" { return Err(Error::BinaryFormatMismatch(Box::new(error))); - } else { - return Err(Error::PgError(Box::new(error))); } + return Err(Error::PgError(Box::new(error))); } 'C' => (), c => return Err(Error::OutOfSync(c)), @@ -158,7 +159,9 @@ impl CopySubscriber { if rfq.code() != 'Z' { return Err(Error::OutOfSync(rfq.code())); } - } + Ok(()) + })) + .await?; Ok(()) } @@ -174,12 +177,19 @@ impl CopySubscriber { } async fn flush(&mut self) -> Result<(usize, usize), Error> { + if self.buffer.is_empty() { + return Ok((0, 0)); + } + let result = self.copy.shard(&self.buffer)?; self.buffer.clear(); let rows = result.len(); let bytes = result.iter().map(|row| row.len()).sum::(); + self.bytes_sharded += bytes; + // Route each row to the right shard(s). send_one is a buffered write + // so this loop does no I/O — no concurrency needed here. for row in &result { for (shard, server) in self.connections.iter_mut().enumerate() { match row.shard() { @@ -198,7 +208,8 @@ impl CopySubscriber { } } - self.bytes_sharded += result.iter().map(|c| c.len()).sum::(); + // Flush all shards concurrently — this is the actual socket write. + try_join_all(self.connections.iter_mut().map(|s| s.flush())).await?; Ok((rows, bytes)) } diff --git a/pgdog/src/backend/replication/logical/subscriber/mod.rs b/pgdog/src/backend/replication/logical/subscriber/mod.rs index 9df30191b..7b5e7e0b2 100644 --- a/pgdog/src/backend/replication/logical/subscriber/mod.rs +++ b/pgdog/src/backend/replication/logical/subscriber/mod.rs @@ -1,6 +1,5 @@ pub mod context; pub mod copy; -pub mod parallel_connection; pub mod stream; #[cfg(test)] @@ -8,5 +7,4 @@ mod tests; pub use context::StreamContext; pub use copy::CopySubscriber; -pub use parallel_connection::ParallelConnection; pub use stream::StreamSubscriber; diff --git a/pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs b/pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs deleted file mode 100644 index 4c4b15713..000000000 --- a/pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs +++ /dev/null @@ -1,256 +0,0 @@ -//! Postgres server connection running in its own task. -//! -//! This allows to queue up messages across multiple instances -//! of this connection without blocking and while maintaining protocol integrity. -//! -use tokio::select; -use tokio::spawn; -use tokio::sync::{ - mpsc::{channel, Receiver, Sender}, - Notify, -}; - -use crate::backend::pool::Address; -use crate::{ - backend::Server, - frontend::ClientRequest, - net::{Message, ProtocolMessage}, -}; - -use std::sync::Arc; - -use super::super::Error; - -// What we can send. -enum ParallelMessage { - // Protocol message, e.g. Bind, Execute, Sync. - ProtocolMessage(ProtocolMessage), - // Flush the socket. - Flush, -} - -// What we can receive. -enum ParallelReply { - // Message, e.g. RowDescription, DataRow, CommandComplete, etc. - Message(Message), - // The task gives back the server connection to the owner. - // Preserve connections between parallel executions. - Server(Box), -} - -// Parallel Postgres server connection. -#[derive(Debug)] -pub struct ParallelConnection { - tx: Sender, - rx: Receiver, - stop: Arc, - address: Address, -} - -impl ParallelConnection { - // Queue up message to server. - pub async fn send_one(&mut self, message: &ProtocolMessage) -> Result<(), Error> { - self.tx - .send(ParallelMessage::ProtocolMessage(message.clone())) - .await - .map_err(|_| Error::ParallelConnection)?; - - Ok(()) - } - - // Queue up the contents of the buffer. - pub async fn send(&mut self, client_request: &ClientRequest) -> Result<(), Error> { - for message in client_request.messages.iter() { - self.tx - .send(ParallelMessage::ProtocolMessage(message.clone())) - .await - .map_err(|_| Error::ParallelConnection)?; - self.tx - .send(ParallelMessage::Flush) - .await - .map_err(|_| Error::ParallelConnection)?; - } - - Ok(()) - } - - // Wait for a message from the server. - pub async fn read(&mut self) -> Result { - let reply = self.rx.recv().await.ok_or(Error::ParallelConnection)?; - match reply { - ParallelReply::Message(message) => Ok(message), - ParallelReply::Server(_) => Err(Error::ParallelConnection), - } - } - - // Request server connection performs socket flush. - pub async fn flush(&mut self) -> Result<(), Error> { - self.tx - .send(ParallelMessage::Flush) - .await - .map_err(|_| Error::ParallelConnection)?; - - Ok(()) - } - - /// Server address. - pub fn addr(&self) -> &Address { - &self.address - } - - // Move server connection into its own Tokio task. - pub fn new(server: Server) -> Result { - // Ideally we don't hardcode these. PgDog - // can use a lot of memory if this is high. - let (tx1, rx1) = channel(4096); - let (tx2, rx2) = channel(4096); - let stop = Arc::new(Notify::new()); - let address = server.addr().clone(); - - let listener = Listener { - stop: stop.clone(), - rx: rx1, - tx: tx2, - server: Some(Box::new(server)), - }; - - spawn(async move { - listener.run().await?; - - Ok::<(), Error>(()) - }); - - Ok(Self { - address, - tx: tx1, - rx: rx2, - stop, - }) - } - - // Get the connection back from the async task. This will - // only work if the connection is idle (ReadyForQuery received, no more traffic expected). - pub async fn reattach(mut self) -> Result { - self.stop.notify_one(); - let server = self.rx.recv().await.ok_or(Error::ParallelConnection)?; - match server { - ParallelReply::Server(server) => Ok(*server), - _ => Err(Error::ParallelConnection), - } - } -} - -// Stop the background task and kill the connection. -// Prevents leaks in case the connection is not "reattached". -impl Drop for ParallelConnection { - fn drop(&mut self) { - self.stop.notify_one(); - } -} - -// Background task performing the actual work of talking to Postgres. -struct Listener { - rx: Receiver, - tx: Sender, - server: Option>, - stop: Arc, -} - -impl Listener { - // Send message to Postgres. - async fn send(&mut self, message: ProtocolMessage) -> Result<(), Error> { - if let Some(ref mut server) = self.server { - server.send_one(&message).await?; - } - - Ok(()) - } - - // Flush socket. - async fn flush(&mut self) -> Result<(), Error> { - if let Some(ref mut server) = self.server { - server.flush().await?; - } - - Ok(()) - } - - // Return server to parent task. - async fn return_server(&mut self) -> Result<(), Error> { - if let Some(server) = self.server.take() { - if self.tx.is_closed() { - drop(server); - } else { - let _ = self.tx.send(ParallelReply::Server(server)).await; - } - } - - Ok(()) - } - - // Run the background task. - async fn run(mut self) -> Result<(), Error> { - loop { - select! { - message = self.rx.recv() => { - if let Some(message) = message { - match message { - ParallelMessage::ProtocolMessage(message) => self.send(message).await?, - ParallelMessage::Flush => self.flush().await?, - } - } else { - self.return_server().await?; - break; - } - } - - reply = self.server.as_mut().unwrap().read() => { - let reply = reply?; - self.tx.send(ParallelReply::Message(reply)).await.map_err(|_| Error::ParallelConnection)?; - } - - _ = self.stop.notified() => { - self.return_server().await?; - break; - } - } - } - - Ok(()) - } -} - -#[cfg(test)] -mod test { - use crate::{ - backend::server::test::test_server, - net::{Parse, Protocol, Sync}, - }; - - use super::*; - - #[tokio::test] - async fn test_parallel_connection() { - let server = test_server().await; - let mut parallel = ParallelConnection::new(server).unwrap(); - - parallel - .send( - &vec![ - Parse::named("test", "SELECT $1::bigint").into(), - Sync.into(), - ] - .into(), - ) - .await - .unwrap(); - - for c in ['1', 'Z'] { - let msg = parallel.read().await.unwrap(); - assert_eq!(msg.code(), c); - } - - let server = parallel.reattach().await.unwrap(); - assert!(server.in_sync()); - } -} From c0b8767d92a546e9b1f50b4d5e9f29c4ef4454f3 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Wed, 22 Apr 2026 12:11:23 +0000 Subject: [PATCH 06/10] ci: update jsonschema --- .schema/pgdog.schema.json | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/.schema/pgdog.schema.json b/.schema/pgdog.schema.json index aa70e980a..c1d94a18d 100644 --- a/.schema/pgdog.schema.json +++ b/.schema/pgdog.schema.json @@ -92,6 +92,8 @@ "regex_parser_limit": 1000, "reload_schema_on_ddl": true, "resharding_copy_format": "binary", + "resharding_copy_retry_max_attempts": 5, + "resharding_copy_retry_min_delay": 1000, "resharding_parallel_copies": 1, "rollback_timeout": 5000, "server_lifetime": 86400000, @@ -964,6 +966,20 @@ "$ref": "#/$defs/CopyFormat", "default": "binary" }, + "resharding_copy_retry_max_attempts": { + "description": "Maximum number of retries for a failed table copy during resharding (per-table).\nRetries use exponential backoff starting at `resharding_copy_retry_min_delay`.\n_Default:_ `5`", + "type": "integer", + "format": "uint", + "default": 5, + "minimum": 0 + }, + "resharding_copy_retry_min_delay": { + "description": "Base delay in milliseconds between table copy retries.\nEach successive attempt doubles the delay, capped at 32×.\n_Default:_ `1000`", + "type": "integer", + "format": "uint64", + "default": 1000, + "minimum": 0 + }, "resharding_parallel_copies": { "description": "How many parallel copies to launch, irrespective of the number of available replicas.", "type": "integer", From d4d3d6cd8da7954c77297c0d1883ef86a539d297 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Wed, 22 Apr 2026 14:13:36 +0000 Subject: [PATCH 07/10] ci: fix tests --- integration/copy_data/docker-compose.yml | 6 +++--- pgdog/src/backend/pool/shard/monitor.rs | 3 +++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/integration/copy_data/docker-compose.yml b/integration/copy_data/docker-compose.yml index d7cdaf10a..6975bc019 100644 --- a/integration/copy_data/docker-compose.yml +++ b/integration/copy_data/docker-compose.yml @@ -1,6 +1,6 @@ services: source: - image: postgres:18 + image: postgres:16 command: postgres -c wal_level=logical environment: POSTGRES_USER: pgdog @@ -14,7 +14,7 @@ services: - postgres shard_0: - image: postgres:18 + image: postgres:16 command: postgres -c wal_level=logical environment: POSTGRES_USER: pgdog @@ -26,7 +26,7 @@ services: - postgres shard_1: - image: postgres:18 + image: postgres:16 command: postgres -c wal_level=logical environment: POSTGRES_USER: pgdog diff --git a/pgdog/src/backend/pool/shard/monitor.rs b/pgdog/src/backend/pool/shard/monitor.rs index cc9dddce8..a21f19b3e 100644 --- a/pgdog/src/backend/pool/shard/monitor.rs +++ b/pgdog/src/backend/pool/shard/monitor.rs @@ -42,6 +42,9 @@ impl ShardMonitor { impl ShardMonitor { async fn spawn(&self) { + if self.shard.comms().lsn_check_interval == Duration::MAX { + return; + } let maintenance = (self.shard.comms().lsn_check_interval / 2) .clamp(Duration::from_millis(333), Duration::MAX); let mut maintenance = interval(maintenance); From 18b218ce27ecbc7a7f97bc9019f169b73f061a7a Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Wed, 22 Apr 2026 18:18:44 +0000 Subject: [PATCH 08/10] Revert "drop parallel_connections wrapper" This reverts commit 556027685c0a98717c1a20f81b98f128ed61d55b. --- docs/issues/894-897-copy-data-retry.md | 38 --- .../src/backend/replication/logical/error.rs | 4 + .../logical/publisher/parallel_sync.rs | 9 +- .../replication/logical/subscriber/copy.rs | 69 ++--- .../replication/logical/subscriber/mod.rs | 2 + .../logical/subscriber/parallel_connection.rs | 256 ++++++++++++++++++ 6 files changed, 295 insertions(+), 83 deletions(-) create mode 100644 pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs diff --git a/docs/issues/894-897-copy-data-retry.md b/docs/issues/894-897-copy-data-retry.md index 958e35909..e603b9708 100644 --- a/docs/issues/894-897-copy-data-retry.md +++ b/docs/issues/894-897-copy-data-retry.md @@ -55,44 +55,6 @@ and `ReplicationTimeout` return `true`; everything else defaults to `false`. Non errors (`CopyAborted`, `DataSyncAborted`, `NoPrimaryKey`, `NoReplicaIdentity`) bypass the retry loop immediately but still trigger the destination row check. -### `ParallelConnection` issue - -`CopySubscriber` originally held a `Vec`, each wrapping a `Server` in a -background Tokio task. The intent was parallel shard writes; the reality was a sequential -loop: - -```rust -for server in &mut self.connections { - server.send_one(&stmt.clone().into()).await?; // channel push — fast - server.flush().await?; // channel push — fast - let msg = server.read().await?; // BLOCKS until shard replies -} -``` - -`send_one` and `flush` pushed to the mpsc channel and returned immediately, but `read()` -blocked on the reply channel until the background task completed a full socket round-trip. -Shard 1 never started until shard 0 finished. No parallelism at all. - -The `Listener` task's `select!` also polled `server.read()` continuously, including during -COPY IN when Postgres never sends unsolicited messages. When the socket died, the real error -surfaced inside the `Listener`, which exited — but the `JoinHandle` was fire-and-forget, so -the error was dropped. The main task found the channel closed and returned -`Error::ParallelConnection` with `is_retryable() = false`. The retry loop never fired. - -#### Solution - -`ParallelConnection` was removed. `CopySubscriber` (`subscriber/copy.rs`) now holds -`Vec` directly. `start_copy`, the per-buffer `flush`, and `copy_done` all use -`futures::future::try_join_all` to drive every shard concurrently — this is the first version -that is actually parallel. Real errors propagate directly up the call stack and -`is_retryable()` sees the true failure. - -| Phase | Before | After | -|---|---|---| -| `start_copy` | N × RTT | 1 × RTT | -| `flush` (per buffer) | N × RTT | 1 × RTT | -| `copy_done` | N × RTT | 1 × RTT | - ### Tests `integration/copy_data/retry_test/run.sh` kills shard_1 mid-sync, brings it back after ~2 s, diff --git a/pgdog/src/backend/replication/logical/error.rs b/pgdog/src/backend/replication/logical/error.rs index 641d654a2..8d39ff70c 100644 --- a/pgdog/src/backend/replication/logical/error.rs +++ b/pgdog/src/backend/replication/logical/error.rs @@ -81,6 +81,9 @@ pub enum Error { #[error("shard {0} has no replication slot")] NoReplicationSlot(usize), + #[error("parallel connection error")] + ParallelConnection, + #[error("no replicas available for table sync")] NoReplicas, @@ -220,5 +223,6 @@ mod tests { assert!(!Error::DataSyncAborted.is_retryable()); assert!(!Error::NoPrimaryKey(PublicationTable::default()).is_retryable()); assert!(!Error::NoReplicaIdentity("s".into(), "t".into()).is_retryable()); + assert!(!Error::ParallelConnection.is_retryable()); } } diff --git a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs index f3d017a3a..3b2047f62 100644 --- a/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs +++ b/pgdog/src/backend/replication/logical/publisher/parallel_sync.rs @@ -46,7 +46,7 @@ impl ParallelSync { let _permit = Arc::clone(&self.permit) .acquire_owned() .await - .map_err(|_| Error::DataSyncAborted)?; + .map_err(|_| Error::ParallelConnection)?; if self.tx.is_closed() { return Err(Error::DataSyncAborted); @@ -72,10 +72,9 @@ impl ParallelSync { .await { Ok(_) => { - // Best-effort report: if the coordinator receiver is already gone, - // the whole sync is shutting down and our result is irrelevant. - // The copy succeeded; don't mask that with a channel error. - let _ = self.tx.send(Ok(self.table.clone())); + self.tx + .send(Ok(self.table.clone())) + .map_err(|_| Error::ParallelConnection)?; return Ok(()); } Err(err) if !err.is_retryable() || attempt >= max_retries => { diff --git a/pgdog/src/backend/replication/logical/subscriber/copy.rs b/pgdog/src/backend/replication/logical/subscriber/copy.rs index 869a63fd3..262db9d58 100644 --- a/pgdog/src/backend/replication/logical/subscriber/copy.rs +++ b/pgdog/src/backend/replication/logical/subscriber/copy.rs @@ -1,13 +1,12 @@ //! Shard COPY stream from one source //! between N shards. -use futures::future::try_join_all; use pg_query::{parse_raw, NodeEnum}; use pgdog_config::QueryParserEngine; use tracing::debug; use crate::{ - backend::{Cluster, ConnectReason, Server}, + backend::{replication::subscriber::ParallelConnection, Cluster, ConnectReason}, config::Role, frontend::router::parser::{CopyParser, Shard}, net::{CopyData, CopyDone, ErrorResponse, FromBytes, Protocol, Query, ToBytes}, @@ -25,7 +24,7 @@ pub struct CopySubscriber { copy: CopyParser, cluster: Cluster, buffer: Vec, - connections: Vec, + connections: Vec, stmt: CopyStatement, bytes_sharded: usize, } @@ -86,7 +85,7 @@ impl CopySubscriber { .1 .standalone(ConnectReason::Replication) .await?; - servers.push(primary); + servers.push(ParallelConnection::new(primary)?); } self.connections = servers; @@ -96,38 +95,38 @@ impl CopySubscriber { /// Disconnect from all shards. pub async fn disconnect(&mut self) -> Result<(), Error> { - self.connections.clear(); + for conn in std::mem::take(&mut self.connections) { + conn.reattach().await?; + } Ok(()) } /// Start COPY on all shards. pub async fn start_copy(&mut self) -> Result<(), Error> { + let stmt = Query::new(self.stmt.copy_in()); + if self.connections.is_empty() { self.connect().await?; } - let stmt = Query::new(self.stmt.copy_in()); - - // Start COPY IN on all shards concurrently. - try_join_all(self.connections.iter_mut().map(|server| { - let msg: crate::net::ProtocolMessage = stmt.clone().into(); + for server in &mut self.connections { debug!("{} [{}]", stmt.query(), server.addr()); - async move { - server.send_one(&msg).await?; - server.flush().await?; - let reply = server.read().await?; - match reply.code() { - 'G' => Ok(()), - 'E' => Err(Error::PgError(Box::new(ErrorResponse::from_bytes( - reply.to_bytes()?, - )?))), - c => Err(Error::OutOfSync(c)), + server.send_one(&stmt.clone().into()).await?; + server.flush().await?; + + let msg = server.read().await?; + match msg.code() { + 'G' => (), + 'E' => { + return Err(Error::PgError(Box::new(ErrorResponse::from_bytes( + msg.to_bytes()?, + )?))) } + c => return Err(Error::OutOfSync(c)), } - })) - .await?; + } Ok(()) } @@ -136,20 +135,20 @@ impl CopySubscriber { pub async fn copy_done(&mut self) -> Result<(), Error> { self.flush().await?; - // Finalise COPY on all shards concurrently. - try_join_all(self.connections.iter_mut().map(|server| async move { + for server in &mut self.connections { server.send_one(&CopyDone.into()).await?; server.flush().await?; - let cc = server.read().await?; - match cc.code() { + let command_complete = server.read().await?; + match command_complete.code() { 'E' => { - let error = ErrorResponse::from_bytes(cc.to_bytes()?)?; + let error = ErrorResponse::from_bytes(command_complete.to_bytes()?)?; if error.code == "08P01" && error.message == "insufficient data left in message" { return Err(Error::BinaryFormatMismatch(Box::new(error))); + } else { + return Err(Error::PgError(Box::new(error))); } - return Err(Error::PgError(Box::new(error))); } 'C' => (), c => return Err(Error::OutOfSync(c)), @@ -159,9 +158,7 @@ impl CopySubscriber { if rfq.code() != 'Z' { return Err(Error::OutOfSync(rfq.code())); } - Ok(()) - })) - .await?; + } Ok(()) } @@ -177,19 +174,12 @@ impl CopySubscriber { } async fn flush(&mut self) -> Result<(usize, usize), Error> { - if self.buffer.is_empty() { - return Ok((0, 0)); - } - let result = self.copy.shard(&self.buffer)?; self.buffer.clear(); let rows = result.len(); let bytes = result.iter().map(|row| row.len()).sum::(); - self.bytes_sharded += bytes; - // Route each row to the right shard(s). send_one is a buffered write - // so this loop does no I/O — no concurrency needed here. for row in &result { for (shard, server) in self.connections.iter_mut().enumerate() { match row.shard() { @@ -208,8 +198,7 @@ impl CopySubscriber { } } - // Flush all shards concurrently — this is the actual socket write. - try_join_all(self.connections.iter_mut().map(|s| s.flush())).await?; + self.bytes_sharded += result.iter().map(|c| c.len()).sum::(); Ok((rows, bytes)) } diff --git a/pgdog/src/backend/replication/logical/subscriber/mod.rs b/pgdog/src/backend/replication/logical/subscriber/mod.rs index 7b5e7e0b2..9df30191b 100644 --- a/pgdog/src/backend/replication/logical/subscriber/mod.rs +++ b/pgdog/src/backend/replication/logical/subscriber/mod.rs @@ -1,5 +1,6 @@ pub mod context; pub mod copy; +pub mod parallel_connection; pub mod stream; #[cfg(test)] @@ -7,4 +8,5 @@ mod tests; pub use context::StreamContext; pub use copy::CopySubscriber; +pub use parallel_connection::ParallelConnection; pub use stream::StreamSubscriber; diff --git a/pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs b/pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs new file mode 100644 index 000000000..4c4b15713 --- /dev/null +++ b/pgdog/src/backend/replication/logical/subscriber/parallel_connection.rs @@ -0,0 +1,256 @@ +//! Postgres server connection running in its own task. +//! +//! This allows to queue up messages across multiple instances +//! of this connection without blocking and while maintaining protocol integrity. +//! +use tokio::select; +use tokio::spawn; +use tokio::sync::{ + mpsc::{channel, Receiver, Sender}, + Notify, +}; + +use crate::backend::pool::Address; +use crate::{ + backend::Server, + frontend::ClientRequest, + net::{Message, ProtocolMessage}, +}; + +use std::sync::Arc; + +use super::super::Error; + +// What we can send. +enum ParallelMessage { + // Protocol message, e.g. Bind, Execute, Sync. + ProtocolMessage(ProtocolMessage), + // Flush the socket. + Flush, +} + +// What we can receive. +enum ParallelReply { + // Message, e.g. RowDescription, DataRow, CommandComplete, etc. + Message(Message), + // The task gives back the server connection to the owner. + // Preserve connections between parallel executions. + Server(Box), +} + +// Parallel Postgres server connection. +#[derive(Debug)] +pub struct ParallelConnection { + tx: Sender, + rx: Receiver, + stop: Arc, + address: Address, +} + +impl ParallelConnection { + // Queue up message to server. + pub async fn send_one(&mut self, message: &ProtocolMessage) -> Result<(), Error> { + self.tx + .send(ParallelMessage::ProtocolMessage(message.clone())) + .await + .map_err(|_| Error::ParallelConnection)?; + + Ok(()) + } + + // Queue up the contents of the buffer. + pub async fn send(&mut self, client_request: &ClientRequest) -> Result<(), Error> { + for message in client_request.messages.iter() { + self.tx + .send(ParallelMessage::ProtocolMessage(message.clone())) + .await + .map_err(|_| Error::ParallelConnection)?; + self.tx + .send(ParallelMessage::Flush) + .await + .map_err(|_| Error::ParallelConnection)?; + } + + Ok(()) + } + + // Wait for a message from the server. + pub async fn read(&mut self) -> Result { + let reply = self.rx.recv().await.ok_or(Error::ParallelConnection)?; + match reply { + ParallelReply::Message(message) => Ok(message), + ParallelReply::Server(_) => Err(Error::ParallelConnection), + } + } + + // Request server connection performs socket flush. + pub async fn flush(&mut self) -> Result<(), Error> { + self.tx + .send(ParallelMessage::Flush) + .await + .map_err(|_| Error::ParallelConnection)?; + + Ok(()) + } + + /// Server address. + pub fn addr(&self) -> &Address { + &self.address + } + + // Move server connection into its own Tokio task. + pub fn new(server: Server) -> Result { + // Ideally we don't hardcode these. PgDog + // can use a lot of memory if this is high. + let (tx1, rx1) = channel(4096); + let (tx2, rx2) = channel(4096); + let stop = Arc::new(Notify::new()); + let address = server.addr().clone(); + + let listener = Listener { + stop: stop.clone(), + rx: rx1, + tx: tx2, + server: Some(Box::new(server)), + }; + + spawn(async move { + listener.run().await?; + + Ok::<(), Error>(()) + }); + + Ok(Self { + address, + tx: tx1, + rx: rx2, + stop, + }) + } + + // Get the connection back from the async task. This will + // only work if the connection is idle (ReadyForQuery received, no more traffic expected). + pub async fn reattach(mut self) -> Result { + self.stop.notify_one(); + let server = self.rx.recv().await.ok_or(Error::ParallelConnection)?; + match server { + ParallelReply::Server(server) => Ok(*server), + _ => Err(Error::ParallelConnection), + } + } +} + +// Stop the background task and kill the connection. +// Prevents leaks in case the connection is not "reattached". +impl Drop for ParallelConnection { + fn drop(&mut self) { + self.stop.notify_one(); + } +} + +// Background task performing the actual work of talking to Postgres. +struct Listener { + rx: Receiver, + tx: Sender, + server: Option>, + stop: Arc, +} + +impl Listener { + // Send message to Postgres. + async fn send(&mut self, message: ProtocolMessage) -> Result<(), Error> { + if let Some(ref mut server) = self.server { + server.send_one(&message).await?; + } + + Ok(()) + } + + // Flush socket. + async fn flush(&mut self) -> Result<(), Error> { + if let Some(ref mut server) = self.server { + server.flush().await?; + } + + Ok(()) + } + + // Return server to parent task. + async fn return_server(&mut self) -> Result<(), Error> { + if let Some(server) = self.server.take() { + if self.tx.is_closed() { + drop(server); + } else { + let _ = self.tx.send(ParallelReply::Server(server)).await; + } + } + + Ok(()) + } + + // Run the background task. + async fn run(mut self) -> Result<(), Error> { + loop { + select! { + message = self.rx.recv() => { + if let Some(message) = message { + match message { + ParallelMessage::ProtocolMessage(message) => self.send(message).await?, + ParallelMessage::Flush => self.flush().await?, + } + } else { + self.return_server().await?; + break; + } + } + + reply = self.server.as_mut().unwrap().read() => { + let reply = reply?; + self.tx.send(ParallelReply::Message(reply)).await.map_err(|_| Error::ParallelConnection)?; + } + + _ = self.stop.notified() => { + self.return_server().await?; + break; + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use crate::{ + backend::server::test::test_server, + net::{Parse, Protocol, Sync}, + }; + + use super::*; + + #[tokio::test] + async fn test_parallel_connection() { + let server = test_server().await; + let mut parallel = ParallelConnection::new(server).unwrap(); + + parallel + .send( + &vec![ + Parse::named("test", "SELECT $1::bigint").into(), + Sync.into(), + ] + .into(), + ) + .await + .unwrap(); + + for c in ['1', 'Z'] { + let msg = parallel.read().await.unwrap(); + assert_eq!(msg.code(), c); + } + + let server = parallel.reattach().await.unwrap(); + assert!(server.in_sync()); + } +} From ef30ad5ea38989f2ff0a3ba5167b4c266c615103 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Wed, 22 Apr 2026 18:25:34 +0000 Subject: [PATCH 09/10] fix ParallelConnection error --- pgdog/src/backend/replication/logical/error.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pgdog/src/backend/replication/logical/error.rs b/pgdog/src/backend/replication/logical/error.rs index 8d39ff70c..b37f9ff7a 100644 --- a/pgdog/src/backend/replication/logical/error.rs +++ b/pgdog/src/backend/replication/logical/error.rs @@ -159,6 +159,11 @@ impl Error { Self::NotConnected | Self::NoPrimary => true, // Replication stalled; temporary slot is gone, next attempt starts fresh. Self::ReplicationTimeout => true, + // TODO: escape-hatch when using ParallelConnection wrapper + // the underlying error could be anything and to handler it properly + // either the ParallelConnection wrapper should be removed or + // the proper error should be propagated + Self::ParallelConnection => true, _ => false, } } From f1f82e67c193edd083d25063b8a38bf248e41741 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Wed, 22 Apr 2026 18:36:35 +0000 Subject: [PATCH 10/10] fix tests --- pgdog/src/backend/pool/shard/monitor.rs | 3 --- pgdog/src/backend/replication/logical/error.rs | 1 - 2 files changed, 4 deletions(-) diff --git a/pgdog/src/backend/pool/shard/monitor.rs b/pgdog/src/backend/pool/shard/monitor.rs index a21f19b3e..cc9dddce8 100644 --- a/pgdog/src/backend/pool/shard/monitor.rs +++ b/pgdog/src/backend/pool/shard/monitor.rs @@ -42,9 +42,6 @@ impl ShardMonitor { impl ShardMonitor { async fn spawn(&self) { - if self.shard.comms().lsn_check_interval == Duration::MAX { - return; - } let maintenance = (self.shard.comms().lsn_check_interval / 2) .clamp(Duration::from_millis(333), Duration::MAX); let mut maintenance = interval(maintenance); diff --git a/pgdog/src/backend/replication/logical/error.rs b/pgdog/src/backend/replication/logical/error.rs index b37f9ff7a..362c73d7f 100644 --- a/pgdog/src/backend/replication/logical/error.rs +++ b/pgdog/src/backend/replication/logical/error.rs @@ -228,6 +228,5 @@ mod tests { assert!(!Error::DataSyncAborted.is_retryable()); assert!(!Error::NoPrimaryKey(PublicationTable::default()).is_retryable()); assert!(!Error::NoReplicaIdentity("s".into(), "t".into()).is_retryable()); - assert!(!Error::ParallelConnection.is_retryable()); } }