Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(rust, python, cli): use simd-json for all json parsing #8922

Merged
merged 11 commits into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ num-traits = "0.2"
ahash = "0.8"
xxhash-rust = { version = "0.8.6", features = ["xxh3"] }
hashbrown = { version = "0.13.1", features = ["rayon", "ahash"] }
indexmap = { version = "1", features = ["std"] }
bitflags = "1.3"
once_cell = "1"
memchr = "2"
Expand All @@ -45,7 +46,7 @@ package = "arrow2"
git = "https://github.com/ritchie46/arrow2"
# rev = "1491c6e8f4fd100f53c358e4f3ef1536d9e75090"
# path = "../arrow2"
branch = "polars_2023-05-16"
branch = "polars_2023-05-19"
version = "0.17"
default-features = false
features = [
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ chrono-tz = { version = "0.8", optional = true }
comfy-table = { version = "6.1.4", optional = true, default_features = false }
either.workspace = true
hashbrown.workspace = true
indexmap = { version = "1", features = ["std"] }
indexmap.workspace = true
itoap = { version = "1", optional = true }
ndarray = { version = "0.15", optional = true, default_features = false }
num-traits.workspace = true
Expand Down
4 changes: 1 addition & 3 deletions polars/polars-core/src/series/any_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,9 +309,7 @@ impl Series {
};
series_fields.push(s)
}
return Ok(StructChunked::new(name, &series_fields)
.unwrap()
.into_series());
return StructChunked::new(name, &series_fields).map(|ca| ca.into_series());
}
#[cfg(feature = "object")]
DataType::Object(_) => {
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ description = "IO related logic for the Polars DataFrame library"

[features]
# support for arrows json parsing
json = ["arrow/io_json", "simd-json", "memmap", "lexical", "lexical-core", "serde_json"]
json = ["arrow/io_json_write", "polars-json", "simd-json", "memmap", "lexical", "lexical-core", "serde_json"]
# support for arrows ipc file parsing
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression", "memmap"]
# support for arrows streaming ipc file parsing
Expand Down Expand Up @@ -71,6 +71,7 @@ once_cell = "1"
polars-arrow = { version = "0.29.0", path = "../polars-arrow" }
polars-core = { version = "0.29.0", path = "../polars-core", features = ["private"], default-features = false }
polars-error = { version = "0.29.0", path = "../polars-error", default-features = false }
polars-json = { version = "0.29.0", optional = true, path = "../polars-json" }
polars-time = { version = "0.29.0", path = "../polars-time", features = ["private"], default-features = false, optional = true }
polars-utils = { version = "0.29.0", path = "../polars-utils" }
rayon.workspace = true
Expand Down
16 changes: 9 additions & 7 deletions polars/polars-io/src/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ use polars_arrow::utils::CustomIterTools;
use polars_core::error::to_compute_err;
use polars_core::prelude::*;
use polars_core::utils::try_get_supertype;
use polars_json::json::infer;
use simd_json::BorrowedValue;

use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::prelude::*;
Expand Down Expand Up @@ -199,18 +201,18 @@ where

let out = match self.json_format {
JsonFormat::Json => {
use arrow::io::json::read::json_deserializer::Value;
let bytes = rb.deref();
let mut bytes = rb.deref().to_vec();
let json_value =
json::read::json_deserializer::parse(bytes).map_err(to_compute_err)?;
simd_json::to_borrowed_value(&mut bytes).map_err(to_compute_err)?;

// likely struct type
let dtype = if let Value::Array(values) = &json_value {
let dtype = if let BorrowedValue::Array(values) = &json_value {
// struct types may have missing fields so find supertype
let dtype = values
.iter()
.take(self.infer_schema_len.unwrap_or(usize::MAX))
.map(|value| {
json::read::infer(value)
infer(value)
.map_err(PolarsError::from)
.map(|dt| DataType::from(&dt))
})
Expand All @@ -223,9 +225,9 @@ where
let dtype = DataType::List(Box::new(dtype));
dtype.to_arrow()
} else {
json::read::infer(&json_value)?
infer(&json_value)?
};
let arr = json::read::deserialize(&json_value, dtype)?;
let arr = polars_json::json::deserialize(&json_value, dtype)?;
let arr = arr.as_any().downcast_ref::<StructArray>().ok_or_else(
|| polars_err!(ComputeError: "can only deserialize json objects"),
)?;
Expand Down
37 changes: 16 additions & 21 deletions polars/polars-io/src/ndjson/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ impl<'a> CoreJsonReader<'a> {
let bytes: &[u8] = &reader_bytes;
let mut cursor = Cursor::new(bytes);

let data_type = arrow_ndjson::read::infer(&mut cursor, infer_schema_len)?;
let data_type = polars_json::ndjson::infer(&mut cursor, infer_schema_len)?;
dbg!(&data_type);
let schema = StructArray::get_fields(&data_type).iter().collect();

Cow::Owned(schema)
Expand Down Expand Up @@ -213,7 +214,7 @@ impl<'a> CoreJsonReader<'a> {
.into_par_iter()
.map(|(start_pos, stop_at_nbytes)| {
let mut buffers = init_buffers(&self.schema, capacity)?;
let _ = parse_lines(&bytes[start_pos..stop_at_nbytes], &mut buffers);
parse_lines(&bytes[start_pos..stop_at_nbytes], &mut buffers)?;
DataFrame::new(
buffers
.into_values()
Expand Down Expand Up @@ -247,26 +248,27 @@ impl<'a> CoreJsonReader<'a> {
fn parse_impl(
bytes: &[u8],
buffers: &mut PlIndexMap<BufferKey, Buffer>,
line: &mut Vec<u8>,
scratch: &mut Vec<u8>,
) -> PolarsResult<usize> {
line.clear();
line.extend_from_slice(bytes);
let n = line.len();
scratch.clear();
scratch.extend_from_slice(bytes);
let n = scratch.len();
let all_good = match n {
0 => true,
1 => line[0] == NEWLINE,
2 => line[0] == NEWLINE && line[1] == RETURN,
1 => scratch[0] == NEWLINE,
2 => scratch[0] == NEWLINE && scratch[1] == RETURN,
_ => {
let value: simd_json::BorrowedValue = simd_json::to_borrowed_value(line)
let value: simd_json::BorrowedValue = simd_json::to_borrowed_value(scratch)
.map_err(|e| polars_err!(ComputeError: "error parsing line: {}", e))?;
match value {
simd_json::BorrowedValue::Object(value) => {
buffers
.iter_mut()
.for_each(|(s, inner)| match s.0.map_lookup(&value) {
Some(v) => inner.add(v).expect("inner.add(v)"),
buffers.iter_mut().try_for_each(|(s, inner)| {
match s.0.map_lookup(&value) {
Some(v) => inner.add(v)?,
None => inner.add_null(),
});
}
PolarsResult::Ok(())
})?;
}
_ => {
buffers.iter_mut().for_each(|(_, inner)| inner.add_null());
Expand All @@ -282,21 +284,14 @@ fn parse_impl(
fn parse_lines(bytes: &[u8], buffers: &mut PlIndexMap<BufferKey, Buffer>) -> PolarsResult<()> {
let mut buf = vec![];

let total_bytes = bytes.len();
let mut offset = 0;
// The `RawValue` is a pointer to the original JSON string and does not perform any deserialization.
// It is used to properly iterate over the lines without re-implementing the splitlines logic when this does the same thing.
let mut iter =
serde_json::Deserializer::from_slice(bytes).into_iter::<Box<serde_json::value::RawValue>>();
while let Some(Ok(value)) = iter.next() {
let bytes = value.get().as_bytes();
offset += bytes.len();
parse_impl(bytes, buffers, &mut buf)?;
}
polars_ensure!(
offset == total_bytes,
ComputeError: "expected {} bytes, but only parsed {}", total_bytes, offset,
);
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions polars/polars-json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ edition = "2021"
[dependencies]
ahash.workspace = true
arrow.workspace = true
fallible-streaming-iterator = "0.1"
hashbrown.workspace = true
indexmap.workspace = true
num-traits.workspace = true
polars-arrow = { version = "0.29.0", path = "../polars-arrow", default-features = false }
polars-error = { version = "0.29.0", path = "../polars-error" }
Expand Down
194 changes: 194 additions & 0 deletions polars/polars-json/src/json/infer_schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
use std::borrow::Borrow;

use arrow::datatypes::{DataType, Field, Schema};
use indexmap::map::Entry;
use indexmap::IndexMap;
use simd_json::borrowed::Object;
use simd_json::{BorrowedValue, StaticNode};

use super::*;

const ITEM_NAME: &str = "item";

/// Infers [`DataType`] from [`Value`].
pub fn infer(json: &BorrowedValue) -> PolarsResult<DataType> {
Ok(match json {
BorrowedValue::Static(StaticNode::Bool(_)) => DataType::Boolean,
BorrowedValue::Static(StaticNode::U64(_) | StaticNode::I64(_)) => DataType::Int64,
BorrowedValue::Static(StaticNode::F64(_)) => DataType::Float64,
BorrowedValue::Static(StaticNode::Null) => DataType::Null,
BorrowedValue::Array(array) => infer_array(array)?,
BorrowedValue::String(_) => DataType::LargeUtf8,
BorrowedValue::Object(inner) => infer_object(inner)?,
})
}

/// Infers [`Schema`] from JSON [`Value`] in (pandas-compatible) records format.
pub fn infer_records_schema(json: &BorrowedValue) -> PolarsResult<Schema> {
let outer_array = match json {
BorrowedValue::Array(array) => Ok(array),
_ => Err(PolarsError::ComputeError(
"outer type is not an array".into(),
)),
}?;

let fields = match outer_array.iter().next() {
Some(BorrowedValue::Object(record)) => record
.iter()
.map(|(name, json)| {
let data_type = infer(json)?;

Ok(Field {
name: name.to_string(),
data_type: DataType::LargeList(Box::new(Field {
name: format!("{name}-records"),
data_type,
is_nullable: true,
metadata: Default::default(),
})),
is_nullable: true,
metadata: Default::default(),
})
})
.collect::<PolarsResult<Vec<_>>>(),
None => Ok(vec![]),
_ => Err(PolarsError::ComputeError(
"first element in array is not a record".into(),
)),
}?;

Ok(Schema {
fields,
metadata: Default::default(),
})
}

fn filter_map_nulls(dt: DataType) -> Option<DataType> {
if dt == DataType::Null {
None
} else {
Some(dt)
}
}

fn infer_object(inner: &Object) -> PolarsResult<DataType> {
let fields = inner
.iter()
.filter_map(|(key, value)| {
infer(value)
.map(|dt| filter_map_nulls(dt).map(|dt| (key, dt)))
.transpose()
})
.map(|maybe_dt| {
let (key, dt) = maybe_dt?;
Ok(Field::new(key.as_ref(), dt, true))
})
.collect::<PolarsResult<Vec<_>>>()?;
Ok(DataType::Struct(fields))
}

fn infer_array(values: &[BorrowedValue]) -> PolarsResult<DataType> {
let types = values
.iter()
.map(infer)
.filter_map(|x| x.map(filter_map_nulls).transpose())
// deduplicate entries
.collect::<PolarsResult<PlHashSet<_>>>()?;

let dt = if !types.is_empty() {
let types = types.into_iter().collect::<Vec<_>>();
coerce_data_type(&types)
} else {
DataType::Null
};

// if a record contains only nulls, it is not
// added to values
Ok(if dt == DataType::Null {
dt
} else {
DataType::LargeList(Box::new(Field::new(ITEM_NAME, dt, true)))
})
}

/// Coerce an heterogeneous set of [`DataType`] into a single one. Rules:
/// * The empty set is coerced to `Null`
/// * `Int64` and `Float64` are `Float64`
/// * Lists and scalars are coerced to a list of a compatible scalar
/// * Structs contain the union of all fields
/// * All other types are coerced to `Utf8`
pub(crate) fn coerce_data_type<A: Borrow<DataType>>(datatypes: &[A]) -> DataType {
use DataType::*;

if datatypes.is_empty() {
return Null;
}

let are_all_equal = datatypes.windows(2).all(|w| w[0].borrow() == w[1].borrow());

if are_all_equal {
return datatypes[0].borrow().clone();
}

let are_all_structs = datatypes.iter().all(|x| matches!(x.borrow(), Struct(_)));

if are_all_structs {
// all are structs => union of all fields (that may have equal names)
let fields = datatypes.iter().fold(vec![], |mut acc, dt| {
if let Struct(new_fields) = dt.borrow() {
acc.extend(new_fields);
};
acc
});
// group fields by unique
let fields = fields.iter().fold(
IndexMap::<&str, PlHashSet<&DataType>, ahash::RandomState>::default(),
|mut acc, field| {
match acc.entry(field.name.as_str()) {
Entry::Occupied(mut v) => {
v.get_mut().insert(&field.data_type);
}
Entry::Vacant(v) => {
let mut a = PlHashSet::default();
a.insert(&field.data_type);
v.insert(a);
}
}
acc
},
);
// and finally, coerce each of the fields within the same name
let fields = fields
.into_iter()
.map(|(name, dts)| {
let dts = dts.into_iter().collect::<Vec<_>>();
Field::new(name, coerce_data_type(&dts), true)
})
.collect();
return Struct(fields);
} else if datatypes.len() > 2 {
return LargeUtf8;
}
let (lhs, rhs) = (datatypes[0].borrow(), datatypes[1].borrow());

return match (lhs, rhs) {
(lhs, rhs) if lhs == rhs => lhs.clone(),
(LargeList(lhs), LargeList(rhs)) => {
let inner = coerce_data_type(&[lhs.data_type(), rhs.data_type()]);
LargeList(Box::new(Field::new(ITEM_NAME, inner, true)))
}
(scalar, List(list)) => {
let inner = coerce_data_type(&[scalar, list.data_type()]);
LargeList(Box::new(Field::new(ITEM_NAME, inner, true)))
}
(LargeList(list), scalar) => {
let inner = coerce_data_type(&[scalar, list.data_type()]);
LargeList(Box::new(Field::new(ITEM_NAME, inner, true)))
}
(Float64, Int64) => Float64,
(Int64, Float64) => Float64,
(Int64, Boolean) => Int64,
(Boolean, Int64) => Int64,
(_, _) => LargeUtf8,
};
}
4 changes: 4 additions & 0 deletions polars/polars-json/src/json/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
pub mod deserialize;
pub(crate) mod infer_schema;

pub use deserialize::deserialize;
pub use infer_schema::{infer, infer_records_schema};
use polars_error::*;
use polars_utils::aliases::*;
Loading