Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Updated IPC Record type to support reference types.
Browse files Browse the repository at this point in the history
  • Loading branch information
dexterduck committed Mar 3, 2022
1 parent 819e4df commit 6ac0865
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 37 deletions.
49 changes: 36 additions & 13 deletions src/io/ipc/write/common.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::borrow::{Borrow, Cow};
use std::sync::Arc;

use arrow_format::ipc::planus::Builder;
Expand Down Expand Up @@ -382,30 +383,52 @@ pub(crate) fn pad_to_8(len: usize) -> usize {

/// An array [`Chunk`] with optional accompanying IPC fields.
#[derive(Debug, Clone, PartialEq)]
pub struct Record {
/// Chunk of Arrow columns to be written in IPC format.
pub columns: Chunk<Arc<dyn Array>>,
/// Optional IPC field list used to map Arrow columns to IPC dictionaries.
pub fields: Option<Vec<IpcField>>,
pub struct Record<'a> {
columns: Cow<'a, Chunk<Arc<dyn Array>>>,
fields: Option<Cow<'a, [IpcField]>>,
}

impl From<Chunk<Arc<dyn Array>>> for Record {
impl<'a> Record<'a> {
/// Get the IPC fields for this record.
pub fn fields(&self) -> Option<&[IpcField]> {
self.fields.as_deref()
}

/// Get the Arrow columns in this record.
pub fn columns(&self) -> &Chunk<Arc<dyn Array>> {
self.columns.borrow()
}
}

impl From<Chunk<Arc<dyn Array>>> for Record<'static> {
fn from(columns: Chunk<Arc<dyn Array>>) -> Self {
Self {
columns,
columns: Cow::Owned(columns),
fields: None,
}
}
}

impl From<(Chunk<Arc<dyn Array>>, Option<Vec<IpcField>>)> for Record {
fn from((columns, fields): (Chunk<Arc<dyn Array>>, Option<Vec<IpcField>>)) -> Self {
Self { columns, fields }
impl<'a, F> From<(Chunk<Arc<dyn Array>>, Option<F>)> for Record<'a>
where
F: Into<Cow<'a, [IpcField]>>,
{
fn from((columns, fields): (Chunk<Arc<dyn Array>>, Option<F>)) -> Self {
Self {
columns: Cow::Owned(columns),
fields: fields.map(|f| f.into()),
}
}
}

impl From<Record> for (Chunk<Arc<dyn Array>>, Option<Vec<IpcField>>) {
fn from(record: Record) -> Self {
(record.columns, record.fields)
impl<'a, F> From<(&'a Chunk<Arc<dyn Array>>, Option<F>)> for Record<'a>
where
F: Into<Cow<'a, [IpcField]>>,
{
fn from((columns, fields): (&'a Chunk<Arc<dyn Array>>, Option<F>)) -> Self {
Self {
columns: Cow::Borrowed(columns),
fields: fields.map(|f| f.into()),
}
}
}
19 changes: 10 additions & 9 deletions src/io/ipc/write/file_async.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
//! Async writer for IPC files.

use std::task::Poll;

use arrow_format::ipc::{planus::Builder, Block, Footer, MetadataVersion};
use futures::{future::BoxFuture, AsyncWrite, AsyncWriteExt, FutureExt, Sink};

use super::common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions};
use super::common_async::{write_continuation, write_message};
use super::schema::serialize_schema;
use super::{default_ipc_fields, schema_to_bytes, Record};
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
use crate::io::ipc::{IpcField, ARROW_MAGIC};
use arrow_format::ipc::{planus::Builder, Block, Footer, MetadataVersion};
use futures::{future::BoxFuture, AsyncWrite, AsyncWriteExt, FutureExt, Sink};
use std::task::Poll;

type WriteOutput<W> = (usize, Option<Block>, Vec<Block>, Option<W>);

Expand Down Expand Up @@ -175,7 +177,7 @@ where
}
}

impl<'a, W> Sink<Record> for FileSink<'a, W>
impl<'a, W> Sink<Record<'_>> for FileSink<'a, W>
where
W: AsyncWrite + Unpin + Send + 'a,
{
Expand All @@ -188,16 +190,15 @@ where
self.get_mut().poll_write(cx)
}

fn start_send(self: std::pin::Pin<&mut Self>, item: Record) -> Result<()> {
fn start_send(self: std::pin::Pin<&mut Self>, item: Record<'_>) -> Result<()> {
let this = self.get_mut();

if let Some(writer) = this.writer.take() {
let Record { columns, fields } = item;
let fields = fields.unwrap_or_else(|| this.fields.clone());
let fields = item.fields().unwrap_or_else(|| &this.fields[..]);

let (dictionaries, record) = encode_chunk(
&columns,
&fields[..],
item.columns(),
fields,
&mut this.dictionary_tracker,
&this.options,
)?;
Expand Down
26 changes: 15 additions & 11 deletions src/io/ipc/write/stream_async.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
//! `async` writing of arrow streams

use std::{pin::Pin, task::Poll};

use futures::{future::BoxFuture, AsyncWrite, FutureExt, Sink};

use super::super::IpcField;
pub use super::common::WriteOptions;
use super::common::{encode_chunk, DictionaryTracker, EncodedData};
use super::common_async::{write_continuation, write_message};
use super::{default_ipc_fields, schema_to_bytes, Record};

use futures::{future::BoxFuture, AsyncWrite, FutureExt, Sink};
use std::{pin::Pin, task::Poll};

use crate::datatypes::*;
use crate::error::{ArrowError, Result};

Expand Down Expand Up @@ -93,11 +94,14 @@ where
.boxed()
}

fn write(&mut self, record: &Record) -> Result<()> {
let Record { columns, fields } = record;
let fields = fields.as_ref().unwrap_or(&self.fields);
let (dictionaries, message) =
encode_chunk(columns, fields, &mut self.dictionary_tracker, &self.options)?;
fn write(&mut self, record: Record<'_>) -> Result<()> {
let fields = record.fields().unwrap_or(&self.fields[..]);
let (dictionaries, message) = encode_chunk(
record.columns(),
fields,
&mut self.dictionary_tracker,
&self.options,
)?;

if let Some(mut writer) = self.writer.take() {
self.task = Some(
Expand Down Expand Up @@ -138,7 +142,7 @@ where
}
}

impl<'a, W> Sink<Record> for StreamSink<'a, W>
impl<'a, W> Sink<Record<'_>> for StreamSink<'a, W>
where
W: AsyncWrite + Unpin + Send,
{
Expand All @@ -148,8 +152,8 @@ where
self.get_mut().poll_complete(cx)
}

fn start_send(self: Pin<&mut Self>, item: Record) -> Result<()> {
self.get_mut().write(&item)
fn start_send(self: Pin<&mut Self>, item: Record<'_>) -> Result<()> {
self.get_mut().write(item)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Result<()>> {
Expand Down
3 changes: 1 addition & 2 deletions tests/it/io/ipc/write_file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ async fn write_(
let options = WriteOptions { compression: None };
let mut sink = FileSink::new(&mut result, schema, Some(ipc_fields.to_vec()), options);
for batch in batches {
sink.feed((batch.clone(), Some(ipc_fields.to_vec())).into())
.await?;
sink.feed((batch, Some(ipc_fields)).into()).await?;
}
sink.close().await?;
drop(sink);
Expand Down
3 changes: 1 addition & 2 deletions tests/it/io/ipc/write_stream_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ async fn write_(
let options = stream_async::WriteOptions { compression: None };
let mut sink = StreamSink::new(&mut result, schema, Some(ipc_fields.to_vec()), options);
for batch in batches {
sink.feed((batch.clone(), Some(ipc_fields.to_vec())).into())
.await?;
sink.feed((batch, Some(ipc_fields)).into()).await?;
}
sink.close().await?;
drop(sink);
Expand Down

0 comments on commit 6ac0865

Please sign in to comment.