Skip to content

Commit

Permalink
update arrow (#2805)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Mar 2, 2022
1 parent 3687cb4 commit fda66f0
Show file tree
Hide file tree
Showing 13 changed files with 166 additions and 93 deletions.
4 changes: 2 additions & 2 deletions polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "fb5d4330544d9d746dfc9bdacab5f1f5c1de9203", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "csv_write", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "5d0db548142e5a7944728fe81d00ed048f7114d9", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "csv_write", default-features = false }
# arrow = { package = "arrow2", version = "0.9", default-features = false, features = ["compute_concatenate"] }
hashbrown = "0.12"
num = "^0.4"
Expand Down
8 changes: 8 additions & 0 deletions polars/polars-arrow/src/conversion.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use arrow::array::{ArrayRef, StructArray};
use arrow::chunk::Chunk;
use arrow::datatypes::{DataType, Field};

pub fn chunk_to_struct(chunk: Chunk<ArrayRef>, fields: Vec<Field>) -> StructArray {
let dtype = DataType::Struct(fields);
StructArray::from_data(dtype, chunk.into_arrays(), None)
}
1 change: 1 addition & 0 deletions polars/polars-arrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod array;
pub mod bit_util;
#[cfg(feature = "compute")]
pub mod compute;
pub mod conversion;
pub mod error;
pub mod export;
pub mod index;
Expand Down
10 changes: 5 additions & 5 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,15 @@ rayon = "1.5"
regex = { version = "1.5", optional = true }
# activate if you want serde support for Series and DataFrames
serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1.0", optional = true }
serde_json = { version = "1", optional = true }
thiserror = "^1.0"

[dependencies.arrow]
package = "arrow2"
# git = "https://github.com/jorgecarleitao/arrow2"
git = "https://github.com/ritchie46/arrow2"
# rev = "fb5d4330544d9d746dfc9bdacab5f1f5c1de9203"
branch = "csv_write"
git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
rev = "5d0db548142e5a7944728fe81d00ed048f7114d9"
# branch = "csv_write"
# version = "0.9"
default-features = false
features = [
Expand Down
25 changes: 25 additions & 0 deletions polars/polars-core/src/frame/from.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use crate::prelude::*;
use arrow::array::StructArray;

impl TryFrom<StructArray> for DataFrame {
type Error = PolarsError;

fn try_from(arr: StructArray) -> Result<Self> {
let (fld, arrs, nulls) = arr.into_data();
if nulls.is_some() {
return Err(PolarsError::ComputeError(
"cannot deserialze struct with nulls into a DataFrame".into(),
));
}
let columns = fld
.iter()
.zip(arrs)
.map(|(fld, arr)| {
// Safety
// reported data type is correct
unsafe { Series::try_from_arrow_unchecked(&fld.name, vec![arr], fld.data_type()) }
})
.collect::<Result<Vec<_>>>()?;
DataFrame::new(columns)
}
}
1 change: 1 addition & 0 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod chunks;
#[cfg(feature = "cross_join")]
pub(crate) mod cross_join;
pub mod explode;
mod from;
pub mod groupby;
pub mod hash_join;
#[cfg(feature = "rows")]
Expand Down
7 changes: 4 additions & 3 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ description = "IO related logic for the Polars DataFrame library"

[features]
# support for arrows json parsing
json = ["arrow/io_json"]
json = ["arrow/io_json", "serde_json"]
# support for arrows ipc file parsing
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression"]
# support for arrow avro parsing
Expand All @@ -34,8 +34,8 @@ private = []
[dependencies]
ahash = "0.7"
anyhow = "1.0"
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "fb5d4330544d9d746dfc9bdacab5f1f5c1de9203", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "csv_write", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "5d0db548142e5a7944728fe81d00ed048f7114d9", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "csv_write", default-features = false }
# arrow = { package = "arrow2", version = "0.9", default-features = false }
csv-core = { version = "0.1.10", optional = true }
dirs = "4.0"
Expand All @@ -51,6 +51,7 @@ polars-core = { version = "0.19.1", path = "../polars-core", features = ["privat
polars-utils = { version = "0.1.0", path = "../polars-utils", optional = true }
rayon = "1.5"
regex = "1.5"
serde_json = { version = "1", optional = true }
simdutf8 = "0.1"

[package.metadata.docs.rs]
Expand Down
123 changes: 62 additions & 61 deletions polars/polars-io/src/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
//! {"a":1, "b":0.6, "c":false, "d":"text"}"#;
//! let file = Cursor::new(basic_json);
//! let df = JsonReader::new(file)
//! .with_json_format(JsonFormat::JsonLines)
//! .infer_schema_len(Some(3))
//! .with_batch_size(3)
//! .finish()
Expand Down Expand Up @@ -61,9 +62,15 @@
//! ```
//!
use crate::prelude::*;
pub use arrow::{error::Result as ArrowResult, io::json::read, io::json::write};
use arrow::array::{ArrayRef, StructArray};
use arrow::io::ndjson::read::FallibleStreamingIterator;
pub use arrow::{
error::Result as ArrowResult,
io::{json, ndjson},
};
use polars_arrow::conversion::chunk_to_struct;
use polars_arrow::kernels::concatenate::concatenate_owned_unchecked;
use polars_core::prelude::*;
use polars_core::utils::accumulate_dataframes_vertical;
use std::convert::TryFrom;
use std::io::{BufRead, Seek, Write};

Expand Down Expand Up @@ -100,22 +107,20 @@ where

fn finish(mut self, df: &mut DataFrame) -> Result<()> {
df.rechunk();
let batches = df.iter_chunks().map(Ok);
let names = df.get_column_names_owned();
let fields = df.iter().map(|s| s.field().to_arrow()).collect::<Vec<_>>();
let batches = df
.iter_chunks()
.map(|chunk| Ok(Arc::new(chunk_to_struct(chunk, fields.clone())) as ArrayRef));

match self.json_format {
JsonFormat::JsonLines => {
let format = write::LineDelimited::default();

let blocks =
write::Serializer::new(batches, names, Vec::with_capacity(1024), format);
write::write(&mut self.buffer, format, blocks)?;
let serializer = ndjson::write::Serializer::new(batches, vec![]);
let writer = ndjson::write::FileWriter::new(&mut self.buffer, serializer);
writer.collect::<ArrowResult<()>>()?;
}
JsonFormat::Json => {
let format = write::JsonArray::default();
let blocks =
write::Serializer::new(batches, names, Vec::with_capacity(1024), format);
write::write(&mut self.buffer, format, blocks)?;
let serializer = json::write::Serializer::new(batches, vec![]);
json::write::write(&mut self.buffer, serializer)?;
}
}

Expand All @@ -134,6 +139,7 @@ where
batch_size: usize,
projection: Option<Vec<String>>,
schema: Option<ArrowSchema>,
json_format: JsonFormat,
}

impl<R> SerReader<R> for JsonReader<R>
Expand All @@ -148,6 +154,7 @@ where
batch_size: 8192,
projection: None,
schema: None,
json_format: JsonFormat::Json,
}
}

Expand All @@ -157,58 +164,47 @@ where
}

fn finish(mut self) -> Result<DataFrame> {
let rechunk = self.rechunk;

let fields = if let Some(schema) = self.schema {
schema.fields
} else {
read::infer_and_reset(&mut self.reader, self.infer_schema_len)?
};
let projection = self
.projection
.map(|projection| {
projection
.iter()
.map(|name| {
fields
.iter()
.position(|fld| &fld.name == name)
.ok_or_else(|| PolarsError::NotFound(name.into()))
})
.collect::<Result<Vec<_>>>()
})
.transpose()?;

let mut dfs = vec![];

// at most rows. This container can be re-used across batches.
let mut rows = vec![String::default(); self.batch_size];
loop {
let read = read::read_rows(&mut self.reader, &mut rows)?;
if read == 0 {
break;
let out = match self.json_format {
JsonFormat::Json => {
let v = serde_json::from_reader(&mut self.reader)
.map_err(|e| PolarsError::ComputeError(format!("{:?}", e).into()))?;
// likely struct type
let dtype = json::read::infer(&v)?;
let arr = json::read::deserialize(&v, dtype)?;
let arr = arr.as_any().downcast_ref::<StructArray>().ok_or_else(|| {
PolarsError::ComputeError("only can deserialize json objects".into())
})?;
DataFrame::try_from(arr.clone())
}
let read_rows = &rows[..read];
let rb = read::deserialize(read_rows, &fields)?;
let df = DataFrame::try_from((rb, fields.as_slice()))?;
let cols = df.get_columns();
JsonFormat::JsonLines => {
let dtype = ndjson::read::infer(&mut self.reader, self.infer_schema_len)?;
self.reader.rewind()?;

if let Some(projection) = &projection {
let cols = projection
.iter()
.map(|idx| cols[*idx].clone())
.collect::<Vec<_>>();
dfs.push(DataFrame::new_no_checks(cols))
} else {
dfs.push(df)
let mut reader = ndjson::read::FileReader::new(
&mut self.reader,
vec!["".to_string(); self.batch_size],
None,
);
let mut arrays = vec![];
// `next` is IO-bounded
while let Some(rows) = reader.next()? {
// `deserialize` is CPU-bounded
let array = ndjson::read::deserialize(rows, dtype.clone())?;
arrays.push(array);
}
let arr = concatenate_owned_unchecked(&arrays)?;
let arr = arr.as_any().downcast_ref::<StructArray>().ok_or_else(|| {
PolarsError::ComputeError("only can deserialize json objects".into())
})?;
DataFrame::try_from(arr.clone())
}
}
}?;

let mut out = accumulate_dataframes_vertical(dfs.into_iter())?;
if rechunk {
out.rechunk();
if let Some(proj) = &self.projection {
out.select(proj)
} else {
Ok(out)
}
Ok(out)
}
}

Expand Down Expand Up @@ -240,6 +236,11 @@ where
self.projection = projection;
self
}

pub fn with_json_format(mut self, format: JsonFormat) -> Self {
self.json_format = format;
self
}
}

#[cfg(test)]
Expand All @@ -264,11 +265,11 @@ mod test {
let file = Cursor::new(basic_json);
let df = JsonReader::new(file)
.infer_schema_len(Some(3))
.with_json_format(JsonFormat::JsonLines)
.with_batch_size(3)
.finish()
.unwrap();

println!("{:?}", df);
assert_eq!("a", df.get_columns()[0].name());
assert_eq!("d", df.get_columns()[3].name());
assert_eq!((12, 4), df.shape());
Expand Down
3 changes: 2 additions & 1 deletion py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 3 additions & 8 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,20 +631,15 @@ def _read_ipc(
return self

@staticmethod
def _read_json(file: Union[str, IOBase]) -> "DataFrame":
def _read_json(file: Union[str, IOBase], json_lines: bool = False) -> "DataFrame":
"""
Read into a DataFrame from JSON format.
Parameters
----------
file
Path to a file or a file-like object.
See Also pl.read_json
"""
if isinstance(file, StringIO):
file = BytesIO(file.getvalue().encode())

self = DataFrame.__new__(DataFrame)
self._df = PyDataFrame.read_json(file)
self._df = PyDataFrame.read_json(file, json_lines)
return self

def to_arrow(self) -> "pa.Table":
Expand Down
6 changes: 4 additions & 2 deletions py-polars/polars/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -878,16 +878,18 @@ def read_parquet(
)


def read_json(source: Union[str, IOBase]) -> DataFrame:
def read_json(source: Union[str, IOBase], json_lines: bool = False) -> DataFrame:
"""
Read into a DataFrame from JSON format.
Parameters
----------
source
Path to a file or a file-like object.
json_lines
Toggle between "JSON" and "NDJSON" format
"""
return DataFrame._read_json(source)
return DataFrame._read_json(source, json_lines)


def read_sql(
Expand Down

0 comments on commit fda66f0

Please sign in to comment.