Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion api/src/routes/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use config::shared::{
DestinationConfig, PgConnectionConfig, PipelineConfig as SharedPipelineConfig,
ReplicatorConfig, SupabaseConfig, TlsConfig,
};
use postgres::schema::TableId;
use serde::{Deserialize, Serialize};
use sqlx::{PgPool, PgTransaction};
use std::ops::DerefMut;
Expand Down Expand Up @@ -718,7 +719,8 @@ pub async fn get_pipeline_replication_status(
let mut tables: Vec<TableReplicationStatus> = Vec::new();
for row in state_rows {
let table_id = row.table_id.0;
let table_name = get_table_name_from_oid(&source_pool, table_id).await?;
let table_name =
get_table_name_from_oid(&source_pool, TableId::new(row.table_id.0)).await?;
tables.push(TableReplicationStatus {
table_id,
table_name: table_name.to_string(),
Expand Down
5 changes: 4 additions & 1 deletion etl/benches/table_copies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,10 @@ async fn start_pipeline(args: RunArgs) -> Result<(), Box<dyn Error>> {
let mut table_copied_notifications = vec![];
for table_id in &args.table_ids {
let table_copied = state_store
.notify_on_table_state(*table_id, TableReplicationPhaseType::FinishedCopy)
.notify_on_table_state(
TableId::new(*table_id),
TableReplicationPhaseType::FinishedCopy,
)
.await;
table_copied_notifications.push(table_copied);
}
Expand Down
18 changes: 11 additions & 7 deletions etl/src/conversions/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ impl RelationEvent {
.iter()
.map(Self::build_column_schema)
.collect::<Result<Vec<ColumnSchema>, _>>()?;
let table_schema = TableSchema::new(relation_body.rel_id(), table_name, column_schemas);
let table_schema = TableSchema::new(
TableId::new(relation_body.rel_id()),
table_name,
column_schemas,
);

Ok(Self {
start_lsn,
Expand Down Expand Up @@ -319,7 +323,7 @@ async fn convert_insert_to_event(
insert_body: &protocol::InsertBody,
) -> Result<InsertEvent, EventConversionError> {
let table_id = insert_body.rel_id();
let table_schema = get_table_schema(schema_cache, table_id).await?;
let table_schema = get_table_schema(schema_cache, TableId::new(table_id)).await?;

let table_row = convert_tuple_to_row(
&table_schema.column_schemas,
Expand All @@ -329,7 +333,7 @@ async fn convert_insert_to_event(
Ok(InsertEvent {
start_lsn,
commit_lsn,
table_id,
table_id: TableId::new(table_id),
table_row,
})
}
Expand All @@ -341,7 +345,7 @@ async fn convert_update_to_event(
update_body: &protocol::UpdateBody,
) -> Result<UpdateEvent, EventConversionError> {
let table_id = update_body.rel_id();
let table_schema = get_table_schema(schema_cache, table_id).await?;
let table_schema = get_table_schema(schema_cache, TableId::new(table_id)).await?;

let table_row = convert_tuple_to_row(
&table_schema.column_schemas,
Expand All @@ -364,7 +368,7 @@ async fn convert_update_to_event(
Ok(UpdateEvent {
start_lsn,
commit_lsn,
table_id,
table_id: TableId::new(table_id),
table_row,
old_table_row,
})
Expand All @@ -377,7 +381,7 @@ async fn convert_delete_to_event(
delete_body: &protocol::DeleteBody,
) -> Result<DeleteEvent, EventConversionError> {
let table_id = delete_body.rel_id();
let table_schema = get_table_schema(schema_cache, table_id).await?;
let table_schema = get_table_schema(schema_cache, TableId::new(table_id)).await?;

// We try to extract the old tuple by either taking the entire old tuple or the key of the old
// tuple.
Expand All @@ -395,7 +399,7 @@ async fn convert_delete_to_event(
Ok(DeleteEvent {
start_lsn,
commit_lsn,
table_id,
table_id: TableId::new(table_id),
old_table_row,
})
}
Expand Down
12 changes: 7 additions & 5 deletions etl/src/destination/bigquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ impl BigQueryDestination {

let mut result = Vec::new();
for (table_id, (table_name, column_schemas)) in table_schemas {
let table_schema = TableSchema::new(table_id, table_name, column_schemas);
let table_schema = TableSchema::new(TableId::new(table_id), table_name, column_schemas);
result.push(table_schema);
}

Expand Down Expand Up @@ -557,7 +557,9 @@ impl BigQueryDestination {
.lock_inner()
.await;

if let Some(table_schema) = schema_cache.get_table_schema_ref(&table_id) {
if let Some(table_schema) =
schema_cache.get_table_schema_ref(&TableId::new(table_id))
{
inner
.client
.truncate_table(
Expand All @@ -582,7 +584,7 @@ impl BigQueryDestination {
/// Extracts table ID, schema name, and table name for storage in `etl_table_schemas`.
fn table_schema_to_table_row(table_schema: &TableSchema) -> TableRow {
let columns = vec![
Cell::U32(table_schema.id),
Cell::U32(table_schema.id.into()),
Cell::String(table_schema.name.schema.clone()),
Cell::String(table_schema.name.name.clone()),
];
Expand All @@ -598,7 +600,7 @@ impl BigQueryDestination {

for (column_order, column_schema) in table_schema.column_schemas.iter().enumerate() {
let columns = vec![
Cell::U32(table_schema.id),
Cell::U32(table_schema.id.into()),
Cell::String(column_schema.name.clone()),
Cell::String(Self::postgres_type_to_string(&column_schema.typ)),
Cell::I32(column_schema.modifier),
Expand Down Expand Up @@ -924,7 +926,7 @@ mod tests {
ColumnSchema::new("data".to_string(), Type::JSONB, -1, true, false),
ColumnSchema::new("active".to_string(), Type::BOOL, -1, false, false),
];
let table_schema = TableSchema::new(456, table_name, columns);
let table_schema = TableSchema::new(TableId::new(456), table_name, columns);

let schema_row = BigQueryDestination::table_schema_to_table_row(&table_schema);
assert_eq!(schema_row.values[0], Cell::U32(456));
Expand Down
20 changes: 12 additions & 8 deletions etl/src/replication/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ where
};

if !hook
.should_apply_changes(message.rel_id(), remote_final_lsn)
.should_apply_changes(TableId::new(message.rel_id()), remote_final_lsn)
.await?
{
return Ok(HandleMessageResult::default());
Expand All @@ -756,8 +756,12 @@ where
// dealt with differently based on the worker type.
// TODO: explore how to deal with applying relation messages to the schema (creating it if missing).
let schema_cache = schema_cache.lock_inner().await;
let Some(existing_table_schema) = schema_cache.get_table_schema_ref(&message.rel_id()) else {
return Err(ApplyLoopError::MissingTableSchema(message.rel_id()));
let Some(existing_table_schema) =
schema_cache.get_table_schema_ref(&TableId::new(message.rel_id()))
else {
return Err(ApplyLoopError::MissingTableSchema(TableId::new(
message.rel_id(),
)));
};

// We compare the table schema from the relation message with the existing schema (if any).
Expand All @@ -766,7 +770,7 @@ where
if !existing_table_schema.partial_eq(&event.table_schema) {
return Ok(HandleMessageResult {
end_batch: Some(EndBatch::Exclusive),
skip_table: Some(message.rel_id()),
skip_table: Some(TableId::new(message.rel_id())),
..Default::default()
});
}
Expand Down Expand Up @@ -803,7 +807,7 @@ where
};

if !hook
.should_apply_changes(message.rel_id(), remote_final_lsn)
.should_apply_changes(TableId::new(message.rel_id()), remote_final_lsn)
.await?
{
return Ok(HandleMessageResult::default());
Expand Down Expand Up @@ -841,7 +845,7 @@ where
};

if !hook
.should_apply_changes(message.rel_id(), remote_final_lsn)
.should_apply_changes(TableId::new(message.rel_id()), remote_final_lsn)
.await?
{
return Ok(HandleMessageResult::default());
Expand Down Expand Up @@ -879,7 +883,7 @@ where
};

if !hook
.should_apply_changes(message.rel_id(), remote_final_lsn)
.should_apply_changes(TableId::new(message.rel_id()), remote_final_lsn)
.await?
{
return Ok(HandleMessageResult::default());
Expand Down Expand Up @@ -919,7 +923,7 @@ where
let mut rel_ids = Vec::with_capacity(message.rel_ids().len());
for &table_id in message.rel_ids().iter() {
if hook
.should_apply_changes(table_id, remote_final_lsn)
.should_apply_changes(TableId::new(table_id), remote_final_lsn)
.await?
{
rel_ids.push(table_id)
Expand Down
9 changes: 8 additions & 1 deletion etl/src/replication/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub fn get_slot_name(
#[cfg(test)]
mod tests {
use super::*;
use postgres::schema::TableId;

#[test]
fn test_apply_worker_slot_name() {
Expand All @@ -55,7 +56,13 @@ mod tests {
#[test]
fn test_table_sync_slot_name() {
let pipeline_id = 1;
let result = get_slot_name(pipeline_id, WorkerType::TableSync { table_id: 123 }).unwrap();
let result = get_slot_name(
pipeline_id,
WorkerType::TableSync {
table_id: TableId::new(123),
},
)
.unwrap();
assert!(result.starts_with(TABLE_SYNC_PREFIX));
assert!(result.len() <= MAX_SLOT_NAME_LENGTH);
}
Expand Down
7 changes: 4 additions & 3 deletions etl/src/state/store/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc};
use config::shared::PgConnectionConfig;
use postgres::replication::{
TableReplicationState, TableReplicationStateRow, connect_to_source_database,
update_replication_state,
get_table_replication_state_rows, update_replication_state,
};
use postgres::schema::TableId;
use sqlx::PgPool;
Expand Down Expand Up @@ -96,7 +96,7 @@ impl PostgresStateStore {
pool: &PgPool,
pipeline_id: PipelineId,
) -> sqlx::Result<Vec<TableReplicationStateRow>> {
postgres::replication::get_table_replication_state_rows(pool, pipeline_id as i64).await
get_table_replication_state_rows(pool, pipeline_id as i64).await
}

async fn update_replication_state(
Expand Down Expand Up @@ -165,7 +165,7 @@ impl StateStore for PostgresStateStore {
let phase = self
.replication_phase_from_state(&row.state, row.sync_done_lsn)
.await?;
table_states.insert(row.table_id.0, phase);
table_states.insert(TableId::new(row.table_id.0), phase);
}
let mut inner = self.inner.lock().await;
inner.table_states = table_states.clone();
Expand All @@ -188,6 +188,7 @@ impl StateStore for PostgresStateStore {
.await?;
let mut inner = self.inner.lock().await;
inner.table_states.insert(table_id, state);

Ok(())
}
}
8 changes: 4 additions & 4 deletions etl/src/test_utils/test_schema.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use postgres::schema::{ColumnSchema, Oid, TableName, TableSchema};
use postgres::schema::{ColumnSchema, TableId, TableName, TableSchema};
use postgres::tokio::test_utils::{PgDatabase, id_column_schema};
use std::ops::RangeInclusive;
use tokio_postgres::types::{PgLsn, Type};
Expand Down Expand Up @@ -188,7 +188,7 @@ pub async fn insert_mock_data(

pub async fn get_users_age_sum_from_rows<D>(
destination: &TestDestinationWrapper<D>,
table_id: Oid,
table_id: TableId,
) -> i32 {
let mut actual_sum = 0;

Expand Down Expand Up @@ -250,7 +250,7 @@ pub fn events_equal_excluding_fields(left: &Event, right: &Event) -> bool {

pub fn build_expected_users_inserts(
mut starting_id: i64,
users_table_id: Oid,
users_table_id: TableId,
expected_rows: Vec<(&str, i32)>,
) -> Vec<Event> {
let mut events = Vec::new();
Expand All @@ -277,7 +277,7 @@ pub fn build_expected_users_inserts(

pub fn build_expected_orders_inserts(
mut starting_id: i64,
orders_table_id: Oid,
orders_table_id: TableId,
expected_rows: Vec<&str>,
) -> Vec<Event> {
let mut events = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion etl/src/workers/table_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ where
"table_sync_worker",
pipeline_id = self.pipeline_id,
publication_name = self.config.publication_name,
table_id = self.table_id,
table_id = %self.table_id,
);
let table_sync_worker = async move {
debug!(
Expand Down
2 changes: 2 additions & 0 deletions postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ edition = "2024"

[dependencies]
config = { workspace = true }

bytes = { workspace = true }
pg_escape = { workspace = true }
rustls = { workspace = true }
serde = { workspace = true, features = ["derive"] }
Expand Down
2 changes: 1 addition & 1 deletion postgres/src/replication/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub async fn get_table_name_from_oid(
";

let row = sqlx::query(query)
.bind(table_id as i64)
.bind(table_id.into_inner() as i64)
.fetch_optional(pool)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion postgres/src/replication/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub async fn update_replication_state(
"#,
)
.bind(pipeline_id as i64)
.bind(SqlxTableId(table_id))
.bind(SqlxTableId(table_id.into_inner()))
.bind(state)
.bind(sync_done_lsn)
.execute(pool)
Expand Down
Loading