Skip to content

Commit

Permalink
Add map_columns method to SyncSchema
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Jun 14, 2024
1 parent 4edd50a commit 52fca2d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 26 deletions.
12 changes: 12 additions & 0 deletions src/frontend/flight/sync/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ impl SyncSchema {
field: field.clone(),
})
}

// Map over all columns with a specific role
pub fn map_columns<F, T>(&self, role: ColumnRole, f: F) -> Vec<T>
where
Self: Sized,
F: FnMut(SyncColumn) -> T,
{
self.columns()
.filter(|sc| sc.role == role)
.map(f)
.collect::<Vec<T>>()
}
}

pub struct SyncColumn {
Expand Down
30 changes: 4 additions & 26 deletions src/frontend/flight/sync/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,29 +412,13 @@ impl SeafowlDataSyncWriter {
// Skip rows where both old and new primary keys are NULL, meaning a row inserted/updated
// and deleted within the same sync message (so it shouldn't be in the input nor output)
let old_pk_nulls = sync_schema
.columns()
.filter_map(|column| {
if column.role() == ColumnRole::OldPk {
Some(is_null(col(column.field().name())))
} else {
None
}
})
.collect::<Vec<Expr>>()
.map_columns(ColumnRole::OldPk, |c| is_null(col(c.field().name())))
.into_iter()
.reduce(|e1: Expr, e2| e1.and(e2))
.unwrap();

let new_pk_nulls = sync_schema
.columns()
.filter_map(|column| {
if column.role() == ColumnRole::NewPk {
Some(is_null(col(column.field().name())))
} else {
None
}
})
.collect::<Vec<Expr>>()
.map_columns(ColumnRole::NewPk, |c| is_null(col(c.field().name())))
.into_iter()
.reduce(|e1: Expr, e2| e1.and(e2))
.unwrap();
Expand All @@ -449,15 +433,9 @@ impl SeafowlDataSyncWriter {
// These differ since the physical column names are reflected in the ColumnDescriptor,
// while logical column names are found in the arrow fields
let (input_pk_cols, sync_pk_cols): (Vec<String>, Vec<String>) = sync_schema
.columns()
.filter_map(|column| {
if column.role() == ColumnRole::OldPk {
Some((column.name().clone(), column.field().name().clone()))
} else {
None
}
.map_columns(ColumnRole::OldPk, |c| {
(c.name().clone(), c.field().name().clone())
})
.collect::<Vec<(String, String)>>()
.into_iter()
.unzip();

Expand Down

0 comments on commit 52fca2d

Please sign in to comment.