Skip to content

Commit

Permalink
feat[python, rust]: add scan_json and a LazyJsonReader (#4382)
Browse files Browse the repository at this point in the history
  • Loading branch information
universalmind303 committed Aug 18, 2022
1 parent baba89c commit e846a56
Show file tree
Hide file tree
Showing 17 changed files with 322 additions and 9 deletions.
27 changes: 27 additions & 0 deletions examples/datasets/foods1.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{"category":"vegetables","calories":45,"fats_g":0.5,"sugars_g":2}
{"category":"seafood","calories":150,"fats_g":5.0,"sugars_g":0}
{"category":"meat","calories":100,"fats_g":5.0,"sugars_g":0}
{"category":"fruit","calories":60,"fats_g":0.0,"sugars_g":11}
{"category":"seafood","calories":140,"fats_g":5.0,"sugars_g":1}
{"category":"meat","calories":120,"fats_g":10.0,"sugars_g":1}
{"category":"vegetables","calories":20,"fats_g":0.0,"sugars_g":2}
{"category":"fruit","calories":30,"fats_g":0.0,"sugars_g":5}
{"category":"seafood","calories":130,"fats_g":5.0,"sugars_g":0}
{"category":"fruit","calories":50,"fats_g":4.5,"sugars_g":0}
{"category":"meat","calories":110,"fats_g":7.0,"sugars_g":0}
{"category":"vegetables","calories":25,"fats_g":0.0,"sugars_g":2}
{"category":"fruit","calories":30,"fats_g":0.0,"sugars_g":3}
{"category":"vegetables","calories":22,"fats_g":0.0,"sugars_g":3}
{"category":"vegetables","calories":25,"fats_g":0.0,"sugars_g":4}
{"category":"seafood","calories":100,"fats_g":5.0,"sugars_g":0}
{"category":"seafood","calories":200,"fats_g":10.0,"sugars_g":0}
{"category":"seafood","calories":200,"fats_g":7.0,"sugars_g":2}
{"category":"fruit","calories":60,"fats_g":0.0,"sugars_g":11}
{"category":"meat","calories":110,"fats_g":7.0,"sugars_g":0}
{"category":"vegetables","calories":25,"fats_g":0.0,"sugars_g":3}
{"category":"seafood","calories":200,"fats_g":7.0,"sugars_g":2}
{"category":"seafood","calories":130,"fats_g":1.5,"sugars_g":0}
{"category":"fruit","calories":130,"fats_g":0.0,"sugars_g":25}
{"category":"meat","calories":100,"fats_g":7.0,"sugars_g":0}
{"category":"vegetables","calories":30,"fats_g":0.0,"sugars_g":5}
{"category":"fruit","calories":50,"fats_g":0.0,"sugars_g":11}
3 changes: 2 additions & 1 deletion polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ strings = ["polars-core/strings", "polars-lazy/strings", "polars-ops/strings"]
object = ["polars-core/object", "polars-lazy/object"]

# support for arrows json parsing
json = ["polars-io", "polars-io/json"]
json = ["polars-io", "polars-io/json", "polars-lazy/json"]

# support for arrows ipc file parsing
ipc = ["polars-io", "polars-io/ipc", "polars-lazy/ipc"]
Expand Down Expand Up @@ -139,6 +139,7 @@ test = [
"parquet",
"ipc",
"ipc_streaming",
"json",
]

# don't use this
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-io/src/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ where

match self.json_format {
JsonFormat::JsonLines => {
let serializer = ndjson::write::Serializer::new(batches, vec![]);
let writer = ndjson::write::FileWriter::new(&mut self.buffer, serializer);
let serializer = arrow_ndjson::write::Serializer::new(batches, vec![]);
let writer = arrow_ndjson::write::FileWriter::new(&mut self.buffer, serializer);
writer.collect::<ArrowResult<()>>()?;
}
JsonFormat::Json => {
Expand Down
7 changes: 6 additions & 1 deletion polars/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ pub mod json;
#[cfg_attr(docsrs, doc(cfg(feature = "json")))]
pub mod ndjson_core;

#[cfg(any(feature = "csv-file", feature = "parquet", feature = "ipc"))]
#[cfg(any(
feature = "csv-file",
feature = "parquet",
feature = "ipc",
feature = "json"
))]
pub mod mmap;
mod options;
#[cfg(feature = "parquet")]
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-io/src/ndjson_core/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fs::File;
use std::io::Cursor;
use std::path::PathBuf;

pub use arrow::{array::StructArray, io::ndjson};
pub use arrow::{array::StructArray, io::ndjson as arrow_ndjson};
use polars_core::{prelude::*, utils::accumulate_dataframes_vertical, POOL};
use rayon::prelude::*;

Expand Down Expand Up @@ -157,7 +157,7 @@ impl<'a> CoreJsonReader<'a> {
let bytes: &[u8] = &reader_bytes;
let mut cursor = Cursor::new(bytes);

let data_type = ndjson::read::infer(&mut cursor, infer_schema_len).unwrap();
let data_type = arrow_ndjson::read::infer(&mut cursor, infer_schema_len).unwrap();
let schema: polars_core::prelude::Schema =
StructArray::get_fields(&data_type).into();

Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ compile = []
default = ["compile", "private"]
parquet = ["polars-core/parquet", "polars-io/parquet"]
ipc = ["polars-io/ipc"]
json = ["polars-io/json"]
csv-file = ["polars-io/csv-file"]
temporal = ["polars-core/temporal", "polars-time", "dtype-datetime"]
# debugging purposes
Expand Down
4 changes: 4 additions & 0 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
mod csv;
#[cfg(feature = "ipc")]
mod ipc;
#[cfg(feature = "json")]
mod ndjson;
#[cfg(feature = "parquet")]
mod parquet;
#[cfg(feature = "python")]
Expand All @@ -18,6 +20,8 @@ pub use anonymous_scan::*;
pub use csv::*;
#[cfg(feature = "ipc")]
pub use ipc::*;
#[cfg(feature = "json")]
pub use ndjson::*;
#[cfg(feature = "parquet")]
pub use parquet::*;
use polars_arrow::prelude::QuantileInterpolOptions;
Expand Down
90 changes: 90 additions & 0 deletions polars/polars-lazy/src/frame/ndjson.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use polars_core::prelude::*;
use polars_io::RowCount;

use super::{LazyFrame, ScanArgsAnonymous};

pub struct LazyJsonLineReader {
pub(crate) path: String,
pub(crate) batch_size: Option<usize>,
pub(crate) low_memory: bool,
pub(crate) rechunk: bool,
pub(crate) schema: Option<Schema>,
pub(crate) row_count: Option<RowCount>,
pub(crate) infer_schema_length: Option<usize>,
pub(crate) n_rows: Option<usize>,
}

impl LazyJsonLineReader {
pub fn new(path: String) -> Self {
LazyJsonLineReader {
path,
batch_size: None,
low_memory: false,
rechunk: true,
schema: None,
row_count: None,
infer_schema_length: Some(100),
n_rows: None,
}
}
/// Add a `row_count` column.
#[must_use]
pub fn with_row_count(mut self, row_count: Option<RowCount>) -> Self {
self.row_count = row_count;
self
}
/// Try to stop parsing when `n` rows are parsed. During multithreaded parsing the upper bound `n` cannot
/// be guaranteed.
#[must_use]
pub fn with_n_rows(mut self, num_rows: Option<usize>) -> Self {
self.n_rows = num_rows;
self
}
/// Set the number of rows to use when inferring the json schema.
/// the default is 100 rows.
/// Setting to `None` will do a full table scan, very slow.
#[must_use]
pub fn with_infer_schema_length(mut self, num_rows: Option<usize>) -> Self {
self.infer_schema_length = num_rows;
self
}
/// Set the JSON file's schema
#[must_use]
pub fn with_schema(mut self, schema: Schema) -> Self {
self.schema = Some(schema);
self
}

/// Reduce memory usage in expensive of performance
#[must_use]
pub fn low_memory(mut self, toggle: bool) -> Self {
self.low_memory = toggle;
self
}

/// Rechunk the memory to contiguous chunks when parsing is done.
#[must_use]
pub fn with_rechunk(mut self, toggle: bool) -> Self {
self.rechunk = toggle;
self
}

#[must_use]
pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
self.batch_size = batch_size;
self
}

pub fn finish(self) -> Result<LazyFrame> {
let options = ScanArgsAnonymous {
name: "JSON SCAN",
infer_schema_length: self.infer_schema_length,
n_rows: self.n_rows,
row_count: self.row_count.clone(),
schema: self.schema.clone(),
..ScanArgsAnonymous::default()
};

LazyFrame::anonymous_scan(std::sync::Arc::new(self), options)
}
}
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
mod csv;
#[cfg(feature = "ipc")]
mod ipc;
#[cfg(feature = "json")]
mod ndjson;
#[cfg(feature = "parquet")]
mod parquet;

Expand Down
30 changes: 30 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/scan/ndjson.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use super::*;
use crate::prelude::{AnonymousScan, AnonymousScanOptions, LazyJsonLineReader};

impl AnonymousScan for LazyJsonLineReader {
fn scan(&self, scan_opts: AnonymousScanOptions) -> Result<DataFrame> {
let schema = scan_opts.output_schema.unwrap_or(scan_opts.schema);
JsonLineReader::from_path(&self.path)?
.with_schema(&schema)
.with_rechunk(self.rechunk)
.with_chunk_size(self.batch_size)
.low_memory(self.low_memory)
.with_n_rows(scan_opts.n_rows)
.with_chunk_size(self.batch_size)
.finish()
}

fn schema(&self, infer_schema_length: Option<usize>) -> Result<Schema> {
let f = std::fs::File::open(&self.path)?;
let mut reader = std::io::BufReader::new(f);

let data_type = arrow_ndjson::read::infer(&mut reader, infer_schema_length)
.map_err(|err| PolarsError::ComputeError(format!("{:#?}", err).into()))?;
let schema: Schema = StructArray::get_fields(&data_type).into();

Ok(schema)
}
fn allows_projection_pushdown(&self) -> bool {
true
}
}
3 changes: 1 addition & 2 deletions py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ avro = ["polars/avro"]
parquet = ["polars/parquet"]
ipc = ["polars/ipc"]
is_in = ["polars/is_in"]
json = ["polars/serde", "serde_json"]
json = ["polars/serde", "serde_json", "polars/json"]
trigonometry = ["polars/trigonometry"]
sign = ["polars/sign"]
asof_join = ["polars/asof_join"]
Expand All @@ -57,7 +57,6 @@ all = [
"ipc",
"avro",
"is_in",
"json",
"repeat_by",
"trigonometry",
"sign",
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ def version() -> str:
scan_csv,
scan_ds,
scan_ipc,
scan_ndjson,
scan_parquet,
)
from polars.show_versions import show_versions
Expand Down Expand Up @@ -181,6 +182,7 @@ def version() -> str:
"scan_ipc",
"scan_ds",
"scan_parquet",
"scan_ndjson",
"read_ipc_schema",
"read_parquet_schema",
"read_avro",
Expand Down
34 changes: 34 additions & 0 deletions py-polars/polars/internals/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,40 @@ def scan_ipc(
)
return self

@classmethod
def scan_ndjson(
cls: type[LDF],
file: str,
infer_schema_length: int | None = None,
batch_size: int | None = None,
n_rows: int | None = None,
low_memory: bool = False,
rechunk: bool = True,
row_count_name: str | None = None,
row_count_offset: int = 0,
) -> LDF:
"""
Lazily read from a JSON file.
Use ``pl.scan_ndjson`` to dispatch to this method.
See Also
--------
polars.io.scan_ndjson
"""
self = cls.__new__(cls)
self._ldf = PyLazyFrame.new_from_ndjson(
file,
infer_schema_length,
batch_size,
n_rows,
low_memory,
rechunk,
_prepare_row_count_args(row_count_name, row_count_offset),
)
return self

@classmethod
def from_json(cls, json: str) -> LazyFrame:
"""
Expand Down
52 changes: 52 additions & 0 deletions py-polars/polars/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,58 @@ def scan_parquet(
)


def scan_ndjson(
file: str | Path,
infer_schema_length: int | None = 100,
batch_size: int | None = 1024,
n_rows: int | None = None,
low_memory: bool = False,
rechunk: bool = True,
row_count_name: str | None = None,
row_count_offset: int = 0,
) -> LazyFrame:
"""
Lazily read from a newline delimited JSON file.
This allows the query optimizer to push down predicates and projections to the scan
level, thereby potentially reducing memory overhead.
Parameters
----------
file
Path to a file.
infer_schema_length
Infer the schema length from the first ``infer_schema_length`` rows.
batch_size
Number of rows to read in each batch.
n_rows
Stop reading from JSON file after reading ``n_rows``.
low_memory
Reduce memory pressure at the expense of performance.
rechunk
Reallocate to contiguous memory when all chunks/ files are parsed.
row_count_name
If not None, this will insert a row count column with give name into the
DataFrame
row_count_offset
Offset to start the row_count column (only use if the name is set)
"""
if isinstance(file, (str, Path)):
file = format_path(file)

return LazyFrame.scan_ndjson(
file=file,
infer_schema_length=infer_schema_length,
batch_size=batch_size,
n_rows=n_rows,
low_memory=low_memory,
rechunk=rechunk,
row_count_name=row_count_name,
row_count_offset=row_count_offset,
)


@deprecated_alias(projection="columns")
def read_avro(
file: str | Path | BytesIO | BinaryIO,
Expand Down

0 comments on commit e846a56

Please sign in to comment.