Skip to content

Commit

Permalink
Fix a bug in filter generation and add a test
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Jun 17, 2024
1 parent 9a736f1 commit 0596a2e
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 38 deletions.
148 changes: 113 additions & 35 deletions src/frontend/flight/sync/utils.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use crate::frontend::flight::sync::schema::SyncSchema;
use crate::frontend::flight::sync::writer::DataSyncCollection;
use crate::frontend::flight::sync::writer::DataSyncItem;
use arrow::array::{new_null_array, Array, ArrayRef, RecordBatch, UInt64Array};
use arrow::compute::{concat_batches, take};
use arrow_row::{Row, RowConverter, SortField};
use arrow_schema::SchemaRef;
use clade::sync::ColumnRole;
use datafusion::physical_expr::expressions::{MaxAccumulator, MinAccumulator};
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{lit, Accumulator, Expr};
use itertools::Itertools;
use datafusion_expr::{col, lit, Accumulator, Expr};
use std::collections::{HashMap, HashSet, VecDeque};

// Compact a set of record batches into a single one, squashing any chain of changes to a given row
Expand Down Expand Up @@ -217,48 +215,51 @@ pub(super) fn compact_batches(
// Generate a qualifier expression that, when applied to the table, will only return
// rows whose primary keys are affected by the changes in `entry`. This is so that
// we can only read the partitions from Delta Lake that we need to rewrite.
pub(super) fn construct_qualifier(
full_schema: SchemaRef,
entry: &DataSyncCollection,
) -> Result<Expr> {
pub(super) fn construct_qualifier(syncs: &[DataSyncItem]) -> Result<Expr> {
// Initialize the min/max accumulators for the primary key columns needed to prune the table
// files
let mut min_max_values: HashMap<String, (MinAccumulator, MaxAccumulator)> = entry
.syncs
// files.
// The assumption here is that the primary key columns are the same across all syncs.
let mut min_max_values: HashMap<String, (MinAccumulator, MaxAccumulator)> = syncs
.first()
.expect("At least one sync item in the queue")
.sync_schema
.columns()
.iter()
.flat_map(|sync| {
sync.sync_schema
.columns()
.iter()
.filter_map(|col| {
if matches!(col.role(), ColumnRole::OldPk | ColumnRole::NewPk) {
Some(col.name().to_string())
} else {
None
}
})
.collect::<Vec<String>>()
})
.unique()
.map(|col_name| {
// The validation in the interface should have ensured these fields exist.
let f = full_schema.column_with_name(&col_name).unwrap().1;
.filter(|col| matches!(col.role(), ColumnRole::OldPk | ColumnRole::NewPk))
.map(|col| {
Ok((
col_name,
col.name().clone(),
(
MinAccumulator::try_new(f.data_type())?,
MaxAccumulator::try_new(f.data_type())?,
MinAccumulator::try_new(col.field().data_type())?,
MaxAccumulator::try_new(col.field().data_type())?,
),
))
})
.collect::<Result<_>>()?;

// Collect all min/max stats for PK columns
for sync in &entry.syncs {
for sync in syncs {
min_max_values
.iter_mut()
.try_for_each(|(pk_col, (min_value, max_value))| {
if let Some(pk_array) = sync.batch.column_by_name(pk_col) {
let old_field = sync
.sync_schema
.column(pk_col, ColumnRole::OldPk)
.unwrap()
.field();

if let Some(pk_array) = sync.batch.column_by_name(old_field.name()) {
min_value.update_batch(&[pk_array.clone()])?;
max_value.update_batch(&[pk_array.clone()])?;
}

let new_field = sync
.sync_schema
.column(pk_col, ColumnRole::NewPk)
.unwrap()
.field();

if let Some(pk_array) = sync.batch.column_by_name(new_field.name()) {
min_value.update_batch(&[pk_array.clone()])?;
max_value.update_batch(&[pk_array.clone()])?;
}
Expand All @@ -270,7 +271,7 @@ pub(super) fn construct_qualifier(
Ok(min_max_values
.iter_mut()
.map(|(pk_col, (min_value, max_value))| {
Ok(lit(pk_col)
Ok(col(pk_col)
.between(lit(min_value.evaluate()?), lit(max_value.evaluate()?)))
})
.collect::<Result<Vec<Expr>>>()?
Expand All @@ -282,13 +283,15 @@ pub(super) fn construct_qualifier(
#[cfg(test)]
mod tests {
use crate::frontend::flight::sync::schema::SyncSchema;
use crate::frontend::flight::sync::utils::compact_batches;
use crate::frontend::flight::sync::utils::{compact_batches, construct_qualifier};
use crate::frontend::flight::sync::writer::DataSyncItem;
use arrow::array::{
BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray, UInt8Array,
};
use arrow_schema::{DataType, Field, Schema};
use clade::sync::{ColumnDescriptor, ColumnRole};
use datafusion_common::assert_batches_eq;
use datafusion_expr::{col, lit};
use itertools::Itertools;
use rand::distributions::{Alphanumeric, DistString, Distribution, WeightedIndex};
use rand::seq::IteratorRandom;
Expand Down Expand Up @@ -564,4 +567,79 @@ mod tests {

Ok(())
}

#[test]
fn test_sync_filter() -> Result<(), Box<dyn std::error::Error>> {
let schema = Arc::new(Schema::new(vec![
Field::new("old_c1", DataType::Int32, true),
Field::new("old_c2", DataType::Float64, true),
Field::new("new_c1", DataType::Int32, true),
Field::new("new_c2", DataType::Float64, true),
]));

let column_descriptors = vec![
ColumnDescriptor {
role: ColumnRole::OldPk as i32,
name: "c1".to_string(),
},
ColumnDescriptor {
role: ColumnRole::OldPk as i32,
name: "c2".to_string(),
},
ColumnDescriptor {
role: ColumnRole::NewPk as i32,
name: "c1".to_string(),
},
ColumnDescriptor {
role: ColumnRole::NewPk as i32,
name: "c2".to_string(),
},
];

let sync_schema = SyncSchema::try_new(column_descriptors, schema.clone())?;

let batch_1 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![Some(1), None])),
Arc::new(Float64Array::from(vec![Some(1.1), None])),
Arc::new(Int32Array::from(vec![2, 6])),
Arc::new(Float64Array::from(vec![2.1, 2.2])),
],
)?;

let batch_2 = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int32Array::from(vec![0, 4])),
Arc::new(Float64Array::from(vec![3.1, 3.2])),
Arc::new(Int32Array::from(vec![1, 3])),
Arc::new(Float64Array::from(vec![4.1, 0.1])),
],
)?;

let expr = construct_qualifier(&[
DataSyncItem {
origin: 0,
sequence_number: 0,
sync_schema: sync_schema.clone(),
batch: batch_1,
},
DataSyncItem {
origin: 0,
sequence_number: 0,
sync_schema: sync_schema.clone(),
batch: batch_2,
},
])?;

assert_eq!(
expr,
col("c1")
.between(lit(0), lit(6))
.and(col("c2").between(lit(0.1), lit(4.1))),
);

Ok(())
}
}
6 changes: 3 additions & 3 deletions src/frontend/flight/sync/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ pub(super) struct DataSyncCollection {
#[derive(Debug, Clone)]
pub(super) struct DataSyncItem {
// Identifier of the origin where the change stems from
origin: Origin,
pub(super) origin: Origin,
// Sequence number of this particular change and origin
sequence_number: SequenceNumber,
pub(super) sequence_number: SequenceNumber,
// Old and new primary keys, changed and value columns
pub(super) sync_schema: SyncSchema,
// Record batch to replicate
Expand Down Expand Up @@ -304,7 +304,7 @@ impl SeafowlDataSyncWriter {
let full_schema = TableProvider::schema(&table);

// Generate a qualifier expression for pruning partition files and filtering the base scan
let qualifier = construct_qualifier(full_schema.clone(), entry)?;
let qualifier = construct_qualifier(&entry.syncs)?;

// Iterate through all syncs for this table and construct a full plan by applying each
// individual sync
Expand Down

0 comments on commit 0596a2e

Please sign in to comment.