Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel csv writer #3652

Merged
merged 1 commit into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
keywords = ["dataframe", "query-engine", "arrow"]
Expand Down Expand Up @@ -242,11 +242,11 @@ bench = [
]

[dependencies]
polars-core = { version = "0.22.5", path = "./polars-core", features = ["docs", "private"], default-features = false }
polars-io = { version = "0.22.5", path = "./polars-io", features = ["private"], default-features = false, optional = true }
polars-lazy = { version = "0.22.5", path = "./polars-lazy", features = ["private"], default-features = false, optional = true }
polars-ops = { version = "0.22.5", path = "./polars-ops" }
polars-time = { version = "0.22.5", path = "./polars-time", default-features = false, optional = true }
polars-core = { version = "0.22.6", path = "./polars-core", features = ["docs", "private"], default-features = false }
polars-io = { version = "0.22.6", path = "./polars-io", features = ["private"], default-features = false, optional = true }
polars-lazy = { version = "0.22.6", path = "./polars-lazy", features = ["private"], default-features = false, optional = true }
polars-ops = { version = "0.22.6", path = "./polars-ops" }
polars-time = { version = "0.22.6", path = "./polars-time", default-features = false, optional = true }

[dev-dependencies]
ahash = "0.7"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars-arrow"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
license = "MIT"
Expand Down
6 changes: 3 additions & 3 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars-core"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
license = "MIT"
Expand Down Expand Up @@ -158,8 +158,8 @@ jsonpath_lib = { version = "0.3.0", optional = true, git = "https://github.com/r
ndarray = { version = "0.15", optional = true, default_features = false }
num = "^0.4"
once_cell = "1"
polars-arrow = { version = "0.22.5", path = "../polars-arrow", features = ["compute"] }
polars-utils = { version = "0.22.5", path = "../polars-utils" }
polars-arrow = { version = "0.22.6", path = "../polars-arrow", features = ["compute"] }
polars-utils = { version = "0.22.6", path = "../polars-utils" }
rand = { version = "0.8", optional = true, features = ["small_rng", "std"] }
rand_distr = { version = "0.4", optional = true }
rayon = "1.5"
Expand Down
1 change: 1 addition & 0 deletions polars/polars-core/src/chunked_array/ops/any_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub(crate) unsafe fn arr_to_any_value<'a>(
idx: usize,
dtype: &'a DataType,
) -> AnyValue<'a> {
debug_assert!(idx < arr.len());
if arr.is_null(idx) {
return AnyValue::Null;
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/series/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ impl<'a> Iterator for SeriesIter<'a> {

fn next(&mut self) -> Option<Self::Item> {
let idx = self.idx;
self.idx += 1;

if idx == self.len {
None
} else {
self.idx += 1;
unsafe { Some(arr_to_any_value(self.arr, idx, self.dtype)) }
}
}
Expand Down
1 change: 1 addition & 0 deletions polars/polars-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::hash::{Hash, Hasher};
use std::ops::Deref;
use std::sync::Arc;

pub use iterator::SeriesIter;
pub use series_trait::IsSorted;

/// # Series
Expand Down
13 changes: 7 additions & 6 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars-io"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
license = "MIT"
Expand All @@ -23,7 +23,7 @@ dtype-datetime = ["polars-core/dtype-datetime", "polars-core/temporal", "polars-
dtype-date = ["polars-core/dtype-date", "polars-time/dtype-date"]
dtype-time = ["polars-core/dtype-time", "polars-core/temporal", "polars-time/dtype-time"]
dtype-categorical = ["polars-core/dtype-categorical"]
csv-file = ["csv-core", "memmap", "lexical", "arrow/io_csv_write"]
csv-file = ["csv-core", "memmap", "lexical", "polars-core/rows", "lexical-core"]
fmt = ["polars-core/fmt"]
decompress = ["flate2/miniz_oxide"]
decompress-fast = ["flate2/zlib-ng-compat"]
Expand All @@ -43,14 +43,15 @@ csv-core = { version = "0.1.10", optional = true }
dirs = "4.0"
flate2 = { version = "1", optional = true, default-features = false }
lexical = { version = "6", optional = true, default-features = false, features = ["std", "parse-floats", "parse-integers"] }
lexical-core = { version = "0.8", optional = true }
memchr = "2.4"
memmap = { package = "memmap2", version = "0.5.2", optional = true }
num = "^0.4"
once_cell = "1"
polars-arrow = { version = "0.22.5", path = "../polars-arrow" }
polars-core = { version = "0.22.5", path = "../polars-core", features = ["private"], default-features = false }
polars-time = { version = "0.22.5", path = "../polars-time", features = ["private"], default-features = false, optional = true }
polars-utils = { version = "0.22.5", path = "../polars-utils" }
polars-arrow = { version = "0.22.6", path = "../polars-arrow" }
polars-core = { version = "0.22.6", path = "../polars-core", features = ["private"], default-features = false }
polars-time = { version = "0.22.6", path = "../polars-time", features = ["private"], default-features = false, optional = true }
polars-utils = { version = "0.22.6", path = "../polars-utils" }
rayon = "1.5"
regex = "1.5"
serde = { version = "1", features = ["derive"], optional = true }
Expand Down
35 changes: 20 additions & 15 deletions polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::mmap::MmapBytesReader;
use crate::predicates::PhysicalIoExpr;
use crate::utils::resolve_homedir;
use crate::{RowCount, SerReader, SerWriter};
pub use arrow::io::csv::write;

use polars_core::prelude::*;
#[cfg(feature = "temporal")]
use polars_time::prelude::*;
Expand All @@ -60,14 +60,18 @@ use std::fs::File;
use std::io::Write;
use std::path::PathBuf;

use super::csv_core::write;

/// Write a DataFrame to csv.
///
/// Don't use a `Buffered` writer, the `CsvWriter` internally already buffers writes.
#[must_use]
pub struct CsvWriter<W: Write> {
/// File or Stream handler
buffer: W,
/// arrow specific options
options: write::SerializeOptions,
header: bool,
batch_size: usize,
}

impl<W> SerWriter<W> for CsvWriter<W>
Expand All @@ -77,29 +81,26 @@ where
fn new(buffer: W) -> Self {
// 9f: all nanoseconds
let options = write::SerializeOptions {
time64_format: Some("%T%.9f".to_string()),
timestamp_format: Some("%FT%H:%M:%S.%9f".to_string()),
time_format: Some("%T%.9f".to_string()),
datetime_format: Some("%FT%H:%M:%S.%9f".to_string()),
..Default::default()
};

CsvWriter {
buffer,
options,
header: true,
batch_size: 1024,
}
}

fn finish(&mut self, df: &mut DataFrame) -> Result<()> {
df.rechunk();
df.as_single_chunk_par();
let names = df.get_column_names();
let iter = df.iter_chunks();
if self.header {
write::write_header(&mut self.buffer, &names, &self.options)?;
}
for batch in iter {
write::write_chunk(&mut self.buffer, &batch, &self.options)?;
}
Ok(())
write::write(&mut self.buffer, df, self.batch_size, &self.options)
}
}

Expand All @@ -119,22 +120,26 @@ where
self
}

pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}

/// Set the CSV file's date format
pub fn with_date_format(mut self, format: Option<String>) -> Self {
self.options.date32_format = format;
self.options.date_format = format;
self
}

/// Set the CSV file's time format
pub fn with_time_format(mut self, format: Option<String>) -> Self {
self.options.time32_format = format.clone();
self.options.time64_format = format;
self.options.time_format = format;
self
}

/// Set the CSV file's timestamp format array in
pub fn with_timestamp_format(mut self, format: Option<String>) -> Self {
self.options.timestamp_format = format;
pub fn with_datetime(mut self, format: Option<String>) -> Self {
self.options.datetime_format = format;
self
}

Expand Down
1 change: 1 addition & 0 deletions polars/polars-io/src/csv_core/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::RowCount;
use polars_arrow::array::*;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::{prelude::*, POOL};
#[cfg(feature = "polars-time")]
use polars_time::prelude::*;
use polars_utils::flatten;
use rayon::prelude::*;
Expand Down
1 change: 1 addition & 0 deletions polars/polars-io/src/csv_core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pub(crate) mod parser;
pub(crate) mod utils;
#[cfg(feature = "private")]
pub mod utils;
pub(super) mod write;
36 changes: 26 additions & 10 deletions polars/polars-io/src/csv_core/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use crate::prelude::NullValues;
use once_cell::sync::Lazy;
use polars_core::datatypes::PlHashSet;
use polars_core::prelude::*;
#[cfg(feature = "polars-time")]
use polars_time::chunkedarray::utf8::infer as date_infer;
#[cfg(feature = "polars-time")]
use polars_time::prelude::utf8::Pattern;
use regex::{Regex, RegexBuilder};
use std::borrow::Cow;
Expand Down Expand Up @@ -94,12 +96,19 @@ fn infer_field_schema(string: &str, parse_dates: bool) -> DataType {
// Utf8 for them
if string.starts_with('"') {
if parse_dates {
match date_infer::infer_pattern_single(&string[1..string.len() - 1]) {
Some(Pattern::DatetimeYMD | Pattern::DatetimeDMY) => {
DataType::Datetime(TimeUnit::Microseconds, None)
#[cfg(feature = "polars-time")]
{
match date_infer::infer_pattern_single(&string[1..string.len() - 1]) {
Some(Pattern::DatetimeYMD | Pattern::DatetimeDMY) => {
DataType::Datetime(TimeUnit::Microseconds, None)
}
Some(Pattern::DateYMD | Pattern::DateDMY) => DataType::Date,
None => DataType::Utf8,
}
Some(Pattern::DateYMD | Pattern::DateDMY) => DataType::Date,
None => DataType::Utf8,
}
#[cfg(not(feature = "polars-time"))]
{
panic!("activate one of {{'dtype-date', 'dtype-datetime', dtype-time'}} features")
}
} else {
DataType::Utf8
Expand All @@ -113,12 +122,19 @@ fn infer_field_schema(string: &str, parse_dates: bool) -> DataType {
} else if INTEGER_RE.is_match(string) {
DataType::Int64
} else if parse_dates {
match date_infer::infer_pattern_single(string) {
Some(Pattern::DatetimeYMD | Pattern::DatetimeDMY) => {
DataType::Datetime(TimeUnit::Microseconds, None)
#[cfg(feature = "polars-time")]
{
match date_infer::infer_pattern_single(string) {
Some(Pattern::DatetimeYMD | Pattern::DatetimeDMY) => {
DataType::Datetime(TimeUnit::Microseconds, None)
}
Some(Pattern::DateYMD | Pattern::DateDMY) => DataType::Date,
None => DataType::Utf8,
}
Some(Pattern::DateYMD | Pattern::DateDMY) => DataType::Date,
None => DataType::Utf8,
}
#[cfg(not(feature = "polars-time"))]
{
panic!("activate one of {{'dtype-date', 'dtype-datetime', dtype-time'}} features")
}
} else {
DataType::Utf8
Expand Down