diff --git a/docs/FIX_ISSUE_914.md b/docs/FIX_ISSUE_914.md new file mode 100644 index 000000000..f603bb43c --- /dev/null +++ b/docs/FIX_ISSUE_914.md @@ -0,0 +1,337 @@ +# Issue #914: `pool is shut down` during resharding COPY + +## The issue + +A large-scale reshard (276 tables, ~180 GB) completed its parallel COPY phase +successfully then emitted: + +``` +2026-04-18T02:31:14.592Z ERROR [task: 1] pool: pool is shut down +``` + +The replication stream never started. The named replication slot +`__pgdog_repl_200ydsmxvvvbtco6kfu_0` remained inactive with a **2855 GB WAL +lag** on the source. + +A second problem was also present: two sharded tables had no primary key. +`REPLICA IDENTITY FULL` had been applied as a workaround on the assumption that +pgdog would accept it. No validation error was raised before the copy started; +the failure would only have appeared after the full multi-hour copy completed. + +--- + +## Footprints + +### Log sequence + +``` +02:31:14.553Z INFO closing server connection [postgres@:5440/db, reason: other] +02:31:14.553Z INFO closing server connection [postgres@:5441/db, reason: other] +02:31:14.553Z INFO closing server connection [postgres@:5442/db, reason: other] +02:31:14.558Z INFO closing server connection [postgres@:5434/db, reason: closed by server] +02:31:14.564Z INFO table sync for 276 tables complete [db_name, shard: 0] +02:31:14.592Z INFO closing server connection [postgres@:5434/db, state: idle, reason: other] +02:31:14.592Z ERROR [task: 1] pool: pool is shut down +``` + +The lines at `.553Z` are the `CopySubscriber` shard connections closing normally +after the last table sync — expected. + +The line at `.558Z` is the per-table replication slot connection closing after +draining the temporary WAL replay — expected. + +The line at `.564Z` confirms all 276 tables finished successfully. + +The line at `.592Z` with `reason: other` on a **source** connection is the +diagnostic. `DisconnectReason::Other` is the `#[default]` value, set when +`inner.rs::maybe_check_in()` finds `!self.online` and returns early without +recording a proper reason: + +```rust +if !self.online && !moving || self.paused { + result.replenish = false; + return Ok(result); // no disconnect_reason set → default = Other +} +``` + +This means a `Guard` (a checked-out connection) was being returned to a pool +that was **already offline**. The pool went offline while the Guard was in use, +not before `pool.get()` was called. + +The error at `.592Z` is the immediately following `pool.get()` call on that +same offline pool. + +### Task ID + +`AsyncTasks` assigns monotonically increasing IDs starting at 0: + +| ID | Task | +|---|---| +| 0 | `COPY_DATA` admin command | +| 1 | Replication waiter, registered inside COPY_DATA after `orchestrator.replicate()` returns | + +`[task: 1]` means `orchestrator.replicate()` was called and the replication +`Waiter` was registered — then the waiter surfaced an error from inside the +spawned replication stream. + +### How `pool is shut down` is produced + +`pool::Error::Offline` (`error.rs`: `#[error("pool is shut down")] Offline`) +is emitted from five sites: + +| Site | File | Condition | +|---|---|---| +| `Pool::get_internal()` fast path | `pool_impl.rs:132` | `pool.get()` called; `!guard.online` at lock time | +| `Waiting::new()` slow path | `waiting.rs:32` | No idle conn; tries to queue; `!guard.online` before queuing | +| Monitor request loop | `monitor.rs:110` | Monitor wakes; pool already offline; drains queued waiters | +| Monitor maintenance loop | `monitor.rs:200` | Periodic tick; `!guard.online`; drains waiters, stops monitor | +| `Pool::shutdown()` | `pool_impl.rs:365` | Called directly; drains all current waiters immediately | + +The fast-path and slow-path sites are what a `pool.get()` caller observes as a +returned `Err`. The monitor sites drain waiters that were already queued when +the pool went offline between their enqueue and their wakeup. + +### How a pool goes offline + +Two mechanisms set `guard.online = false`: + +**Mechanism A — `Pool::move_conns_to()`** (used on config reload). Sets +`from_guard.online = false`, migrates idle connections to the new pool, then +calls `shutdown()`. Checked-out connections are not forcibly closed; they reach +the new pool when their `Guard` is dropped. + +**Mechanism B — `Pool::shutdown()`** (used on reconnect and as the final step +after migration): + +```rust +pub fn shutdown(&self) { + let mut guard = self.lock(); + guard.online = false; + guard.dump_idle(); // close idle connections now + guard.close_waiters(Error::Offline); // unblock queued waiters + self.comms().shutdown.notify_waiters(); + self.comms().ready.notify_waiters(); +} +``` + +### `replace_databases()` — the single dispatch point + +Every reload path calls `databases.rs::replace_databases(new, reload)`. It +always marks the old pool generation offline: + +``` +replace_databases(new, reload=true) ← config reload, DDL, cutover + old.move_conns_to(&new) → mechanism A: old pools offline, idle conns migrate + new.launch() + DATABASES.store(new) → atomic global swap + old.shutdown() → mechanism B: any remaining old-pool handles offline + +replace_databases(new, reload=false) ← RECONNECT, init + new.launch() + DATABASES.store(new) + old.shutdown() → mechanism B only; no connection migration +``` + +After `DATABASES.store(new)`, `databases().get_cluster()` returns the new +generation. Any `Arc` captured before the swap is still valid memory but +`guard.online == false`. Any subsequent `pool.get()` on it returns +`Error::Offline`. + +### All triggers of `replace_databases` + +``` +Client DDL routed through pgdog + query.rs: after transaction commit where cluster.reload_schema() == true + (CreateStmt, DropStmt for table/index/view/sequence, ViewStmt, + CreateTableAsStmt, AlterTableStmt set schema_changed = true on the Route) + → schema_changed() + → reload_from_existing() + → replace_databases(reload=true) + +Passthrough authentication + databases::add() — new user or password change + → reload_from_existing() + → replace_databases(reload=true) + +Resharding pre-data schema sync + orchestrator::schema_sync_pre() — after pre-data DDL applied to destination + → reload_from_existing() + → replace_databases(reload=true) + +Admin RELOAD / SIGHUP + admin/reload.rs, net/tls.rs + → reload() (re-reads pgdog.toml) + → replace_databases(reload=true) + +Admin RECONNECT + admin/reconnect.rs + → reconnect() + → replace_databases(reload=false) ← no connection migration + +Resharding cutover + orchestrator::replicate_and_cutover() → databases::cutover() + → replace_databases(reload=true) +``` + +--- + +## Exact bugs + +### Bug 1 — Publisher holds a stale pool reference across a long COPY + +`Publisher` is constructed during `schema_sync_pre()`. At that point +`schema_sync_pre()` calls `reload_from_existing()` (creating pool generation +**v2**) and the publisher stores a `Cluster` that wraps v2. This is correct at +construction time. + +`data_sync()` runs for hours using only standalone connections and raw addresses +extracted at task start — it never calls `pool.get()` and is unaffected by pool +reloads. + +During that window, any client DDL routed through pgdog calls +`reload_from_existing()` again, creating generation **v3** and marking v2 +offline. The publisher still holds v2. + +When `data_sync()` completes and `publisher.replicate()` is called: + +``` +publisher.replicate() + sync_tables(false, dest) + shard.primary(&Request::default()).await? ← pool.get() on v2 + v2.guard.online == false + → Error::Offline + → "pool is shut down" +``` + +The `reason: other` log entry at `.592Z` is the v2 `Guard` that was checked out +while v2 was still online, used during `Table::load()` (~28 ms), and then +returned to the now-offline v2 pool. The error immediately after is the next +`pool.get()` call hitting the same offline pool. + +### Bug 2 — `valid()` called too late + +`Table::valid()` checks that at least one column carries a replica identity: + +```rust +pub fn valid(&self) -> Result<(), Error> { + if !self.columns.iter().any(|c| c.identity) { + return Err(Error::NoPrimaryKey(self.table.clone())); + } + Ok(()) +} +``` + +Before this fix it was only called from `stream.rs` when the replication stream +processed the first row for a table — after the entire COPY phase had already +completed. + +The column-level `identity` flag is set by the `COLUMNS` query via +`pg_get_replica_identity_index()`. For `REPLICA IDENTITY FULL`, +`pg_get_replica_identity_index()` returns `NULL`, the `LEFT JOIN` finds no +matching index, and **all columns get `identity=false`**. This means `valid()` +correctly returns `NoPrimaryKey` for a `REPLICA IDENTITY FULL` table — it does +not silently pass. The user in issue #914 would have seen this error eventually, +but only after the multi-hour copy had already completed and the replication +stream attempted to process the first row. + +--- + +## Solution + +### Fix 1 — Refresh the pool reference before replication starts + +`Publisher::update_cluster(&Cluster)` replaces only `self.cluster`, leaving +`self.tables` (with per-table LSN watermarks) and `self.slots` untouched: + +```rust +/// Replace the source cluster reference without disturbing accumulated +/// table LSN state or replication slot state. +/// +/// Called after a long data_sync to pick up any pool reload that occurred +/// while the copy was running (e.g. triggered by a client DDL). +pub fn update_cluster(&mut self, cluster: &Cluster) { + self.cluster = cluster.clone(); +} +``` + +`Orchestrator::refresh_before_replicate()` re-fetches source and destination +from `databases()` (always the current live generation) and calls +`update_cluster()`: + +```rust +pub(crate) async fn refresh_before_replicate(&mut self) -> Result<(), Error> { + self.source = databases().schema_owner(&self.source.identifier().database)?; + self.destination = databases().schema_owner(&self.destination.identifier().database)?; + self.publisher.lock().await.update_cluster(&self.source); + Ok(()) +} +``` + +This is intentionally **not** `Orchestrator::refresh()`. `refresh()` recreates +the `Publisher` entirely, discarding the per-table LSN watermarks accumulated +during the copy. Those watermarks gate which WAL messages the replication stream +applies — discarding them causes the stream to re-apply all WAL from slot +creation, producing duplicates or conflicting updates. `update_cluster()` is +the minimal, safe swap. + +`refresh_before_replicate()` is called at two points in the pipeline: + +- `copy_data.rs` — between `orchestrator.data_sync().await?` and + `orchestrator.replicate().await?` (the `COPY_DATA` admin command path) +- `orchestrator.rs::replicate_and_cutover()` — between `schema_sync_post()` + and `replicate()` (the `RESHARD` command path) + +### Fix 2 — Validate tables before the copy starts + +`Publisher::data_sync()` now calls `table.valid()` on every table after +`sync_tables(true, dest)` resolves the table list but before any copy handle is +spawned: + +```rust +// Validate all tables support replication before committing to +// what can be a multi-hour copy. A table with no primary key or +// unique replica-identity index cannot be replicated correctly. +for tables in self.tables.values() { + for table in tables { + table.valid()?; + } +} +``` + +`Error::NoPrimaryKey(table)` is returned immediately with the offending table +name. No data has moved, no replication slot has been created, and no cleanup is +required. + +--- + +## Changed files + +| File | Change | +|---|---| +| `pgdog/src/backend/replication/logical/publisher/publisher_impl.rs` | Add `Publisher::update_cluster()`; add `valid()` pre-check in `data_sync()` | +| `pgdog/src/backend/replication/logical/orchestrator.rs` | Add `Orchestrator::refresh_before_replicate()`; call it in `replicate_and_cutover()` | +| `pgdog/src/admin/copy_data.rs` | Call `refresh_before_replicate()` between `data_sync()` and `replicate()` | + +--- + +## Guidance + +**Can a primary key be added on the destination shards after schema sync but +before the copy?** + +Yes. The safe window is between `schema_sync_pre()` completing and +`data_sync()` starting. Issue `ALTER TABLE … ADD PRIMARY KEY` on each +destination shard during that window. The source publication must then expose a +compatible replica identity — either a matching PK, or a unique index configured +with `ALTER TABLE … REPLICA IDENTITY USING INDEX`. + +**Is a unique index sufficient instead of a primary key?** + +Yes, if the table uses `ALTER TABLE t REPLICA IDENTITY USING INDEX unique_idx`. +That sets the indexed columns as the WAL replica identity. `valid()` passes, the +`ON CONFLICT` target is deterministic, and the upsert SQL is well-formed. + +`REPLICA IDENTITY FULL` is not sufficient. `pg_get_replica_identity_index()` +returns `NULL` for FULL mode, so no columns are marked with `identity=true` +and `valid()` fails with `NoPrimaryKey` — the same error as a table with no +primary key at all. Use `REPLICA IDENTITY USING INDEX` instead. diff --git a/pgdog/src/admin/copy_data.rs b/pgdog/src/admin/copy_data.rs index 069007166..6ba9a9ce3 100644 --- a/pgdog/src/admin/copy_data.rs +++ b/pgdog/src/admin/copy_data.rs @@ -61,6 +61,13 @@ impl Command for CopyData { orchestrator.load_schema().await?; orchestrator.schema_sync_pre(true).await?; orchestrator.data_sync().await?; + // Refresh cluster references before starting replication. + // data_sync can run for hours; a config reload triggered by any + // client DDL during that window shuts down the pools the publisher + // was constructed with. refresh_before_replicate() swaps in the + // current live pool without discarding the table LSN state. + orchestrator.refresh_before_replicate().await?; + AsyncTasks::insert(TaskType::Replication(Box::new( orchestrator.replicate().await?, ))); diff --git a/pgdog/src/backend/databases.rs b/pgdog/src/backend/databases.rs index 47a61c733..4db325d54 100644 --- a/pgdog/src/backend/databases.rs +++ b/pgdog/src/backend/databases.rs @@ -71,7 +71,7 @@ pub fn replace_databases(new_databases: Databases, reload: bool) -> Result<(), E pub fn reconnect() -> Result<(), Error> { let config = config(); let databases = from_config(&config); - + warn!("reconnect: dropping all server connections and re-creating pools"); replace_databases(databases, false)?; Ok(()) } @@ -80,10 +80,9 @@ pub fn reconnect() -> Result<(), Error> { /// preserving connections. pub fn reload_from_existing() -> Result<(), Error> { let _lock = lock(); - let config = config(); let databases = from_config(&config); - + info!("reloading pools from current config (connections preserved where possible)"); replace_databases(databases, true)?; Ok(()) } @@ -126,19 +125,18 @@ pub fn reload() -> Result<(), Error> { let old_config = config(); let new_config = load(&old_config.config_path, &old_config.users_path)?; let databases = from_config(&new_config); - + info!( + "reloading pools from config file: {}", + old_config.config_path.display() + ); replace_databases(databases, true)?; - tls::reload()?; - // Remove any unused prepared statements. PreparedStatements::global() .write() .close_unused(new_config.config.general.prepared_statements_limit); - // Resize query cache Cache::resize(new_config.config.general.query_cache_limit); - Ok(()) } diff --git a/pgdog/src/backend/pool/pool_impl.rs b/pgdog/src/backend/pool/pool_impl.rs index 10bca5928..3e00e4d17 100644 --- a/pgdog/src/backend/pool/pool_impl.rs +++ b/pgdog/src/backend/pool/pool_impl.rs @@ -9,7 +9,7 @@ use once_cell::sync::{Lazy, OnceCell}; use parking_lot::RwLock; use parking_lot::{lock_api::MutexGuard, Mutex, RawMutex}; use tokio::time::{timeout, Instant}; -use tracing::error; +use tracing::{error, warn}; use crate::backend::pool::LsnStats; use crate::backend::{ConnectReason, DisconnectReason, Server, ServerOptions}; @@ -358,8 +358,15 @@ impl Pool { /// Shutdown the pool. pub fn shutdown(&self) { + let addr = self.addr(); + warn!( + host = %addr.host, + port = addr.port, + database = %addr.database_name, + user = %addr.user, + "pool offline: connection pool shut down" + ); let mut guard = self.lock(); - guard.online = false; guard.dump_idle(); guard.close_waiters(Error::Offline); diff --git a/pgdog/src/backend/replication/logical/orchestrator.rs b/pgdog/src/backend/replication/logical/orchestrator.rs index 58bf2ab20..c811ae3e1 100644 --- a/pgdog/src/backend/replication/logical/orchestrator.rs +++ b/pgdog/src/backend/replication/logical/orchestrator.rs @@ -74,6 +74,19 @@ impl Orchestrator { self.publisher = Arc::new(Mutex::new(publisher)); } + /// Refresh cluster references and the publisher's source pool after a + /// long-running operation (e.g. data_sync). Must NOT recreate the + /// publisher — that would discard the table LSN state accumulated during + /// the copy. Instead, only the cluster reference inside the existing + /// publisher is updated so that subsequent pool.get() calls target the + /// live pool rather than a stale, potentially-offline one. + pub(crate) async fn refresh_before_replicate(&mut self) -> Result<(), Error> { + self.source = databases().schema_owner(&self.source.identifier().database)?; + self.destination = databases().schema_owner(&self.destination.identifier().database)?; + self.publisher.lock().await.update_cluster(&self.source); + Ok(()) + } + pub(crate) fn replication_slot(&self) -> &str { &self.replication_slot } @@ -183,6 +196,10 @@ impl Orchestrator { self.schema_sync_post(true).await?; // Start replication to catch up and cutover once done. + // Refresh cluster references: data_sync can take hours and the pools + // may have been reloaded (e.g. by a client DDL) in the meantime. + self.refresh_before_replicate().await?; + self.replicate().await?.cutover().await?; Ok(()) diff --git a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs index 5a0675b8a..6939ef00b 100644 --- a/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs +++ b/pgdog/src/backend/replication/logical/publisher/publisher_impl.rs @@ -289,6 +289,15 @@ impl Publisher { }) } + /// Replace the source cluster reference without disturbing accumulated + /// table LSN state or replication slot state. + /// + /// Called after a long data_sync to pick up any pool reload that occurred + /// while the copy was running (e.g. triggered by a client DDL). + pub fn update_cluster(&mut self, cluster: &Cluster) { + self.cluster = cluster.clone(); + } + /// Request the publisher to stop replication. pub fn request_stop(&self) { self.stop.notify_one(); @@ -314,6 +323,15 @@ impl Publisher { // Fetch schema. self.sync_tables(true, dest).await?; + // Validate all tables support replication before committing to + // what can be a multi-hour copy. A table with no primary key or + // unique replica-identity index cannot be replicated correctly. + for tables in self.tables.values() { + for table in tables { + table.valid()?; + } + } + let mut handles = vec![]; for (number, shard) in self.cluster.shards().iter().enumerate() { diff --git a/pgdog/src/backend/replication/logical/publisher/table.rs b/pgdog/src/backend/replication/logical/publisher/table.rs index ad3bc1b75..e9b886082 100644 --- a/pgdog/src/backend/replication/logical/publisher/table.rs +++ b/pgdog/src/backend/replication/logical/publisher/table.rs @@ -72,11 +72,14 @@ impl Table { } /// Check that the table supports replication. + /// + /// Requires at least one column with a replica identity flag. Tables with + /// REPLICA IDENTITY FULL or NOTHING have no identity columns and fail here + /// with NoPrimaryKey. pub fn valid(&self) -> Result<(), Error> { if !self.columns.iter().any(|c| c.identity) { return Err(Error::NoPrimaryKey(self.table.clone())); } - Ok(()) } @@ -289,8 +292,12 @@ impl Table { #[cfg(test)] mod test { - use crate::backend::replication::logical::publisher::test::setup_publication; + use crate::backend::{ + replication::logical::publisher::queries::{PublicationTableColumn, ReplicaIdentity}, + server::test::test_server, + Server, + }; use crate::config::config; use super::*; @@ -324,6 +331,18 @@ mod test { } } + #[test] + fn valid_with_pk() { + let t = make_table(vec![("id", true), ("name", false)]); + assert!(t.valid().is_ok()); + } + + #[test] + fn valid_without_pk() { + let t = make_table(vec![("id", false), ("name", false)]); + assert!(matches!(t.valid(), Err(Error::NoPrimaryKey(_)))); + } + #[test] fn test_sql_generation_simple() { let table = make_table(vec![("id", true), ("name", false), ("value", false)]); @@ -460,4 +479,109 @@ mod test { publication.cleanup().await; } + + // Load identity + columns for a named table that already exists in the current session. + // Uses pg_catalog directly — no publication required. + async fn load_table(server: &mut Server, name: &str) -> Table { + let pub_table = PublicationTable { + schema: "public".into(), + name: name.into(), + attributes: "".into(), + parent_schema: "".into(), + parent_name: "".into(), + }; + let identity = ReplicaIdentity::load(&pub_table, server).await.unwrap(); + let columns = PublicationTableColumn::load(&identity, server) + .await + .unwrap(); + Table { + publication: "".into(), + table: pub_table, + identity, + columns, + lsn: Lsn::default(), + query_parser_engine: QueryParserEngine::default(), + } + } + + #[tokio::test] + async fn test_valid_pk() { + crate::logger(); + let mut s = test_server().await; + s.execute("BEGIN").await.unwrap(); + s.execute("CREATE TABLE public.valid_test_pk (id BIGSERIAL PRIMARY KEY, name TEXT)") + .await + .unwrap(); + assert!(load_table(&mut s, "valid_test_pk").await.valid().is_ok()); + s.execute("ROLLBACK").await.unwrap(); + } + + #[tokio::test] + async fn test_valid_no_pk() { + crate::logger(); + let mut s = test_server().await; + s.execute("BEGIN").await.unwrap(); + s.execute("CREATE TABLE public.valid_test_nopk (id BIGINT, name TEXT)") + .await + .unwrap(); + assert!(matches!( + load_table(&mut s, "valid_test_nopk").await.valid(), + Err(Error::NoPrimaryKey(_)) + )); + s.execute("ROLLBACK").await.unwrap(); + } + + #[tokio::test] + async fn test_valid_replica_identity_full() { + crate::logger(); + let mut s = test_server().await; + s.execute("BEGIN").await.unwrap(); + s.execute("CREATE TABLE public.valid_test_full (id BIGINT, name TEXT)") + .await + .unwrap(); + s.execute("ALTER TABLE valid_test_full REPLICA IDENTITY FULL") + .await + .unwrap(); + let table = load_table(&mut s, "valid_test_full").await; + assert_eq!(table.identity.identity, "f"); + assert!(matches!(table.valid(), Err(Error::NoPrimaryKey(_)))); + s.execute("ROLLBACK").await.unwrap(); + } + + #[tokio::test] + async fn test_valid_replica_identity_nothing() { + crate::logger(); + let mut s = test_server().await; + s.execute("BEGIN").await.unwrap(); + s.execute("CREATE TABLE public.valid_test_nothing (id BIGSERIAL PRIMARY KEY, name TEXT)") + .await + .unwrap(); + s.execute("ALTER TABLE valid_test_nothing REPLICA IDENTITY NOTHING") + .await + .unwrap(); + let table = load_table(&mut s, "valid_test_nothing").await; + assert_eq!(table.identity.identity, "n"); + assert!(matches!(table.valid(), Err(Error::NoPrimaryKey(_)))); + s.execute("ROLLBACK").await.unwrap(); + } + + #[tokio::test] + async fn test_valid_replica_identity_using_index() { + crate::logger(); + let mut s = test_server().await; + s.execute("BEGIN").await.unwrap(); + s.execute("CREATE TABLE public.valid_test_idx (email TEXT NOT NULL, name TEXT)") + .await + .unwrap(); + s.execute("CREATE UNIQUE INDEX valid_test_idx_uidx ON valid_test_idx (email)") + .await + .unwrap(); + s.execute("ALTER TABLE valid_test_idx REPLICA IDENTITY USING INDEX valid_test_idx_uidx") + .await + .unwrap(); + let table = load_table(&mut s, "valid_test_idx").await; + assert_eq!(table.identity.identity, "i"); + assert!(table.valid().is_ok()); + s.execute("ROLLBACK").await.unwrap(); + } } diff --git a/pgdog/src/frontend/client/query_engine/hooks/schema.rs b/pgdog/src/frontend/client/query_engine/hooks/schema.rs index b00d79013..429098be6 100644 --- a/pgdog/src/frontend/client/query_engine/hooks/schema.rs +++ b/pgdog/src/frontend/client/query_engine/hooks/schema.rs @@ -1,5 +1,8 @@ +use tracing::info; + use crate::backend::{databases::reload_from_existing, Error}; pub(crate) fn schema_changed() -> Result<(), Error> { + info!("schema change detected: reloading pools to refresh schema cache"); reload_from_existing() }