From 9d48887868af782c7e06c082a596b5bc1ad48ede Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Sun, 7 Nov 2021 13:50:03 +0800 Subject: [PATCH 1/2] implement copy-from-file executor --- Cargo.lock | 1 + Cargo.toml | 1 + src/array/mod.rs | 39 ++++++++- src/executor/copy_from_file.rs | 115 +++++++++++++++++++++++++ src/executor/mod.rs | 10 +++ src/physical_planner/copy_from_file.rs | 28 ++++++ src/physical_planner/mod.rs | 2 + 7 files changed, 193 insertions(+), 3 deletions(-) create mode 100644 src/executor/copy_from_file.rs create mode 100644 src/physical_planner/copy_from_file.rs diff --git a/Cargo.lock b/Cargo.lock index bb26b509..7d614cb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1254,6 +1254,7 @@ dependencies = [ "bytes", "core_simd", "criterion", + "csv", "enum_dispatch", "env_logger", "futures", diff --git a/Cargo.toml b/Cargo.toml index 2f3e8f11..e3adbdd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ async-trait = "0.1" bitvec = {version = "0.22", features = ["serde"]} bytes = "1" core_simd = {git = "https://github.com/rust-lang/portable-simd", rev = "c2f59483f96cf1ab1e92cf10e0f9094432a8374c", optional = true} +csv = "1.1" enum_dispatch = "0.3" env_logger = "0.9" futures = {version = "0.3", default-features = false, features = ["alloc"]} diff --git a/src/array/mod.rs b/src/array/mod.rs index b0da1e10..86d67f4c 100644 --- a/src/array/mod.rs +++ b/src/array/mod.rs @@ -1,4 +1,4 @@ -use crate::types::{DataType, DataTypeKind, DataValue}; +use crate::types::{ConvertError, DataType, DataTypeKind, DataValue}; use serde::{Deserialize, Serialize}; use std::convert::TryFrom; use std::ops::{Bound, RangeBounds}; @@ -208,9 +208,11 @@ impl ArrayBuilderImpl { match ty.kind() { DataTypeKind::Boolean => Self::Bool(PrimitiveArrayBuilder::::new(0)), DataTypeKind::Int => Self::Int32(PrimitiveArrayBuilder::::new(0)), + DataTypeKind::BigInt => Self::Int64(PrimitiveArrayBuilder::::new(0)), DataTypeKind::Double => Self::Float64(PrimitiveArrayBuilder::::new(0)), - DataTypeKind::Char(_) => Self::UTF8(UTF8ArrayBuilder::new(0)), - DataTypeKind::Varchar(_) => Self::UTF8(UTF8ArrayBuilder::new(0)), + DataTypeKind::Char(_) | DataTypeKind::Varchar(_) | DataTypeKind::String => { + Self::UTF8(UTF8ArrayBuilder::new(0)) + } _ => panic!("unsupported data type"), } } @@ -222,6 +224,7 @@ impl ArrayBuilderImpl { DataValue::Int32(_) => Self::Int32(PrimitiveArrayBuilder::::new(0)), DataValue::Int64(_) => Self::Int64(PrimitiveArrayBuilder::::new(0)), DataValue::Float64(_) => Self::Float64(PrimitiveArrayBuilder::::new(0)), + DataValue::String(_) => Self::UTF8(UTF8ArrayBuilder::new(0)), _ => panic!("unsupported data type"), } } @@ -254,6 +257,36 @@ impl ArrayBuilderImpl { } } + /// Appends an element in string. + pub fn push_str(&mut self, s: &str) -> Result<(), ConvertError> { + let null = s.is_empty(); + match self { + Self::Bool(a) if null => a.push(None), + Self::Int32(a) if null => a.push(None), + Self::Int64(a) if null => a.push(None), + Self::Float64(a) if null => a.push(None), + Self::UTF8(a) if null => a.push(None), + Self::Bool(a) => a.push(Some( + &s.parse::() + .map_err(|e| ConvertError::ParseBool(s.to_string(), e))?, + )), + Self::Int32(a) => a.push(Some( + &s.parse::() + .map_err(|e| ConvertError::ParseInt(s.to_string(), e))?, + )), + Self::Int64(a) => a.push(Some( + &s.parse::() + .map_err(|e| ConvertError::ParseInt(s.to_string(), e))?, + )), + Self::Float64(a) => a.push(Some( + &s.parse::() + .map_err(|e| ConvertError::ParseFloat(s.to_string(), e))?, + )), + Self::UTF8(a) => a.push(Some(s)), + } + Ok(()) + } + /// Finish build and return a new array. pub fn finish(self) -> ArrayImpl { match self { diff --git a/src/executor/copy_from_file.rs b/src/executor/copy_from_file.rs new file mode 100644 index 00000000..fc72149b --- /dev/null +++ b/src/executor/copy_from_file.rs @@ -0,0 +1,115 @@ +use super::*; +use crate::{ + array::ArrayBuilderImpl, + physical_planner::{FileFormat, PhysicalCopyFromFile}, +}; +use std::fs::File; + +/// The executor of loading file data. +pub struct CopyFromFileExecutor { + plan: PhysicalCopyFromFile, +} + +impl CopyFromFileExecutor { + pub fn execute(self) -> impl Stream> { + try_stream! { + let chunk = tokio::task::spawn_blocking(|| self.read_file_blocking()).await.unwrap()?; + yield chunk; + } + } + + // TODO(wrj): process a window at a time + fn read_file_blocking(self) -> Result { + let mut array_builders = self + .plan + .column_types + .iter() + .map(ArrayBuilderImpl::new) + .collect::>(); + + let file = File::open(&self.plan.path)?; + let mut reader = match self.plan.format { + FileFormat::Csv { + delimiter, + quote, + escape, + header, + } => csv::ReaderBuilder::new() + .delimiter(delimiter) + .quote(quote) + .escape(escape) + .has_headers(header) + .from_reader(file), + }; + + for result in reader.records() { + let record = result?; + if record.len() != array_builders.len() { + return Err(ExecutorError::LengthMismatch { + expected: array_builders.len(), + actual: record.len(), + }); + } + for ((s, builder), ty) in record + .iter() + .zip(&mut array_builders) + .zip(&self.plan.column_types) + { + if !ty.is_nullable() && s.is_empty() { + return Err(ExecutorError::NotNullable); + } + builder.push_str(s)?; + } + } + let chunk = array_builders + .into_iter() + .map(|builder| builder.finish()) + .collect(); + Ok(chunk) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + array::ArrayImpl, + types::{DataTypeExt, DataTypeKind}, + }; + use std::io::Write; + + #[tokio::test] + async fn read_csv() { + let csv = "1,1.5,one\n2,2.5,two\n"; + + let mut file = tempfile::NamedTempFile::new().expect("failed to create temp file"); + write!(file, "{}", csv).expect("failed to write file"); + + let executor = CopyFromFileExecutor { + plan: PhysicalCopyFromFile { + path: file.path().into(), + format: FileFormat::Csv { + delimiter: b',', + quote: b'"', + escape: None, + header: false, + }, + column_types: vec![ + DataTypeKind::Int.not_null(), + DataTypeKind::Double.not_null(), + DataTypeKind::String.not_null(), + ], + }, + }; + let actual = executor.execute().boxed().next().await.unwrap().unwrap(); + + let expected: DataChunk = [ + ArrayImpl::Int32([1, 2].into_iter().collect()), + ArrayImpl::Float64([1.5, 2.5].into_iter().collect()), + ArrayImpl::UTF8(["one", "two"].iter().map(Some).collect()), + ] + .into_iter() + .collect(); + assert_eq!(actual, expected); + } +} diff --git a/src/executor/mod.rs b/src/executor/mod.rs index 26a0fd76..f5fda952 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -21,6 +21,7 @@ use futures::stream::{BoxStream, Stream, StreamExt}; use std::sync::Arc; mod aggregation; +mod copy_from_file; mod create; mod delete; mod drop; @@ -37,6 +38,7 @@ mod seq_scan; mod values; pub use self::aggregation::*; +use self::copy_from_file::*; use self::create::*; use self::delete::*; use self::drop::*; @@ -60,6 +62,14 @@ pub enum ExecutorError { Storage(#[from] StorageError), #[error("conversion error: {0}")] Convert(#[from] ConvertError), + #[error("tuple length mismatch: expected {expected} but got {actual}")] + LengthMismatch { expected: usize, actual: usize }, + #[error("io error")] + Io(#[from] std::io::Error), + #[error("csv error")] + Csv(#[from] csv::Error), + #[error("value can not be null")] + NotNullable, } /// Reference type of the global environment. diff --git a/src/physical_planner/copy_from_file.rs b/src/physical_planner/copy_from_file.rs new file mode 100644 index 00000000..07af42c4 --- /dev/null +++ b/src/physical_planner/copy_from_file.rs @@ -0,0 +1,28 @@ +use crate::types::DataType; +use std::path::PathBuf; + +/// The physical plan of `copy`. +#[derive(Debug, PartialEq, Clone)] +pub struct PhysicalCopyFromFile { + /// The file path to copy from. + pub path: PathBuf, + /// The file format. + pub format: FileFormat, + /// The column types. + pub column_types: Vec, +} + +/// File format. +#[derive(Debug, PartialEq, Clone)] +pub enum FileFormat { + Csv { + /// Delimiter to parse. + delimiter: u8, + /// Quote to use. + quote: u8, + /// Escape character to use. + escape: Option, + /// Whether or not the file has a header line. + header: bool, + }, +} diff --git a/src/physical_planner/mod.rs b/src/physical_planner/mod.rs index fd2bf0f6..96f0e352 100644 --- a/src/physical_planner/mod.rs +++ b/src/physical_planner/mod.rs @@ -1,3 +1,4 @@ +mod copy_from_file; mod create; mod delete; mod drop; @@ -10,6 +11,7 @@ mod order; mod projection; mod seq_scan; +pub use copy_from_file::*; pub use create::*; pub use delete::*; pub use drop::*; From 7b0dd9dabcbf483ca32789c13f3960a50ef3d20f Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Sun, 7 Nov 2021 21:39:12 +0800 Subject: [PATCH 2/2] implement copy-to-file executor --- src/array/data_chunk.rs | 5 ++ src/executor/copy_from_file.rs | 3 + src/executor/copy_to_file.rs | 97 ++++++++++++++++++++++++++++ src/executor/mod.rs | 4 +- src/physical_planner/copy_to_file.rs | 16 +++++ src/physical_planner/mod.rs | 2 + 6 files changed, 126 insertions(+), 1 deletion(-) create mode 100644 src/executor/copy_to_file.rs create mode 100644 src/physical_planner/copy_to_file.rs diff --git a/src/array/data_chunk.rs b/src/array/data_chunk.rs index 630a6468..7a0dacb1 100644 --- a/src/array/data_chunk.rs +++ b/src/array/data_chunk.rs @@ -53,6 +53,11 @@ impl DataChunk { &self.arrays[idx] } + /// Get all arrays. + pub fn arrays(&self) -> &[ArrayImpl] { + &self.arrays + } + /// Filter elements and create a new chunk. pub fn filter(&self, visibility: impl Iterator + Clone) -> Self { let arrays = self diff --git a/src/executor/copy_from_file.rs b/src/executor/copy_from_file.rs index fc72149b..e6e1781f 100644 --- a/src/executor/copy_from_file.rs +++ b/src/executor/copy_from_file.rs @@ -1,3 +1,6 @@ +// TODO(wrj): remove this once linked to plan +#![allow(dead_code)] + use super::*; use crate::{ array::ArrayBuilderImpl, diff --git a/src/executor/copy_to_file.rs b/src/executor/copy_to_file.rs new file mode 100644 index 00000000..d2e05a17 --- /dev/null +++ b/src/executor/copy_to_file.rs @@ -0,0 +1,97 @@ +// TODO(wrj): remove this once linked to plan +#![allow(dead_code)] + +use super::*; +use crate::physical_planner::FileFormat; +use std::{fs::File, path::PathBuf}; +use tokio::sync::mpsc; + +/// The executor of saving data to file. +pub struct CopyToFileExecutor { + pub path: PathBuf, + pub format: FileFormat, + pub child: BoxedExecutor, +} + +impl CopyToFileExecutor { + pub fn execute(self) -> impl Stream> { + try_stream! { + let Self { path, format, child } = self; + let (sender, recver) = mpsc::channel(1); + let writer = tokio::task::spawn_blocking(move || Self::write_file_blocking(path, format, recver)); + for await batch in child { + sender.send(batch?).await.unwrap(); + } + drop(sender); + writer.await.unwrap()?; + yield DataChunk::single(1); + } + } + + fn write_file_blocking( + path: PathBuf, + format: FileFormat, + mut recver: mpsc::Receiver, + ) -> Result<(), ExecutorError> { + let file = File::create(&path)?; + let mut writer = match format { + FileFormat::Csv { + delimiter, + quote, + escape, + header, + } => csv::WriterBuilder::new() + .delimiter(delimiter) + .quote(quote) + .escape(escape.unwrap_or(b'\\')) + .has_headers(header) + .from_writer(file), + }; + + while let Some(chunk) = recver.blocking_recv() { + for i in 0..chunk.cardinality() { + // TODO(wrj): avoid dynamic memory allocation (String) + let row = chunk.arrays().iter().map(|a| a.get_to_string(i)); + writer.write_record(row)?; + } + writer.flush()?; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::array::ArrayImpl; + + #[tokio::test] + async fn write_csv() { + let file = tempfile::NamedTempFile::new().expect("failed to create temp file"); + + let executor = CopyToFileExecutor { + path: file.path().into(), + format: FileFormat::Csv { + delimiter: b',', + quote: b'"', + escape: None, + header: false, + }, + child: try_stream! { + yield [ + ArrayImpl::Int32([1, 2].into_iter().collect()), + ArrayImpl::Float64([1.5, 2.5].into_iter().collect()), + ArrayImpl::UTF8(["one", "two"].iter().map(Some).collect()), + ] + .into_iter() + .collect(); + } + .boxed(), + }; + executor.execute().boxed().next().await.unwrap().unwrap(); + + let actual = std::fs::read_to_string(file.path()).unwrap(); + let expected = "1,1.5,one\n2,2.5,two\n"; + assert_eq!(actual, expected); + } +} diff --git a/src/executor/mod.rs b/src/executor/mod.rs index f5fda952..85e513f3 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -22,6 +22,7 @@ use std::sync::Arc; mod aggregation; mod copy_from_file; +mod copy_to_file; mod create; mod delete; mod drop; @@ -38,7 +39,8 @@ mod seq_scan; mod values; pub use self::aggregation::*; -use self::copy_from_file::*; +// use self::copy_from_file::*; +// use self::copy_to_file::*; use self::create::*; use self::delete::*; use self::drop::*; diff --git a/src/physical_planner/copy_to_file.rs b/src/physical_planner/copy_to_file.rs new file mode 100644 index 00000000..41b11d2c --- /dev/null +++ b/src/physical_planner/copy_to_file.rs @@ -0,0 +1,16 @@ +use super::{FileFormat, PhysicalPlan}; +use crate::types::DataType; +use std::path::PathBuf; + +/// The physical plan of `copy`. +#[derive(Debug, PartialEq, Clone)] +pub struct PhysicalCopyToFile { + /// The file path to copy to. + pub path: PathBuf, + /// The file format. + pub format: FileFormat, + /// The column types. + pub column_types: Vec, + /// The child plan. + pub child: PhysicalPlan, +} diff --git a/src/physical_planner/mod.rs b/src/physical_planner/mod.rs index 96f0e352..144a642b 100644 --- a/src/physical_planner/mod.rs +++ b/src/physical_planner/mod.rs @@ -1,4 +1,5 @@ mod copy_from_file; +mod copy_to_file; mod create; mod delete; mod drop; @@ -12,6 +13,7 @@ mod projection; mod seq_scan; pub use copy_from_file::*; +pub use copy_to_file::*; pub use create::*; pub use delete::*; pub use drop::*;