Skip to content

Commit

Permalink
perf[ndjson]: use simd_json (#4424)
Browse files Browse the repository at this point in the history
  • Loading branch information
universalmind303 committed Aug 16, 2022
1 parent 26ddba0 commit 18faa47
Show file tree
Hide file tree
Showing 10 changed files with 298 additions and 129 deletions.
2 changes: 1 addition & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "3b9d86bfb65bc786bf9315a41737a77895716443", features = ["compute_concatenate"], default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "6abd0c4164676f7b17865c8def875401b5bbd5fc", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "ipc_meta", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", version = "0.12", default-features = false, features = ["compute_concatenate"] }
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ thiserror = "^1.0"
package = "arrow2"
git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
rev = "3b9d86bfb65bc786bf9315a41737a77895716443"
rev = "6abd0c4164676f7b17865c8def875401b5bbd5fc"
# path = "../../../arrow2"
# branch = "ipc_meta"
# version = "0.12"
Expand Down
21 changes: 11 additions & 10 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,33 @@ description = "IO related logic for the Polars DataFrame library"

[features]
# support for arrows json parsing
json = ["arrow/io_json", "serde_json"]
json = ["arrow/io_json", "simd-json", "memmap", "lexical", "lexical-core", "csv-core"]
# support for arrows ipc file parsing
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression", "memmap"]
# support for arrows streaming ipc file parsing
ipc_streaming = ["arrow/io_ipc", "arrow/io_ipc_compression"]
# support for arrow avro parsing
avro = ["arrow/io_avro", "arrow/io_avro_compression"]
# ipc = []
lazy = []
parquet = ["polars-core/parquet", "arrow/io_parquet", "arrow/io_parquet_compression", "memmap"]
dtype-datetime = ["polars-core/dtype-datetime", "polars-core/temporal", "polars-time/dtype-datetime"]
dtype-date = ["polars-core/dtype-date", "polars-time/dtype-date"]
dtype-time = ["polars-core/dtype-time", "polars-core/temporal", "polars-time/dtype-time"]
dtype-categorical = ["polars-core/dtype-categorical"]
csv-file = ["csv-core", "memmap", "lexical", "polars-core/rows", "lexical-core"]
fmt = ["polars-core/fmt"]
decompress = ["flate2/miniz_oxide"]
decompress-fast = ["flate2/zlib-ng-compat"]
temporal = ["dtype-datetime", "dtype-date", "dtype-time"]
dtype-categorical = ["polars-core/dtype-categorical"]
dtype-date = ["polars-core/dtype-date", "polars-time/dtype-date"]
dtype-datetime = ["polars-core/dtype-datetime", "polars-core/temporal", "polars-time/dtype-datetime"]
dtype-time = ["polars-core/dtype-time", "polars-core/temporal", "polars-time/dtype-time"]
fmt = ["polars-core/fmt"]
lazy = []
parquet = ["polars-core/parquet", "arrow/io_parquet", "arrow/io_parquet_compression", "memmap"]
partition = ["polars-core/partition_by"]
temporal = ["dtype-datetime", "dtype-date", "dtype-time"]
# don't use this
private = ["polars-time/private"]

[dependencies]
ahash = "0.7"
anyhow = "1.0"
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "3b9d86bfb65bc786bf9315a41737a77895716443", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "6abd0c4164676f7b17865c8def875401b5bbd5fc", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "ipc_meta", default-features = false }
# arrow = { package = "arrow2", version = "0.12", default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", default-features = false }
Expand All @@ -58,6 +58,7 @@ rayon = "1.5"
regex = "1.5"
serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true, default-features = false, features = ["alloc"] }
simd-json = { version = "0.6.0", optional = true, features = ["allow-non-simd", "known-key"] }
simdutf8 = "0.1"

[dev-dependencies]
Expand Down
34 changes: 0 additions & 34 deletions polars/polars-io/src/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,37 +239,3 @@ where
self
}
}

#[cfg(test)]
mod test {
use std::io::Cursor;

use crate::prelude::*;

#[test]
fn read_json() {
let basic_json = r#"{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":-10, "b":-3.5, "c":true, "d":"4"}
{"a":2, "b":0.6, "c":false, "d":"text"}
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":7, "b":-3.5, "c":true, "d":"4"}
{"a":1, "b":0.6, "c":false, "d":"text"}
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":5, "b":-3.5, "c":true, "d":"4"}
{"a":1, "b":0.6, "c":false, "d":"text"}
{"a":1, "b":2.0, "c":false, "d":"4"}
{"a":1, "b":-3.5, "c":true, "d":"4"}
{"a":100000000000000, "b":0.6, "c":false, "d":"text"}"#;
let file = Cursor::new(basic_json);
let df = JsonReader::new(file)
.infer_schema_len(Some(3))
.with_json_format(JsonFormat::JsonLines)
.with_batch_size(3)
.finish()
.unwrap();

assert_eq!("a", df.get_columns()[0].name());
assert_eq!("d", df.get_columns()[3].name());
assert_eq!((12, 4), df.shape());
}
}
2 changes: 1 addition & 1 deletion polars/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub(crate) mod aggregations;
#[cfg(feature = "avro")]
#[cfg_attr(docsrs, doc(cfg(feature = "avro")))]
pub mod avro;
#[cfg(feature = "csv-file")]
#[cfg(any(feature = "csv-file", feature = "json"))]
#[cfg_attr(docsrs, doc(cfg(feature = "csv-file")))]
pub mod csv;
#[cfg(feature = "parquet")]
Expand Down
77 changes: 40 additions & 37 deletions polars/polars-io/src/ndjson_core/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
use std::hash::Hash;
use std::hash::Hasher;

use arrow::types::NativeType;
use num::traits::NumCast;
use polars_core::prelude::*;
use polars_time::prelude::utf8::infer::infer_pattern_single;
use polars_time::prelude::utf8::infer::DatetimeInfer;
use polars_time::prelude::utf8::Pattern;
use serde_json::Value;
pub(crate) fn init_buffers(schema: &Schema, capacity: usize) -> Result<PlIndexMap<String, Buffer>> {
use simd_json::{BorrowedValue as Value, KnownKey, StaticNode};

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct BufferKey<'a>(pub(crate) KnownKey<'a>);
impl<'a> Eq for BufferKey<'a> {}

impl<'a> Hash for BufferKey<'a> {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.key().hash(state)
}
}

pub(crate) fn init_buffers(
schema: &Schema,
capacity: usize,
) -> Result<PlIndexMap<BufferKey, Buffer>> {
schema
.iter()
.map(|(name, dtype)| {
Expand All @@ -28,7 +45,9 @@ pub(crate) fn init_buffers(schema: &Schema, capacity: usize) -> Result<PlIndexMa
&DataType::Date => Buffer::Date(PrimitiveChunkedBuilder::new(name, capacity)),
_ => Buffer::All((Vec::with_capacity(capacity), name)),
};
Ok((name.clone(), builder))
let key = KnownKey::from(name);

Ok((BufferKey(key), builder))
})
.collect()
}
Expand Down Expand Up @@ -98,7 +117,7 @@ impl<'a> Buffer<'a> {
match self {
Boolean(buf) => {
match value {
Value::Bool(v) => buf.append_value(*v),
Value::Static(StaticNode::Bool(b)) => buf.append_value(*b),
_ => buf.append_null(),
}
Ok(())
Expand Down Expand Up @@ -144,7 +163,7 @@ impl<'a> Buffer<'a> {
Ok(())
}
Float64(buf) => {
let n = deserialize_float::<f64>(value);
let n = deserialize_number::<f64>(value);
match n {
Some(v) => buf.append_value(v),
None => buf.append_null(),
Expand Down Expand Up @@ -180,18 +199,12 @@ impl<'a> Buffer<'a> {
}
}

fn deserialize_float<T: NativeType + NumCast>(value: &Value) -> Option<T> {
match value {
Value::Number(number) => number.as_f64().and_then(num::traits::cast::<f64, T>),
Value::Bool(number) => num::traits::cast::<i32, T>(*number as i32),
_ => None,
}
}

fn deserialize_number<T: NativeType + NumCast>(value: &Value) -> Option<T> {
match value {
Value::Number(v) => v.as_i64().and_then(num::traits::cast::<i64, T>),
Value::Bool(number) => num::traits::cast::<i32, T>(*number as i32),
Value::Static(StaticNode::F64(f)) => num::traits::cast::<f64, T>(*f),
Value::Static(StaticNode::I64(i)) => num::traits::cast::<i64, T>(*i),
Value::Static(StaticNode::U64(u)) => num::traits::cast::<u64, T>(*u),
Value::Static(StaticNode::Bool(b)) => num::traits::cast::<i32, T>(*b as i32),
_ => None,
}
}
Expand All @@ -217,17 +230,12 @@ where
#[cfg(feature = "dtype-struct")]
fn value_to_dtype(val: &Value) -> DataType {
match val {
Value::Null => DataType::Null,
Value::Bool(_) => DataType::Boolean,
Value::Number(n) => {
if n.is_i64() {
DataType::Int64
} else if n.is_u64() {
DataType::UInt64
} else {
DataType::Float64
}
}
Value::Static(StaticNode::Bool(_)) => DataType::Boolean,
Value::Static(StaticNode::I64(_)) => DataType::Int64,
Value::Static(StaticNode::U64(_)) => DataType::UInt64,
Value::Static(StaticNode::F64(_)) => DataType::Float64,
Value::Static(StaticNode::Null) => DataType::Null,
Value::String(_) => DataType::Utf8,
Value::Array(arr) => {
let dtype = value_to_dtype(&arr[0]);

Expand All @@ -247,23 +255,18 @@ fn value_to_dtype(val: &Value) -> DataType {

fn deserialize_all<'a, 'b>(json: &'b Value) -> AnyValue<'a> {
match json {
Value::Bool(b) => AnyValue::Boolean(*b),
Value::Number(n) => {
if n.is_i64() {
AnyValue::Int64(n.as_i64().unwrap())
} else if n.is_u64() {
AnyValue::UInt64(n.as_u64().unwrap())
} else {
AnyValue::Float64(n.as_f64().unwrap())
}
}
Value::Static(StaticNode::Bool(b)) => AnyValue::Boolean(*b),
Value::Static(StaticNode::I64(i)) => AnyValue::Int64(*i),
Value::Static(StaticNode::U64(u)) => AnyValue::UInt64(*u),
Value::Static(StaticNode::F64(f)) => AnyValue::Float64(*f),
Value::Static(StaticNode::Null) => AnyValue::Null,
Value::String(s) => AnyValue::Utf8Owned(s.to_string()),
Value::Array(arr) => {
let vals: Vec<AnyValue> = arr.iter().map(deserialize_all).collect();

let s = Series::new("", vals);
AnyValue::List(s)
}
Value::Null => AnyValue::Null,
#[cfg(feature = "dtype-struct")]
Value::Object(doc) => {
let vals: (Vec<AnyValue>, Vec<Field>) = doc
Expand Down

0 comments on commit 18faa47

Please sign in to comment.