Skip to content

Commit

Permalink
Construct the SyncColumns during SyncSchema init
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Jun 14, 2024
1 parent 52fca2d commit 68c9092
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 47 deletions.
39 changes: 21 additions & 18 deletions src/frontend/flight/sync/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use arrow_schema::{DataType, FieldRef, SchemaRef};
use clade::sync::{ColumnDescriptor, ColumnRole};
use std::collections::HashSet;

#[derive(Debug, Clone)]
#[derive(Clone, Debug)]
pub struct SyncSchema {
column_descriptors: Vec<ColumnDescriptor>,
schema: SchemaRef,
columns: Vec<SyncColumn>,
}

impl SyncSchema {
Expand Down Expand Up @@ -81,46 +81,49 @@ impl SyncSchema {
});
}

Ok(Self {
column_descriptors,
schema,
})
let columns = column_descriptors
.iter()
.zip(schema.fields())
.map(|(column_descriptor, field)| SyncColumn {
role: column_descriptor.role(),
name: column_descriptor.name.clone(),
field: field.clone(),
})
.collect();

Ok(Self { schema, columns })
}

#[allow(dead_code)]
pub fn arrow_schema(&self) -> SchemaRef {
self.schema.clone()
}

pub fn column(&self, name: &str, role: ColumnRole) -> Option<SyncColumn> {
pub fn column(&self, name: &str, role: ColumnRole) -> Option<&SyncColumn> {
self.columns()
.iter()
.find(|col| col.name == name && col.role == role)
}

pub fn columns(&self) -> impl Iterator<Item = SyncColumn> + '_ {
self.column_descriptors
.iter()
.zip(self.schema.fields())
.map(|(column_descriptor, field)| SyncColumn {
role: column_descriptor.role(),
name: column_descriptor.name.clone(),
field: field.clone(),
})
pub fn columns(&self) -> &[SyncColumn] {
&self.columns
}

// Map over all columns with a specific role
pub fn map_columns<F, T>(&self, role: ColumnRole, f: F) -> Vec<T>
where
Self: Sized,
F: FnMut(SyncColumn) -> T,
F: FnMut(&SyncColumn) -> T,
{
self.columns()
self.columns
.iter()
.filter(|sc| sc.role == role)
.map(f)
.collect::<Vec<T>>()
}
}

#[derive(Clone, Debug)]
pub struct SyncColumn {
role: ColumnRole,
name: String,
Expand Down
62 changes: 34 additions & 28 deletions src/frontend/flight/sync/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub(super) fn compact_batches(
let columns = |role: ColumnRole| -> (Vec<ArrayRef>, (Vec<SortField>, Vec<ArrayRef>)) {
sync_schema
.columns()
.iter()
.zip(batch.columns().iter())
.filter_map(|(col, array)| {
if col.role() == role {
Expand Down Expand Up @@ -150,36 +151,40 @@ pub(super) fn compact_batches(
.collect::<Vec<_>>();
let mut changed_pos = 0;
let mut indices: HashMap<usize, &UInt64Array> = HashMap::new();
sync_schema.columns().enumerate().for_each(|(col_id, col)| {
match col.role() {
ColumnRole::OldPk => {
indices.insert(col_id, &old_pk_indices);
}
ColumnRole::NewPk => {
indices.insert(col_id, &new_pk_indices);
}
ColumnRole::Changed => {
// Insert the indices for both this column...
indices.insert(col_id, &changed_indices[0]);
sync_schema
.columns()
.iter()
.enumerate()
.for_each(|(col_id, col)| {
match col.role() {
ColumnRole::OldPk => {
indices.insert(col_id, &old_pk_indices);
}
ColumnRole::NewPk => {
indices.insert(col_id, &new_pk_indices);
}
ColumnRole::Changed => {
// Insert the indices for both this column...
indices.insert(col_id, &changed_indices[0]);

// ... as well as for the actual changed Value column
let changed_col_id = schema
.index_of(
sync_schema
.column(col.name(), ColumnRole::Value)
.unwrap()
.field()
.name(),
)
.expect("Field exists");
indices.insert(changed_col_id, &changed_indices[0]);
changed_pos += 1;
}
ColumnRole::Value => {
indices.entry(col_id).or_insert(&new_pk_indices);
// ... as well as for the actual changed Value column
let changed_col_id = schema
.index_of(
sync_schema
.column(col.name(), ColumnRole::Value)
.unwrap()
.field()
.name(),
)
.expect("Field exists");
indices.insert(changed_col_id, &changed_indices[0]);
changed_pos += 1;
}
ColumnRole::Value => {
indices.entry(col_id).or_insert(&new_pk_indices);
}
}
}
});
});

// Finally take the designated rows from each old PK, new PK, value and changed columns from the
// batch
Expand Down Expand Up @@ -213,6 +218,7 @@ pub(super) fn construct_qualifier(
.flat_map(|sync| {
sync.sync_schema
.columns()
.iter()
.filter_map(|col| {
if matches!(col.role(), ColumnRole::OldPk | ColumnRole::NewPk) {
Some(col.name().to_string())
Expand Down
2 changes: 1 addition & 1 deletion src/frontend/flight/sync/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ impl SeafowlDataSyncWriter {
) -> Result<DeltaTable> {
// Get the actual table schema by removing the OldPk and Changed column roles from the schema.
let mut builder = SchemaBuilder::new();
sync_schema.columns().for_each(|col| {
sync_schema.columns().iter().for_each(|col| {
if matches!(col.role(), ColumnRole::NewPk | ColumnRole::Value) {
let field = col.field().as_ref().clone().with_name(col.name());
builder.push(field);
Expand Down

0 comments on commit 68c9092

Please sign in to comment.