Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed error reading unbounded Avro list (#1253)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Sep 20, 2022
1 parent c615095 commit e972df0
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 6 deletions.
46 changes: 40 additions & 6 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,32 @@ fn deserialize_value<'a>(
.as_mut_any()
.downcast_mut::<DynMutableListArray<i32>>()
.unwrap();
// Arrays are encoded as a series of blocks.
loop {
let len = util::zigzag_i64(&mut block)? as usize;
// Each block consists of a long count value, followed by that many array items.
let len = util::zigzag_i64(&mut block)?;
let len = if len < 0 {
// Avro spec: If a block's count is negative, its absolute value is used,
// and the count is followed immediately by a long block size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when projecting a record to a subset of its fields.
let _ = util::zigzag_i64(&mut block)?;

-len
} else {
len
};

// A block with count zero indicates the end of the array.
if len == 0 {
break;
}

// Each item is encoded per the array’s item schema.
let values = array.mut_values();
for _ in 0..len {
block = deserialize_item(values, is_nullable, avro_inner, block)?;
}
array.try_push_valid()?;
}
array.try_push_valid()?;
}
DataType::Struct(inner_fields) => {
let fields = match avro_field {
Expand Down Expand Up @@ -341,14 +354,35 @@ fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) ->
};

loop {
let len = util::zigzag_i64(&mut block)? as usize;
let len = util::zigzag_i64(&mut block)?;
let (len, bytes) = if len < 0 {
// Avro spec: If a block's count is negative, its absolute value is used,
// and the count is followed immediately by a long block size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when projecting a record to a subset of its fields.
let bytes = util::zigzag_i64(&mut block)?;

(-len, Some(bytes))
} else {
(len, None)
};

let bytes: Option<usize> = bytes
.map(|bytes| {
bytes
.try_into()
.map_err(|_| Error::oos("Avro block size negative or too large"))
})
.transpose()?;

if len == 0 {
break;
}

for _ in 0..len {
block = skip_item(inner, avro_inner, block)?;
if let Some(bytes) = bytes {
block = &block[bytes..];
} else {
for _ in 0..len {
block = skip_item(inner, avro_inner, block)?;
}
}
}
}
Expand Down Expand Up @@ -489,7 +523,7 @@ pub fn deserialize(
arrays
.iter_mut()
.zip(projection.iter())
.filter_map(|x| if *x.1 { Some(x.0) } else { None })
.filter_map(|x| x.1.then(|| x.0))
.map(|array| array.as_box())
.collect(),
)
Expand Down
73 changes: 73 additions & 0 deletions tests/it/io/avro/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,3 +311,76 @@ fn test_projected() -> Result<()> {
}
Ok(())
}

fn schema_list() -> (AvroSchema, Schema) {
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "h", "type": {
"type": "array",
"items": {
"name": "item",
"type": "int"
}
}}
]
}
"#;

let schema = Schema::from(vec![Field::new(
"h",
DataType::List(Box::new(Field::new("item", DataType::Int32, false))),
false,
)]);

(AvroSchema::parse_str(raw_schema).unwrap(), schema)
}

pub(super) fn data_list() -> Chunk<Box<dyn Array>> {
let data = [Some(vec![Some(1i32), Some(2), Some(3)]), Some(vec![])];

let mut array = MutableListArray::<i32, MutablePrimitiveArray<i32>>::new_from(
Default::default(),
DataType::List(Box::new(Field::new("item", DataType::Int32, false))),
0,
);
array.try_extend(data).unwrap();

let columns = vec![array.into_box()];

Chunk::try_new(columns).unwrap()
}

pub(super) fn write_list(codec: Codec) -> std::result::Result<Vec<u8>, avro_rs::Error> {
let (avro, _) = schema_list();
// a writer needs a schema and something to write to
let mut writer = Writer::with_codec(&avro, Vec::new(), codec);

// the Record type models our Record schema
let mut record = Record::new(writer.schema()).unwrap();
record.put(
"h",
Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)]),
);
writer.append(record)?;

let mut record = Record::new(writer.schema()).unwrap();
record.put("h", Value::Array(vec![]));
writer.append(record)?;
Ok(writer.into_inner().unwrap())
}

#[test]
fn test_list() -> Result<()> {
let avro = write_list(Codec::Null).unwrap();
let expected = data_list();
let (_, expected_schema) = schema_list();

let (result, schema) = read_avro(&avro, None)?;

assert_eq!(schema, expected_schema);
assert_eq!(result, expected);
Ok(())
}

0 comments on commit e972df0

Please sign in to comment.