Skip to content

Commit

Permalink
parallel csv writer (#3652)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 10, 2022
1 parent e08e6ec commit 24d0cd0
Show file tree
Hide file tree
Showing 21 changed files with 377 additions and 71 deletions.
12 changes: 6 additions & 6 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
keywords = ["dataframe", "query-engine", "arrow"]
Expand Down Expand Up @@ -242,11 +242,11 @@ bench = [
]

[dependencies]
polars-core = { version = "0.22.5", path = "./polars-core", features = ["docs", "private"], default-features = false }
polars-io = { version = "0.22.5", path = "./polars-io", features = ["private"], default-features = false, optional = true }
polars-lazy = { version = "0.22.5", path = "./polars-lazy", features = ["private"], default-features = false, optional = true }
polars-ops = { version = "0.22.5", path = "./polars-ops" }
polars-time = { version = "0.22.5", path = "./polars-time", default-features = false, optional = true }
polars-core = { version = "0.22.6", path = "./polars-core", features = ["docs", "private"], default-features = false }
polars-io = { version = "0.22.6", path = "./polars-io", features = ["private"], default-features = false, optional = true }
polars-lazy = { version = "0.22.6", path = "./polars-lazy", features = ["private"], default-features = false, optional = true }
polars-ops = { version = "0.22.6", path = "./polars-ops" }
polars-time = { version = "0.22.6", path = "./polars-time", default-features = false, optional = true }

[dev-dependencies]
ahash = "0.7"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars-arrow"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
license = "MIT"
Expand Down
6 changes: 3 additions & 3 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars-core"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
license = "MIT"
Expand Down Expand Up @@ -158,8 +158,8 @@ jsonpath_lib = { version = "0.3.0", optional = true, git = "https://github.com/r
ndarray = { version = "0.15", optional = true, default_features = false }
num = "^0.4"
once_cell = "1"
polars-arrow = { version = "0.22.5", path = "../polars-arrow", features = ["compute"] }
polars-utils = { version = "0.22.5", path = "../polars-utils" }
polars-arrow = { version = "0.22.6", path = "../polars-arrow", features = ["compute"] }
polars-utils = { version = "0.22.6", path = "../polars-utils" }
rand = { version = "0.8", optional = true, features = ["small_rng", "std"] }
rand_distr = { version = "0.4", optional = true }
rayon = "1.5"
Expand Down
1 change: 1 addition & 0 deletions polars/polars-core/src/chunked_array/ops/any_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub(crate) unsafe fn arr_to_any_value<'a>(
idx: usize,
dtype: &'a DataType,
) -> AnyValue<'a> {
debug_assert!(idx < arr.len());
if arr.is_null(idx) {
return AnyValue::Null;
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/series/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ impl<'a> Iterator for SeriesIter<'a> {

fn next(&mut self) -> Option<Self::Item> {
let idx = self.idx;
self.idx += 1;

if idx == self.len {
None
} else {
self.idx += 1;
unsafe { Some(arr_to_any_value(self.arr, idx, self.dtype)) }
}
}
Expand Down
1 change: 1 addition & 0 deletions polars/polars-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::hash::{Hash, Hasher};
use std::ops::Deref;
use std::sync::Arc;

pub use iterator::SeriesIter;
pub use series_trait::IsSorted;

/// # Series
Expand Down
13 changes: 7 additions & 6 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars-io"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
license = "MIT"
Expand All @@ -23,7 +23,7 @@ dtype-datetime = ["polars-core/dtype-datetime", "polars-core/temporal", "polars-
dtype-date = ["polars-core/dtype-date", "polars-time/dtype-date"]
dtype-time = ["polars-core/dtype-time", "polars-core/temporal", "polars-time/dtype-time"]
dtype-categorical = ["polars-core/dtype-categorical"]
csv-file = ["csv-core", "memmap", "lexical", "arrow/io_csv_write"]
csv-file = ["csv-core", "memmap", "lexical", "polars-core/rows", "lexical-core"]
fmt = ["polars-core/fmt"]
decompress = ["flate2/miniz_oxide"]
decompress-fast = ["flate2/zlib-ng-compat"]
Expand All @@ -43,14 +43,15 @@ csv-core = { version = "0.1.10", optional = true }
dirs = "4.0"
flate2 = { version = "1", optional = true, default-features = false }
lexical = { version = "6", optional = true, default-features = false, features = ["std", "parse-floats", "parse-integers"] }
lexical-core = { version = "0.8", optional = true }
memchr = "2.4"
memmap = { package = "memmap2", version = "0.5.2", optional = true }
num = "^0.4"
once_cell = "1"
polars-arrow = { version = "0.22.5", path = "../polars-arrow" }
polars-core = { version = "0.22.5", path = "../polars-core", features = ["private"], default-features = false }
polars-time = { version = "0.22.5", path = "../polars-time", features = ["private"], default-features = false, optional = true }
polars-utils = { version = "0.22.5", path = "../polars-utils" }
polars-arrow = { version = "0.22.6", path = "../polars-arrow" }
polars-core = { version = "0.22.6", path = "../polars-core", features = ["private"], default-features = false }
polars-time = { version = "0.22.6", path = "../polars-time", features = ["private"], default-features = false, optional = true }
polars-utils = { version = "0.22.6", path = "../polars-utils" }
rayon = "1.5"
regex = "1.5"
serde = { version = "1", features = ["derive"], optional = true }
Expand Down
35 changes: 20 additions & 15 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::utils::resolve_homedir;
use crate::{RowCount, SerReader, SerWriter};
pub use arrow::io::csv::write;

use polars_core::prelude::*;
#[cfg(feature = "temporal")]
use polars_time::prelude::*;
Expand All @@ -60,14 +60,18 @@ use std::fs::File;
use std::io::Write;
use std::path::PathBuf;

use super::csv_core::write;

/// Write a DataFrame to csv.
///
/// Don't use a `Buffered` writer, the `CsvWriter` internally already buffers writes.
#[must_use]
pub struct CsvWriter<W: Write> {
/// File or Stream handler
buffer: W,
/// arrow specific options
options: write::SerializeOptions,
header: bool,
batch_size: usize,
}

impl<W> SerWriter<W> for CsvWriter<W>
Expand All @@ -77,29 +81,26 @@ where
fn new(buffer: W) -> Self {
// 9f: all nanoseconds
let options = write::SerializeOptions {
time64_format: Some("%T%.9f".to_string()),
timestamp_format: Some("%FT%H:%M:%S.%9f".to_string()),
time_format: Some("%T%.9f".to_string()),
datetime_format: Some("%FT%H:%M:%S.%9f".to_string()),
..Default::default()
};

CsvWriter {
buffer,
options,
header: true,
batch_size: 1024,
}
}

fn finish(&mut self, df: &mut DataFrame) -> Result<()> {
df.rechunk();
df.as_single_chunk_par();
let names = df.get_column_names();
let iter = df.iter_chunks();
if self.header {
write::write_header(&mut self.buffer, &names, &self.options)?;
}
for batch in iter {
write::write_chunk(&mut self.buffer, &batch, &self.options)?;
}
Ok(())
write::write(&mut self.buffer, df, self.batch_size, &self.options)
}
}

Expand All @@ -119,22 +120,26 @@ where
self
}

pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}

/// Set the CSV file's date format
pub fn with_date_format(mut self, format: Option<String>) -> Self {
self.options.date32_format = format;
self.options.date_format = format;
self
}

/// Set the CSV file's time format
pub fn with_time_format(mut self, format: Option<String>) -> Self {
self.options.time32_format = format.clone();
self.options.time64_format = format;
self.options.time_format = format;
self
}

/// Set the CSV file's timestamp format array in
pub fn with_timestamp_format(mut self, format: Option<String>) -> Self {
self.options.timestamp_format = format;
pub fn with_datetime(mut self, format: Option<String>) -> Self {
self.options.datetime_format = format;
self
}

Expand Down
1 change: 1 addition & 0 deletions polars/polars-io/src/csv_core/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::RowCount;
use polars_arrow::array::*;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::{prelude::*, POOL};
#[cfg(feature = "polars-time")]
use polars_time::prelude::*;
use polars_utils::flatten;
use rayon::prelude::*;
Expand Down
1 change: 1 addition & 0 deletions polars/polars-io/src/csv_core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pub(crate) mod parser;
pub(crate) mod utils;
#[cfg(feature = "private")]
pub mod utils;
pub(super) mod write;
36 changes: 26 additions & 10 deletions polars/polars-io/src/csv_core/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use crate::prelude::NullValues;
use once_cell::sync::Lazy;
use polars_core::datatypes::PlHashSet;
use polars_core::prelude::*;
#[cfg(feature = "polars-time")]
use polars_time::chunkedarray::utf8::infer as date_infer;
#[cfg(feature = "polars-time")]
use polars_time::prelude::utf8::Pattern;
use regex::{Regex, RegexBuilder};
use std::borrow::Cow;
Expand Down Expand Up @@ -94,12 +96,19 @@ fn infer_field_schema(string: &str, parse_dates: bool) -> DataType {
// Utf8 for them
if string.starts_with('"') {
if parse_dates {
match date_infer::infer_pattern_single(&string[1..string.len() - 1]) {
Some(Pattern::DatetimeYMD | Pattern::DatetimeDMY) => {
DataType::Datetime(TimeUnit::Microseconds, None)
#[cfg(feature = "polars-time")]
{
match date_infer::infer_pattern_single(&string[1..string.len() - 1]) {
Some(Pattern::DatetimeYMD | Pattern::DatetimeDMY) => {
DataType::Datetime(TimeUnit::Microseconds, None)
}
Some(Pattern::DateYMD | Pattern::DateDMY) => DataType::Date,
None => DataType::Utf8,
}
Some(Pattern::DateYMD | Pattern::DateDMY) => DataType::Date,
None => DataType::Utf8,
}
#[cfg(not(feature = "polars-time"))]
{
panic!("activate one of {{'dtype-date', 'dtype-datetime', dtype-time'}} features")
}
} else {
DataType::Utf8
Expand All @@ -113,12 +122,19 @@ fn infer_field_schema(string: &str, parse_dates: bool) -> DataType {
} else if INTEGER_RE.is_match(string) {
DataType::Int64
} else if parse_dates {
match date_infer::infer_pattern_single(string) {
Some(Pattern::DatetimeYMD | Pattern::DatetimeDMY) => {
DataType::Datetime(TimeUnit::Microseconds, None)
#[cfg(feature = "polars-time")]
{
match date_infer::infer_pattern_single(string) {
Some(Pattern::DatetimeYMD | Pattern::DatetimeDMY) => {
DataType::Datetime(TimeUnit::Microseconds, None)
}
Some(Pattern::DateYMD | Pattern::DateDMY) => DataType::Date,
None => DataType::Utf8,
}
Some(Pattern::DateYMD | Pattern::DateDMY) => DataType::Date,
None => DataType::Utf8,
}
#[cfg(not(feature = "polars-time"))]
{
panic!("activate one of {{'dtype-date', 'dtype-datetime', dtype-time'}} features")
}
} else {
DataType::Utf8
Expand Down

0 comments on commit 24d0cd0

Please sign in to comment.