Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Panic-free read of IPC files #1075

Merged
merged 2 commits into from
Jun 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ indexmap = { version = "^1.6", optional = true }
# used to print columns in a nice columnar format
comfy-table = { version = "5.0", optional = true, default-features = false }

arrow-format = { version = "0.6", optional = true, features = ["ipc"] }
arrow-format = { version = "0.7", optional = true, features = ["ipc"] }

hex = { version = "^0.4", optional = true }

Expand Down Expand Up @@ -101,6 +101,8 @@ tokio-util = { version = "0.6", features = ["compat"] }
# used to run formal property testing
proptest = { version = "1", default_features = false, features = ["std"] }
avro-rs = { version = "0.13", features = ["snappy"] }
# use for flaky testing
rand = "0.8"

[package.metadata.docs.rs]
features = ["full"]
Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ Most uses of `unsafe` fall into 3 categories:
We actively monitor for vulnerabilities in Rust's advisory and either patch or mitigate
them (see e.g. `.cargo/audit.yaml` and `.github/workflows/security.yaml`).

Reading parquet and IPC currently `panic!` when they receive invalid. We are
actively addressing this.
Reading from untrusted data currently _may_ `panic!` on the following formats:

* parquet
* avro

We are actively addressing this.

## Integration tests

Expand Down
2 changes: 1 addition & 1 deletion integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ logging = ["tracing-subscriber"]

[dependencies]
arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] }
arrow-format = { version = "0.6", features = ["full"] }
arrow-format = { version = "0.7", features = ["full"] }
async-trait = "0.1.41"
clap = { version = "^3", features = ["derive"] }
futures = "0.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,9 @@ async fn receive_batch_flight_data(
.expect("Header to be valid flatbuffers")
.expect("Header to be present")
{
let length = data.data_body.len();
let mut reader = std::io::Cursor::new(&data.data_body);
read::read_dictionary(batch, fields, ipc_schema, dictionaries, &mut reader, 0)
read::read_dictionary(batch, fields, ipc_schema, dictionaries, &mut reader, 0, length as u64)
.expect("Error reading dictionary");

data = resp.next().await?.ok()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ async fn record_batch_from_message(
ipc_schema: &IpcSchema,
dictionaries: &mut Dictionaries,
) -> Result<Chunk<Box<dyn Array>>, Status> {
let length = data_body.len();
let mut reader = std::io::Cursor::new(data_body);

let arrow_batch_result = ipc::read::read_record_batch(
Expand All @@ -294,6 +295,7 @@ async fn record_batch_from_message(
arrow_format::ipc::MetadataVersion::V5,
&mut reader,
0,
length as u64,
);

arrow_batch_result.map_err(|e| Status::internal(format!("Could not convert to Chunk: {:?}", e)))
Expand All @@ -306,10 +308,11 @@ async fn dictionary_from_message(
ipc_schema: &IpcSchema,
dictionaries: &mut Dictionaries,
) -> Result<(), Status> {
let length = data_body.len();
let mut reader = std::io::Cursor::new(data_body);

let dictionary_batch_result =
ipc::read::read_dictionary(dict_batch, fields, ipc_schema, dictionaries, &mut reader, 0);
ipc::read::read_dictionary(dict_batch, fields, ipc_schema, dictionaries, &mut reader, 0, length as u64);
dictionary_batch_result
.map_err(|e| Status::internal(format!("Could not convert to Dictionary: {:?}", e)))
}
Expand Down
7 changes: 6 additions & 1 deletion src/array/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,12 @@ impl FixedSizeBinaryArray {
impl FixedSizeBinaryArray {
pub(crate) fn maybe_get_size(data_type: &DataType) -> Result<usize, Error> {
match data_type.to_logical_type() {
DataType::FixedSizeBinary(size) => Ok(*size),
DataType::FixedSizeBinary(size) => {
if *size == 0 {
return Err(Error::oos("FixedSizeBinaryArray expects a positive size"));
}
Ok(*size)
}
_ => Err(Error::oos(
"FixedSizeBinaryArray expects DataType::FixedSizeBinary",
)),
Expand Down
7 changes: 6 additions & 1 deletion src/array/fixed_size_list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,12 @@ impl FixedSizeListArray {
impl FixedSizeListArray {
pub(crate) fn try_child_and_size(data_type: &DataType) -> Result<(&Field, usize), Error> {
match data_type.to_logical_type() {
DataType::FixedSizeList(child, size) => Ok((child.as_ref(), *size as usize)),
DataType::FixedSizeList(child, size) => {
if *size == 0 {
return Err(Error::oos("FixedSizeBinaryArray expects a positive size"));
}
Ok((child.as_ref(), *size as usize))
}
_ => Err(Error::oos(
"FixedSizeListArray expects DataType::FixedSizeList",
)),
Expand Down
2 changes: 2 additions & 0 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ pub fn deserialize_batch(
let message = arrow_format::ipc::MessageRef::read_as_root(&data.data_header)
.map_err(|err| Error::OutOfSpec(format!("Unable to get root as message: {:?}", err)))?;

let length = data.data_body.len();
let mut reader = std::io::Cursor::new(&data.data_body);

match message.header()?.ok_or_else(|| {
Expand All @@ -132,6 +133,7 @@ pub fn deserialize_batch(
message.version()?,
&mut reader,
0,
length as u64,
),
_ => Err(Error::nyi(
"flight currently only supports reading RecordBatch messages",
Expand Down
8 changes: 0 additions & 8 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@
//! [2](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_write.rs),
//! [3](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow)).

use crate::error::Error;

mod compression;
mod endianess;

Expand Down Expand Up @@ -103,9 +101,3 @@ pub struct IpcSchema {
/// Endianness of the file
pub is_little_endian: bool,
}

impl From<arrow_format::ipc::planus::Error> for Error {
fn from(error: arrow_format::ipc::planus::Error) -> Self {
Error::OutOfSpec(error.to_string())
}
}
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

pub fn read_binary<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
Expand All @@ -34,9 +34,14 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
compression,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let offsets: Buffer<O> = read_buffer(
buffers,
1 + field_node.length() as usize,
1 + length,
reader,
block_offset,
is_little_endian,
Expand Down
8 changes: 6 additions & 2 deletions src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

pub fn read_boolean<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
Expand All @@ -24,7 +24,11 @@ pub fn read_boolean<R: Read + Seek>(
))
})?;

let length = field_node.length() as usize;
let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let validity = read_validity(
buffers,
field_node,
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

pub fn read_fixed_size_binary<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
Expand All @@ -33,7 +33,12 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
compression,
)?;

let length = field_node.length() as usize * FixedSizeBinaryArray::get_size(&data_type);
let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let length = length.saturating_mul(FixedSizeBinaryArray::get_size(&data_type));
let values = read_buffer(
buffers,
length,
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, Version};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_list<O: Offset, R: Read + Seek>(
Expand Down Expand Up @@ -45,9 +45,14 @@ where
compression,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let offsets = read_buffer::<O, _>(
buffers,
1 + field_node.length() as usize,
1 + length,
reader,
block_offset,
is_little_endian,
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, Version};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_map<R: Read + Seek>(
Expand Down Expand Up @@ -41,9 +41,14 @@ pub fn read_map<R: Read + Seek>(
compression,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let offsets = read_buffer::<i32, _>(
buffers,
1 + field_node.length() as usize,
1 + length,
reader,
block_offset,
is_little_endian,
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
error::{Error, Result},
};

use super::super::Node;
use super::super::{Node, OutOfSpecKind};

pub fn read_null(field_nodes: &mut VecDeque<Node>, data_type: DataType) -> Result<NullArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Expand All @@ -16,7 +16,12 @@ pub fn read_null(field_nodes: &mut VecDeque<Node>, data_type: DataType) -> Resul
))
})?;

NullArray::try_new(data_type, field_node.length() as usize)
let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

NullArray::try_new(data_type, length)
}

pub fn skip_null(field_nodes: &mut VecDeque<Node>) -> Result<()> {
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::error::{Error, Result};
use crate::{array::PrimitiveArray, types::NativeType};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

pub fn read_primitive<T: NativeType, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
Expand Down Expand Up @@ -36,9 +36,14 @@ where
compression,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let values = read_buffer(
buffers,
field_node.length() as usize,
length,
reader,
block_offset,
is_little_endian,
Expand Down
11 changes: 8 additions & 3 deletions src/io/ipc/read/array/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, Version};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_union<R: Read + Seek>(
Expand Down Expand Up @@ -38,9 +38,14 @@ pub fn read_union<R: Read + Seek>(
.ok_or_else(|| Error::oos("IPC: missing validity buffer."))?;
};

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let types = read_buffer(
buffers,
field_node.length() as usize,
length,
reader,
block_offset,
is_little_endian,
Expand All @@ -51,7 +56,7 @@ pub fn read_union<R: Read + Seek>(
if !mode.is_sparse() {
Some(read_buffer(
buffers,
field_node.length() as usize,
length,
reader,
block_offset,
is_little_endian,
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

pub fn read_utf8<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
Expand All @@ -34,9 +34,14 @@ pub fn read_utf8<O: Offset, R: Read + Seek>(
compression,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let offsets: Buffer<O> = read_buffer(
buffers,
1 + field_node.length() as usize,
1 + length,
reader,
block_offset,
is_little_endian,
Expand Down
Loading