Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Refactored JSON IO (better support for JSON and NDJSON) #870

Merged
merged 6 commits into from
Feb 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 9 additions & 20 deletions benches/write_json.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,44 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::error::ArrowError;
use arrow2::io::json::write;
use arrow2::util::bench_util::*;

fn write_batch(columns: &Chunk<Arc<dyn Array>>) -> Result<()> {
fn write_array(array: Box<dyn Array>) -> Result<(), ArrowError> {
let mut writer = vec![];
let format = write::Format::Json;

let batches = vec![Ok(columns.clone())].into_iter();
let arrays = vec![Ok(array)].into_iter();

// Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(batches, vec!["c1".to_string()], vec![], format);
// Advancing this iterator serializes the next array to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(arrays, vec![]);

// the operation of writing is IO-bounded.
write::write(&mut writer, format, blocks)?;
write::write(&mut writer, blocks)?;

Ok(())
}

fn make_chunk(array: impl Array + 'static) -> Chunk<Arc<dyn Array>> {
Chunk::new(vec![Arc::new(array) as Arc<dyn Array>])
}

fn add_benchmark(c: &mut Criterion) {
(10..=18).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);

let array = create_primitive_array::<i32>(size, 0.1);
let columns = make_chunk(array);

c.bench_function(&format!("json write i32 2^{}", log2_size), |b| {
b.iter(|| write_batch(&columns))
b.iter(|| write_array(Box::new(array.clone())))
});

let array = create_string_array::<i32>(size, 100, 0.1, 42);
let columns = make_chunk(array);

c.bench_function(&format!("json write utf8 2^{}", log2_size), |b| {
b.iter(|| write_batch(&columns))
b.iter(|| write_array(Box::new(array.clone())))
});

let array = create_primitive_array::<f64>(size, 0.1);
let columns = make_chunk(array);

c.bench_function(&format!("json write f64 2^{}", log2_size), |b| {
b.iter(|| write_batch(&columns))
b.iter(|| write_array(Box::new(array.clone())))
});
});
}
Expand Down
14 changes: 4 additions & 10 deletions examples/json_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,17 @@ use std::io::BufReader;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::error::{ArrowError, Result};
use arrow2::error::Result;
use arrow2::io::json::read;

fn read_path(path: &str) -> Result<Arc<dyn Array>> {
// Example of reading a JSON file.
let reader = BufReader::new(File::open(path)?);
let data = serde_json::from_reader(reader)?;
let json = serde_json::from_reader(reader)?;

let values = if let serde_json::Value::Array(values) = data {
Ok(values)
} else {
Err(ArrowError::InvalidArgumentError("".to_string()))
}?;
let data_type = read::infer(&json)?;

let data_type = read::infer_rows(&values)?;

Ok(read::deserialize_json(&values, data_type))
read::deserialize(&json, data_type)
}

fn main() -> Result<()> {
Expand Down
40 changes: 15 additions & 25 deletions examples/json_write.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,32 @@
use std::fs::File;
use std::sync::Arc;

use arrow2::{
array::{Array, Int32Array},
chunk::Chunk,
error::Result,
error::ArrowError,
io::json::write,
};

fn write_batches(path: &str, names: Vec<String>, batches: &[Chunk<Arc<dyn Array>>]) -> Result<()> {
fn write_array(path: &str, array: Box<dyn Array>) -> Result<(), ArrowError> {
let mut writer = File::create(path)?;
let format = write::Format::Json;

let batches = batches.iter().cloned().map(Ok);
let arrays = vec![Ok(array)].into_iter();

// Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(batches, names, vec![], format);
// Advancing this iterator serializes the next array to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(arrays, vec![]);

// the operation of writing is IO-bounded.
write::write(&mut writer, format, blocks)?;
write::write(&mut writer, blocks)?;

Ok(())
}

fn main() -> Result<()> {
let array = Arc::new(Int32Array::from(&[
Some(0),
None,
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
])) as Arc<dyn Array>;

write_batches(
"example.json",
vec!["c1".to_string()],
&[Chunk::new(vec![array.clone()]), Chunk::new(vec![array])],
)
fn main() -> Result<(), ArrowError> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];

let array = Int32Array::from(&[Some(0), None, Some(2), Some(3), Some(4), Some(5), Some(6)]);

write_array(file_path, Box::new(array))
}
52 changes: 22 additions & 30 deletions examples/ndjson_read.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,40 @@
use std::fs::File;
use std::io::BufReader;
use std::io::{BufReader, Seek};
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::json::read;
use arrow2::io::ndjson::read;
use arrow2::io::ndjson::read::FallibleStreamingIterator;

fn read_path(path: &str, projection: Option<Vec<&str>>) -> Result<Chunk<Arc<dyn Array>>> {
// Example of reading a NDJSON file.
fn read_path(path: &str) -> Result<Vec<Arc<dyn Array>>> {
let batch_size = 1024; // number of rows per array
let mut reader = BufReader::new(File::open(path)?);

let fields = read::infer_and_reset(&mut reader, None)?;

let fields = if let Some(projection) = projection {
fields
.into_iter()
.filter(|field| projection.contains(&field.name.as_ref()))
.collect()
} else {
fields
};

// at most 1024 rows. This container can be re-used across batches.
let mut rows = vec![String::default(); 1024];

// Reads up to 1024 rows.
// this is IO-intensive and performs minimal CPU work. In particular,
// no deserialization is performed.
let read = read::read_rows(&mut reader, &mut rows)?;
let rows = &rows[..read];

// deserialize `rows` into `Chunk`. This is CPU-intensive, has no IO,
// and can be performed on a different thread pool via a channel.
read::deserialize(rows, &fields)
let data_type = read::infer(&mut reader, None)?;
reader.rewind()?;

let mut reader = read::FileReader::new(reader, vec!["".to_string(); batch_size], None);

let mut arrays = vec![];
// `next` is IO-bounded
while let Some(rows) = reader.next()? {
// `deserialize` is CPU-bounded
let array = read::deserialize(rows, data_type.clone())?;
arrays.push(array);
}

Ok(arrays)
}

fn main() -> Result<()> {
// Example of reading a NDJSON file from a path
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];

let batch = read_path(file_path, None)?;
println!("{:#?}", batch);
let arrays = read_path(file_path)?;
println!("{:#?}", arrays);
Ok(())
}
35 changes: 35 additions & 0 deletions examples/ndjson_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::fs::File;

use arrow2::array::{Array, Int32Array};
use arrow2::error::Result;
use arrow2::io::ndjson::write;

fn write_path(path: &str, array: Box<dyn Array>) -> Result<()> {
let writer = File::create(path)?;

let serializer = write::Serializer::new(vec![Ok(array)].into_iter(), vec![]);

let mut writer = write::FileWriter::new(writer, serializer);
writer.by_ref().collect::<Result<()>>()
}

fn main() -> Result<()> {
// Example of reading a NDJSON file from a path
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];

let array = Box::new(Int32Array::from(&[
Some(0),
None,
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
]));

write_path(file_path, array)?;
Ok(())
}
6 changes: 4 additions & 2 deletions guide/src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ interoperability with the arrow format.

The typical use-case for this library is to perform CPU and memory-intensive analytics in a format that supports heterogeneous data structures, null values, and IPC and FFI interfaces across languages.

Arrow2 is divided into three main parts:
Arrow2 is divided into 5 main parts:

* a [low-level API](./low_level.md) to efficiently operate with contiguous memory regions;
* a [high-level API](./high_level.md) to operate with arrow arrays;
* a [metadata API](./metadata.md) to declare and operate with logical types and metadata.
* a [metadata API](./metadata.md) to declare and operate with logical types and metadata;
* a [compute API](./compute.md) with operators to operate over arrays;
* an [IO API](./io/README.md) with interfaces to read from, and write to, other formats.
13 changes: 13 additions & 0 deletions guide/src/io/json_read.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,16 @@ This crate also supports reading JSON, at the expense of being unable to read th
```rust
{{#include ../../../examples/json_read.rs}}
```

## Metadata and inference

This crate uses the following mapping between Arrow's data type and JSON:

| `JSON` | `DataType` |
| ------ | ---------- |
| Bool | Boolean |
| Int | Int64 |
| Float | Float64 |
| String | Utf8 |
| List | List |
| Object | Struct |
10 changes: 8 additions & 2 deletions guide/src/io/json_write.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# Write JSON

When compiled with feature `io_json`, you can use this crate to write JSON files.
The following example writes a batch as a JSON file:
When compiled with feature `io_json`, you can use this crate to write JSON.
The following example writes an array to JSON:

```rust
{{#include ../../../examples/json_write.rs}}
```

Likewise, you can also use it to write to NDJSON:

```rust
{{#include ../../../examples/ndjson_write.rs}}
```
45 changes: 17 additions & 28 deletions src/io/json/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use serde_json::Value;
use crate::{
array::*,
bitmap::MutableBitmap,
chunk::Chunk,
datatypes::{DataType, Field, IntervalUnit},
datatypes::{DataType, IntervalUnit},
error::ArrowError,
types::NativeType,
};
Expand Down Expand Up @@ -203,7 +202,7 @@ fn deserialize_dictionary<K: DictionaryKey, A: Borrow<Value>>(
DictionaryArray::<K>::from_data(keys, values)
}

fn _deserialize<A: Borrow<Value>>(rows: &[A], data_type: DataType) -> Arc<dyn Array> {
pub(crate) fn _deserialize<A: Borrow<Value>>(rows: &[A], data_type: DataType) -> Arc<dyn Array> {
match &data_type {
DataType::Null => Arc::new(NullArray::from_data(data_type, rows.len())),
DataType::Boolean => Arc::new(deserialize_boolean(rows)),
Expand Down Expand Up @@ -251,30 +250,20 @@ fn _deserialize<A: Borrow<Value>>(rows: &[A], data_type: DataType) -> Arc<dyn Ar
}
}

/// Deserializes `rows` into a [`Chunk`] according to `fields`.
/// Deserializes a `json` [`Value`] into an [`Array`] of [`DataType`]
/// This is CPU-bounded.
pub fn deserialize<A: AsRef<str>>(
rows: &[A],
fields: &[Field],
) -> Result<Chunk<Arc<dyn Array>>, ArrowError> {
let data_type = DataType::Struct(fields.to_vec());

// convert rows to `Value`
let rows = rows
.iter()
.map(|row| {
let row: Value = serde_json::from_str(row.as_ref()).map_err(ArrowError::from)?;
Ok(row)
})
.collect::<Result<Vec<_>, ArrowError>>()?;

let (_, columns, _) = deserialize_struct(&rows, data_type).into_data();
Ok(Chunk::new(columns))
}

/// Deserializes a slice of [`Value`] to an Array of logical type [`DataType`].
///
/// This function allows consuming deserialized JSON to Arrow.
pub fn deserialize_json(rows: &[Value], data_type: DataType) -> Arc<dyn Array> {
_deserialize(rows, data_type)
/// # Error
/// This function errors iff either:
/// * `json` is not a [`Value::Array`]
/// * `data_type` is neither [`DataType::List`] nor [`DataType::LargeList`]
pub fn deserialize(json: &Value, data_type: DataType) -> Result<Arc<dyn Array>, ArrowError> {
match json {
Value::Array(rows) => match data_type {
DataType::List(inner) | DataType::LargeList(inner) => {
Ok(_deserialize(rows, inner.data_type))
}
_ => Err(ArrowError::nyi("read an Array from a non-Array data type")),
},
_ => Err(ArrowError::nyi("read an Array from a non-Array JSON")),
}
}