diff --git a/database-utils/src/lib.rs b/database-utils/src/lib.rs index 0c1adb68e0..e3cff386dd 100644 --- a/database-utils/src/lib.rs +++ b/database-utils/src/lib.rs @@ -123,6 +123,16 @@ pub struct UpstreamConfig { #[clap(long, env = "IGNORE_ULIMIT_CHECK")] #[serde(default)] pub ignore_ulimit_check: bool, + + /// Sets the time (in seconds) between status updates sent to the upstream database + #[clap( + long, + default_value = "10", + hide = true, + env = "STATUS_UPDATE_INTERVAL_SECS" + )] + #[serde(default = "default_status_update_interval_secs")] + pub status_update_interval_secs: u16, } impl UpstreamConfig { @@ -160,6 +170,10 @@ fn default_snapshot_report_interval_secs() -> u16 { UpstreamConfig::default().snapshot_report_interval_secs } +fn default_status_update_interval_secs() -> u16 { + UpstreamConfig::default().status_update_interval_secs +} + fn duration_from_seconds(i: &str) -> Result { i.parse::().map(Duration::from_secs) } @@ -178,6 +192,7 @@ impl Default for UpstreamConfig { ssl_root_cert: None, replication_pool_size: 50, ignore_ulimit_check: false, + status_update_interval_secs: 10, } } } diff --git a/readyset-server/tests/config_versions/1de70c2ead1edaedaf9808ac4bc854e18833fd4b.json b/readyset-server/tests/config_versions/1de70c2ead1edaedaf9808ac4bc854e18833fd4b.json new file mode 100644 index 0000000000..296b6a0ee2 --- /dev/null +++ b/readyset-server/tests/config_versions/1de70c2ead1edaedaf9808ac4bc854e18833fd4b.json @@ -0,0 +1 @@ +{"sharding":null,"materialization_config":{"packet_filters_enabled":false,"allow_full_materialization":false,"frontier_strategy":"None","partial_enabled":true},"domain_config":{"aggressively_update_state_sizes":false,"view_request_timeout":{"secs":5,"nanos":0},"table_request_timeout":{"secs":1800,"nanos":0},"eviction_kind":"Random"},"persistence":{"mode":"MemoryOnly","db_filename_prefix":"readyset","persistence_threads":1,"db_dir":null},"min_workers":1,"reuse":null,"abort_on_task_failure":true,"mir_config":{"allow_topk":false,"allow_paginate":false,"allow_mixed_comparisons":false},"upstream_db_url":null,"disable_upstream_ssl_verification":false,"ssl_root_cert":null,"disable_setup_ddl_replication":false,"replication_server_id":null,"replicator_restart_timeout":{"secs":1,"nanos":0},"replication_tables":null,"replication_tables_ignore":null,"snapshot_report_interval_secs":30,"replication_pool_size":50,"ignore_ulimit_check":false,"replicator_statement_logging":false,"replication_strategy":"Never","upquery_timeout":{"secs":5,"nanos":0},"worker_request_timeout":{"secs":1800,"nanos":0},"background_recovery_interval":{"secs":20,"nanos":0}} \ No newline at end of file diff --git a/replicators/src/postgres_connector/connector.rs b/replicators/src/postgres_connector/connector.rs index 4a979e2798..3b5cc37014 100644 --- a/replicators/src/postgres_connector/connector.rs +++ b/replicators/src/postgres_connector/connector.rs @@ -61,6 +61,8 @@ pub struct PostgresWalConnector { in_transaction: bool, /// A handle to the controller controller: ReadySetHandle, + /// The interval on which we should send status updates to the upstream Postgres instance + status_update_interval: Duration, } /// The decoded response to `IDENTIFY_SYSTEM` @@ -106,11 +108,6 @@ impl PostgresWalConnector { /// we receive many consecutive events that share the same LSN. const MAX_QUEUED_INDEPENDENT_ACTIONS: usize = 100; - /// The interval at which we will send status updates to the upstream database. This matches the - /// default value for Postgres's `wal_receiver_status_interval`, which is the interval between - /// status updates given by Postgres's own WAL receiver during replication. - const STATUS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); - /// Connects to postgres and if needed creates a new replication slot for itself with an /// exported snapshot. #[allow(clippy::too_many_arguments)] @@ -135,6 +132,7 @@ impl PostgresWalConnector { .await .map_err(|e| ReadySetError::ReplicationFailed(format!("Failed to connect: {e}")))?; let connection_handle = tokio::spawn(connection); + let status_update_interval = Duration::from_secs(config.status_update_interval_secs as u64); let mut connector = PostgresWalConnector { client, @@ -145,10 +143,10 @@ impl PostgresWalConnector { replication_slot: None, enable_statement_logging, // We initialize `time_last_position_reported` to be more than - // `Self::STATUS_UPDATE_INTERVAL` seconds ago to ensure that we report our position in + // `Self.status_update_interval` seconds ago to ensure that we report our position in // the logs the next time we have an opportunity to time_last_position_reported: Instant::now() - - Self::STATUS_UPDATE_INTERVAL + - status_update_interval - Duration::from_secs(1), // We'll never start replicating in the middle of a transaction, since Postgres's // logical replication protocol only streams whole transactions to us at a time. This @@ -156,6 +154,7 @@ impl PostgresWalConnector { // database, we'll always start replicating outside of a transaction in_transaction: false, controller, + status_update_interval, }; if full_resnapshot || next_position.is_none() { @@ -578,7 +577,7 @@ impl Connector for PostgresWalConnector { // If it has been longer than the defined status update interval, send a status update to // the upstream database to report our position in the WAL as the LSN of the last event we // successfully applied to ReadySet - if self.time_last_position_reported.elapsed() > Self::STATUS_UPDATE_INTERVAL { + if self.time_last_position_reported.elapsed() > self.status_update_interval { let lsn: Lsn = last_pos.try_into()?; self.send_standby_status_update(lsn).await?;