Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Change IPC FileReader to own the underlying reader #518

Merged
merged 1 commit into from Oct 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions examples/extension.rs
Expand Up @@ -46,9 +46,9 @@ fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<
Ok(writer.into_inner())
}

fn read_ipc(reader: &[u8]) -> Result<RecordBatch> {
let mut reader = Cursor::new(reader);
let metadata = read::read_file_metadata(&mut reader)?;
let mut reader = read::FileReader::new(&mut reader, metadata, None);
fn read_ipc(buf: &[u8]) -> Result<RecordBatch> {
let mut cursor = Cursor::new(buf);
let metadata = read::read_file_metadata(&mut cursor)?;
let mut reader = read::FileReader::new(cursor, metadata, None);
reader.next().unwrap()
}
2 changes: 1 addition & 1 deletion examples/ipc_file_read.rs
Expand Up @@ -12,7 +12,7 @@ fn read_batches(path: &str) -> Result<Vec<RecordBatch>> {
let metadata = read_file_metadata(&mut file)?;

// Simplest way: use the reader, an iterator over batches.
let reader = FileReader::new(&mut file, metadata, None);
let reader = FileReader::new(file, metadata, None);

reader.collect()
}
Expand Down
2 changes: 1 addition & 1 deletion integration-testing/src/bin/arrow-file-to-stream.rs
Expand Up @@ -27,7 +27,7 @@ fn main() -> Result<()> {
let filename = &args[1];
let mut f = File::open(filename)?;
let metadata = read::read_file_metadata(&mut f)?;
let mut reader = read::FileReader::new(&mut f, metadata, None);
let mut reader = read::FileReader::new(f, metadata, None);
let schema = reader.schema();

let mut writer = StreamWriter::try_new(std::io::stdout(), schema)?;
Expand Down
4 changes: 2 additions & 2 deletions integration-testing/src/bin/arrow-json-integration-test.rs
Expand Up @@ -99,7 +99,7 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()>

let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(&mut arrow_file, metadata, None);
let reader = read::FileReader::new(arrow_file, metadata, None);

let mut fields: Vec<ArrowJsonField> = vec![];
for f in reader.schema().fields() {
Expand Down Expand Up @@ -137,7 +137,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
// open Arrow file
let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(&mut arrow_file, metadata, None);
let reader = read::FileReader::new(arrow_file, metadata, None);
let arrow_schema = reader.schema().as_ref().to_owned();

// compare schemas
Expand Down
17 changes: 11 additions & 6 deletions src/io/ipc/read/reader.rs
Expand Up @@ -62,8 +62,8 @@ impl FileMetadata {
}

/// Arrow File reader
pub struct FileReader<'a, R: Read + Seek> {
reader: &'a mut R,
pub struct FileReader<R: Read + Seek> {
reader: R,
metadata: FileMetadata,
current_block: usize,
projection: Option<(Vec<usize>, Arc<Schema>)>,
Expand Down Expand Up @@ -231,11 +231,11 @@ pub fn read_batch<R: Read + Seek>(
}
}

impl<'a, R: Read + Seek> FileReader<'a, R> {
impl<R: Read + Seek> FileReader<R> {
/// Creates a new [`FileReader`]. Use `projection` to only take certain columns.
/// # Panic
/// Panics iff the projection is not in increasing order (e.g. `[1, 0]` nor `[0, 1, 1]` are valid)
pub fn new(reader: &'a mut R, metadata: FileMetadata, projection: Option<Vec<usize>>) -> Self {
pub fn new(reader: R, metadata: FileMetadata, projection: Option<Vec<usize>>) -> Self {
if let Some(projection) = projection.as_ref() {
let _ = projection.iter().fold(0, |mut acc, v| {
assert!(
Expand Down Expand Up @@ -270,9 +270,14 @@ impl<'a, R: Read + Seek> FileReader<'a, R> {
.map(|x| &x.1)
.unwrap_or(&self.metadata.schema)
}

/// Consumes this FileReader, returning the underlying reader
pub fn into_inner(self) -> R {
self.reader
}
}

impl<'a, R: Read + Seek> Iterator for FileReader<'a, R> {
impl<R: Read + Seek> Iterator for FileReader<R> {
type Item = Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -295,7 +300,7 @@ impl<'a, R: Read + Seek> Iterator for FileReader<'a, R> {
}
}

impl<'a, R: Read + Seek> RecordBatchReader for FileReader<'a, R> {
impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
fn schema(&self) -> &Schema {
self.schema().as_ref()
}
Expand Down
2 changes: 1 addition & 1 deletion tests/it/io/ipc/read/file.rs
Expand Up @@ -16,7 +16,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> {
let (schema, batches) = read_gzip_json(version, file_name)?;

let metadata = read_file_metadata(&mut file)?;
let reader = FileReader::new(&mut file, metadata, None);
let reader = FileReader::new(file, metadata, None);

assert_eq!(&schema, reader.schema().as_ref());

Expand Down
4 changes: 2 additions & 2 deletions tests/it/io/ipc/write/file.rs
Expand Up @@ -23,7 +23,7 @@ fn round_trip(batch: RecordBatch) -> Result<()> {
let metadata = read_file_metadata(&mut reader)?;
let schema = metadata.schema().clone();

let reader = FileReader::new(&mut reader, metadata, None);
let reader = FileReader::new(reader, metadata, None);

// read expected JSON output
let (expected_schema, expected_batches) = (batch.schema().clone(), vec![batch]);
Expand Down Expand Up @@ -55,7 +55,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> {
let metadata = read_file_metadata(&mut reader)?;
let schema = metadata.schema().clone();

let reader = FileReader::new(&mut reader, metadata, None);
let reader = FileReader::new(reader, metadata, None);

// read expected JSON output
let (expected_schema, expected_batches) = read_gzip_json(version, file_name)?;
Expand Down