Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Audited all IPC read code
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jun 16, 2022
1 parent 5aeee0e commit 1ef454f
Show file tree
Hide file tree
Showing 22 changed files with 496 additions and 209 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ indexmap = { version = "^1.6", optional = true }
# used to print columns in a nice columnar format
comfy-table = { version = "5.0", optional = true, default-features = false }

arrow-format = { version = "0.6", optional = true, features = ["ipc"] }
arrow-format = { version = "0.7", optional = true, features = ["ipc"] }

hex = { version = "^0.4", optional = true }

Expand Down
2 changes: 1 addition & 1 deletion integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ logging = ["tracing-subscriber"]

[dependencies]
arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] }
arrow-format = { version = "0.6", features = ["full"] }
arrow-format = { version = "0.7", features = ["full"] }
async-trait = "0.1.41"
clap = { version = "^3", features = ["derive"] }
futures = "0.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ async fn record_batch_from_message(
ipc_schema: &IpcSchema,
dictionaries: &mut Dictionaries,
) -> Result<Chunk<Box<dyn Array>>, Status> {
let length = data_body.len();
let mut reader = std::io::Cursor::new(data_body);

let arrow_batch_result = ipc::read::read_record_batch(
Expand All @@ -294,6 +295,7 @@ async fn record_batch_from_message(
arrow_format::ipc::MetadataVersion::V5,
&mut reader,
0,
length,
);

arrow_batch_result.map_err(|e| Status::internal(format!("Could not convert to Chunk: {:?}", e)))
Expand All @@ -306,10 +308,11 @@ async fn dictionary_from_message(
ipc_schema: &IpcSchema,
dictionaries: &mut Dictionaries,
) -> Result<(), Status> {
let length = data_body.len();
let mut reader = std::io::Cursor::new(data_body);

let dictionary_batch_result =
ipc::read::read_dictionary(dict_batch, fields, ipc_schema, dictionaries, &mut reader, 0);
ipc::read::read_dictionary(dict_batch, fields, ipc_schema, dictionaries, &mut reader, 0, length);
dictionary_batch_result
.map_err(|e| Status::internal(format!("Could not convert to Dictionary: {:?}", e)))
}
Expand Down
8 changes: 0 additions & 8 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@
//! [2](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_write.rs),
//! [3](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow)).

use crate::error::Error;

mod compression;
mod endianess;

Expand Down Expand Up @@ -103,9 +101,3 @@ pub struct IpcSchema {
/// Endianness of the file
pub is_little_endian: bool,
}

impl From<arrow_format::ipc::planus::Error> for Error {
fn from(error: arrow_format::ipc::planus::Error) -> Self {
Error::OutOfSpec(error.to_string())
}
}
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

pub fn read_binary<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
Expand All @@ -34,9 +34,14 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
compression,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let offsets: Buffer<O> = read_buffer(
buffers,
1 + field_node.length() as usize,
1 + length,
reader,
block_offset,
is_little_endian,
Expand Down
8 changes: 6 additions & 2 deletions src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

pub fn read_boolean<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
Expand All @@ -24,7 +24,11 @@ pub fn read_boolean<R: Read + Seek>(
))
})?;

let length = field_node.length() as usize;
let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let validity = read_validity(
buffers,
field_node,
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

pub fn read_fixed_size_binary<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
Expand All @@ -33,7 +33,12 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
compression,
)?;

let length = field_node.length() as usize * FixedSizeBinaryArray::get_size(&data_type);
let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let length = length * FixedSizeBinaryArray::get_size(&data_type);
let values = read_buffer(
buffers,
length,
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, Version};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_list<O: Offset, R: Read + Seek>(
Expand Down Expand Up @@ -45,9 +45,14 @@ where
compression,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let offsets = read_buffer::<O, _>(
buffers,
1 + field_node.length() as usize,
1 + length,
reader,
block_offset,
is_little_endian,
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, Version};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_map<R: Read + Seek>(
Expand Down Expand Up @@ -41,9 +41,14 @@ pub fn read_map<R: Read + Seek>(
compression,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let offsets = read_buffer::<i32, _>(
buffers,
1 + field_node.length() as usize,
1 + length,
reader,
block_offset,
is_little_endian,
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
error::{Error, Result},
};

use super::super::Node;
use super::super::{Node, OutOfSpecKind};

pub fn read_null(field_nodes: &mut VecDeque<Node>, data_type: DataType) -> Result<NullArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Expand All @@ -16,7 +16,12 @@ pub fn read_null(field_nodes: &mut VecDeque<Node>, data_type: DataType) -> Resul
))
})?;

NullArray::try_new(data_type, field_node.length() as usize)
let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

NullArray::try_new(data_type, length)
}

pub fn skip_null(field_nodes: &mut VecDeque<Node>) -> Result<()> {
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::error::{Error, Result};
use crate::{array::PrimitiveArray, types::NativeType};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

pub fn read_primitive<T: NativeType, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
Expand Down Expand Up @@ -36,9 +36,14 @@ where
compression,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let values = read_buffer(
buffers,
field_node.length() as usize,
length,
reader,
block_offset,
is_little_endian,
Expand Down
11 changes: 8 additions & 3 deletions src/io/ipc/read/array/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::Dictionaries;
use super::super::{Compression, IpcBuffer, Node, Version};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind, Version};

#[allow(clippy::too_many_arguments)]
pub fn read_union<R: Read + Seek>(
Expand Down Expand Up @@ -38,9 +38,14 @@ pub fn read_union<R: Read + Seek>(
.ok_or_else(|| Error::oos("IPC: missing validity buffer."))?;
};

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let types = read_buffer(
buffers,
field_node.length() as usize,
length,
reader,
block_offset,
is_little_endian,
Expand All @@ -51,7 +56,7 @@ pub fn read_union<R: Read + Seek>(
if !mode.is_sparse() {
Some(read_buffer(
buffers,
field_node.length() as usize,
length,
reader,
block_offset,
is_little_endian,
Expand Down
9 changes: 7 additions & 2 deletions src/io/ipc/read/array/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::datatypes::DataType;
use crate::error::{Error, Result};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node};
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};

pub fn read_utf8<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
Expand All @@ -34,9 +34,14 @@ pub fn read_utf8<O: Offset, R: Read + Seek>(
compression,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let offsets: Buffer<O> = read_buffer(
buffers,
1 + field_node.length() as usize,
1 + length,
reader,
block_offset,
is_little_endian,
Expand Down
Loading

0 comments on commit 1ef454f

Please sign in to comment.