Skip to content

Commit

Permalink
Refactored JSON IO (better support for JSON and NDJSON) (jorgecarleit…
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored and dexterduck committed Mar 2, 2022
1 parent d1c43b9 commit df75eb2
Show file tree
Hide file tree
Showing 27 changed files with 1,174 additions and 1,206 deletions.
29 changes: 9 additions & 20 deletions benches/write_json.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,44 @@
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, Criterion};

use arrow2::array::*;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::error::ArrowError;
use arrow2::io::json::write;
use arrow2::util::bench_util::*;

fn write_batch(columns: &Chunk<Arc<dyn Array>>) -> Result<()> {
fn write_array(array: Box<dyn Array>) -> Result<(), ArrowError> {
let mut writer = vec![];
let format = write::Format::Json;

let batches = vec![Ok(columns.clone())].into_iter();
let arrays = vec![Ok(array)].into_iter();

// Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(batches, vec!["c1".to_string()], vec![], format);
// Advancing this iterator serializes the next array to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(arrays, vec![]);

// the operation of writing is IO-bounded.
write::write(&mut writer, format, blocks)?;
write::write(&mut writer, blocks)?;

Ok(())
}

fn make_chunk(array: impl Array + 'static) -> Chunk<Arc<dyn Array>> {
Chunk::new(vec![Arc::new(array) as Arc<dyn Array>])
}

fn add_benchmark(c: &mut Criterion) {
(10..=18).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);

let array = create_primitive_array::<i32>(size, 0.1);
let columns = make_chunk(array);

c.bench_function(&format!("json write i32 2^{}", log2_size), |b| {
b.iter(|| write_batch(&columns))
b.iter(|| write_array(Box::new(array.clone())))
});

let array = create_string_array::<i32>(size, 100, 0.1, 42);
let columns = make_chunk(array);

c.bench_function(&format!("json write utf8 2^{}", log2_size), |b| {
b.iter(|| write_batch(&columns))
b.iter(|| write_array(Box::new(array.clone())))
});

let array = create_primitive_array::<f64>(size, 0.1);
let columns = make_chunk(array);

c.bench_function(&format!("json write f64 2^{}", log2_size), |b| {
b.iter(|| write_batch(&columns))
b.iter(|| write_array(Box::new(array.clone())))
});
});
}
Expand Down
14 changes: 4 additions & 10 deletions examples/json_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,17 @@ use std::io::BufReader;
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::error::{ArrowError, Result};
use arrow2::error::Result;
use arrow2::io::json::read;

fn read_path(path: &str) -> Result<Arc<dyn Array>> {
// Example of reading a JSON file.
let reader = BufReader::new(File::open(path)?);
let data = serde_json::from_reader(reader)?;
let json = serde_json::from_reader(reader)?;

let values = if let serde_json::Value::Array(values) = data {
Ok(values)
} else {
Err(ArrowError::InvalidArgumentError("".to_string()))
}?;
let data_type = read::infer(&json)?;

let data_type = read::infer_rows(&values)?;

Ok(read::deserialize_json(&values, data_type))
read::deserialize(&json, data_type)
}

fn main() -> Result<()> {
Expand Down
40 changes: 15 additions & 25 deletions examples/json_write.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,32 @@
use std::fs::File;
use std::sync::Arc;

use arrow2::{
array::{Array, Int32Array},
chunk::Chunk,
error::Result,
error::ArrowError,
io::json::write,
};

fn write_batches(path: &str, names: Vec<String>, batches: &[Chunk<Arc<dyn Array>>]) -> Result<()> {
fn write_array(path: &str, array: Box<dyn Array>) -> Result<(), ArrowError> {
let mut writer = File::create(path)?;
let format = write::Format::Json;

let batches = batches.iter().cloned().map(Ok);
let arrays = vec![Ok(array)].into_iter();

// Advancing this iterator serializes the next batch to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(batches, names, vec![], format);
// Advancing this iterator serializes the next array to its internal buffer (i.e. CPU-bounded)
let blocks = write::Serializer::new(arrays, vec![]);

// the operation of writing is IO-bounded.
write::write(&mut writer, format, blocks)?;
write::write(&mut writer, blocks)?;

Ok(())
}

fn main() -> Result<()> {
let array = Arc::new(Int32Array::from(&[
Some(0),
None,
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
])) as Arc<dyn Array>;

write_batches(
"example.json",
vec!["c1".to_string()],
&[Chunk::new(vec![array.clone()]), Chunk::new(vec![array])],
)
fn main() -> Result<(), ArrowError> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];

let array = Int32Array::from(&[Some(0), None, Some(2), Some(3), Some(4), Some(5), Some(6)]);

write_array(file_path, Box::new(array))
}
52 changes: 22 additions & 30 deletions examples/ndjson_read.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,40 @@
use std::fs::File;
use std::io::BufReader;
use std::io::{BufReader, Seek};
use std::sync::Arc;

use arrow2::array::Array;
use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::json::read;
use arrow2::io::ndjson::read;
use arrow2::io::ndjson::read::FallibleStreamingIterator;

fn read_path(path: &str, projection: Option<Vec<&str>>) -> Result<Chunk<Arc<dyn Array>>> {
// Example of reading a NDJSON file.
fn read_path(path: &str) -> Result<Vec<Arc<dyn Array>>> {
let batch_size = 1024; // number of rows per array
let mut reader = BufReader::new(File::open(path)?);

let fields = read::infer_and_reset(&mut reader, None)?;

let fields = if let Some(projection) = projection {
fields
.into_iter()
.filter(|field| projection.contains(&field.name.as_ref()))
.collect()
} else {
fields
};

// at most 1024 rows. This container can be re-used across batches.
let mut rows = vec![String::default(); 1024];

// Reads up to 1024 rows.
// this is IO-intensive and performs minimal CPU work. In particular,
// no deserialization is performed.
let read = read::read_rows(&mut reader, &mut rows)?;
let rows = &rows[..read];

// deserialize `rows` into `Chunk`. This is CPU-intensive, has no IO,
// and can be performed on a different thread pool via a channel.
read::deserialize(rows, &fields)
let data_type = read::infer(&mut reader, None)?;
reader.rewind()?;

let mut reader = read::FileReader::new(reader, vec!["".to_string(); batch_size], None);

let mut arrays = vec![];
// `next` is IO-bounded
while let Some(rows) = reader.next()? {
// `deserialize` is CPU-bounded
let array = read::deserialize(rows, data_type.clone())?;
arrays.push(array);
}

Ok(arrays)
}

fn main() -> Result<()> {
// Example of reading a NDJSON file from a path
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];

let batch = read_path(file_path, None)?;
println!("{:#?}", batch);
let arrays = read_path(file_path)?;
println!("{:#?}", arrays);
Ok(())
}
35 changes: 35 additions & 0 deletions examples/ndjson_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::fs::File;

use arrow2::array::{Array, Int32Array};
use arrow2::error::Result;
use arrow2::io::ndjson::write;

fn write_path(path: &str, array: Box<dyn Array>) -> Result<()> {
let writer = File::create(path)?;

let serializer = write::Serializer::new(vec![Ok(array)].into_iter(), vec![]);

let mut writer = write::FileWriter::new(writer, serializer);
writer.by_ref().collect::<Result<()>>()
}

fn main() -> Result<()> {
// Example of reading a NDJSON file from a path
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];

let array = Box::new(Int32Array::from(&[
Some(0),
None,
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
]));

write_path(file_path, array)?;
Ok(())
}
6 changes: 4 additions & 2 deletions guide/src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ interoperability with the arrow format.

The typical use-case for this library is to perform CPU and memory-intensive analytics in a format that supports heterogeneous data structures, null values, and IPC and FFI interfaces across languages.

Arrow2 is divided into three main parts:
Arrow2 is divided into 5 main parts:

* a [low-level API](./low_level.md) to efficiently operate with contiguous memory regions;
* a [high-level API](./high_level.md) to operate with arrow arrays;
* a [metadata API](./metadata.md) to declare and operate with logical types and metadata.
* a [metadata API](./metadata.md) to declare and operate with logical types and metadata;
* a [compute API](./compute.md) with operators to operate over arrays;
* an [IO API](./io/README.md) with interfaces to read from, and write to, other formats.
13 changes: 13 additions & 0 deletions guide/src/io/json_read.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,16 @@ This crate also supports reading JSON, at the expense of being unable to read th
```rust
{{#include ../../../examples/json_read.rs}}
```

## Metadata and inference

This crate uses the following mapping between Arrow's data type and JSON:

| `JSON` | `DataType` |
| ------ | ---------- |
| Bool | Boolean |
| Int | Int64 |
| Float | Float64 |
| String | Utf8 |
| List | List |
| Object | Struct |
10 changes: 8 additions & 2 deletions guide/src/io/json_write.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# Write JSON

When compiled with feature `io_json`, you can use this crate to write JSON files.
The following example writes a batch as a JSON file:
When compiled with feature `io_json`, you can use this crate to write JSON.
The following example writes an array to JSON:

```rust
{{#include ../../../examples/json_write.rs}}
```

Likewise, you can also use it to write to NDJSON:

```rust
{{#include ../../../examples/ndjson_write.rs}}
```
45 changes: 17 additions & 28 deletions src/io/json/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ use serde_json::Value;
use crate::{
array::*,
bitmap::MutableBitmap,
chunk::Chunk,
datatypes::{DataType, Field, IntervalUnit},
datatypes::{DataType, IntervalUnit},
error::ArrowError,
types::NativeType,
};
Expand Down Expand Up @@ -203,7 +202,7 @@ fn deserialize_dictionary<K: DictionaryKey, A: Borrow<Value>>(
DictionaryArray::<K>::from_data(keys, values)
}

fn _deserialize<A: Borrow<Value>>(rows: &[A], data_type: DataType) -> Arc<dyn Array> {
pub(crate) fn _deserialize<A: Borrow<Value>>(rows: &[A], data_type: DataType) -> Arc<dyn Array> {
match &data_type {
DataType::Null => Arc::new(NullArray::from_data(data_type, rows.len())),
DataType::Boolean => Arc::new(deserialize_boolean(rows)),
Expand Down Expand Up @@ -251,30 +250,20 @@ fn _deserialize<A: Borrow<Value>>(rows: &[A], data_type: DataType) -> Arc<dyn Ar
}
}

/// Deserializes `rows` into a [`Chunk`] according to `fields`.
/// Deserializes a `json` [`Value`] into an [`Array`] of [`DataType`]
/// This is CPU-bounded.
pub fn deserialize<A: AsRef<str>>(
rows: &[A],
fields: &[Field],
) -> Result<Chunk<Arc<dyn Array>>, ArrowError> {
let data_type = DataType::Struct(fields.to_vec());

// convert rows to `Value`
let rows = rows
.iter()
.map(|row| {
let row: Value = serde_json::from_str(row.as_ref()).map_err(ArrowError::from)?;
Ok(row)
})
.collect::<Result<Vec<_>, ArrowError>>()?;

let (_, columns, _) = deserialize_struct(&rows, data_type).into_data();
Ok(Chunk::new(columns))
}

/// Deserializes a slice of [`Value`] to an Array of logical type [`DataType`].
///
/// This function allows consuming deserialized JSON to Arrow.
pub fn deserialize_json(rows: &[Value], data_type: DataType) -> Arc<dyn Array> {
_deserialize(rows, data_type)
/// # Error
/// This function errors iff either:
/// * `json` is not a [`Value::Array`]
/// * `data_type` is neither [`DataType::List`] nor [`DataType::LargeList`]
pub fn deserialize(json: &Value, data_type: DataType) -> Result<Arc<dyn Array>, ArrowError> {
match json {
Value::Array(rows) => match data_type {
DataType::List(inner) | DataType::LargeList(inner) => {
Ok(_deserialize(rows, inner.data_type))
}
_ => Err(ArrowError::nyi("read an Array from a non-Array data type")),
},
_ => Err(ArrowError::nyi("read an Array from a non-Array JSON")),
}
}

0 comments on commit df75eb2

Please sign in to comment.