Skip to content

Commit

Permalink
replicators: Allow alphanumeric replication server ids
Browse files Browse the repository at this point in the history
It may be useful to use alphabetic characters in a replication server id
to uniquely identify a postgres replication slot--this commit expands
replication_server_id to allow for that.

Currently, mysql still requires a numeric server id, so that branch
still enforces that.

Release-Note-Core: REPLICATION_SERVER_ID can now be configured to use
  alphanumeric characters for postgres.

Change-Id: I6a9137f91d25029333f0f827e43ec099db6bbbf0
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/6704
Reviewed-by: Tamas Juhasz <tamas@readyset.io>
Tested-by: Buildkite CI
  • Loading branch information
lukoktonos committed Jan 19, 2024
1 parent 4bed68e commit b3c0d46
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 13 deletions.
45 changes: 43 additions & 2 deletions database-utils/src/lib.rs
Expand Up @@ -67,9 +67,14 @@ pub struct UpstreamConfig {
///
/// This can be used to differentiate different ReadySet deployments connected to the same
/// upstream DB.
#[arg(long, env = "REPLICATION_SERVER_ID", hide = true)]
///
/// This ends up being a suffix of the replication slot or resnapshot replication slot, which
/// have prefixes of 'readyset_' and 'readyset_resnapshot_', respectively. Since a replication
/// slot is limited by postgres to a length of 63 bytes, that means this server id must be 43
/// bytes or fewer.
#[arg(long, env = "REPLICATION_SERVER_ID", hide = true, value_parser = parse_repl_server_id)]
#[serde(default)]
pub replication_server_id: Option<u32>,
pub replication_server_id: Option<ReplicationServerId>,

/// The time to wait before restarting the replicator in seconds.
#[arg(long, hide = true, default_value = "1", value_parser = duration_from_seconds)]
Expand Down Expand Up @@ -165,6 +170,42 @@ impl UpstreamConfig {
}
}

#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub struct ReplicationServerId(pub String);

impl Display for ReplicationServerId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0[..])
}
}

impl From<&std::ffi::OsStr> for ReplicationServerId {
fn from(os_s: &std::ffi::OsStr) -> Self {
let mut s = os_s.to_string_lossy().to_string();
s.truncate(43);
Self(s)
}
}

fn parse_repl_server_id(s: &str) -> Result<ReplicationServerId, String> {
let s = s.trim();
// Postgres restricts identifiers to 63 bytes or fewer, and we add a prefix of at most 20 bytes
// ("readyset_resnapshot_")
if s.len() > 43 {
return Err("Replication server id must be 43 characters or fewer".to_string());
}

// Replication slot names may only contain lower case letters, numbers, and the underscore
// character.
if s.chars()
.any(|c| !(c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_'))
{
return Err("Replication server id may only contain lower case letters, numbers, and the underscore character".to_string());
}

Ok(ReplicationServerId(s.to_string()))
}

fn default_replicator_restart_timeout() -> Duration {
UpstreamConfig::default().replicator_restart_timeout
}
Expand Down
8 changes: 4 additions & 4 deletions readyset-client-test-helpers/src/lib.rs
Expand Up @@ -9,7 +9,7 @@ use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use database_utils::DatabaseURL;
use database_utils::{DatabaseURL, ReplicationServerId};
use nom_sql::Relation;
use readyset_adapter::backend::noria_connector::{NoriaConnector, ReadBehavior};
use readyset_adapter::backend::{BackendBuilder, MigrationMode};
Expand Down Expand Up @@ -88,7 +88,7 @@ pub struct TestBuilder {
durability_mode: DurabilityMode,
storage_dir_path: Option<PathBuf>,
authority: Option<Arc<Authority>>,
replication_server_id: Option<u32>,
replication_server_id: Option<ReplicationServerId>,
}

impl Default for TestBuilder {
Expand Down Expand Up @@ -180,8 +180,8 @@ impl TestBuilder {
self
}

pub fn replication_server_id(mut self, replication_server_id: u32) -> Self {
self.replication_server_id = Some(replication_server_id);
pub fn replication_server_id(mut self, replication_server_id: String) -> Self {
self.replication_server_id = Some(ReplicationServerId(replication_server_id));
self
}

Expand Down
2 changes: 1 addition & 1 deletion readyset-psql/tests/fallback.rs
Expand Up @@ -2391,7 +2391,7 @@ mod failure_injection_tests {

let (config, mut handle, shutdown_tx) = TestBuilder::default()
.migration_mode(MigrationMode::InRequestPath)
.replication_server_id(123)
.replication_server_id("readyset_123".into())
.fallback(true)
.build::<PostgreSQLAdapter>()
.await;
Expand Down
4 changes: 2 additions & 2 deletions readyset-server/src/builder.rs
Expand Up @@ -4,7 +4,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::{self, Duration};

use database_utils::UpstreamConfig;
use database_utils::{ReplicationServerId, UpstreamConfig};
use dataflow::PersistenceParameters;
use readyset_client::consensus::{
Authority, LocalAuthority, LocalAuthorityStore, NodeTypeSchedulingRestriction,
Expand Down Expand Up @@ -257,7 +257,7 @@ impl Builder {
}

/// Set the server ID for replication
pub fn set_replicator_server_id(&mut self, server_id: u32) {
pub fn set_replicator_server_id(&mut self, server_id: ReplicationServerId) {
self.config.replicator_config.replication_server_id = Some(server_id);
}

Expand Down
14 changes: 13 additions & 1 deletion replicators/src/noria_adapter.rs
Expand Up @@ -428,11 +428,23 @@ impl NoriaAdapter {
(Some(pos), _) => pos.clone().try_into()?,
};

let server_id = config
.replication_server_id
.as_ref()
.map(|id| id.0.parse::<u32>())
.transpose()
.map_err(|_| {
ReadySetError::ReplicationFailed(format!(
"{} is an invalid server id--it must be a valid u32.",
config.replication_server_id.unwrap()
))
})?;

let connector = Box::new(
MySqlBinlogConnector::connect(
mysql_options.clone(),
pos.clone(),
config.replication_server_id,
server_id,
enable_statement_logging,
)
.await?,
Expand Down
6 changes: 3 additions & 3 deletions replicators/tests/tests.rs
Expand Up @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

use database_utils::UpstreamConfig as Config;
use database_utils::{ReplicationServerId, UpstreamConfig as Config};
use itertools::Itertools;
use mysql_async::prelude::Queryable;
use mysql_time::MySqlTime;
Expand Down Expand Up @@ -499,11 +499,11 @@ async fn replication_test_multiple(url: &str) -> ReadySetResult<()> {
client.query(POPULATE_SCHEMA).await?;

let config_one = Config {
replication_server_id: Some(1),
replication_server_id: Some(ReplicationServerId("1".into())),
..Default::default()
};
let config_two = Config {
replication_server_id: Some(2),
replication_server_id: Some(ReplicationServerId("2".into())),
..Default::default()
};
let (mut ctx_one, shutdown_tx_one) =
Expand Down

0 comments on commit b3c0d46

Please sign in to comment.