Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(executor): support reading and writing csv files #112

Merged
merged 2 commits into from
Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ async-trait = "0.1"
bitvec = {version = "0.22", features = ["serde"]}
bytes = "1"
core_simd = {git = "https://github.com/rust-lang/portable-simd", rev = "c2f59483f96cf1ab1e92cf10e0f9094432a8374c", optional = true}
csv = "1.1"
enum_dispatch = "0.3"
env_logger = "0.9"
futures = {version = "0.3", default-features = false, features = ["alloc"]}
Expand Down
5 changes: 5 additions & 0 deletions src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ impl DataChunk {
&self.arrays[idx]
}

/// Get all arrays.
pub fn arrays(&self) -> &[ArrayImpl] {
&self.arrays
}

/// Filter elements and create a new chunk.
pub fn filter(&self, visibility: impl Iterator<Item = bool> + Clone) -> Self {
let arrays = self
Expand Down
39 changes: 36 additions & 3 deletions src/array/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::types::{DataType, DataTypeKind, DataValue};
use crate::types::{ConvertError, DataType, DataTypeKind, DataValue};
use serde::{Deserialize, Serialize};
use std::convert::TryFrom;
use std::ops::{Bound, RangeBounds};
Expand Down Expand Up @@ -208,9 +208,11 @@ impl ArrayBuilderImpl {
match ty.kind() {
DataTypeKind::Boolean => Self::Bool(PrimitiveArrayBuilder::<bool>::new(0)),
DataTypeKind::Int => Self::Int32(PrimitiveArrayBuilder::<i32>::new(0)),
DataTypeKind::BigInt => Self::Int64(PrimitiveArrayBuilder::<i64>::new(0)),
DataTypeKind::Double => Self::Float64(PrimitiveArrayBuilder::<f64>::new(0)),
DataTypeKind::Char(_) => Self::UTF8(UTF8ArrayBuilder::new(0)),
DataTypeKind::Varchar(_) => Self::UTF8(UTF8ArrayBuilder::new(0)),
DataTypeKind::Char(_) | DataTypeKind::Varchar(_) | DataTypeKind::String => {
Self::UTF8(UTF8ArrayBuilder::new(0))
}
_ => panic!("unsupported data type"),
}
}
Expand All @@ -222,6 +224,7 @@ impl ArrayBuilderImpl {
DataValue::Int32(_) => Self::Int32(PrimitiveArrayBuilder::<i32>::new(0)),
DataValue::Int64(_) => Self::Int64(PrimitiveArrayBuilder::<i64>::new(0)),
DataValue::Float64(_) => Self::Float64(PrimitiveArrayBuilder::<f64>::new(0)),
DataValue::String(_) => Self::UTF8(UTF8ArrayBuilder::new(0)),
_ => panic!("unsupported data type"),
}
}
Expand Down Expand Up @@ -254,6 +257,36 @@ impl ArrayBuilderImpl {
}
}

/// Appends an element in string.
pub fn push_str(&mut self, s: &str) -> Result<(), ConvertError> {
let null = s.is_empty();
match self {
Self::Bool(a) if null => a.push(None),
Self::Int32(a) if null => a.push(None),
Self::Int64(a) if null => a.push(None),
Self::Float64(a) if null => a.push(None),
Self::UTF8(a) if null => a.push(None),
Self::Bool(a) => a.push(Some(
&s.parse::<bool>()
.map_err(|e| ConvertError::ParseBool(s.to_string(), e))?,
)),
Self::Int32(a) => a.push(Some(
&s.parse::<i32>()
.map_err(|e| ConvertError::ParseInt(s.to_string(), e))?,
)),
Self::Int64(a) => a.push(Some(
&s.parse::<i64>()
.map_err(|e| ConvertError::ParseInt(s.to_string(), e))?,
)),
Self::Float64(a) => a.push(Some(
&s.parse::<f64>()
.map_err(|e| ConvertError::ParseFloat(s.to_string(), e))?,
)),
Self::UTF8(a) => a.push(Some(s)),
}
Ok(())
}

/// Finish build and return a new array.
pub fn finish(self) -> ArrayImpl {
match self {
Expand Down
118 changes: 118 additions & 0 deletions src/executor/copy_from_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// TODO(wrj): remove this once linked to plan
#![allow(dead_code)]

use super::*;
use crate::{
array::ArrayBuilderImpl,
physical_planner::{FileFormat, PhysicalCopyFromFile},
};
use std::fs::File;

/// The executor of loading file data.
pub struct CopyFromFileExecutor {
plan: PhysicalCopyFromFile,
}

impl CopyFromFileExecutor {
pub fn execute(self) -> impl Stream<Item = Result<DataChunk, ExecutorError>> {
try_stream! {
let chunk = tokio::task::spawn_blocking(|| self.read_file_blocking()).await.unwrap()?;
yield chunk;
}
}

// TODO(wrj): process a window at a time
fn read_file_blocking(self) -> Result<DataChunk, ExecutorError> {
let mut array_builders = self
.plan
.column_types
.iter()
.map(ArrayBuilderImpl::new)
.collect::<Vec<ArrayBuilderImpl>>();

let file = File::open(&self.plan.path)?;
let mut reader = match self.plan.format {
FileFormat::Csv {
delimiter,
quote,
escape,
header,
} => csv::ReaderBuilder::new()
.delimiter(delimiter)
.quote(quote)
.escape(escape)
.has_headers(header)
.from_reader(file),
};

for result in reader.records() {
let record = result?;
if record.len() != array_builders.len() {
return Err(ExecutorError::LengthMismatch {
expected: array_builders.len(),
actual: record.len(),
});
}
for ((s, builder), ty) in record
.iter()
.zip(&mut array_builders)
.zip(&self.plan.column_types)
{
if !ty.is_nullable() && s.is_empty() {
return Err(ExecutorError::NotNullable);
}
builder.push_str(s)?;
}
}
let chunk = array_builders
.into_iter()
.map(|builder| builder.finish())
.collect();
Ok(chunk)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{
array::ArrayImpl,
types::{DataTypeExt, DataTypeKind},
};
use std::io::Write;

#[tokio::test]
async fn read_csv() {
let csv = "1,1.5,one\n2,2.5,two\n";

let mut file = tempfile::NamedTempFile::new().expect("failed to create temp file");
write!(file, "{}", csv).expect("failed to write file");

let executor = CopyFromFileExecutor {
plan: PhysicalCopyFromFile {
path: file.path().into(),
format: FileFormat::Csv {
delimiter: b',',
quote: b'"',
escape: None,
header: false,
},
column_types: vec![
DataTypeKind::Int.not_null(),
DataTypeKind::Double.not_null(),
DataTypeKind::String.not_null(),
],
},
};
let actual = executor.execute().boxed().next().await.unwrap().unwrap();

let expected: DataChunk = [
ArrayImpl::Int32([1, 2].into_iter().collect()),
ArrayImpl::Float64([1.5, 2.5].into_iter().collect()),
ArrayImpl::UTF8(["one", "two"].iter().map(Some).collect()),
]
.into_iter()
.collect();
assert_eq!(actual, expected);
}
}
97 changes: 97 additions & 0 deletions src/executor/copy_to_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// TODO(wrj): remove this once linked to plan
#![allow(dead_code)]

use super::*;
use crate::physical_planner::FileFormat;
use std::{fs::File, path::PathBuf};
use tokio::sync::mpsc;

/// The executor of saving data to file.
pub struct CopyToFileExecutor {
pub path: PathBuf,
pub format: FileFormat,
pub child: BoxedExecutor,
}

impl CopyToFileExecutor {
pub fn execute(self) -> impl Stream<Item = Result<DataChunk, ExecutorError>> {
try_stream! {
let Self { path, format, child } = self;
let (sender, recver) = mpsc::channel(1);
let writer = tokio::task::spawn_blocking(move || Self::write_file_blocking(path, format, recver));
for await batch in child {
sender.send(batch?).await.unwrap();
}
drop(sender);
writer.await.unwrap()?;
yield DataChunk::single(1);
}
}

fn write_file_blocking(
path: PathBuf,
format: FileFormat,
mut recver: mpsc::Receiver<DataChunk>,
) -> Result<(), ExecutorError> {
let file = File::create(&path)?;
let mut writer = match format {
FileFormat::Csv {
delimiter,
quote,
escape,
header,
} => csv::WriterBuilder::new()
.delimiter(delimiter)
.quote(quote)
.escape(escape.unwrap_or(b'\\'))
.has_headers(header)
.from_writer(file),
};

while let Some(chunk) = recver.blocking_recv() {
for i in 0..chunk.cardinality() {
// TODO(wrj): avoid dynamic memory allocation (String)
let row = chunk.arrays().iter().map(|a| a.get_to_string(i));
writer.write_record(row)?;
}
writer.flush()?;
}
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::array::ArrayImpl;

#[tokio::test]
async fn write_csv() {
let file = tempfile::NamedTempFile::new().expect("failed to create temp file");

let executor = CopyToFileExecutor {
path: file.path().into(),
format: FileFormat::Csv {
delimiter: b',',
quote: b'"',
escape: None,
header: false,
},
child: try_stream! {
yield [
ArrayImpl::Int32([1, 2].into_iter().collect()),
ArrayImpl::Float64([1.5, 2.5].into_iter().collect()),
ArrayImpl::UTF8(["one", "two"].iter().map(Some).collect()),
]
.into_iter()
.collect();
}
.boxed(),
};
executor.execute().boxed().next().await.unwrap().unwrap();

let actual = std::fs::read_to_string(file.path()).unwrap();
let expected = "1,1.5,one\n2,2.5,two\n";
assert_eq!(actual, expected);
}
}
12 changes: 12 additions & 0 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use futures::stream::{BoxStream, Stream, StreamExt};
use std::sync::Arc;

mod aggregation;
mod copy_from_file;
mod copy_to_file;
mod create;
mod delete;
mod drop;
Expand All @@ -37,6 +39,8 @@ mod seq_scan;
mod values;

pub use self::aggregation::*;
// use self::copy_from_file::*;
// use self::copy_to_file::*;
use self::create::*;
use self::delete::*;
use self::drop::*;
Expand All @@ -60,6 +64,14 @@ pub enum ExecutorError {
Storage(#[from] StorageError),
#[error("conversion error: {0}")]
Convert(#[from] ConvertError),
#[error("tuple length mismatch: expected {expected} but got {actual}")]
LengthMismatch { expected: usize, actual: usize },
#[error("io error")]
Io(#[from] std::io::Error),
#[error("csv error")]
Csv(#[from] csv::Error),
#[error("value can not be null")]
NotNullable,
}

/// Reference type of the global environment.
Expand Down
28 changes: 28 additions & 0 deletions src/physical_planner/copy_from_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::types::DataType;
use std::path::PathBuf;

/// The physical plan of `copy`.
#[derive(Debug, PartialEq, Clone)]
pub struct PhysicalCopyFromFile {
/// The file path to copy from.
pub path: PathBuf,
/// The file format.
pub format: FileFormat,
/// The column types.
pub column_types: Vec<DataType>,
}

/// File format.
#[derive(Debug, PartialEq, Clone)]
pub enum FileFormat {
Csv {
/// Delimiter to parse.
delimiter: u8,
/// Quote to use.
quote: u8,
/// Escape character to use.
escape: Option<u8>,
/// Whether or not the file has a header line.
header: bool,
},
}
16 changes: 16 additions & 0 deletions src/physical_planner/copy_to_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use super::{FileFormat, PhysicalPlan};
use crate::types::DataType;
use std::path::PathBuf;

/// The physical plan of `copy`.
#[derive(Debug, PartialEq, Clone)]
pub struct PhysicalCopyToFile {
/// The file path to copy to.
pub path: PathBuf,
/// The file format.
pub format: FileFormat,
/// The column types.
pub column_types: Vec<DataType>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can we know that column types when constructing the plan? Do we need to open that file before actually executing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The column types can be inferred from table catalog or query results, so we don't need to open the file.
btw, I have another question: If we are going to export data to an existing file, should we append or truncate it to empty first?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only allow export to a new file. e.g. OpenOptions::default().create_new(true).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... if exporting to an existing file, we should truncate the content, as it might contain data from other tables. btw, we should warn users before really overwrite a file.

/// The child plan.
pub child: PhysicalPlan,
}