Skip to content

Commit

Permalink
refactor(executor): refactor CopyFromFile using DataChunkBuilder (#637)
Browse files Browse the repository at this point in the history
* refactor(executor): refactor CopyFromFile using DataChunkBuilder

Signed-off-by: noneback <chenlan_of_learn@126.com>

* style: improve code style

Signed-off-by: noneback <chenlan_of_learn@126.com>

* fix: update bar when a chunk is produced

Signed-off-by: noneback <chenlan_of_learn@126.com>
  • Loading branch information
noneback committed Apr 24, 2022
1 parent e1a2686 commit 65372ee
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 48 deletions.
27 changes: 26 additions & 1 deletion src/array/data_chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::iter::IntoIterator;
use itertools::Itertools;

use super::{ArrayBuilderImpl, DataChunk};
use crate::types::{DataType, DataValue};
use crate::types::{ConvertError, DataType, DataValue};

/// A helper struct to build a [`DataChunk`].
///
Expand Down Expand Up @@ -57,6 +57,31 @@ impl DataChunkBuilder {
}
}

/// Push a row of str in the Iterator.
///
/// The row is accepted as an iterator of [`&str`], and it's required that the size of row
/// should be the same as the number of columns.
///
/// It will convert [`&str`] into specified type, and a [`ConvertError`] will be returned if
/// converting fails.
///
/// A [`DataChunk`] will be returned while converting correctly and `size == capacity`.
pub fn push_str_row<'a>(
&mut self,
row: impl IntoIterator<Item = &'a str>,
) -> Result<Option<DataChunk>, ConvertError> {
for (builder, r) in self.array_builders.iter_mut().zip_eq(row) {
builder.push_str(r)?
}

self.size += 1;
if self.size == self.capacity {
Ok(self.take())
} else {
Ok(None)
}
}

/// Generate a [`DataChunk`] with the remaining rows.
///
/// If there are no remaining rows, `None` will be returned.
Expand Down
86 changes: 39 additions & 47 deletions src/executor/copy_from_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::fs::File;
use std::io::BufReader;

use indicatif::{ProgressBar, ProgressStyle};
use itertools::izip;
use tokio::sync::mpsc::Sender;

use super::*;
use crate::array::ArrayBuilderImpl;
use crate::array::DataChunkBuilder;
use crate::binder::FileFormat;
use crate::optimizer::plan_nodes::PhysicalCopyFromFile;

Expand Down Expand Up @@ -75,59 +76,50 @@ impl CopyFromFileExecutor {
};

let column_count = self.plan.logical().column_types().len();
let mut iter = reader.records();
let mut finished = false;
while !finished {
// create array builders
let mut array_builders = self
.plan
.logical()
.column_types()
.iter()
.map(|ty| ArrayBuilderImpl::with_capacity(PROCESSING_WINDOW_SIZE, ty))
.collect_vec();

// read records and push to array builder
for _ in 0..PROCESSING_WINDOW_SIZE {
let record = match iter.next() {
Some(record) => record?,
None => {
finished = true;
break;
}
};
if !(record.len() == column_count
|| record.len() == column_count + 1 && record.get(column_count) == Some(""))
{
return Err(ExecutorError::LengthMismatch {
expected: column_count,
actual: record.len(),
});
}
for ((s, builder), ty) in record
.iter()
.zip(&mut array_builders)
.zip(&self.plan.logical().column_types().to_vec())
{
if !ty.is_nullable() && s.is_empty() {
return Err(ExecutorError::NotNullable);
}
builder.push_str(s)?;
}
}
// update progress bar
bar.set_position(iter.reader().position().byte());

// send data chunk
let chunk: DataChunk = array_builders.into_iter().collect();
// create chunk builder
let mut chunk_builder =
DataChunkBuilder::new(self.plan.logical().column_types(), PROCESSING_WINDOW_SIZE);
let mut size_count = 0;

for record in reader.records() {
// read records and push raw str rows into data chunk builder
let record = record?;

if !(record.len() == column_count
|| record.len() == column_count + 1 && record.get(column_count) == Some(""))
{
return Err(ExecutorError::LengthMismatch {
expected: column_count,
actual: record.len(),
});
}

#[allow(clippy::collapsible_if)]
if chunk.cardinality() > 0 {
let str_row_data: Result<Vec<&str>, _> =
izip!(record.iter(), self.plan.logical().column_types())
.map(|(v, ty)| {
if !ty.is_nullable() && v.is_empty() {
return Err(ExecutorError::NotNullable);
}
Ok(v)
})
.collect();
size_count += record.as_slice().as_bytes().len();

// push a raw str row and send it if necessary
if let Some(chunk) = chunk_builder.push_str_row(str_row_data?)? {
bar.set_position(size_count as u64);
if token.is_cancelled() || tx.blocking_send(chunk).is_err() {
return Err(ExecutorError::Abort);
}
}
}
// send left chunk
if let Some(chunk) = chunk_builder.take() {
if token.is_cancelled() || tx.blocking_send(chunk).is_err() {
return Err(ExecutorError::Abort);
}
}
bar.finish();
Ok(())
}
Expand Down

0 comments on commit 65372ee

Please sign in to comment.