Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions etl/src/replication/table_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use crate::metrics::{
};
use crate::replication::client::PgReplicationClient;
use crate::replication::stream::TableCopyStream;
use crate::state::table::RetryPolicy;
use crate::state::table::{TableReplicationPhase, TableReplicationPhaseType};
use crate::store::schema::SchemaStore;
use crate::store::state::StateStore;
Expand Down Expand Up @@ -192,19 +191,6 @@ where
.await?;

if !table_schema.has_primary_keys() {
store
.update_table_replication_state(
table_id,
TableReplicationPhase::Errored {
reason: "The table has no primary keys".to_string(),
solution: Some(format!(
"You should set at least one primary key on the table {table_id}"
)),
retry_policy: RetryPolicy::ManualRetry,
},
)
.await?;

bail!(
ErrorKind::SourceSchemaError,
"Missing primary key",
Expand Down
13 changes: 4 additions & 9 deletions etl/src/workers/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct TableSyncWorkerPoolInner {
/// Completed or failed table sync workers, preserving history for inspection.
finished: HashMap<TableId, Vec<TableSyncWorkerHandle>>,
/// Notification mechanism for pool state changes.
pool_update: Option<Arc<Notify>>,
pool_update: Arc<Notify>,
}

impl TableSyncWorkerPoolInner {
Expand All @@ -33,7 +33,7 @@ impl TableSyncWorkerPoolInner {
Self {
active: HashMap::new(),
finished: HashMap::new(),
pool_update: None,
pool_update: Arc::new(Notify::new()),
}
}

Expand Down Expand Up @@ -75,9 +75,7 @@ impl TableSyncWorkerPoolInner {
pub fn mark_worker_finished(&mut self, table_id: TableId) {
let removed_worker = self.active.remove(&table_id);

if let Some(waiting) = self.pool_update.take() {
waiting.notify_one();
}
self.pool_update.notify_waiters();

if let Some(removed_worker) = removed_worker {
self.finished
Expand Down Expand Up @@ -117,10 +115,7 @@ impl TableSyncWorkerPoolInner {
// worker within the `ReactiveFuture` will not be able to hold the lock onto the pool to
// mark itself as finished.
if !self.active.is_empty() {
let notify = Arc::new(Notify::new());
self.pool_update = Some(notify.clone());

return Ok(Some(notify));
Comment on lines -120 to -123
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to recreate this every time.

return Ok(Some(self.pool_update.clone()));
}

let mut errors = Vec::new();
Expand Down
14 changes: 7 additions & 7 deletions etl/src/workers/table_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,15 +406,15 @@ where
let store = self.store.clone();
let config = self.config.clone();

// Clone all the fields we need for retries
// Clone all the fields we need for retries.
let pipeline_id = self.pipeline_id;
let destination = self.destination.clone();
let shutdown_rx = self.shutdown_rx.clone();
let force_syncing_tables_tx = self.force_syncing_tables_tx.clone();
let run_permit = self.run_permit.clone();

loop {
// Recreate the worker for each attempt
// Recreate the worker for each attempt.
let worker = TableSyncWorker {
pipeline_id,
config: config.clone(),
Expand All @@ -431,7 +431,7 @@ where

match result {
Ok(_) => {
// Worker completed successfully, mark as finished
// Worker completed successfully, mark as finished.
let mut pool = pool.lock().await;
pool.mark_worker_finished(table_id);

Expand All @@ -440,16 +440,16 @@ where
Err(err) => {
error!("table sync worker failed for table {}: {}", table_id, err);

// Convert error to table replication error to determine retry policy
// Convert error to table replication error to determine retry policy.
let table_error =
TableReplicationError::from_etl_error(&config, table_id, &err);
let retry_policy = table_error.retry_policy().clone();

// We lock both the pool and the table sync worker state to be consistent
// We lock both the pool and the table sync worker state to be consistent.
let mut pool_guard = pool.lock().await;
let mut state_guard = state.lock().await;

// Update the state and store with the error
// Update the state and store with the error.
if let Err(err) = state_guard.set_and_store(table_error.into(), &store).await {
error!(
"failed to update table sync worker state for table {}: {}",
Expand Down Expand Up @@ -488,7 +488,7 @@ where
);
}

// Before rolling back, we acquire the pool lock again for consistency
// Before rolling back, we acquire the pool lock again for consistency.
let mut pool_guard = pool.lock().await;

// After sleeping, we rollback to the previous state and retry.
Expand Down
1 change: 0 additions & 1 deletion etl/tests/integration/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
mod no_primary_key_test;
mod pipeline_test;
mod postgres_store;
mod replica_identity;
Expand Down
89 changes: 0 additions & 89 deletions etl/tests/integration/no_primary_key_test.rs

This file was deleted.

61 changes: 61 additions & 0 deletions etl/tests/integration/pipeline_test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use etl::destination::memory::MemoryDestination;
use etl::error::ErrorKind;
use etl::state::table::TableReplicationPhaseType;
use etl::test_utils::database::{spawn_source_database, test_table_name};
use etl::test_utils::event::group_events_by_type_and_table_id;
Expand Down Expand Up @@ -782,3 +783,63 @@ async fn table_processing_with_schema_change_errors_table() {
);
assert_events_equal(orders_inserts, &expected_orders_inserts);
}

#[tokio::test(flavor = "multi_thread")]
async fn table_without_primary_key_is_errored() {
init_test_tracing();
let database = spawn_source_database().await;

let table_name = test_table_name("no_primary_key_table");
let table_id = database
.create_table(table_name.clone(), false, &[("name", "text")])
.await
.unwrap();

let publication_name = "test_pub".to_string();
database
.create_publication(&publication_name, std::slice::from_ref(&table_name))
.await
.expect("Failed to create publication");

// Insert a row to later check that this doesn't appear in destination's table rows.
database
.insert_values(table_name.clone(), &["name"], &[&"abc"])
.await
.unwrap();

let state_store = NotifyingStore::new();
let destination = TestDestinationWrapper::wrap(MemoryDestination::new());

let pipeline_id: PipelineId = random();
let mut pipeline = create_pipeline(
&database.config,
pipeline_id,
publication_name,
state_store.clone(),
destination.clone(),
);

// We wait for the table to be errored.
let errored_state = state_store
.notify_on_table_state(table_id, TableReplicationPhaseType::Errored)
.await;

pipeline.start().await.unwrap();

// Insert a row to later check that it is not processed by the apply worker.
database
.insert_values(table_name.clone(), &["name"], &[&"abc1"])
.await
.unwrap();

errored_state.notified().await;

// Wait for the pipeline expecting an error to be returned.
let err = pipeline.shutdown_and_wait().await.err().unwrap();
assert_eq!(err.kinds().len(), 1);
assert_eq!(err.kinds()[0], ErrorKind::SourceSchemaError);

// We expect no events to be saved.
let events = destination.get_events().await;
assert!(events.is_empty());
}