From aafba7b4eb4991e016638cbc1d4df676912e8236 Mon Sep 17 00:00:00 2001 From: Colin Jermain Date: Tue, 17 May 2022 11:59:06 -0400 Subject: [PATCH] Support for deserializing JSON from Utf8Array (#989) --- src/io/json/read/deserialize.rs | 6 +++++- src/io/ndjson/read/deserialize.rs | 14 +++++++++++++- src/io/ndjson/read/file.rs | 19 +++++++++++++++++++ src/io/ndjson/read/mod.rs | 4 ++-- tests/it/io/ndjson/mod.rs | 18 +++++++++++++++--- tests/it/io/ndjson/read.rs | 25 +++++++++++++++++++++++++ 6 files changed, 79 insertions(+), 7 deletions(-) diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index ee8068d0819..8d6ddb160e2 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -146,17 +146,21 @@ fn deserialize_struct>(rows: &[A], data_type: DataType) -> Stru .map(|f| (&f.name, (f.data_type(), vec![]))) .collect::>(); + let mut validity = MutableBitmap::with_capacity(rows.len()); + rows.iter().for_each(|row| { match row.borrow() { Value::Object(value) => { values .iter_mut() .for_each(|(s, (_, inner))| inner.push(value.get(*s).unwrap_or(&Value::Null))); + validity.push(true); } _ => { values .iter_mut() .for_each(|(_, (_, inner))| inner.push(&Value::Null)); + validity.push(false); } }; }); @@ -166,7 +170,7 @@ fn deserialize_struct>(rows: &[A], data_type: DataType) -> Stru .map(|(_, (data_type, values))| _deserialize(&values, data_type.clone())) .collect::>(); - StructArray::new(data_type, values, None) + StructArray::new(data_type, values, validity.into()) } fn deserialize_dictionary>( diff --git a/src/io/ndjson/read/deserialize.rs b/src/io/ndjson/read/deserialize.rs index e02927d0248..5960b164989 100644 --- a/src/io/ndjson/read/deserialize.rs +++ b/src/io/ndjson/read/deserialize.rs @@ -22,9 +22,21 @@ pub fn deserialize(rows: &[String], data_type: DataType) -> Result>( + rows: impl Iterator, + data_type: DataType, +) -> Result, ArrowError> { // deserialize strings to `Value`s let rows = rows - .iter() .map(|row| serde_json::from_str(row.as_ref()).map_err(ArrowError::from)) .collect::, ArrowError>>()?; diff --git a/src/io/ndjson/read/file.rs b/src/io/ndjson/read/file.rs index c45dc139e1d..9bf6129543e 100644 --- a/src/io/ndjson/read/file.rs +++ b/src/io/ndjson/read/file.rs @@ -125,3 +125,22 @@ pub fn infer( let v: Vec<&DataType> = data_types.iter().collect(); Ok(coerce_data_type(&v)) } + +/// Infers the [`DataType`] from an iterator of JSON strings. A limited number of +/// rows can be used by passing `rows.take(number_of_rows)` as an input. +/// +/// # Implementation +/// This implementation infers each row by going through the entire iterator. +pub fn infer_iter>(rows: impl Iterator) -> Result { + let mut data_types = HashSet::new(); + for row in rows { + let v: Value = serde_json::from_str(row.as_ref())?; + let data_type = infer_json(&v)?; + if data_type != DataType::Null { + data_types.insert(data_type); + } + } + + let v: Vec<&DataType> = data_types.iter().collect(); + Ok(coerce_data_type(&v)) +} diff --git a/src/io/ndjson/read/mod.rs b/src/io/ndjson/read/mod.rs index 5c52bd183fc..6e7da6131bf 100644 --- a/src/io/ndjson/read/mod.rs +++ b/src/io/ndjson/read/mod.rs @@ -4,5 +4,5 @@ pub use fallible_streaming_iterator::FallibleStreamingIterator; mod deserialize; mod file; -pub use deserialize::deserialize; -pub use file::{infer, FileReader}; +pub use deserialize::{deserialize, deserialize_iter}; +pub use file::{infer, infer_iter, FileReader}; diff --git a/tests/it/io/ndjson/mod.rs b/tests/it/io/ndjson/mod.rs index 9f9cd0ab6b6..1e25de73875 100644 --- a/tests/it/io/ndjson/mod.rs +++ b/tests/it/io/ndjson/mod.rs @@ -220,11 +220,19 @@ fn case_struct() -> (String, Arc) { // build expected output let d = Utf8Array::::from(&vec![Some("text"), None, Some("text"), None]); - let c = StructArray::from_data(DataType::Struct(vec![d_field]), vec![Arc::new(d)], None); + let c = StructArray::from_data( + DataType::Struct(vec![d_field]), + vec![Arc::new(d)], + Some(Bitmap::from_u8_slice([0b11111101], 4)), + ); let b = BooleanArray::from(vec![Some(true), Some(false), Some(true), None]); let inner = DataType::Struct(vec![Field::new("b", DataType::Boolean, true), c_field]); - let expected = StructArray::from_data(inner, vec![Arc::new(b), Arc::new(c)], None); + let expected = StructArray::from_data( + inner, + vec![Arc::new(b), Arc::new(c)], + Some(Bitmap::from_u8_slice([0b11110111], 4)), + ); let data_type = DataType::Struct(fields); @@ -268,7 +276,11 @@ fn case_nested_list() -> (String, Arc) { None, ]); - let c = StructArray::from_data(DataType::Struct(vec![d_field]), vec![Arc::new(d)], None); + let c = StructArray::from_data( + DataType::Struct(vec![d_field]), + vec![Arc::new(d)], + Some(Bitmap::from_u8_slice([0b11111011], 6)), + ); let b = BooleanArray::from(vec![ Some(true), diff --git a/tests/it/io/ndjson/read.rs b/tests/it/io/ndjson/read.rs index 4b74248c569..5e19c1f351c 100644 --- a/tests/it/io/ndjson/read.rs +++ b/tests/it/io/ndjson/read.rs @@ -273,3 +273,28 @@ fn skip_empty_lines() -> Result<()> { assert_eq!(3, arrays[0].len()); Ok(()) } + +#[test] +fn utf8_array() -> Result<()> { + let array = Utf8Array::::from([ + Some(r#"{"a": 1, "b": [{"c": 0}, {"c": 1}]}"#), + None, + Some(r#"{"a": 2, "b": [{"c": 2}, {"c": 5}]}"#), + None, + ]); + let data_type = ndjson_read::infer_iter(array.iter().map(|x| x.unwrap_or("null"))).unwrap(); + let new_array = + ndjson_read::deserialize_iter(array.iter().map(|x| x.unwrap_or("null")), data_type) + .unwrap(); + + // Explicitly cast as StructArray + let new_array = new_array.as_any().downcast_ref::().unwrap(); + + assert_eq!(array.len(), new_array.len()); + assert_eq!(array.null_count(), new_array.null_count()); + assert_eq!(array.validity().unwrap(), new_array.validity().unwrap()); + + let field_names: Vec = new_array.fields().iter().map(|f| f.name.clone()).collect(); + assert_eq!(field_names, vec!["a".to_string(), "b".to_string()]); + Ok(()) +}