Skip to content

Commit

Permalink
replicators: Move version query before ddl setup
Browse files Browse the repository at this point in the history
If we are connecting to postgres 13, the ddl setup commands we run cause
our connection to be considered a "replication connection", which if the
upstream is configured in a certain way (in a way that Aurora Postgres
is configured), would cause the query for the server version to fail.

Moving the query to before the ddl setup allows the query to succeed.

Release-Note-Core: Fixed a bug that would cause replication to fail on
  Aurora Postgres version 13 databases.
Refs: ENG-2208
Change-Id: Ie3f506e593c9bbd11802852472b0d29cbc5b0671
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/4015
Tested-by: Buildkite CI
Reviewed-by: Sif Hall <sif@readyset.io>
  • Loading branch information
lukoktonos committed Dec 15, 2022
1 parent 3107da9 commit 9be6443
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 10 deletions.
24 changes: 22 additions & 2 deletions replicators/src/noria_adapter.rs
Expand Up @@ -411,6 +411,26 @@ impl NoriaAdapter {
None,
)?;

// For Postgres 13, once we setup ddl replication, the following query can be rejected, so
// run it ahead of time.
// TODO: (luke): We can probably consolidate this query with the db_version string query
// below
let version_num: u32 = {
let (client, connection) = pgsql_opts.connect(tls_connector.clone()).await?;
let _connection_handle = tokio::spawn(connection);
client
.query_one("SHOW server_version_num", &[])
.await
.and_then(|row| row.try_get::<_, String>(0))
.map_err(|e| {
ReadySetError::Internal(format!("Unable to determine postgres version: {}", e))
})?
.parse()
.map_err(|e| {
ReadySetError::Internal(format!("Unable to parse postgres version: {}", e))
})?
};

let mut connector = Box::new(
PostgresWalConnector::connect(
pgsql_opts.clone(),
Expand Down Expand Up @@ -451,7 +471,7 @@ impl NoriaAdapter {
let snapshot_start = Instant::now();
// If snapshot name exists, it means we need to make a snapshot to noria

let (mut client, connection) = pgsql_opts.connect(tls_connector).await?;
let (mut client, connection) = pgsql_opts.connect(tls_connector.clone()).await?;

let connection_handle = tokio::spawn(connection);
let db_version = client
Expand Down Expand Up @@ -504,7 +524,7 @@ impl NoriaAdapter {
}

connector
.start_replication(REPLICATION_SLOT, PUBLICATION_NAME)
.start_replication(REPLICATION_SLOT, PUBLICATION_NAME, version_num)
.await?;

let replication_offsets = noria.replication_offsets().await?;
Expand Down
9 changes: 1 addition & 8 deletions replicators/src/postgres_connector/connector.rs
Expand Up @@ -260,15 +260,8 @@ impl PostgresWalConnector {
&mut self,
slot: &str,
publication: &str,
version: u32,
) -> ReadySetResult<()> {
let version: u32 = self
.one_row_query("SHOW server_version_num", 1)
.await?
.get(0)
.unwrap()
.parse()
.unwrap_or(0);

let inner_client = self.client.inner();
let wal_position = self.next_position.unwrap_or_default();
let messages_support = if version >= 140000 {
Expand Down

0 comments on commit 9be6443

Please sign in to comment.