Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Changed IPC FileWriter to own the writer. #420

Merged
merged 3 commits into from Sep 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions benches/write_ipc.rs
Expand Up @@ -15,8 +15,8 @@ fn write(array: &dyn Array) -> Result<()> {
let schema = Schema::new(vec![field]);
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![clone(array).into()])?;

let mut writer = Cursor::new(vec![]);
let mut writer = FileWriter::try_new(&mut writer, &schema)?;
let writer = Cursor::new(vec![]);
let mut writer = FileWriter::try_new(writer, &schema)?;

writer.write(&batch)
}
Expand Down
12 changes: 7 additions & 5 deletions examples/extension.rs
Expand Up @@ -17,13 +17,13 @@ fn main() -> Result<()> {
let array = UInt16Array::from_slice([1, 2]).to(extension_type.clone());

// from here on, it works as usual
let mut buffer = Cursor::new(vec![]);
let buffer = Cursor::new(vec![]);

// write to IPC
write_ipc(&mut buffer, array)?;
let result_buffer = write_ipc(buffer, array)?;

// read it back
let batch = read_ipc(&buffer.into_inner())?;
let batch = read_ipc(&result_buffer.into_inner())?;

// and verify that the datatype is preserved.
let array = &batch.columns()[0];
Expand All @@ -34,14 +34,16 @@ fn main() -> Result<()> {
Ok(())
}

fn write_ipc<W: Write + Seek>(writer: &mut W, array: impl Array + 'static) -> Result<()> {
fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<W> {
let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), false)]);

let mut writer = write::FileWriter::try_new(writer, &schema)?;

let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;

writer.write(&batch)
writer.write(&batch)?;

Ok(writer.into_inner())
}

fn read_ipc(reader: &[u8]) -> Result<RecordBatch> {
Expand Down
4 changes: 2 additions & 2 deletions examples/ipc_file_write.rs
Expand Up @@ -8,9 +8,9 @@ use arrow2::io::ipc::write;
use arrow2::record_batch::RecordBatch;

fn write_batches(path: &str, schema: &Schema, batches: &[RecordBatch]) -> Result<()> {
let mut file = File::create(path)?;
let file = File::create(path)?;

let mut writer = write::FileWriter::try_new(&mut file, schema)?;
let mut writer = write::FileWriter::try_new(file, schema)?;

for batch in batches {
writer.write(batch)?
Expand Down
4 changes: 2 additions & 2 deletions integration-testing/src/bin/arrow-json-integration-test.rs
Expand Up @@ -80,8 +80,8 @@ fn json_to_arrow(json_name: &str, arrow_name: &str, verbose: bool) -> Result<()>

let json_file = read_json_file(json_name)?;

let mut arrow_file = File::create(arrow_name)?;
let mut writer = FileWriter::try_new(&mut arrow_file, &json_file.schema)?;
let arrow_file = File::create(arrow_name)?;
let mut writer = FileWriter::try_new(arrow_file, &json_file.schema)?;

for b in json_file.batches {
writer.write(&b)?;
Expand Down
4 changes: 2 additions & 2 deletions integration-testing/src/bin/arrow-stream-to-file.rs
Expand Up @@ -27,9 +27,9 @@ fn main() -> Result<()> {
let mut arrow_stream_reader = read::StreamReader::new(reader, metadata);
let schema = arrow_stream_reader.schema();

let mut writer = io::stdout();
let writer = io::stdout();

let mut writer = FileWriter::try_new(&mut writer, schema)?;
let mut writer = FileWriter::try_new(writer, schema)?;

arrow_stream_reader.try_for_each(|batch| writer.write(&batch?.unwrap()))?;
writer.finish()?;
Expand Down
25 changes: 10 additions & 15 deletions src/io/ipc/write/writer.rs
Expand Up @@ -37,9 +37,9 @@ use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

pub struct FileWriter<'a, W: Write> {
pub struct FileWriter<W: Write> {
/// The object to write to
writer: &'a mut W,
writer: W,
/// IPC write options
write_options: IpcWriteOptions,
/// A reference to the schema, used in validating record batches
Expand All @@ -56,16 +56,16 @@ pub struct FileWriter<'a, W: Write> {
dictionary_tracker: DictionaryTracker,
}

impl<'a, W: Write> FileWriter<'a, W> {
impl<W: Write> FileWriter<W> {
/// Try create a new writer, with the schema written as part of the header
pub fn try_new(writer: &'a mut W, schema: &Schema) -> Result<Self> {
pub fn try_new(writer: W, schema: &Schema) -> Result<Self> {
let write_options = IpcWriteOptions::default();
Self::try_new_with_options(writer, schema, write_options)
}

/// Try create a new writer with IpcWriteOptions
pub fn try_new_with_options(
writer: &'a mut W,
mut writer: W,
schema: &Schema,
write_options: IpcWriteOptions,
) -> Result<Self> {
Expand All @@ -78,7 +78,7 @@ impl<'a, W: Write> FileWriter<'a, W> {
ipc_message: schema_to_bytes(schema, *write_options.metadata_version()),
arrow_data: vec![],
};
let (meta, data) = write_message(writer, encoded_message, &write_options)?;
let (meta, data) = write_message(&mut writer, encoded_message, &write_options)?;
Ok(Self {
writer,
write_options,
Expand All @@ -91,6 +91,10 @@ impl<'a, W: Write> FileWriter<'a, W> {
})
}

pub fn into_inner(self) -> W {
self.writer
}

/// Write a record batch to the file
pub fn write(&mut self, batch: &RecordBatch) -> Result<()> {
if self.finished {
Expand Down Expand Up @@ -153,12 +157,3 @@ impl<'a, W: Write> FileWriter<'a, W> {
Ok(())
}
}

/// Finish the file if it is not 'finished' when it goes out of scope
impl<'a, W: Write> Drop for FileWriter<'a, W> {
jorgecarleitao marked this conversation as resolved.
Show resolved Hide resolved
fn drop(&mut self) {
if !self.finished {
self.finish().unwrap();
}
}
}
22 changes: 12 additions & 10 deletions tests/it/io/ipc/write/file.rs
Expand Up @@ -9,16 +9,17 @@ use arrow2::record_batch::RecordBatch;
use crate::io::ipc::common::read_gzip_json;

fn round_trip(batch: RecordBatch) -> Result<()> {
let mut result = Vec::<u8>::new();
let result = Vec::<u8>::new();

// write IPC version 5
{
let written_result = {
let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
let mut writer = FileWriter::try_new_with_options(&mut result, batch.schema(), options)?;
let mut writer = FileWriter::try_new_with_options(result, batch.schema(), options)?;
writer.write(&batch)?;
writer.finish()?;
}
let mut reader = Cursor::new(result);
writer.into_inner()
};
let mut reader = Cursor::new(written_result);
let metadata = read_file_metadata(&mut reader)?;
let schema = metadata.schema().clone();

Expand All @@ -38,18 +39,19 @@ fn round_trip(batch: RecordBatch) -> Result<()> {
fn test_file(version: &str, file_name: &str) -> Result<()> {
let (schema, batches) = read_gzip_json(version, file_name)?;

let mut result = Vec::<u8>::new();
let result = Vec::<u8>::new();

// write IPC version 5
{
let written_result = {
let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5)?;
let mut writer = FileWriter::try_new_with_options(&mut result, &schema, options)?;
let mut writer = FileWriter::try_new_with_options(result, &schema, options)?;
for batch in batches {
writer.write(&batch)?;
}
writer.finish()?;
}
let mut reader = Cursor::new(result);
writer.into_inner()
};
let mut reader = Cursor::new(written_result);
let metadata = read_file_metadata(&mut reader)?;
let schema = metadata.schema().clone();

Expand Down