Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 104 additions & 83 deletions wrappers/src/fdw/iceberg_fdw/iceberg_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub(crate) struct IcebergFdw {
catalog: Box<dyn Catalog>,
table: Option<Table>,
predicate: Option<Predicate>,
batch_size: Option<usize>,
batch_size: usize,

// copy of target columns
tgt_cols: Vec<Column>,
Expand All @@ -65,6 +65,9 @@ pub(crate) struct IcebergFdw {
num_rows: usize,
bytes_in: usize,

// for insertion: if it is a direct insertion, i.e. no partitioning and no sorting
is_direct_insert: bool,

// for insertion: buffer for accumulating input rows before sorting
input_rows: Vec<InputRow>,
}
Expand Down Expand Up @@ -139,7 +142,7 @@ impl IcebergFdw {
let mut scan_builder = table
.scan()
.select(self.tgt_cols.iter().map(|c| c.name.clone()))
.with_batch_size(self.batch_size);
.with_batch_size(Some(self.batch_size));
if let Some(predicate) = &self.predicate {
scan_builder = scan_builder.with_filter(predicate.clone());
}
Expand Down Expand Up @@ -278,6 +281,86 @@ impl IcebergFdw {

Ok(record_batch)
}

fn write_rows_to_iceberg(&mut self) -> IcebergFdwResult<()> {
// only write if we have rows
if self.input_rows.is_empty() {
return Ok(());
}

// clone the table to avoid borrowing conflicts
let table = match &self.table {
Some(table) => table.clone(),
None => return Ok(()),
};

let metadata = table.metadata();
let schema = metadata.current_schema();

// sort input_rows by partition column values
let sorted_rows = self.sort_rows_by_partition(metadata, schema)?;

// build record batch from sorted rows
let record_batch = self.build_record_batch_from_rows(schema, &sorted_rows)?;

// split the record batch by partition values
let partition_batches = utils::split_record_batch_by_partition(metadata, record_batch)?;

let mut data_files = Vec::new();

// write each partition batch separately
for partition_batch in partition_batches.iter() {
let location_generator = LocationGenerator::new(metadata, partition_batch)?;
let file_name_generator = FileNameGenerator::new(DataFileFormat::Parquet);

// get partition value from location generator
let partition_value = location_generator.partition_value();

let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::default(),
schema.clone(),
table.file_io().clone(),
location_generator,
file_name_generator,
);
let data_file_writer_builder = DataFileWriterBuilder::new(
parquet_writer_builder,
partition_value,
metadata.default_partition_spec().spec_id(),
);
let mut data_file_writer = self.rt.block_on(data_file_writer_builder.build())?;

// write the record batch to Iceberg and close the writer and get
// the data file
self.rt
.block_on(data_file_writer.write(partition_batch.clone()))?;
let mut part_data_files = self.rt.block_on(data_file_writer.close())?;

data_files.append(&mut part_data_files);
}

// create transaction and commit the changes to update table metadata
let tx = Transaction::new(&table);
let append_action = tx.fast_append().add_data_files(data_files.clone());
let tx = append_action.apply(tx)?;
let updated_table = self.rt.block_on(tx.commit(self.catalog.as_ref()))?;

// update the cached table reference with the new metadata
self.table = Some(updated_table);

if cfg!(debug_assertions) {
for data_file in &data_files {
report_info(&format!(
"Data file: {}, records: {}, size: {} bytes",
data_file.file_path(),
data_file.record_count(),
data_file.file_size_in_bytes()
));
}
}

Ok(())
}
}

impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {
Expand Down Expand Up @@ -346,14 +429,15 @@ impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {
catalog,
table: None,
predicate: None,
batch_size: batch_size.into(),
batch_size,
tgt_cols: Vec::new(),
stream: None,
row_data: VecDeque::new(),
src_fields: Vec::new(),
mapper: Mapper::default(),
num_rows: 0,
bytes_in: 0,
is_direct_insert: false,
input_rows: Vec::new(),
})
}
Expand Down Expand Up @@ -416,10 +500,12 @@ impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {

fn begin_modify(&mut self, options: &HashMap<String, String>) -> IcebergFdwResult<()> {
let tbl_ident = TableIdent::from_strs(require_option("table", options)?.split("."))?;
self.table = self
.rt
.block_on(self.catalog.load_table(&tbl_ident))?
.into();
let table = self.rt.block_on(self.catalog.load_table(&tbl_ident))?;
let metadata = table.metadata();

self.is_direct_insert = metadata.default_partition_spec().is_unpartitioned()
&& metadata.default_sort_order().is_unsorted();
self.table = table.into();
self.input_rows.clear();

Ok(())
Expand All @@ -431,86 +517,21 @@ impl ForeignDataWrapper<IcebergFdwError> for IcebergFdw {
cells: src.cells.clone(),
});

// Direct insert optimization: when the table has no partitioning and no sorting,
// rows can be written in batches to avoid buffering the entire dataset in memory.
// Each batch is written to a separate parquet file, so larger batch sizes are
// recommended for better performance when inserting large datasets.
if self.is_direct_insert && self.input_rows.len() >= self.batch_size {
self.write_rows_to_iceberg()?;
self.input_rows.clear();
}

Ok(())
}

fn end_modify(&mut self) -> IcebergFdwResult<()> {
// only write if we have rows
if self.input_rows.is_empty() {
return Ok(());
}

// clone the table to avoid borrowing conflicts
let table = match &self.table {
Some(table) => table.clone(),
None => return Ok(()),
};

let metadata = table.metadata();
let schema = metadata.current_schema();

// sort input_rows by partition column values
let sorted_rows = self.sort_rows_by_partition(metadata, schema)?;

// build record batch from sorted rows
let record_batch = self.build_record_batch_from_rows(schema, &sorted_rows)?;

// split the record batch by partition values
let partition_batches = utils::split_record_batch_by_partition(metadata, record_batch)?;

let mut data_files = Vec::new();

// write each partition batch separately
for partition_batch in partition_batches.iter() {
let location_generator = LocationGenerator::new(metadata, partition_batch)?;
let file_name_generator = FileNameGenerator::new(DataFileFormat::Parquet);

// get partition value from location generator
let partition_value = location_generator.partition_value();

let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::default(),
schema.clone(),
table.file_io().clone(),
location_generator,
file_name_generator,
);
let data_file_writer_builder = DataFileWriterBuilder::new(
parquet_writer_builder,
partition_value,
metadata.default_partition_spec().spec_id(),
);
let mut data_file_writer = self.rt.block_on(data_file_writer_builder.build())?;

// write the record batch to Iceberg and close the writer and get
// the data file
self.rt
.block_on(data_file_writer.write(partition_batch.clone()))?;
let mut part_data_files = self.rt.block_on(data_file_writer.close())?;

data_files.append(&mut part_data_files);
}

// create transaction and commit the changes to update table metadata
let tx = Transaction::new(&table);
let append_action = tx.fast_append().add_data_files(data_files.clone());
let tx = append_action.apply(tx)?;
let updated_table = self.rt.block_on(tx.commit(self.catalog.as_ref()))?;

// update the cached table reference with the new metadata
self.table = Some(updated_table);

if cfg!(debug_assertions) {
for data_file in &data_files {
report_info(&format!(
"Data file: {}, records: {}, size: {} bytes",
data_file.file_path(),
data_file.record_count(),
data_file.file_size_in_bytes()
));
}
}

self.write_rows_to_iceberg()?;
self.input_rows.clear();
Ok(())
}

Expand Down
Loading