Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Fixed error reading unbounded Avro list #1253

Merged
merged 2 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,19 +122,32 @@ fn deserialize_value<'a>(
.as_mut_any()
.downcast_mut::<DynMutableListArray<i32>>()
.unwrap();
// Arrays are encoded as a series of blocks.
loop {
let len = util::zigzag_i64(&mut block)? as usize;
// Each block consists of a long count value, followed by that many array items.
let len = util::zigzag_i64(&mut block)?;
let len = if len < 0 {
// Avro spec: If a block's count is negative, its absolute value is used,
// and the count is followed immediately by a long block size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when projecting a record to a subset of its fields.
let _ = util::zigzag_i64(&mut block)?;

-len
} else {
len
};
Copy link
Contributor

@shaeqahmed shaeqahmed Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR looks correct to me, but it actually fixes a different bug than the one I was referring to in #1252. The fix here addresses the handling the case when an avro list item block encodes an optional byte_size for data skipping as <-1*block_count><byte_size>....

The issue I was referring to, however, is the call to array.try_push_valid()?; on line 145 (136 original), which is called for each block deserialised. Shouldn't this call be moved right outside of the loop {...} onto line 147, so that we only push valid once per list item?

Thinking out loud, for an empty list, looks like the if len == 0 { break; } causes this loop to short circuit currently meaning we won't push a valid for that case too. I wonder if this is also an unintended bug, of having this call to try_push_valid in the block reading loop as opposed to outside.

To clarify, I am concerned about the handling of the following two cases:

  • List [1,2,3,4], encoded as two "blocks"

    • Currently we make 2 calls to try_push_valid whereas I would expect 1
  • List [], encoded as 0 blocks, indicated by a prefixed byte of len == 0

    • Currently we make 0 calls to try_push_valid whereas I would expect 1, as you mentioned before that empty lists/structs are still "valid" in Arrow so as to maintain O(1) validity checks.

Again, I am not well versed in the Avro format, so feel free to correct me if I have an incorrect understanding here. I came across these potential bugs while reading the deserialisation code thoroughly, as I was considering taking a stab at implementing Map type support for Avro, which will be very similar to List. Thank you

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Really good call.

Did another push with an extra fix

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, LGTM


// A block with count zero indicates the end of the array.
if len == 0 {
break;
}

// Each item is encoded per the array’s item schema.
let values = array.mut_values();
for _ in 0..len {
block = deserialize_item(values, is_nullable, avro_inner, block)?;
}
array.try_push_valid()?;
}
array.try_push_valid()?;
}
DataType::Struct(inner_fields) => {
let fields = match avro_field {
Expand Down Expand Up @@ -340,14 +353,35 @@ fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) ->
};

loop {
let len = util::zigzag_i64(&mut block)? as usize;
let len = util::zigzag_i64(&mut block)?;
let (len, bytes) = if len < 0 {
// Avro spec: If a block's count is negative, its absolute value is used,
// and the count is followed immediately by a long block size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when projecting a record to a subset of its fields.
let bytes = util::zigzag_i64(&mut block)?;

(-len, Some(bytes))
} else {
(len, None)
};

let bytes: Option<usize> = bytes
.map(|bytes| {
bytes
.try_into()
.map_err(|_| Error::oos("Avro block size negative or too large"))
})
.transpose()?;

if len == 0 {
break;
}

for _ in 0..len {
block = skip_item(inner, avro_inner, block)?;
if let Some(bytes) = bytes {
block = &block[bytes..];
} else {
for _ in 0..len {
block = skip_item(inner, avro_inner, block)?;
}
}
}
}
Expand Down Expand Up @@ -488,7 +522,7 @@ pub fn deserialize(
arrays
.iter_mut()
.zip(projection.iter())
.filter_map(|x| if *x.1 { Some(x.0) } else { None })
.filter_map(|x| x.1.then(|| x.0))
.map(|array| array.as_box())
.collect(),
)
Expand Down
73 changes: 73 additions & 0 deletions tests/it/io/avro/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,76 @@ fn test_projected() -> Result<()> {
}
Ok(())
}

fn schema_list() -> (AvroSchema, Schema) {
let raw_schema = r#"
{
"type": "record",
"name": "test",
"fields": [
{"name": "h", "type": {
"type": "array",
"items": {
"name": "item",
"type": "int"
}
}}
]
}
"#;

let schema = Schema::from(vec![Field::new(
"h",
DataType::List(Box::new(Field::new("item", DataType::Int32, false))),
false,
)]);

(AvroSchema::parse_str(raw_schema).unwrap(), schema)
}

pub(super) fn data_list() -> Chunk<Box<dyn Array>> {
let data = [Some(vec![Some(1i32), Some(2), Some(3)]), Some(vec![])];

let mut array = MutableListArray::<i32, MutablePrimitiveArray<i32>>::new_from(
Default::default(),
DataType::List(Box::new(Field::new("item", DataType::Int32, false))),
0,
);
array.try_extend(data).unwrap();

let columns = vec![array.into_box()];

Chunk::try_new(columns).unwrap()
}

pub(super) fn write_list(codec: Codec) -> std::result::Result<Vec<u8>, avro_rs::Error> {
let (avro, _) = schema_list();
// a writer needs a schema and something to write to
let mut writer = Writer::with_codec(&avro, Vec::new(), codec);

// the Record type models our Record schema
let mut record = Record::new(writer.schema()).unwrap();
record.put(
"h",
Value::Array(vec![Value::Int(1), Value::Int(2), Value::Int(3)]),
);
writer.append(record)?;

let mut record = Record::new(writer.schema()).unwrap();
record.put("h", Value::Array(vec![]));
writer.append(record)?;
Ok(writer.into_inner().unwrap())
}

#[test]
fn test_list() -> Result<()> {
let avro = write_list(Codec::Null).unwrap();
let expected = data_list();
let (_, expected_schema) = schema_list();

let (result, schema) = read_avro(&avro, None)?;

assert_eq!(schema, expected_schema);
assert_eq!(result, expected);
Ok(())
}