Skip to content

Commit

Permalink
Replicator: flush buffered actions on error
Browse files Browse the repository at this point in the history
Ensures that we flush buffered actions when we receive an error in
PostgresWalConnector::next_action(). Previously, we returned the error
and dropped anything in `actions`, unless the error was an
`UnsupportedTypeConversion`.

This behavior is implemented by storing either an event or a WalError in
`PostgresWalConnector::peek`. The behavior is tested by a new
readyset-psql fallback test.

Co-authored-by: Ethan Donowitz <ethan@readyset.io>
Fixes: ENG-2832
Change-Id: I0de405d1239bd66458b53cd7648276c62baa9a74
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/4708
Tested-by: Buildkite CI
Reviewed-by: Ethan Donowitz <ethan@readyset.io>
  • Loading branch information
Dan Wilbanks and ethan-readyset committed May 13, 2023
1 parent fd08ff1 commit 1101215
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 37 deletions.
60 changes: 60 additions & 0 deletions readyset-psql/tests/fallback.rs
Expand Up @@ -1075,3 +1075,63 @@ async fn replication_failure_retries_if_failed_to_drop(failpoint: &str) {

shutdown_tx.shutdown().await;
}

#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn replication_of_other_tables_succeeds_even_after_error() {
readyset_tracing::init_test_logging();
use std::time::Duration;

let (config, _handle, shutdown_tx) = TestBuilder::default()
.recreate_database(false)
.fallback_url(PostgreSQLAdapter::url())
.migration_mode(MigrationMode::InRequestPath)
.build::<PostgreSQLAdapter>()
.await;

let client = connect(config).await;

client
.simple_query("DROP TABLE IF EXISTS cats CASCADE")
.await
.unwrap();
client
.simple_query("DROP TABLE IF EXISTS cats2 CASCADE")
.await
.unwrap();
client
.simple_query("CREATE TABLE cats (id SERIAL PRIMARY KEY, cuteness int)")
.await
.unwrap();
client
.simple_query("CREATE TABLE cats2 (id SERIAL PRIMARY KEY, ts TIMESTAMP)")
.await
.unwrap();

sleep().await;
sleep().await;

client
.simple_query(
"INSERT INTO cats (cuteness) VALUES (100); INSERT INTO cats2 (ts) VALUES ('infinity')",
)
.await
.unwrap();

tokio::time::sleep(Duration::from_secs(5)).await;
sleep().await;

let result: Vec<u32> = client
.simple_query("SELECT * FROM cats")
.await
.unwrap()
.iter()
.filter_map(|m| match m {
SimpleQueryMessage::Row(r) => Some(r.get(0).unwrap().parse().unwrap()),
_ => None,
})
.collect();
assert_eq!(result, [1]);

shutdown_tx.shutdown().await;
}
78 changes: 41 additions & 37 deletions replicators/src/postgres_connector/connector.rs
Expand Up @@ -43,8 +43,8 @@ pub struct PostgresWalConnector {
connection_handle: tokio::task::JoinHandle<Result<(), pgsql::Error>>,
/// Reader is a decoder for binlog events
reader: Option<WalReader>,
/// Stores an event that was read but not handled
peek: Option<(WalEvent, Lsn)>,
/// Stores an event or error that was read but not handled
peek: Option<Result<(WalEvent, Lsn), WalError>>,
/// If we just want to continue reading the log from a previous point
next_position: Option<PostgresPosition>,
/// The replication slot if was created for this connector
Expand Down Expand Up @@ -514,39 +514,20 @@ impl Connector for PostgresWalConnector {
"We should either have no current table, or the current table should have a schema"
);

let (mut event, lsn) = match self.peek.take() {
Some(event) => event,
None => match self.next_event().await {
Ok(ev) => ev,
Err(WalError::TableError {
kind: TableErrorKind::UnsupportedTypeConversion { type_oid },
schema,
table,
}) => {
warn!(
type_oid,
schema = schema,
table = table,
"Ignoring write with value of unsupported type"
);
// ReadySet will skip replicate tables with unsupported types (e.g.
// Postgres's user defined types). RS will leverage the fallback instead.
continue;
}
Err(err) => {
return Err(err.into());
}
},
// Get the next buffered event or error, or read a new event from the WAL stream.
let event = match self.peek.take() {
Some(buffered) => buffered,
None => self.next_event().await,
};

// Try not to accumulate too many actions between calls, but if we're collecting a
// series of events that reuse the same LSN we have to make sure they all end up in the
// same batch:
// If our next event is an error, flush any buffered actions. Otherwise, buffer up to
// MAX_QUEUED_INDEPENDENT_ACTIONS unless we're collecting a series of events that reuse
// the same LSN, as we have to make sure they all end up in the same batch.
//
// If we have no actions but our offset exceeds the given 'until', report our
// If we have no buffered actions but our offset exceeds the given 'until', report our
// position in the logs.
if actions.len() >= MAX_QUEUED_INDEPENDENT_ACTIONS && lsn != cur_lsn.lsn {
self.peek = Some((event, lsn));
if event.is_err() && !actions.is_empty() || actions.len() >= MAX_QUEUED_INDEPENDENT_ACTIONS && matches!(event, Ok((_, lsn)) if lsn != cur_lsn.lsn) {
self.peek = Some(event);
return Ok((
ReplicationAction::TableAction {
table: cur_table,
Expand All @@ -565,6 +546,29 @@ impl Connector for PostgresWalConnector {
info!(target: "replicator_statement", "{:?}", event);
}

// Handle errors or extract the event and LSN.
let (mut event, lsn) = match event {
Ok(e) => e,
// ReadySet will not snapshot tables with unsupported types (e.g., Postgres
// user defined types).
Err(WalError::TableError {
kind: TableErrorKind::UnsupportedTypeConversion { type_oid },
schema,
table,
}) => {
warn!(
type_oid,
schema = schema,
table = table,
"Ignoring write with value of unsupported type"
);
continue;
}
Err(e) => {
return Err(e.into());
}
};

// Check if next event is for another table, in which case we have to flush the events
// accumulated for this table and store the next event in `peek`.
match &mut event {
Expand All @@ -579,12 +583,12 @@ impl Connector for PostgresWalConnector {
invariant!(matching.is_empty());
if let Some((schema, name)) = other_tables.pop() {
if !other_tables.is_empty() {
self.peek = Some((
self.peek = Some(Ok((
WalEvent::Truncate {
tables: other_tables,
},
lsn,
));
)));
}

actions.push(TableOperation::Truncate);
Expand All @@ -605,12 +609,12 @@ impl Connector for PostgresWalConnector {
}
} else {
if !other_tables.is_empty() {
self.peek = Some((
self.peek = Some(Ok((
WalEvent::Truncate {
tables: other_tables,
},
lsn,
));
)));
}

if !matching.is_empty() {
Expand All @@ -636,7 +640,7 @@ impl Connector for PostgresWalConnector {
|| cur_table.name != table.as_str() =>
{
if !actions.is_empty() {
self.peek = Some((event, lsn));
self.peek = Some(Ok((event, lsn)));
return Ok((
ReplicationAction::TableAction {
table: cur_table,
Expand Down Expand Up @@ -668,7 +672,7 @@ impl Connector for PostgresWalConnector {
PostgresPosition::from(lsn).into(),
));
} else {
self.peek = Some((WalEvent::DdlEvent { ddl_event }, lsn));
self.peek = Some(Ok((WalEvent::DdlEvent { ddl_event }, lsn)));
return Ok((
ReplicationAction::TableAction {
table: cur_table,
Expand Down

0 comments on commit 1101215

Please sign in to comment.