Skip to content

Commit

Permalink
feat(tmp): changes to enable ParquetSink poc
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Feb 27, 2024
1 parent 441a435 commit cf54ed5
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 19 deletions.
50 changes: 31 additions & 19 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use parquet::arrow::arrow_writer::{
ArrowLeafColumn,
};
use parquet::file::writer::SerializedFileWriter;
use parquet::format::FileMetaData;
use std::any::Any;
use std::fmt;
use std::fmt::Debug;
Expand Down Expand Up @@ -540,6 +541,8 @@ async fn fetch_statistics(
pub struct ParquetSink {
/// Config options for writing data
config: FileSinkConfig,
/// File metadata from successfully produced parquet files.
written: Arc<parking_lot::Mutex<Vec<FileMetaData>>>,
}

impl Debug for ParquetSink {
Expand All @@ -563,13 +566,22 @@ impl DisplayAs for ParquetSink {
impl ParquetSink {
/// Create from config.
pub fn new(config: FileSinkConfig) -> Self {
Self { config }
Self {
config,
written: Default::default(),
}
}

/// Retrieve the inner [`FileSinkConfig`].
pub fn config(&self) -> &FileSinkConfig {
&self.config
}

/// Retrieve the file metadata from the last write.
pub fn written(&self) -> Vec<FileMetaData> {
self.written.lock().clone()
}

/// Converts table schema to writer schema, which may differ in the case
/// of hive style partitioning where some columns are removed from the
/// underlying files.
Expand Down Expand Up @@ -667,8 +679,10 @@ impl DataSink for ParquetSink {
"parquet".into(),
);

let mut file_write_tasks: JoinSet<std::result::Result<usize, DataFusionError>> =
JoinSet::new();
let mut file_write_tasks: JoinSet<
std::result::Result<FileMetaData, DataFusionError>,
> = JoinSet::new();

while let Some((path, mut rx)) = file_stream_rx.recv().await {
if !allow_single_file_parallelism {
let mut writer = self
Expand All @@ -679,13 +693,10 @@ impl DataSink for ParquetSink {
)
.await?;
file_write_tasks.spawn(async move {
let mut row_count = 0;
while let Some(batch) = rx.recv().await {
row_count += batch.num_rows();
writer.write(&batch).await?;
}
writer.close().await?;
Ok(row_count)
writer.close().await.map_err(DataFusionError::ParquetError)
});
} else {
let writer = create_writer(
Expand Down Expand Up @@ -716,7 +727,11 @@ impl DataSink for ParquetSink {
while let Some(result) = file_write_tasks.join_next().await {
match result {
Ok(r) => {
row_count += r?;
let r = r?;
row_count += r.num_rows;
let mut written_files = self.written.lock();
written_files.push(r);
drop(written_files);
}
Err(e) => {
if e.is_panic() {
Expand Down Expand Up @@ -935,7 +950,7 @@ async fn concatenate_parallel_row_groups(
schema: Arc<Schema>,
writer_props: Arc<WriterProperties>,
mut object_store_writer: AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>,
) -> Result<usize> {
) -> Result<FileMetaData> {
let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);

let schema_desc = arrow_to_parquet_schema(schema.as_ref())?;
Expand All @@ -945,15 +960,12 @@ async fn concatenate_parallel_row_groups(
writer_props,
)?;

let mut row_count = 0;

while let Some(handle) = serialize_rx.recv().await {
let join_result = handle.await;
match join_result {
Ok(result) => {
let mut rg_out = parquet_writer.next_row_group()?;
let (serialized_columns, cnt) = result?;
row_count += cnt;
let (serialized_columns, _cnt) = result?;
for chunk in serialized_columns {
chunk.append_to_row_group(&mut rg_out)?;
let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
Expand All @@ -976,13 +988,13 @@ async fn concatenate_parallel_row_groups(
}
}

let inner_writer = parquet_writer.into_inner()?;
let final_buff = inner_writer.buffer.try_lock().unwrap();
let file_metadata = parquet_writer.close()?;
let final_buff = merged_buff.buffer.try_lock().unwrap();

object_store_writer.write_all(final_buff.as_slice()).await?;
object_store_writer.shutdown().await?;

Ok(row_count)
Ok(file_metadata)
}

/// Parallelizes the serialization of a single parquet file, by first serializing N
Expand All @@ -995,7 +1007,7 @@ async fn output_single_parquet_file_parallelized(
output_schema: Arc<Schema>,
parquet_props: &WriterProperties,
parallel_options: ParallelParquetWriterOptions,
) -> Result<usize> {
) -> Result<FileMetaData> {
let max_rowgroups = parallel_options.max_parallel_row_groups;
// Buffer size of this channel limits maximum number of RowGroups being worked on in parallel
let (serialize_tx, serialize_rx) =
Expand All @@ -1009,7 +1021,7 @@ async fn output_single_parquet_file_parallelized(
arc_props.clone(),
parallel_options,
);
let row_count = concatenate_parallel_row_groups(
let file_metadata = concatenate_parallel_row_groups(
serialize_rx,
output_schema.clone(),
arc_props.clone(),
Expand All @@ -1029,7 +1041,7 @@ async fn output_single_parquet_file_parallelized(
}
};

Ok(row_count)
Ok(file_metadata)
}

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions datafusion/execution/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ impl ObjectStoreUrl {
pub fn as_str(&self) -> &str {
self.as_ref()
}

/// Returns as Url
pub fn as_url(&self) -> &Url {
&self.url
}
}

impl AsRef<str> for ObjectStoreUrl {
Expand Down

0 comments on commit cf54ed5

Please sign in to comment.