Skip to content

Commit

Permalink
replicators: Configure status update intrvl
Browse files Browse the repository at this point in the history
Adds a configuration option that lets users configure the interval on
which we send updates to the upstream Postgres instance. This is notable
because we include in the status update the min LSN up to which we've
persisted data to disk across every base table. For higher write
throughputs, users may want to configure a lower status update interval to
ensure the size of the replication slot remains small.

Refs: REA-3434
Release-Note-Core: Added a config option to allow users to configure the
  interval upon which we report our current position to Postgres
Change-Id: I285cb1c658324178edadfc10b5b79b6c6dfb826b
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6087
Tested-by: Buildkite CI
Reviewed-by: Luke Osborne <luke@readyset.io>
  • Loading branch information
ethan-readyset committed Oct 23, 2023
1 parent 9cdd4d6 commit a3f88c5
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
15 changes: 15 additions & 0 deletions database-utils/src/lib.rs
Expand Up @@ -123,6 +123,16 @@ pub struct UpstreamConfig {
#[clap(long, env = "IGNORE_ULIMIT_CHECK")]
#[serde(default)]
pub ignore_ulimit_check: bool,

/// Sets the time (in seconds) between status updates sent to the upstream database
#[clap(
long,
default_value = "10",
hide = true,
env = "STATUS_UPDATE_INTERVAL_SECS"
)]
#[serde(default = "default_status_update_interval_secs")]
pub status_update_interval_secs: u16,
}

impl UpstreamConfig {
Expand Down Expand Up @@ -160,6 +170,10 @@ fn default_snapshot_report_interval_secs() -> u16 {
UpstreamConfig::default().snapshot_report_interval_secs
}

fn default_status_update_interval_secs() -> u16 {
UpstreamConfig::default().status_update_interval_secs
}

fn duration_from_seconds(i: &str) -> Result<Duration, ParseIntError> {
i.parse::<u64>().map(Duration::from_secs)
}
Expand All @@ -178,6 +192,7 @@ impl Default for UpstreamConfig {
ssl_root_cert: None,
replication_pool_size: 50,
ignore_ulimit_check: false,
status_update_interval_secs: 10,
}
}
}
Expand Down
@@ -0,0 +1 @@
{"sharding":null,"materialization_config":{"packet_filters_enabled":false,"allow_full_materialization":false,"frontier_strategy":"None","partial_enabled":true},"domain_config":{"aggressively_update_state_sizes":false,"view_request_timeout":{"secs":5,"nanos":0},"table_request_timeout":{"secs":1800,"nanos":0},"eviction_kind":"Random"},"persistence":{"mode":"MemoryOnly","db_filename_prefix":"readyset","persistence_threads":1,"db_dir":null},"min_workers":1,"reuse":null,"abort_on_task_failure":true,"mir_config":{"allow_topk":false,"allow_paginate":false,"allow_mixed_comparisons":false},"upstream_db_url":null,"disable_upstream_ssl_verification":false,"ssl_root_cert":null,"disable_setup_ddl_replication":false,"replication_server_id":null,"replicator_restart_timeout":{"secs":1,"nanos":0},"replication_tables":null,"replication_tables_ignore":null,"snapshot_report_interval_secs":30,"replication_pool_size":50,"ignore_ulimit_check":false,"replicator_statement_logging":false,"replication_strategy":"Never","upquery_timeout":{"secs":5,"nanos":0},"worker_request_timeout":{"secs":1800,"nanos":0},"background_recovery_interval":{"secs":20,"nanos":0}}
15 changes: 7 additions & 8 deletions replicators/src/postgres_connector/connector.rs
Expand Up @@ -61,6 +61,8 @@ pub struct PostgresWalConnector {
in_transaction: bool,
/// A handle to the controller
controller: ReadySetHandle,
/// The interval on which we should send status updates to the upstream Postgres instance
status_update_interval: Duration,
}

/// The decoded response to `IDENTIFY_SYSTEM`
Expand Down Expand Up @@ -106,11 +108,6 @@ impl PostgresWalConnector {
/// we receive many consecutive events that share the same LSN.
const MAX_QUEUED_INDEPENDENT_ACTIONS: usize = 100;

/// The interval at which we will send status updates to the upstream database. This matches the
/// default value for Postgres's `wal_receiver_status_interval`, which is the interval between
/// status updates given by Postgres's own WAL receiver during replication.
const STATUS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);

/// Connects to postgres and if needed creates a new replication slot for itself with an
/// exported snapshot.
#[allow(clippy::too_many_arguments)]
Expand All @@ -135,6 +132,7 @@ impl PostgresWalConnector {
.await
.map_err(|e| ReadySetError::ReplicationFailed(format!("Failed to connect: {e}")))?;
let connection_handle = tokio::spawn(connection);
let status_update_interval = Duration::from_secs(config.status_update_interval_secs as u64);

let mut connector = PostgresWalConnector {
client,
Expand All @@ -145,17 +143,18 @@ impl PostgresWalConnector {
replication_slot: None,
enable_statement_logging,
// We initialize `time_last_position_reported` to be more than
// `Self::STATUS_UPDATE_INTERVAL` seconds ago to ensure that we report our position in
// `Self.status_update_interval` seconds ago to ensure that we report our position in
// the logs the next time we have an opportunity to
time_last_position_reported: Instant::now()
- Self::STATUS_UPDATE_INTERVAL
- status_update_interval
- Duration::from_secs(1),
// We'll never start replicating in the middle of a transaction, since Postgres's
// logical replication protocol only streams whole transactions to us at a time. This
// means we know that when we first initiate a replication connection to the upstream
// database, we'll always start replicating outside of a transaction
in_transaction: false,
controller,
status_update_interval,
};

if full_resnapshot || next_position.is_none() {
Expand Down Expand Up @@ -578,7 +577,7 @@ impl Connector for PostgresWalConnector {
// If it has been longer than the defined status update interval, send a status update to
// the upstream database to report our position in the WAL as the LSN of the last event we
// successfully applied to ReadySet
if self.time_last_position_reported.elapsed() > Self::STATUS_UPDATE_INTERVAL {
if self.time_last_position_reported.elapsed() > self.status_update_interval {
let lsn: Lsn = last_pos.try_into()?;

self.send_standby_status_update(lsn).await?;
Expand Down

0 comments on commit a3f88c5

Please sign in to comment.