Skip to content

Commit

Permalink
replicators: Set replication offsets for newly-created tables
Browse files Browse the repository at this point in the history
After replicating a DDL change including one or more CREATE TABLE
statements, set the replication offset for those tables to the
replication offset of that DDL change, so that newly-created tables
start with a replication offset. This avoids the issue where if we
restart the replicator for any reason after creating a table but before
that table gets any writes, we'll end up with a None replication offset
and have to resnapshot everything.

This is all happening non-atomically (via multiple RPC calls to the
controller) primarily for code simplicity - it's still correct though
because as a last resort in the case where we fail before setting the
replication offsets for all tables we'll still just resnapshot
everything.

Refs: ENG-2655
Change-Id: Iec264764d6826b41e3567a072ecfafb29dc497e5
Reviewed-on: https://gerrit.readyset.name/c/readyset/+/4470
Tested-by: Buildkite CI
Reviewed-by: Fran Noriega <fran@readyset.io>
Reviewed-by: Nick Marino <nick@readyset.io>
  • Loading branch information
glittershark committed Mar 13, 2023
1 parent 719375d commit fb1b1c1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 2 deletions.
28 changes: 27 additions & 1 deletion replicators/src/noria_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,15 @@ impl NoriaAdapter {

changelist = changelist.with_schema_search_path(vec![schema.into()]);

// Collect a list of all tables we're creating for later
let tables = changelist
.changes()
.filter_map(|change| match change {
Change::CreateTable(t) => Some(t.table.clone()),
_ => None,
})
.collect::<Vec<_>>();

match self
.noria
.extend_recipe_with_offset(changelist.clone(), &pos, false)
Expand Down Expand Up @@ -691,9 +700,26 @@ impl NoriaAdapter {
}
Ok(_) => {}
}
self.replication_offsets.schema = Some(pos);

self.replication_offsets.schema = Some(pos.clone());
self.clear_mutator_cache();

// Set the replication offset for each table we just created to this replication offset
// (since otherwise they'll get initialized without an offset)
for table in &tables {
self.replication_offsets
.tables
.insert(table.clone(), Some(pos.clone()));
if let Some(mutator) = self.mutator_for_table(table).await? {
mutator.set_replication_offset(pos.clone()).await?;
} else {
warn!(
table = %table.display_unquoted(),
"Just-created table missing replication offset"
)
}
}

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion replicators/src/postgres_connector/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ impl<'a> PostgresReplicator<'a> {
})
}

/// Snapshoht the contents of the upstream database to ReadySet, starting with the DDL, followed
/// Snapshot the contents of the upstream database to ReadySet, starting with the DDL, followed
/// by each table's contents.
///
/// If `full_snapshot` is set to `true`, *all* tables will be snapshotted, even those that
Expand Down

0 comments on commit fb1b1c1

Please sign in to comment.