Skip to content

Commit

Permalink
update arrow and json writers (#2201)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 28, 2021
1 parent ff2d2f8 commit a982b50
Show file tree
Hide file tree
Showing 13 changed files with 180 additions and 62 deletions.
2 changes: 1 addition & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "830bf5fb519010a9b6faa347a06cedab2044206b", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "d14ae86c69cd76957adec3b14bb62d93732b43c9", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", default-features = false, features = ["compute"], branch = "offset_pub" }
# arrow = { package = "arrow2", version = "0.8", default-features = false }
num = "^0.4"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ unsafe_unwrap = "^0.1.0"
package = "arrow2"
git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
rev = "830bf5fb519010a9b6faa347a06cedab2044206b"
rev = "d14ae86c69cd76957adec3b14bb62d93732b43c9"
# branch = "offset_pub"
# version = "0.8"
default-features = false
Expand Down
17 changes: 13 additions & 4 deletions polars/polars-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,7 @@ impl Schema {
ArrowDataType::Dictionary(
IntegerType::UInt32,
Box::new(ArrowDataType::LargeUtf8),
false,
),
true,
),
Expand Down Expand Up @@ -846,7 +847,7 @@ impl From<&ArrowDataType> for DataType {
ArrowDataType::LargeUtf8 => DataType::Utf8,
ArrowDataType::Utf8 => DataType::Utf8,
ArrowDataType::Time64(_) | ArrowDataType::Time32(_) => DataType::Time,
ArrowDataType::Dictionary(_, _) => DataType::Categorical,
ArrowDataType::Dictionary(_, _, _) => DataType::Categorical,
ArrowDataType::Extension(name, _, _) if name == "POLARS_EXTENSION_TYPE" => {
#[cfg(feature = "object")]
{
Expand Down Expand Up @@ -944,15 +945,23 @@ mod test {
DataType::List(DataType::Float64.into()),
),
(
ArrowDataType::Dictionary(IntegerType::UInt32, ArrowDataType::Utf8.into()),
ArrowDataType::Dictionary(IntegerType::UInt32, ArrowDataType::Utf8.into(), false),
DataType::Categorical,
),
(
ArrowDataType::Dictionary(IntegerType::UInt32, ArrowDataType::LargeUtf8.into()),
ArrowDataType::Dictionary(
IntegerType::UInt32,
ArrowDataType::LargeUtf8.into(),
false,
),
DataType::Categorical,
),
(
ArrowDataType::Dictionary(IntegerType::UInt64, ArrowDataType::LargeUtf8.into()),
ArrowDataType::Dictionary(
IntegerType::UInt64,
ArrowDataType::LargeUtf8.into(),
false,
),
DataType::Categorical,
),
];
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-core/src/series/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,11 @@ impl TryFrom<(&str, Vec<ArrayRef>)> for Series {
Ok(UInt32Chunked::full_null(name, len).into_series())
}
#[cfg(not(feature = "dtype-categorical"))]
ArrowDataType::Dictionary(_, _) => {
ArrowDataType::Dictionary(_, _, _) => {
panic!("activate dtype-categorical to convert dictionary arrays")
}
#[cfg(feature = "dtype-categorical")]
ArrowDataType::Dictionary(key_type, value_type) => {
ArrowDataType::Dictionary(key_type, value_type, _) => {
use crate::chunked_array::categorical::CategoricalChunkedBuilder;
use arrow::datatypes::IntegerType;
let chunks = chunks.iter().map(|arr| &**arr).collect::<Vec<_>>();
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private = []
[dependencies]
ahash = "0.7"
anyhow = "1.0"
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "830bf5fb519010a9b6faa347a06cedab2044206b", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "d14ae86c69cd76957adec3b14bb62d93732b43c9", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", default-features = false, features = ["compute"], branch = "offset_pub" }
# arrow = { package = "arrow2", version = "0.8", default-features = false }
csv-core = { version = "0.1.10", optional = true }
Expand Down
1 change: 0 additions & 1 deletion polars/polars-io/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,6 @@ fn parse_dates(df: DataFrame, fixed_schema: &Schema) -> DataFrame {

#[cfg(test)]
mod test {
use crate::csv_core::utils::get_file_chunks;
use crate::prelude::*;
use polars_core::datatypes::AnyValue;
use polars_core::prelude::*;
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-io/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ where
let mut ipc_writer = write::FileWriter::try_new(
&mut self.writer,
&df.schema().to_arrow(),
None,
WriteOptions {
compression: self.compression,
},
Expand All @@ -256,7 +257,7 @@ where
let iter = df.iter_record_batches();

for batch in iter {
ipc_writer.write(&batch)?
ipc_writer.write(&batch, None)?
}
let _ = ipc_writer.finish()?;
Ok(())
Expand Down
152 changes: 108 additions & 44 deletions polars/polars-io/src/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
//! {"a":100000000000000, "b":0.6, "c":false, "d":"text"}"#;
//! let file = Cursor::new(basic_json);
//! let df = JsonReader::new(file)
//! .infer_schema(Some(3))
//! .infer_schema_len(Some(3))
//! .with_batch_size(3)
//! .finish()
//! .unwrap();
Expand Down Expand Up @@ -61,70 +61,88 @@
//! ```
//!
use crate::prelude::*;
use crate::{finish_reader, ArrowReader};
pub use arrow::{
error::Result as ArrowResult,
io::json::{Reader as ArrowJsonReader, ReaderBuilder},
record_batch::RecordBatch,
error::Result as ArrowResult, io::json::read, io::json::write, record_batch::RecordBatch,
};
use polars_core::prelude::*;
use std::io::Write;
use std::io::{Read, Seek};
use std::sync::Arc;
use polars_core::utils::accumulate_dataframes_vertical;
use std::convert::TryFrom;
use std::io::{BufRead, Seek, Write};

pub enum JsonFormat {
Json,
JsonLines,
}

// Write a DataFrame to JSON
pub struct JsonWriter<W: Write> {
/// File or Stream handler
buffer: W,
json_format: JsonFormat,
}

impl<W: Write> JsonWriter<W> {
pub fn with_json_format(mut self, format: JsonFormat) -> Self {
self.json_format = format;
self
}
}

impl<W> SerWriter<W> for JsonWriter<W>
where
W: Write,
{
fn new(buffer: W) -> Self {
JsonWriter { buffer }
JsonWriter {
buffer,
json_format: JsonFormat::JsonLines,
}
}

fn finish(self, df: &DataFrame) -> Result<()> {
let mut json_writer = arrow::io::json::LineDelimitedWriter::new(self.buffer);
fn finish(mut self, df: &DataFrame) -> Result<()> {
let batches = df.iter_record_batches().map(Ok);

let batches = df.as_record_batches()?;
json_writer.write_batches(&batches)?;
json_writer.finish()?;
match self.json_format {
JsonFormat::JsonLines => {
let format = write::LineDelimited::default();
let blocks = write::Serializer::new(batches, Vec::with_capacity(1024), format);
write::write(&mut self.buffer, format, blocks)?;
}
JsonFormat::Json => {
let format = write::JsonArray::default();
let blocks = write::Serializer::new(batches, Vec::with_capacity(1024), format);
write::write(&mut self.buffer, format, blocks)?;
}
}

Ok(())
}
}

impl<R: Read> ArrowReader for ArrowJsonReader<R> {
fn next_record_batch(&mut self) -> ArrowResult<Option<RecordBatch>> {
self.next()
}

fn schema(&self) -> Arc<Schema> {
Arc::new((&**self.schema()).into())
}
}

pub struct JsonReader<R>
where
R: Read + Seek,
R: BufRead + Seek,
{
reader: R,
reader_builder: ReaderBuilder,
rechunk: bool,
infer_schema_len: Option<usize>,
batch_size: usize,
projection: Option<Vec<String>>,
schema: Option<ArrowSchema>,
}

impl<R> SerReader<R> for JsonReader<R>
where
R: Read + Seek,
R: BufRead + Seek,
{
fn new(reader: R) -> Self {
JsonReader {
reader,
reader_builder: ReaderBuilder::new(),
rechunk: true,
infer_schema_len: Some(100),
batch_size: 8192,
projection: None,
schema: None,
}
}

Expand All @@ -133,44 +151,90 @@ where
self
}

fn finish(self) -> Result<DataFrame> {
fn finish(mut self) -> Result<DataFrame> {
let rechunk = self.rechunk;
finish_reader(
self.reader_builder.build(self.reader)?,
rechunk,
None,
None,
None,
)

let fields = if let Some(schema) = self.schema {
schema.fields
} else {
read::infer_and_reset(&mut self.reader, self.infer_schema_len)?
};
let projection = self
.projection
.map(|projection| {
Some(
projection
.iter()
.map(|name| {
fields
.iter()
.position(|fld| fld.name() == name)
.ok_or_else(|| PolarsError::NotFound(name.into()))
})
.collect::<Result<Vec<_>>>(),
)
})
.flatten()
.transpose()?;

let mut dfs = vec![];

// at most rows. This container can be re-used across batches.
let mut rows = vec![String::default(); self.batch_size];
loop {
let read = read::read_rows(&mut self.reader, &mut rows)?;
if read == 0 {
break;
}
let read_rows = &rows[..read];
let rb = read::deserialize(read_rows, fields.clone())?;
let df = DataFrame::try_from(rb)?;

if let Some(projection) = &projection {
let cols = projection
.iter()
.map(|idx| df.get_columns()[*idx].clone())
.collect::<Vec<_>>();
dfs.push(DataFrame::new_no_checks(cols))
} else {
dfs.push(df)
}
}

let mut out = accumulate_dataframes_vertical(dfs.into_iter())?;
if rechunk {
out.rechunk();
}
Ok(out)
}
}

impl<R> JsonReader<R>
where
R: Read + Seek,
R: BufRead + Seek,
{
/// Set the JSON file's schema
pub fn with_schema(mut self, schema: &Schema) -> Self {
self.reader_builder = self.reader_builder.with_schema(Arc::new(schema.to_arrow()));
self.schema = Some(schema.to_arrow());
self
}

/// Set the JSON reader to infer the schema of the file
pub fn infer_schema(mut self, max_records: Option<usize>) -> Self {
self.reader_builder = self.reader_builder.infer_schema(max_records);
pub fn infer_schema_len(mut self, max_records: Option<usize>) -> Self {
self.infer_schema_len = max_records;
self
}

/// Set the batch size (number of records to load at one time)
/// This heavily influences loading time.
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.reader_builder = self.reader_builder.with_batch_size(batch_size);
self.batch_size = batch_size;
self
}

/// Set the reader's column projection
pub fn with_projection(mut self, projection: Vec<String>) -> Self {
self.reader_builder = self.reader_builder.with_projection(projection);
pub fn with_projection(mut self, projection: Option<Vec<String>>) -> Self {
self.projection = projection;
self
}
}
Expand All @@ -196,7 +260,7 @@ mod test {
{"a":100000000000000, "b":0.6, "c":false, "d":"text"}"#;
let file = Cursor::new(basic_json);
let df = JsonReader::new(file)
.infer_schema(Some(3))
.infer_schema_len(Some(3))
.with_batch_size(3)
.finish()
.unwrap();
Expand Down
6 changes: 5 additions & 1 deletion py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ features = [
"ewma",
"dot_diagram",
"dataframe_arithmetic",
"json",
]

# [patch.crates-io]
Expand Down

0 comments on commit a982b50

Please sign in to comment.