Skip to content

Commit

Permalink
ndjson reader complex types support (#3665)
Browse files Browse the repository at this point in the history
* fallback to "anyvalue" for ndjson parsing

* wip: ndjson reader updates

* update ndjson reader

* chore: code cleanup

* chore: code cleanup

* chore: code cleanup

* use indexmap instead of btreemap

* default to utf8 for not(dtype-struct)

* fix feature flagging

* try again with a default to utf8

* cargo fmt

* try again with a default to utf8
  • Loading branch information
universalmind303 committed Jun 13, 2022
1 parent d369f45 commit 5b9a432
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 28 deletions.
20 changes: 8 additions & 12 deletions nodejs-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,18 +663,14 @@ struct JsonScan {

impl AnonymousScan for JsonScan {
fn scan(&self, scan_opts: AnonymousScanOptions) -> polars::prelude::Result<DataFrame> {
if let Some(s) = scan_opts.output_schema {
JsonLineReader::from_path(&self.path)
.expect("unable to read file")
.with_schema(&s)
.with_chunk_size(self.batch_size)
.finish()
} else {
JsonLineReader::from_path(&self.path)
.expect("unable to read file")
.with_chunk_size(self.batch_size)
.finish()
}
let schema = scan_opts.output_schema.unwrap_or(scan_opts.schema);
JsonLineReader::from_path(&self.path)
.expect("unable to read file")
.with_schema(&schema)
.with_chunk_size(self.batch_size)
.with_n_rows(scan_opts.n_rows)
.finish()

}

fn schema(&self, infer_schema_length: Option<usize>) -> polars::prelude::Result<Schema> {
Expand Down
90 changes: 80 additions & 10 deletions polars/polars-io/src/ndjson_core/buffer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use arrow::types::NativeType;
use num::traits::NumCast;
use polars_core::prelude::*;
use polars_time::prelude::utf8::infer::infer_pattern_single;
use polars_time::prelude::utf8::infer::DatetimeInfer;
use polars_time::prelude::utf8::Pattern;
use serde_json::Value;

use arrow::types::NativeType;
pub(crate) fn init_buffers(schema: &Schema, capacity: usize) -> Result<PlHashMap<String, Buffer>> {
pub(crate) fn init_buffers(schema: &Schema, capacity: usize) -> Result<PlIndexMap<String, Buffer>> {
schema
.iter()
.map(|(name, dtype)| {
Expand All @@ -27,19 +26,15 @@ pub(crate) fn init_buffers(schema: &Schema, capacity: usize) -> Result<PlHashMap
}
#[cfg(feature = "dtype-date")]
&DataType::Date => Buffer::Date(PrimitiveChunkedBuilder::new(name, capacity)),
other => {
return Err(PolarsError::ComputeError(
format!("Unsupported data type {:?} when reading a csv", other).into(),
))
}
_ => Buffer::All((Vec::with_capacity(capacity), name)),
};
Ok((name.clone(), builder))
})
.collect()
}

#[allow(clippy::large_enum_variant)]
pub(crate) enum Buffer {
pub(crate) enum Buffer<'a> {
Boolean(BooleanChunkedBuilder),
Int32(PrimitiveChunkedBuilder<Int32Type>),
Int64(PrimitiveChunkedBuilder<Int64Type>),
Expand All @@ -52,9 +47,10 @@ pub(crate) enum Buffer {
Datetime(PrimitiveChunkedBuilder<Int64Type>),
#[cfg(feature = "dtype-date")]
Date(PrimitiveChunkedBuilder<Int32Type>),
All((Vec<AnyValue<'a>>, &'a str)),
}

impl Buffer {
impl<'a> Buffer<'a> {
pub(crate) fn into_series(self) -> Result<Series> {
let s = match self {
Buffer::Boolean(v) => v.finish().into_series(),
Expand All @@ -73,6 +69,7 @@ impl Buffer {
#[cfg(feature = "dtype-date")]
Buffer::Date(v) => v.finish().into_series().cast(&DataType::Date).unwrap(),
Buffer::Utf8(v) => v.finish().into_series(),
Buffer::All((vals, name)) => Series::new(name, vals),
};
Ok(s)
}
Expand All @@ -91,6 +88,7 @@ impl Buffer {
Buffer::Datetime(v) => v.append_null(),
#[cfg(feature = "dtype-date")]
Buffer::Date(v) => v.append_null(),
Buffer::All((v, _)) => v.push(AnyValue::Null),
};
}

Expand Down Expand Up @@ -173,6 +171,11 @@ impl Buffer {
buf.append_option(v);
Ok(())
}
All((buf, _)) => {
let av = deserialize_all(value);
buf.push(av);
Ok(())
}
}
}
}
Expand Down Expand Up @@ -210,3 +213,70 @@ where
},
}
}

#[cfg(feature = "dtype-struct")]
fn value_to_dtype(val: &Value) -> DataType {
match val {
Value::Null => DataType::Null,
Value::Bool(_) => DataType::Boolean,
Value::Number(n) => {
if n.is_i64() {
DataType::Int64
} else if n.is_u64() {
DataType::UInt64
} else {
DataType::Float64
}
}
Value::Array(arr) => {
let dtype = value_to_dtype(&arr[0]);

DataType::List(Box::new(dtype))
}
#[cfg(feature = "dtype-struct")]
Value::Object(doc) => {
let fields = doc.iter().map(|(key, value)| {
let dtype = value_to_dtype(value);
Field::new(key, dtype)
});
DataType::Struct(fields.collect())
}
_ => DataType::Utf8,
}
}

fn deserialize_all<'a, 'b>(json: &'b Value) -> AnyValue<'a> {
match json {
Value::Bool(b) => AnyValue::Boolean(*b),
Value::Number(n) => {
if n.is_i64() {
AnyValue::Int64(n.as_i64().unwrap())
} else if n.is_u64() {
AnyValue::UInt64(n.as_u64().unwrap())
} else {
AnyValue::Float64(n.as_f64().unwrap())
}
}
Value::Array(arr) => {
let vals: Vec<AnyValue> = arr.iter().map(deserialize_all).collect();

let s = Series::new("", vals);
AnyValue::List(s)
}
Value::Null => AnyValue::Null,
#[cfg(feature = "dtype-struct")]
Value::Object(doc) => {
let vals: (Vec<AnyValue>, Vec<Field>) = doc
.into_iter()
.map(|(key, value)| {
let dt = value_to_dtype(value);
let fld = Field::new(key, dt);
let av: AnyValue<'a> = deserialize_all(value);
(av, fld)
})
.unzip();
AnyValue::StructOwned(Box::new(vals))
}
val => AnyValue::Utf8Owned(format!("{:#?}", val)),
}
}
22 changes: 16 additions & 6 deletions polars/polars-io/src/ndjson_core/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::borrow::Cow;
use std::fs::File;
use std::io::Cursor;
use std::path::PathBuf;

const QUOTE_CHAR: u8 = "\"".as_bytes()[0];
const SEP: u8 = ",".as_bytes()[0];

Expand Down Expand Up @@ -171,10 +170,24 @@ impl<'a> CoreJsonReader<'a> {
})
}
fn parse_json(&mut self, mut n_threads: usize, bytes: &[u8]) -> Result<DataFrame> {
let mut bytes = bytes;
let mut total_rows = 128;

if let Some((mean, std)) = get_line_stats(bytes, self.sample_size) {
let line_length_upper_bound = mean + 1.1 * std;

total_rows = (bytes.len() as f32 / (mean - 0.01 * std)) as usize;
if let Some(n_rows) = self.n_rows {
total_rows = std::cmp::min(n_rows, total_rows);
// the guessed upper bound of the no. of bytes in the file
let n_bytes = (line_length_upper_bound * (n_rows as f32)) as usize;

if n_bytes < bytes.len() {
if let Some(pos) = next_line_position_naive(&bytes[n_bytes..]) {
bytes = &bytes[..n_bytes + pos]
}
}
}
}

if total_rows == 128 {
Expand Down Expand Up @@ -222,7 +235,6 @@ impl<'a> CoreJsonReader<'a> {
})
.collect::<Result<Vec<_>>>()
})?;

accumulate_dataframes_vertical(dfs)
}
pub fn as_df(&mut self) -> Result<DataFrame> {
Expand All @@ -243,7 +255,7 @@ impl<'a> CoreJsonReader<'a> {
}
}

fn parse_lines(bytes: &[u8], buffers: &mut PlHashMap<String, Buffer>) -> Result<usize> {
fn parse_lines<'a>(bytes: &[u8], buffers: &mut PlIndexMap<String, Buffer<'a>>) -> Result<usize> {
let mut stream = Deserializer::from_slice(bytes).into_iter::<Value>();
for value in stream.by_ref() {
let v = value.unwrap_or(Value::Null);
Expand All @@ -252,9 +264,7 @@ fn parse_lines(bytes: &[u8], buffers: &mut PlHashMap<String, Buffer>) -> Result<
buffers
.iter_mut()
.for_each(|(s, inner)| match value.get(s) {
Some(v) => {
inner.add(v).expect("inner.add(v)");
}
Some(v) => inner.add(v).expect("inner.add(v)"),
None => inner.add_null(),
});
}
Expand Down

0 comments on commit 5b9a432

Please sign in to comment.