From e972df0fbe9aa1481831db53f47771b0ca1954a5 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Tue, 20 Sep 2022 21:10:46 +0200 Subject: [PATCH] Fixed error reading unbounded Avro list (#1253) --- src/io/avro/read/deserialize.rs | 46 ++++++++++++++++++--- tests/it/io/avro/read.rs | 73 +++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 6 deletions(-) diff --git a/src/io/avro/read/deserialize.rs b/src/io/avro/read/deserialize.rs index 5f535650140..54b7950a771 100644 --- a/src/io/avro/read/deserialize.rs +++ b/src/io/avro/read/deserialize.rs @@ -122,19 +122,32 @@ fn deserialize_value<'a>( .as_mut_any() .downcast_mut::>() .unwrap(); + // Arrays are encoded as a series of blocks. loop { - let len = util::zigzag_i64(&mut block)? as usize; + // Each block consists of a long count value, followed by that many array items. + let len = util::zigzag_i64(&mut block)?; + let len = if len < 0 { + // Avro spec: If a block's count is negative, its absolute value is used, + // and the count is followed immediately by a long block size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when projecting a record to a subset of its fields. + let _ = util::zigzag_i64(&mut block)?; + + -len + } else { + len + }; + // A block with count zero indicates the end of the array. if len == 0 { break; } + // Each item is encoded per the array’s item schema. let values = array.mut_values(); for _ in 0..len { block = deserialize_item(values, is_nullable, avro_inner, block)?; } - array.try_push_valid()?; } + array.try_push_valid()?; } DataType::Struct(inner_fields) => { let fields = match avro_field { @@ -341,14 +354,35 @@ fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) -> }; loop { - let len = util::zigzag_i64(&mut block)? as usize; + let len = util::zigzag_i64(&mut block)?; + let (len, bytes) = if len < 0 { + // Avro spec: If a block's count is negative, its absolute value is used, + // and the count is followed immediately by a long block size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when projecting a record to a subset of its fields. + let bytes = util::zigzag_i64(&mut block)?; + + (-len, Some(bytes)) + } else { + (len, None) + }; + + let bytes: Option = bytes + .map(|bytes| { + bytes + .try_into() + .map_err(|_| Error::oos("Avro block size negative or too large")) + }) + .transpose()?; if len == 0 { break; } - for _ in 0..len { - block = skip_item(inner, avro_inner, block)?; + if let Some(bytes) = bytes { + block = &block[bytes..]; + } else { + for _ in 0..len { + block = skip_item(inner, avro_inner, block)?; + } } } } @@ -489,7 +523,7 @@ pub fn deserialize( arrays .iter_mut() .zip(projection.iter()) - .filter_map(|x| if *x.1 { Some(x.0) } else { None }) + .filter_map(|x| x.1.then(|| x.0)) .map(|array| array.as_box()) .collect(), ) diff --git a/tests/it/io/avro/read.rs b/tests/it/io/avro/read.rs index 64ff34c7fee..1deb984a852 100644 --- a/tests/it/io/avro/read.rs +++ b/tests/it/io/avro/read.rs @@ -311,3 +311,76 @@ fn test_projected() -> Result<()> { } Ok(()) } + +fn schema_list() -> (AvroSchema, Schema) { + let raw_schema = r#" + { + "type": "record", + "name": "test", + "fields": [ + {"name": "h", "type": { + "type": "array", + "items": { + "name": "item", + "type": "int" + } + }} + ] + } +"#; + + let schema = Schema::from(vec![Field::new( + "h", + DataType::List(Box::new(Field::new("item", DataType::Int32, false))), + false, + )]); + + (AvroSchema::parse_str(raw_schema).unwrap(), schema) +} + +pub(super) fn data_list() -> Chunk> { + let data = [Some(vec![Some(1i32), Some(2), Some(3)]), Some(vec![])]; + + let mut array = MutableListArray::>::new_from( + Default::default(), + DataType::List(Box::new(Field::new("item", DataType::Int32, false))), + 0, + ); + array.try_extend(data).unwrap(); + + let columns = vec![array.into_box()]; + + Chunk::try_new(columns).unwrap() +} + +pub(super) fn write_list(codec: Codec) -> std::result::Result, avro_rs::Error> { + let (avro, _) = schema_list(); + // a writer needs a schema and something to write to + let mut writer = Writer::with_codec(&avro, Vec::new(), codec); + + // the Record type models our Record schema + let mut record = Record::new(writer.schema()).unwrap(); + record.put( + "h", + Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)]), + ); + writer.append(record)?; + + let mut record = Record::new(writer.schema()).unwrap(); + record.put("h", Value::Array(vec![])); + writer.append(record)?; + Ok(writer.into_inner().unwrap()) +} + +#[test] +fn test_list() -> Result<()> { + let avro = write_list(Codec::Null).unwrap(); + let expected = data_list(); + let (_, expected_schema) = schema_list(); + + let (result, schema) = read_avro(&avro, None)?; + + assert_eq!(schema, expected_schema); + assert_eq!(result, expected); + Ok(()) +}