Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Fixed error reading unbounded Avro list #1253

Merged
merged 2 commits into from
Sep 20, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions src/io/avro/read/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,16 @@ fn deserialize_value<'a>(
.downcast_mut::<DynMutableListArray<i32>>()
.unwrap();
loop {
let len = util::zigzag_i64(&mut block)? as usize;
let len = util::zigzag_i64(&mut block)?;
let len = if len < 0 {
// Avro spec: If a block's count is negative, its absolute value is used,
// and the count is followed immediately by a long block size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when projecting a record to a subset of its fields.
let _ = util::zigzag_i64(&mut block)?;

-len
} else {
len
};
Copy link
Contributor

@shaeqahmed shaeqahmed Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR looks correct to me, but it actually fixes a different bug than the one I was referring to in #1252. The fix here addresses the handling the case when an avro list item block encodes an optional byte_size for data skipping as <-1*block_count><byte_size>....

The issue I was referring to, however, is the call to array.try_push_valid()?; on line 145 (136 original), which is called for each block deserialised. Shouldn't this call be moved right outside of the loop {...} onto line 147, so that we only push valid once per list item?

Thinking out loud, for an empty list, looks like the if len == 0 { break; } causes this loop to short circuit currently meaning we won't push a valid for that case too. I wonder if this is also an unintended bug, of having this call to try_push_valid in the block reading loop as opposed to outside.

To clarify, I am concerned about the handling of the following two cases:

  • List [1,2,3,4], encoded as two "blocks"

    • Currently we make 2 calls to try_push_valid whereas I would expect 1
  • List [], encoded as 0 blocks, indicated by a prefixed byte of len == 0

    • Currently we make 0 calls to try_push_valid whereas I would expect 1, as you mentioned before that empty lists/structs are still "valid" in Arrow so as to maintain O(1) validity checks.

Again, I am not well versed in the Avro format, so feel free to correct me if I have an incorrect understanding here. I came across these potential bugs while reading the deserialisation code thoroughly, as I was considering taking a stab at implementing Map type support for Avro, which will be very similar to List. Thank you

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Really good call.

Did another push with an extra fix

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, LGTM


if len == 0 {
break;
Expand Down Expand Up @@ -340,14 +349,35 @@ fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) ->
};

loop {
let len = util::zigzag_i64(&mut block)? as usize;
let len = util::zigzag_i64(&mut block)?;
let (len, bytes) = if len < 0 {
// Avro spec: If a block's count is negative, its absolute value is used,
// and the count is followed immediately by a long block size indicating the number of bytes in the block. This block size permits fast skipping through data, e.g., when projecting a record to a subset of its fields.
let bytes = util::zigzag_i64(&mut block)?;

(-len, Some(bytes))
} else {
(len, None)
};

let bytes: Option<usize> = bytes
.map(|bytes| {
bytes
.try_into()
.map_err(|_| Error::oos("Avro block size negative or too large"))
})
.transpose()?;

if len == 0 {
break;
}

for _ in 0..len {
block = skip_item(inner, avro_inner, block)?;
if let Some(bytes) = bytes {
block = &block[bytes..];
} else {
for _ in 0..len {
block = skip_item(inner, avro_inner, block)?;
}
}
}
}
Expand Down Expand Up @@ -488,7 +518,7 @@ pub fn deserialize(
arrays
.iter_mut()
.zip(projection.iter())
.filter_map(|x| if *x.1 { Some(x.0) } else { None })
.filter_map(|x| x.1.then(|| x.0))
.map(|array| array.as_box())
.collect(),
)
Expand Down