Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions etl-api/src/k8s_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -553,10 +553,10 @@ impl K8sClient for HttpK8sClient {
};

// Check last terminated state for non-zero exit code
if let Some(last_state) = &container_status.last_state {
if let Some(terminated) = &last_state.terminated {
return Ok(terminated.exit_code != 0);
}
if let Some(last_state) = &container_status.last_state
&& let Some(terminated) = &last_state.terminated
{
return Ok(terminated.exit_code != 0);
}

Ok(false)
Expand Down
8 changes: 4 additions & 4 deletions etl-api/src/span_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ impl RootSpanBuilder for ApiRootSpanBuilder {

// In case we have a positive response, we want to log the success. In case of error, it will
// be logged automatically since we have the `emit_event_on_error` feature enabled.
if let Ok(response) = outcome {
if response.response().error().is_none() {
info!("HTTP request completed successfully");
}
if let Ok(response) = outcome
&& response.response().error().is_none()
{
info!("HTTP request completed successfully");
}
}
}
8 changes: 4 additions & 4 deletions etl-destinations/tests/integration/bigquery_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ async fn table_nullable_scalar_columns() {

let publication_name = "test_pub".to_string();
database
.create_publication(&publication_name, &[table_name.clone()])
.create_publication(&publication_name, std::slice::from_ref(&table_name))
.await
.expect("Failed to create publication");

Expand Down Expand Up @@ -725,7 +725,7 @@ async fn table_nullable_array_columns() {

let publication_name = "test_pub_array".to_string();
database
.create_publication(&publication_name, &[table_name.clone()])
.create_publication(&publication_name, std::slice::from_ref(&table_name))
.await
.expect("Failed to create publication");

Expand Down Expand Up @@ -960,7 +960,7 @@ async fn table_non_nullable_scalar_columns() {

let publication_name = "test_pub_non_null".to_string();
database
.create_publication(&publication_name, &[table_name.clone()])
.create_publication(&publication_name, std::slice::from_ref(&table_name))
.await
.expect("Failed to create publication");

Expand Down Expand Up @@ -1210,7 +1210,7 @@ async fn table_non_nullable_array_columns() {

let publication_name = "test_pub_non_null_array".to_string();
database
.create_publication(&publication_name, &[table_name.clone()])
.create_publication(&publication_name, std::slice::from_ref(&table_name))
.await
.expect("Failed to create publication");

Expand Down
56 changes: 28 additions & 28 deletions etl-telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,34 +90,34 @@ where
{
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
// Only try to inject project field if we have one and the content looks like JSON
if let Some(project_ref) = get_global_project_ref() {
if let Ok(json_str) = std::str::from_utf8(buf) {
// Try to parse as JSON
if let Ok(serde_json::Value::Object(mut map)) =
serde_json::from_str::<serde_json::Value>(json_str)
{
// Only inject if "project" field doesn't already exist
if !map.contains_key(PROJECT_KEY_IN_LOG) {
map.insert(
PROJECT_KEY_IN_LOG.to_string(),
serde_json::Value::String(project_ref.to_string()),
);

// Try to serialize back to JSON
if let Ok(modified) = serde_json::to_string(&map) {
// Add new line if it was there
let output = if json_str.ends_with('\n') {
format!("{modified}\n")
} else {
modified
};

// Write the modified JSON and return the original buffer length
return match self.inner.write(output.as_bytes()) {
Ok(_) => Ok(buf.len()),
Err(e) => Err(e),
};
}
if let Some(project_ref) = get_global_project_ref()
&& let Ok(json_str) = std::str::from_utf8(buf)
{
// Try to parse as JSON
if let Ok(serde_json::Value::Object(mut map)) =
serde_json::from_str::<serde_json::Value>(json_str)
{
// Only inject if "project" field doesn't already exist
if !map.contains_key(PROJECT_KEY_IN_LOG) {
map.insert(
PROJECT_KEY_IN_LOG.to_string(),
serde_json::Value::String(project_ref.to_string()),
);

// Try to serialize back to JSON
if let Ok(modified) = serde_json::to_string(&map) {
// Add new line if it was there
let output = if json_str.ends_with('\n') {
format!("{modified}\n")
} else {
modified
};

// Write the modified JSON and return the original buffer length
return match self.inner.write(output.as_bytes()) {
Ok(_) => Ok(buf.len()),
Err(e) => Err(e),
};
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions etl/src/concurrency/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,13 @@ impl<B, S: Stream<Item = B>> Stream for BatchStream<B, S> {

// If there are items, we want to check the deadline, if it's met, we return the batch
// we currently have in memory, otherwise, we return.
if !this.items.is_empty() {
if let Some(deadline) = this.deadline.as_pin_mut() {
ready!(deadline.poll(cx));
*this.reset_timer = true;
if !this.items.is_empty()
&& let Some(deadline) = this.deadline.as_pin_mut()
{
ready!(deadline.poll(cx));
*this.reset_timer = true;

return Poll::Ready(Some(ShutdownResult::Ok(std::mem::take(this.items))));
}
return Poll::Ready(Some(ShutdownResult::Ok(std::mem::take(this.items))));
}

Poll::Pending
Expand Down
5 changes: 0 additions & 5 deletions etl/src/conversions/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,6 @@ impl TruncateEvent {
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct KeepAliveEvent {
pub reply: bool,
}

#[derive(Debug, Clone, PartialEq)]
pub enum Event {
Begin(BeginEvent),
Expand Down
52 changes: 26 additions & 26 deletions etl/src/replication/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,22 +306,22 @@ impl PgReplicationClient {
Ok(())
}
Err(err) => {
if let Some(code) = err.code() {
if *code == SqlState::UNDEFINED_OBJECT {
warn!(
"attempted to delete non-existent replication slot '{}'",
if let Some(code) = err.code()
&& *code == SqlState::UNDEFINED_OBJECT
{
warn!(
"attempted to delete non-existent replication slot '{}'",
slot_name
);

bail!(
ErrorKind::ReplicationSlotNotFound,
"Replication slot not found",
format!(
"Replication slot '{}' not found in database while attempting its deletion",
slot_name
);

bail!(
ErrorKind::ReplicationSlotNotFound,
"Replication slot not found",
format!(
"Replication slot '{}' not found in database while attempting its deletion",
slot_name
)
);
}
)
);
}

error!("failed to delete replication slot '{}': {}", slot_name, err);
Expand Down Expand Up @@ -496,17 +496,17 @@ impl PgReplicationClient {
}
}
Err(err) => {
if let Some(code) = err.code() {
if *code == SqlState::DUPLICATE_OBJECT {
bail!(
ErrorKind::ReplicationSlotAlreadyExists,
"Replication slot already exists",
format!(
"Replication slot '{}' already exists in database",
slot_name
)
);
}
if let Some(code) = err.code()
&& *code == SqlState::DUPLICATE_OBJECT
{
bail!(
ErrorKind::ReplicationSlotAlreadyExists,
"Replication slot already exists",
format!(
"Replication slot '{}' already exists in database",
slot_name
)
);
}

return Err(err.into());
Expand Down
38 changes: 19 additions & 19 deletions etl/src/replication/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,28 +103,28 @@ impl EventsStream {
let this = self.project();

// If we are not forced to send an update, we can willingly do so based on a set of conditions.
if !force {
if let (Some(last_update), Some(last_flush), Some(last_apply)) = (
if !force
&& let (Some(last_update), Some(last_flush), Some(last_apply)) = (
this.last_update.as_mut(),
this.last_flush_lsn.as_mut(),
this.last_apply_lsn.as_mut(),
) {
// The reason for only checking `flush_lsn` and `apply_lsn` is that if we are not
// forced to send a status update to Postgres (when reply is requested), we want to just
// notify it in case we actually durably flushed and persisted events, which is signalled via
// the two aforementioned fields. The `write_lsn` field is mostly used by Postgres for
// tracking what was received by the replication client but not what the client actually
// safely stored.
//
// If we were to check `write_lsn` too, we would end up sending updates more frequently
// when they are not requested, simply because the `write_lsn` is updated for every
// incoming message in the apply loop.
if flush_lsn == *last_flush
&& apply_lsn == *last_apply
&& last_update.elapsed() < STATUS_UPDATE_INTERVAL
{
return Ok(());
}
)
{
// The reason for only checking `flush_lsn` and `apply_lsn` is that if we are not
// forced to send a status update to Postgres (when reply is requested), we want to just
// notify it in case we actually durably flushed and persisted events, which is signalled via
// the two aforementioned fields. The `write_lsn` field is mostly used by Postgres for
// tracking what was received by the replication client but not what the client actually
// safely stored.
//
// If we were to check `write_lsn` too, we would end up sending updates more frequently
// when they are not requested, simply because the `write_lsn` is updated for every
// incoming message in the apply loop.
if flush_lsn == *last_flush
&& apply_lsn == *last_apply
&& last_update.elapsed() < STATUS_UPDATE_INTERVAL
{
return Ok(());
}
}

Expand Down
2 changes: 1 addition & 1 deletion etl/src/schema/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl SchemaCache {
inner.table_schemas.get(table_id).cloned()
}

pub async fn lock_inner(&self) -> MutexGuard<Inner> {
pub async fn lock_inner(&'_ self) -> MutexGuard<'_, Inner> {
self.inner.lock().await
}
}
Expand Down
40 changes: 20 additions & 20 deletions etl/src/workers/table_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,28 +606,28 @@ where
let mut inner = self.table_sync_worker_state.lock().await;

// If we caught up with the lsn, we mark this table as `SyncDone` and stop the worker.
if let TableReplicationPhase::Catchup { lsn } = inner.replication_phase() {
if current_lsn >= lsn {
// If we are told to update the state, we mark the phase as actually changes. We do
// this because we want to update the actual state only when we are sure that the
// progress has been persisted to the destination. When `update_state` is `false` this
// function is used as a lookahead, to determine whether the worker should be stopped.
if update_state {
inner
.set_and_store(
TableReplicationPhase::SyncDone { lsn: current_lsn },
&self.state_store,
)
.await?;

info!(
"table sync worker for table {} is in sync with the apply worker, the worker will terminate",
self.table_id
);
}
if let TableReplicationPhase::Catchup { lsn } = inner.replication_phase()
&& current_lsn >= lsn
{
// If we are told to update the state, we mark the phase as actually changes. We do
// this because we want to update the actual state only when we are sure that the
// progress has been persisted to the destination. When `update_state` is `false` this
// function is used as a lookahead, to determine whether the worker should be stopped.
if update_state {
inner
.set_and_store(
TableReplicationPhase::SyncDone { lsn: current_lsn },
&self.state_store,
)
.await?;

return Ok(false);
info!(
"table sync worker for table {} is in sync with the apply worker, the worker will terminate",
self.table_id
);
}

return Ok(false);
}

Ok(true)
Expand Down
2 changes: 1 addition & 1 deletion etl/tests/integration/no_primary_key_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ async fn tables_without_primary_key_are_errored() {

let publication_name = "test_pub".to_string();
database
.create_publication(&publication_name, &[table_name.clone()])
.create_publication(&publication_name, std::slice::from_ref(&table_name))
.await
.expect("Failed to create publication");

Expand Down
Loading