Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Support for deserializing JSON from Utf8Array (#989)
Browse files Browse the repository at this point in the history
  • Loading branch information
cjermain committed May 17, 2022
1 parent 9913732 commit aafba7b
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 7 deletions.
6 changes: 5 additions & 1 deletion src/io/json/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,21 @@ fn deserialize_struct<A: Borrow<Value>>(rows: &[A], data_type: DataType) -> Stru
.map(|f| (&f.name, (f.data_type(), vec![])))
.collect::<HashMap<_, _>>();

let mut validity = MutableBitmap::with_capacity(rows.len());

rows.iter().for_each(|row| {
match row.borrow() {
Value::Object(value) => {
values
.iter_mut()
.for_each(|(s, (_, inner))| inner.push(value.get(*s).unwrap_or(&Value::Null)));
validity.push(true);
}
_ => {
values
.iter_mut()
.for_each(|(_, (_, inner))| inner.push(&Value::Null));
validity.push(false);
}
};
});
Expand All @@ -166,7 +170,7 @@ fn deserialize_struct<A: Borrow<Value>>(rows: &[A], data_type: DataType) -> Stru
.map(|(_, (data_type, values))| _deserialize(&values, data_type.clone()))
.collect::<Vec<_>>();

StructArray::new(data_type, values, None)
StructArray::new(data_type, values, validity.into())
}

fn deserialize_dictionary<K: DictionaryKey, A: Borrow<Value>>(
Expand Down
14 changes: 13 additions & 1 deletion src/io/ndjson/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,21 @@ pub fn deserialize(rows: &[String], data_type: DataType) -> Result<Arc<dyn Array
));
}

deserialize_iter(rows.iter(), data_type)
}

/// Deserializes an iterator of rows into an [`Array`] of [`DataType`].
/// # Implementation
/// This function is CPU-bounded.
/// This function is guaranteed to return an array of length equal to the leng
/// # Errors
/// This function errors iff any of the rows is not a valid JSON (i.e. the format is not valid NDJSON).
pub fn deserialize_iter<A: AsRef<str>>(
rows: impl Iterator<Item = A>,
data_type: DataType,
) -> Result<Arc<dyn Array>, ArrowError> {
// deserialize strings to `Value`s
let rows = rows
.iter()
.map(|row| serde_json::from_str(row.as_ref()).map_err(ArrowError::from))
.collect::<Result<Vec<Value>, ArrowError>>()?;

Expand Down
19 changes: 19 additions & 0 deletions src/io/ndjson/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,22 @@ pub fn infer<R: std::io::BufRead>(
let v: Vec<&DataType> = data_types.iter().collect();
Ok(coerce_data_type(&v))
}

/// Infers the [`DataType`] from an iterator of JSON strings. A limited number of
/// rows can be used by passing `rows.take(number_of_rows)` as an input.
///
/// # Implementation
/// This implementation infers each row by going through the entire iterator.
pub fn infer_iter<A: AsRef<str>>(rows: impl Iterator<Item = A>) -> Result<DataType> {
let mut data_types = HashSet::new();
for row in rows {
let v: Value = serde_json::from_str(row.as_ref())?;
let data_type = infer_json(&v)?;
if data_type != DataType::Null {
data_types.insert(data_type);
}
}

let v: Vec<&DataType> = data_types.iter().collect();
Ok(coerce_data_type(&v))
}
4 changes: 2 additions & 2 deletions src/io/ndjson/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ pub use fallible_streaming_iterator::FallibleStreamingIterator;

mod deserialize;
mod file;
pub use deserialize::deserialize;
pub use file::{infer, FileReader};
pub use deserialize::{deserialize, deserialize_iter};
pub use file::{infer, infer_iter, FileReader};
18 changes: 15 additions & 3 deletions tests/it/io/ndjson/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,19 @@ fn case_struct() -> (String, Arc<dyn Array>) {

// build expected output
let d = Utf8Array::<i32>::from(&vec![Some("text"), None, Some("text"), None]);
let c = StructArray::from_data(DataType::Struct(vec![d_field]), vec![Arc::new(d)], None);
let c = StructArray::from_data(
DataType::Struct(vec![d_field]),
vec![Arc::new(d)],
Some(Bitmap::from_u8_slice([0b11111101], 4)),
);

let b = BooleanArray::from(vec![Some(true), Some(false), Some(true), None]);
let inner = DataType::Struct(vec![Field::new("b", DataType::Boolean, true), c_field]);
let expected = StructArray::from_data(inner, vec![Arc::new(b), Arc::new(c)], None);
let expected = StructArray::from_data(
inner,
vec![Arc::new(b), Arc::new(c)],
Some(Bitmap::from_u8_slice([0b11110111], 4)),
);

let data_type = DataType::Struct(fields);

Expand Down Expand Up @@ -268,7 +276,11 @@ fn case_nested_list() -> (String, Arc<dyn Array>) {
None,
]);

let c = StructArray::from_data(DataType::Struct(vec![d_field]), vec![Arc::new(d)], None);
let c = StructArray::from_data(
DataType::Struct(vec![d_field]),
vec![Arc::new(d)],
Some(Bitmap::from_u8_slice([0b11111011], 6)),
);

let b = BooleanArray::from(vec![
Some(true),
Expand Down
25 changes: 25 additions & 0 deletions tests/it/io/ndjson/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,28 @@ fn skip_empty_lines() -> Result<()> {
assert_eq!(3, arrays[0].len());
Ok(())
}

#[test]
fn utf8_array() -> Result<()> {
let array = Utf8Array::<i64>::from([
Some(r#"{"a": 1, "b": [{"c": 0}, {"c": 1}]}"#),
None,
Some(r#"{"a": 2, "b": [{"c": 2}, {"c": 5}]}"#),
None,
]);
let data_type = ndjson_read::infer_iter(array.iter().map(|x| x.unwrap_or("null"))).unwrap();
let new_array =
ndjson_read::deserialize_iter(array.iter().map(|x| x.unwrap_or("null")), data_type)
.unwrap();

// Explicitly cast as StructArray
let new_array = new_array.as_any().downcast_ref::<StructArray>().unwrap();

assert_eq!(array.len(), new_array.len());
assert_eq!(array.null_count(), new_array.null_count());
assert_eq!(array.validity().unwrap(), new_array.validity().unwrap());

let field_names: Vec<String> = new_array.fields().iter().map(|f| f.name.clone()).collect();
assert_eq!(field_names, vec!["a".to_string(), "b".to_string()]);
Ok(())
}

0 comments on commit aafba7b

Please sign in to comment.