Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
split IO-bounded from CPU-bounded tasks (#706)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 24, 2021
1 parent 39695e9 commit dfa6370
Show file tree
Hide file tree
Showing 12 changed files with 453 additions and 796 deletions.
46 changes: 46 additions & 0 deletions examples/json_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::fs::File;
use std::io::BufReader;

use arrow2::error::Result;
use arrow2::io::json::read;
use arrow2::record_batch::RecordBatch;

fn read_path(path: &str, projection: Option<Vec<&str>>) -> Result<RecordBatch> {
// Example of reading a JSON file.
let mut reader = BufReader::new(File::open(path)?);

let fields = read::infer_and_reset(&mut reader, None)?;

let fields = if let Some(projection) = projection {
fields
.into_iter()
.filter(|field| projection.contains(&field.name().as_ref()))
.collect()
} else {
fields
};

// at most 1024 rows. This container can be re-used across batches.
let mut rows = vec![String::default(); 1024];

// Reads up to 1024 rows.
// this is IO-intensive and performs minimal CPU work. In particular,
// no deserialization is performed.
let read = read::read_rows(&mut reader, &mut rows)?;
let rows = &rows[..read];

// deserialize `rows` into a `RecordBatch`. This is CPU-intensive, has no IO,
// and can be performed on a different thread pool via a channel.
read::deserialize(&rows, fields)
}

fn main() -> Result<()> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];

let batch = read_path(file_path, None)?;
println!("{:#?}", batch);
Ok(())
}
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
- [Write Arrow](./io/ipc_write.md)
- [Read Avro](./io/avro_read.md)
- [Write Avro](./io/avro_write.md)
- [Read JSON](./io/json_read.md)
10 changes: 10 additions & 0 deletions guide/src/io/json_read.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# JSON read

When compiled with feature `io_json`, you can use this crate to read JSON files.

```rust
{{#include ../../../examples/json_read.rs}}
```

Note how deserialization can be performed on a separate thread pool to avoid
blocking the runtime (see also [here](https://ryhl.io/blog/async-what-is-blocking/)).
3 changes: 1 addition & 2 deletions src/io/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
#![forbid(unsafe_code)]
//! Convert data between the Arrow memory format and JSON line-delimited records.

mod read;
pub mod read;
mod write;

pub use read::*;
pub use write::*;

use crate::error::ArrowError;
Expand Down
154 changes: 84 additions & 70 deletions src/io/json/read/deserialize.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,4 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::borrow::Borrow;
use std::hash::Hasher;
use std::{collections::hash_map::DefaultHasher, sync::Arc};

Expand All @@ -23,6 +7,9 @@ use indexmap::map::IndexMap as HashMap;
use num_traits::NumCast;
use serde_json::Value;

use crate::datatypes::{Field, Schema};
use crate::error::ArrowError;
use crate::record_batch::RecordBatch;
use crate::{
array::*,
bitmap::MutableBitmap,
Expand Down Expand Up @@ -73,42 +60,48 @@ fn build_extract(data_type: &DataType) -> Extract {
}
}

fn read_int<T: NativeType + NumCast>(rows: &[&Value], data_type: DataType) -> PrimitiveArray<T> {
let iter = rows.iter().map(|row| match row {
fn deserialize_boolean<A: Borrow<Value>>(rows: &[A]) -> BooleanArray {
let iter = rows.iter().map(|row| match row.borrow() {
Value::Bool(v) => Some(v),
_ => None,
});
BooleanArray::from_trusted_len_iter(iter)
}

fn deserialize_int<T: NativeType + NumCast, A: Borrow<Value>>(
rows: &[A],
data_type: DataType,
) -> PrimitiveArray<T> {
let iter = rows.iter().map(|row| match row.borrow() {
Value::Number(number) => number.as_i64().and_then(num_traits::cast::<i64, T>),
Value::Bool(number) => num_traits::cast::<i32, T>(*number as i32),
_ => None,
});
PrimitiveArray::from_trusted_len_iter(iter).to(data_type)
}

fn read_float<T: NativeType + NumCast>(rows: &[&Value], data_type: DataType) -> PrimitiveArray<T> {
let iter = rows.iter().map(|row| match row {
fn deserialize_float<T: NativeType + NumCast, A: Borrow<Value>>(
rows: &[A],
data_type: DataType,
) -> PrimitiveArray<T> {
let iter = rows.iter().map(|row| match row.borrow() {
Value::Number(number) => number.as_f64().and_then(num_traits::cast::<f64, T>),
Value::Bool(number) => num_traits::cast::<i32, T>(*number as i32),
_ => None,
});
PrimitiveArray::from_trusted_len_iter(iter).to(data_type)
}

fn read_binary<O: Offset>(rows: &[&Value]) -> BinaryArray<O> {
let iter = rows.iter().map(|row| match row {
fn deserialize_binary<O: Offset, A: Borrow<Value>>(rows: &[A]) -> BinaryArray<O> {
let iter = rows.iter().map(|row| match row.borrow() {
Value::String(v) => Some(v.as_bytes()),
_ => None,
});
BinaryArray::from_trusted_len_iter(iter)
}

fn read_boolean(rows: &[&Value]) -> BooleanArray {
let iter = rows.iter().map(|row| match row {
Value::Bool(v) => Some(v),
_ => None,
});
BooleanArray::from_trusted_len_iter(iter)
}

fn read_utf8<O: Offset>(rows: &[&Value]) -> Utf8Array<O> {
let iter = rows.iter().map(|row| match row {
fn deserialize_utf8<O: Offset, A: Borrow<Value>>(rows: &[A]) -> Utf8Array<O> {
let iter = rows.iter().map(|row| match row.borrow() {
Value::String(v) => Some(v.clone()),
Value::Number(v) => Some(v.to_string()),
Value::Bool(v) => Some(v.to_string()),
Expand All @@ -117,15 +110,15 @@ fn read_utf8<O: Offset>(rows: &[&Value]) -> Utf8Array<O> {
Utf8Array::<O>::from_trusted_len_iter(iter)
}

fn read_list<O: Offset>(rows: &[&Value], data_type: DataType) -> ListArray<O> {
fn deserialize_list<O: Offset, A: Borrow<Value>>(rows: &[A], data_type: DataType) -> ListArray<O> {
let child = ListArray::<O>::get_child_type(&data_type);

let mut validity = MutableBitmap::with_capacity(rows.len());
let mut inner = Vec::<&Value>::with_capacity(rows.len());
let mut offsets = Vec::<O>::with_capacity(rows.len() + 1);
let mut inner = vec![];
offsets.push(O::zero());
rows.iter().fold(O::zero(), |mut length, row| {
match row {
match row.borrow() {
Value::Array(value) => {
inner.extend(value.iter());
validity.push(true);
Expand All @@ -142,26 +135,21 @@ fn read_list<O: Offset>(rows: &[&Value], data_type: DataType) -> ListArray<O> {
}
});

let values = read(&inner, child.clone());
let values = _deserialize(&inner, child.clone());

ListArray::<O>::from_data(data_type, offsets.into(), values, validity.into())
}

fn read_struct(rows: &[&Value], data_type: DataType) -> StructArray {
fn deserialize_struct<A: Borrow<Value>>(rows: &[A], data_type: DataType) -> StructArray {
let fields = StructArray::get_fields(&data_type);

let mut values = fields
.iter()
.map(|f| {
(
f.name(),
(f.data_type(), Vec::<&Value>::with_capacity(rows.len())),
)
})
.map(|f| (f.name(), (f.data_type(), vec![])))
.collect::<HashMap<_, _>>();

rows.iter().for_each(|row| {
match row {
match row.borrow() {
Value::Object(value) => {
values
.iter_mut()
Expand All @@ -177,23 +165,26 @@ fn read_struct(rows: &[&Value], data_type: DataType) -> StructArray {

let values = values
.into_iter()
.map(|(_, (data_type, values))| read(&values, data_type.clone()))
.map(|(_, (data_type, values))| _deserialize(&values, data_type.clone()))
.collect::<Vec<_>>();

StructArray::from_data(data_type, values, None)
}

fn read_dictionary<K: DictionaryKey>(rows: &[&Value], data_type: DataType) -> DictionaryArray<K> {
fn deserialize_dictionary<K: DictionaryKey, A: Borrow<Value>>(
rows: &[A],
data_type: DataType,
) -> DictionaryArray<K> {
let child = DictionaryArray::<K>::get_child(&data_type);

let mut map = HashedMap::<u64, K>::default();

let extractor = build_extract(child);

let mut inner = Vec::<&Value>::with_capacity(rows.len());
let mut inner = vec![];
let keys = rows
.iter()
.map(|x| extractor(x))
.map(|x| extractor(x.borrow()))
.map(|item| match item {
Some((hash, v)) => match map.get(&hash) {
Some(key) => Some(*key),
Expand All @@ -209,45 +200,47 @@ fn read_dictionary<K: DictionaryKey>(rows: &[&Value], data_type: DataType) -> Di
})
.collect::<PrimitiveArray<K>>();

let values = read(&inner, child.clone());
let values = _deserialize(&inner, child.clone());
DictionaryArray::<K>::from_data(keys, values)
}

pub fn read(rows: &[&Value], data_type: DataType) -> Arc<dyn Array> {
fn _deserialize<A: Borrow<Value>>(rows: &[A], data_type: DataType) -> Arc<dyn Array> {
match &data_type {
DataType::Null => Arc::new(NullArray::from_data(data_type, rows.len())),
DataType::Boolean => Arc::new(read_boolean(rows)),
DataType::Int8 => Arc::new(read_int::<i8>(rows, data_type)),
DataType::Int16 => Arc::new(read_int::<i16>(rows, data_type)),
DataType::Boolean => Arc::new(deserialize_boolean(rows)),
DataType::Int8 => Arc::new(deserialize_int::<i8, _>(rows, data_type)),
DataType::Int16 => Arc::new(deserialize_int::<i16, _>(rows, data_type)),
DataType::Int32
| DataType::Date32
| DataType::Time32(_)
| DataType::Interval(IntervalUnit::YearMonth) => Arc::new(read_int::<i32>(rows, data_type)),
| DataType::Interval(IntervalUnit::YearMonth) => {
Arc::new(deserialize_int::<i32, _>(rows, data_type))
}
DataType::Interval(IntervalUnit::DayTime) => {
unimplemented!("There is no natural representation of DayTime in JSON.")
}
DataType::Int64
| DataType::Date64
| DataType::Time64(_)
| DataType::Timestamp(_, _)
| DataType::Duration(_) => Arc::new(read_int::<i64>(rows, data_type)),
DataType::UInt8 => Arc::new(read_int::<u8>(rows, data_type)),
DataType::UInt16 => Arc::new(read_int::<u16>(rows, data_type)),
DataType::UInt32 => Arc::new(read_int::<u32>(rows, data_type)),
DataType::UInt64 => Arc::new(read_int::<u64>(rows, data_type)),
| DataType::Duration(_) => Arc::new(deserialize_int::<i64, _>(rows, data_type)),
DataType::UInt8 => Arc::new(deserialize_int::<u8, _>(rows, data_type)),
DataType::UInt16 => Arc::new(deserialize_int::<u16, _>(rows, data_type)),
DataType::UInt32 => Arc::new(deserialize_int::<u32, _>(rows, data_type)),
DataType::UInt64 => Arc::new(deserialize_int::<u64, _>(rows, data_type)),
DataType::Float16 => unreachable!(),
DataType::Float32 => Arc::new(read_float::<f32>(rows, data_type)),
DataType::Float64 => Arc::new(read_float::<f64>(rows, data_type)),
DataType::Utf8 => Arc::new(read_utf8::<i32>(rows)),
DataType::LargeUtf8 => Arc::new(read_utf8::<i64>(rows)),
DataType::List(_) => Arc::new(read_list::<i32>(rows, data_type)),
DataType::LargeList(_) => Arc::new(read_list::<i64>(rows, data_type)),
DataType::Binary => Arc::new(read_binary::<i32>(rows)),
DataType::LargeBinary => Arc::new(read_binary::<i64>(rows)),
DataType::Struct(_) => Arc::new(read_struct(rows, data_type)),
DataType::Float32 => Arc::new(deserialize_float::<f32, _>(rows, data_type)),
DataType::Float64 => Arc::new(deserialize_float::<f64, _>(rows, data_type)),
DataType::Utf8 => Arc::new(deserialize_utf8::<i32, _>(rows)),
DataType::LargeUtf8 => Arc::new(deserialize_utf8::<i64, _>(rows)),
DataType::List(_) => Arc::new(deserialize_list::<i32, _>(rows, data_type)),
DataType::LargeList(_) => Arc::new(deserialize_list::<i64, _>(rows, data_type)),
DataType::Binary => Arc::new(deserialize_binary::<i32, _>(rows)),
DataType::LargeBinary => Arc::new(deserialize_binary::<i64, _>(rows)),
DataType::Struct(_) => Arc::new(deserialize_struct(rows, data_type)),
DataType::Dictionary(key_type, _) => {
match_integer_type!(key_type, |$T| {
Arc::new(read_dictionary::<$T>(rows, data_type))
Arc::new(deserialize_dictionary::<$T, _>(rows, data_type))
})
}
_ => todo!(),
Expand All @@ -258,3 +251,24 @@ pub fn read(rows: &[&Value], data_type: DataType) -> Arc<dyn Array> {
*/
}
}

/// Deserializes `rows` into a [`RecordBatch`] according to `fields`.
/// This is CPU-bounded.
pub fn deserialize<A: AsRef<str>>(
rows: &[A],
fields: Vec<Field>,
) -> Result<RecordBatch, ArrowError> {
let data_type = DataType::Struct(fields);

// convert rows to `Value`
let rows = rows
.iter()
.map(|row| {
let row: Value = serde_json::from_str(row.as_ref()).map_err(ArrowError::from)?;
Ok(row)
})
.collect::<Result<Vec<_>, ArrowError>>()?;

let (fields, columns, _) = deserialize_struct(&rows, data_type).into_data();
RecordBatch::try_new(Arc::new(Schema::new(fields)), columns)
}

0 comments on commit dfa6370

Please sign in to comment.