Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 10, 2022
1 parent e08e6ec commit 8463f8b
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 31 deletions.
12 changes: 6 additions & 6 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
keywords = ["dataframe", "query-engine", "arrow"]
Expand Down Expand Up @@ -242,11 +242,11 @@ bench = [
]

[dependencies]
polars-core = { version = "0.22.5", path = "./polars-core", features = ["docs", "private"], default-features = false }
polars-io = { version = "0.22.5", path = "./polars-io", features = ["private"], default-features = false, optional = true }
polars-lazy = { version = "0.22.5", path = "./polars-lazy", features = ["private"], default-features = false, optional = true }
polars-ops = { version = "0.22.5", path = "./polars-ops" }
polars-time = { version = "0.22.5", path = "./polars-time", default-features = false, optional = true }
polars-core = { version = "0.22.6", path = "./polars-core", features = ["docs", "private"], default-features = false }
polars-io = { version = "0.22.6", path = "./polars-io", features = ["private"], default-features = false, optional = true }
polars-lazy = { version = "0.22.6", path = "./polars-lazy", features = ["private"], default-features = false, optional = true }
polars-ops = { version = "0.22.6", path = "./polars-ops" }
polars-time = { version = "0.22.6", path = "./polars-time", default-features = false, optional = true }

[dev-dependencies]
ahash = "0.7"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars-arrow"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
license = "MIT"
Expand Down
6 changes: 3 additions & 3 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars-core"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
license = "MIT"
Expand Down Expand Up @@ -158,8 +158,8 @@ jsonpath_lib = { version = "0.3.0", optional = true, git = "https://github.com/r
ndarray = { version = "0.15", optional = true, default_features = false }
num = "^0.4"
once_cell = "1"
polars-arrow = { version = "0.22.5", path = "../polars-arrow", features = ["compute"] }
polars-utils = { version = "0.22.5", path = "../polars-utils" }
polars-arrow = { version = "0.22.6", path = "../polars-arrow", features = ["compute"] }
polars-utils = { version = "0.22.6", path = "../polars-utils" }
rand = { version = "0.8", optional = true, features = ["small_rng", "std"] }
rand_distr = { version = "0.4", optional = true }
rayon = "1.5"
Expand Down
1 change: 1 addition & 0 deletions polars/polars-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use std::ops::Deref;
use std::sync::Arc;

pub use series_trait::IsSorted;
pub use iterator::SeriesIter;

/// # Series
/// The columnar data type for a DataFrame.
Expand Down
10 changes: 5 additions & 5 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars-io"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
license = "MIT"
Expand Down Expand Up @@ -47,10 +47,10 @@ memchr = "2.4"
memmap = { package = "memmap2", version = "0.5.2", optional = true }
num = "^0.4"
once_cell = "1"
polars-arrow = { version = "0.22.5", path = "../polars-arrow" }
polars-core = { version = "0.22.5", path = "../polars-core", features = ["private"], default-features = false }
polars-time = { version = "0.22.5", path = "../polars-time", features = ["private"], default-features = false, optional = true }
polars-utils = { version = "0.22.5", path = "../polars-utils" }
polars-arrow = { version = "0.22.6", path = "../polars-arrow" }
polars-core = { version = "0.22.6", path = "../polars-core", features = ["private"], default-features = false }
polars-time = { version = "0.22.6", path = "../polars-time", features = ["private"], default-features = false, optional = true }
polars-utils = { version = "0.22.6", path = "../polars-utils" }
rayon = "1.5"
regex = "1.5"
serde = { version = "1", features = ["derive"], optional = true }
Expand Down
1 change: 1 addition & 0 deletions polars/polars-io/src/csv_core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ pub(crate) mod parser;
pub(crate) mod utils;
#[cfg(feature = "private")]
pub mod utils;
mod write;
212 changes: 212 additions & 0 deletions polars/polars-io/src/csv_core/write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
use std::borrow::Cow;
use std::fmt::{Display, Formatter};
use std::io::Write;
use memchr::memchr;
use polars_core::{
prelude::*,
series::SeriesIter,
POOL
};
use super::*;
use rayon::prelude::*;
use polars_utils::contention_pool::LowContentionPool;
use arrow::temporal_conversions;
use polars_core::export::chrono::FixedOffset;

fn fmt_and_escape_str(f: &mut Vec<u8>, v: &str, options: &SerializeOptions) -> std::io::Result<()> {

let surround_with_quotes = memchr(options.delimiter, v.as_bytes()).is_some();
let needs_escaping = memchr(options.quote, v.as_bytes()).is_some();

let value = if needs_escaping {
let replaced = unsafe { v.replace(v, std::str::from_utf8_unchecked(&[options.quote, options.quote])) };
Cow::Owned(replaced)
} else {
Cow::Borrowed(v)
};

if surround_with_quotes {
write!(f, "\"{}\"", v)
} else {
write!(f, "{}", v)
}

}

fn write_anyvalue(f: &mut Vec<u8>, value: AnyValue, options: &SerializeOptions) {
match value {
AnyValue::Null => write!(f, ""),
AnyValue::Int8(v) => write!(f, "{}", v),
AnyValue::Int16(v) => write!(f, "{}", v),
AnyValue::Int32(v) => write!(f, "{}", v),
AnyValue::Int64(v) => write!(f, "{}", v),
AnyValue::UInt8(v) => write!(f, "{}", v),
AnyValue::UInt16(v) => write!(f, "{}", v),
AnyValue::UInt32(v) => write!(f, "{}", v),
AnyValue::UInt64(v) => write!(f, "{}", v),
AnyValue::Float32(v) => write!(f, "{}", v),
AnyValue::Float64(v) => write!(f, "{}", v),
AnyValue::Boolean(v) => write!(f, "{}", v),
AnyValue::Utf8(v) => {
fmt_and_escape_str(f, v, options)
},
AnyValue::Categorical(idx, rev_map) => {
let v = rev_map.get(idx);
fmt_and_escape_str(f, v, options)
},
AnyValue::Date(v) => {
let date = temporal_conversions::date32_to_date(v);
match &options.date_format {
None => write!(f, "{}", date),
Some(fmt) => write!(f, "{}", date.format(fmt))
}
}
AnyValue::Datetime(v, tu, tz) => {
match tz {
None => {
let dt = match tu {
TimeUnit::Nanoseconds => temporal_conversions::timestamp_ns_to_datetime(v),
TimeUnit::Microseconds => temporal_conversions::timestamp_us_to_datetime(v),
TimeUnit::Milliseconds => temporal_conversions::timestamp_ms_to_datetime(v),
};
match &options.datetime_format {
None => write!(f, "{}", dt),
Some(fmt) => write!(f, "{}", dt.format(fmt))
}
}
Some(tz) => {
let tz = temporal_conversions::parse_offset(&tz).unwrap();

let dt = temporal_conversions::timestamp_to_datetime(v, tu.to_arrow(), &tz);
match &options.datetime_format {
None => write!(f, "{}", dt),
Some(fmt) => write!(f, "{}", dt.format(fmt))
}
}
}
},
AnyValue::Time(v) => {
let date = temporal_conversions::time64ns_to_time(v);
match &options.time_format {
None => write!(f, "{}", date),
Some(fmt) => write!(f, "{}", date.format(fmt))
}
}
dt => panic!("DataType: {} not supported in writing to csv", dt)
}.unwrap();

}

/// Options to serialize logical types to CSV
/// The default is to format times and dates as `chrono` crate formats them.
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct SerializeOptions {
/// used for [`DataType::Date`]
pub date_format: Option<String>,
/// used for [`DataType::Time64`]
pub time_format: Option<String>,
/// used for [`DataType::Timestamp`]
pub datetime_format: Option<String>,
/// used as separator/delimiter
pub delimiter: u8,
/// quoting character
pub quote: u8,
}

impl Default for SerializeOptions {
fn default() -> Self {
SerializeOptions {
date_format: None,
time_format: None,
datetime_format: None,
delimiter: b',',
quote: b'"',
}
}
}

/// Utility to write to `&mut Vec<u8>` buffer
struct StringWrap<'a>(pub &'a mut Vec<u8>);

impl<'a> std::fmt::Write for StringWrap<'a> {
fn write_str(&mut self, s: &str) -> std::fmt::Result {
self.0.extend_from_slice(s.as_bytes());
Ok(())
}
}

pub(super) fn write<W: Write>(writer: &mut W, df: &DataFrame, chunk_size: usize, options: &SerializeOptions) -> Result<()> {
// check that the double quote is valid utf8
std::str::from_utf8(&[options.quote, options.quote]).map_err(|_| PolarsError::ComputeError("quote char leads invalid utf8".into()));

let len = df.height();
let n_threads = POOL.current_num_threads();

let total_rows_per_pool_iter = n_threads * chunk_size;

let mut any_value_iter_pool = LowContentionPool::<Vec<_>>::new(df.width());
let mut write_buffer_pool = LowContentionPool::<Vec<_>>::new(df.width());

let mut n_rows_finished = 0;

// holds the buffers that will be written
let mut result_buf = Vec::with_capacity(n_threads);
while n_rows_finished < len {


let par_iter = (0..n_threads)
.into_par_iter()
.map(|thread_no| {
let thread_offset = thread_no * chunk_size;
let total_offset = n_rows_finished + thread_offset;
let df = df.slice(total_offset as i64, chunk_size);
let cols = df.get_columns();
let any_value_iters = cols.iter().map(|s| s.iter());
let mut col_iters = any_value_iter_pool.get();
col_iters.extend(any_value_iters);

let mut write_buffer = write_buffer_pool.get();

let last_ptr = &col_iters[col_iters.len() - 1] as *const SeriesIter;
// loop rows
loop {
for col in &mut col_iters {
match col.next() {
Some(value) => {
write_anyvalue(&mut write_buffer, value, options);

},
None => {
break
}
}
let current_ptr = col as *const SeriesIter;
if current_ptr != last_ptr {
write!(&mut write_buffer, ",").unwrap()
}
}
}

// return buffers to the pool
col_iters.clear();
any_value_iter_pool.set(col_iters);

write_buffer
});

// rayon will ensure the right order
result_buf.par_extend(par_iter);

for mut buf in result_buf.drain(..) {
writer.write(&buf)?;
buf.clear();
write_buffer_pool.set(buf);
}



n_rows_finished += total_rows_per_pool_iter;
}

Ok(())
}
14 changes: 7 additions & 7 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars-lazy"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
license = "MIT"
Expand Down Expand Up @@ -123,12 +123,12 @@ rayon = "1.5"
regex = { version = "1.5", optional = true }
serde = { version = "1", features = ["derive", "rc"], optional = true }

polars-arrow = { version = "0.22.5", path = "../polars-arrow" }
polars-core = { version = "0.22.5", path = "../polars-core", features = ["lazy", "private", "zip_with", "random"], default-features = false }
polars-io = { version = "0.22.5", path = "../polars-io", features = ["lazy", "csv-file", "private"], default-features = false }
polars-ops = { version = "0.22.5", path = "../polars-ops", default-features = false }
polars-time = { version = "0.22.5", path = "../polars-time", optional = true }
polars-utils = { version = "0.22.5", path = "../polars-utils" }
polars-arrow = { version = "0.22.6", path = "../polars-arrow" }
polars-core = { version = "0.22.6", path = "../polars-core", features = ["lazy", "private", "zip_with", "random"], default-features = false }
polars-io = { version = "0.22.6", path = "../polars-io", features = ["lazy", "csv-file", "private"], default-features = false }
polars-ops = { version = "0.22.6", path = "../polars-ops", default-features = false }
polars-time = { version = "0.22.6", path = "../polars-time", optional = true }
polars-utils = { version = "0.22.6", path = "../polars-utils" }

[package.metadata.docs.rs]
all-features = true
Expand Down
6 changes: 3 additions & 3 deletions polars/polars-ops/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars-ops"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
license = "MIT"
Expand All @@ -10,8 +10,8 @@ description = "More operations on polars data structures"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
polars-arrow = { version = "0.22.5", path = "../polars-arrow", default-features = false }
polars-core = { version = "0.22.5", path = "../polars-core", features = ["private"], default-features = false }
polars-arrow = { version = "0.22.6", path = "../polars-arrow", default-features = false }
polars-core = { version = "0.22.6", path = "../polars-core", features = ["private"], default-features = false }

[features]
dtype-categorical = ["polars-core/dtype-categorical"]
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-time/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars-time"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
license = "MIT"
Expand All @@ -11,9 +11,9 @@ description = "Time related code for the polars dataframe library"
[dependencies]
chrono = "0.4"
lexical = { version = "6", default-features = false, features = ["std", "parse-floats", "parse-integers"] }
polars-arrow = { version = "0.22.5", path = "../polars-arrow", features = ["compute", "temporal"] }
polars-core = { version = "0.22.5", path = "../polars-core", default-features = false, features = ["private", "dtype-datetime", "dtype-duration", "dtype-time", "dtype-date"] }
polars-utils = { version = "0.22.5", path = "../polars-utils" }
polars-arrow = { version = "0.22.6", path = "../polars-arrow", features = ["compute", "temporal"] }
polars-core = { version = "0.22.6", path = "../polars-core", default-features = false, features = ["private", "dtype-datetime", "dtype-duration", "dtype-time", "dtype-date"] }
polars-utils = { version = "0.22.6", path = "../polars-utils" }
serde = { version = "1", features = ["derive"], optional = true }

[features]
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "polars-utils"
version = "0.22.5"
version = "0.22.6"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2021"
license = "MIT"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-utils/src/contention_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl<T: Default> LowContentionPool<T> {
let size = self.size.fetch_add(1, Ordering::AcqRel);
// implementation error if this fails
assert!(size <= self.stack.len());
let mut locked = self.stack[size - 1].lock();
let mut locked = self.stack[size].lock();
*locked = value;
}
}

0 comments on commit 8463f8b

Please sign in to comment.