Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add destination/writer trait #838

Merged
merged 4 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[workspace]
members = ["crates/*"]
resolver = "2"
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved

[workspace.package]
authors = ["Kaskada Developers"]
Expand Down
5 changes: 3 additions & 2 deletions crates/sparrow-interfaces/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@ Common interfaces for the Sparrow compilation and runtime.
[dependencies]
arrow-array.workspace = true
arrow-schema.workspace = true
async-trait.workspace = true
derive_more.workspace = true
error-stack.workspace = true
futures.workspace = true
hashbrown.workspace = true
index_vec.workspace = true
inventory.workspace = true
itertools.workspace = true
serde.workspace = true
sparrow-arrow = { path = "../sparrow-arrow" }
sparrow-batch = { path = "../sparrow-batch" }
static_init.workspace = true

[dev-dependencies]

[lib]
bench = false
doctest = false
64 changes: 64 additions & 0 deletions crates/sparrow-interfaces/src/destination.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use sparrow_batch::Batch;
use std::fmt::Debug;

use crate::types::Partition;

/// Trait implemented by destinations.
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
pub trait Destination: Send + Sync + Debug {
/// Creates a new writer to this destination.
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
fn new_writer(
&self,
partition: Partition,
) -> error_stack::Result<Box<dyn Writer>, DestinationError>;
}

pub trait Writer: Send + Sync + Debug {
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
/// Write a batch to the given writer.
///
/// NOTE: Some destinations (such as Parquet) may actually rotate files during / after
/// calls to `write_batch`.
/// TODO: FRAZ - write_batch doesn't need an "outputconfig" because at the time of execution,
/// we're able to create the writer _with_ the output config already (I THINK).
fn write_batch(&mut self, batch: Batch) -> error_stack::Result<(), WriteError>;

/// Close this writer.
fn close(&self) -> error_stack::Result<(), WriteError>;
}

#[non_exhaustive]
#[derive(derive_more::Display, Debug)]
pub enum DestinationError {
#[display(fmt = "internal error: {}", _0)]
Internal(&'static str),
}

impl error_stack::Context for DestinationError {}

impl DestinationError {
pub fn internal() -> Self {
DestinationError::Internal("no additional context")
}

pub fn internal_msg(msg: &'static str) -> Self {
DestinationError::Internal(msg)
}
}

#[non_exhaustive]
#[derive(derive_more::Display, Debug)]
pub enum WriteError {
#[display(fmt = "internal error on write: {}", _0)]
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
Internal(&'static str),
}

impl error_stack::Context for WriteError {}

impl WriteError {
pub fn internal() -> Self {
WriteError::Internal("no additional context")
}

pub fn internal_msg(msg: &'static str) -> Self {
WriteError::Internal(msg)
}
}
2 changes: 2 additions & 0 deletions crates/sparrow-interfaces/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
clippy::print_stderr
)]

pub mod destination;
mod execution_options;
pub mod expression;
pub mod source;
pub mod types;

pub use execution_options::*;
2 changes: 2 additions & 0 deletions crates/sparrow-interfaces/src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod partition;
pub use partition::*;
3 changes: 3 additions & 0 deletions crates/sparrow-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ Defines the IO implementations.
arrow-array.workspace = true
async-broadcast.workspace = true
async-stream.workspace = true
async-trait.workspace = true
arrow-schema.workspace = true
error-stack.workspace = true
futures.workspace = true
parking_lot.workspace = true
tokio.workspace = true
sparrow-merge = { path = "../sparrow-merge" }
sparrow-batch = { path = "../sparrow-batch" }
sparrow-interfaces = { path = "../sparrow-interfaces" }
Expand Down
5 changes: 2 additions & 3 deletions crates/sparrow-io/src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
mod source;
mod destination;

pub use source::InMemoryBatches;
pub use source::InMemorySource;
pub use destination::*;
73 changes: 73 additions & 0 deletions crates/sparrow-io/src/channel/destination.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use error_stack::{IntoReport, ResultExt};
use sparrow_batch::Batch;
use sparrow_interfaces::{
destination::{Destination, DestinationError, WriteError, Writer},
types::Partition,
};

#[derive(Debug)]
pub struct ChannelDestination {
/// The expected output schema.
schema: SchemaRef,
// TODO: The output current expects [RecordBatch] instead of [Batch]
// but we should standardize on [Batch].
txs: Vec<tokio::sync::mpsc::Sender<RecordBatch>>,
}

impl ChannelDestination {
/// Construct a new channel destination with the given senders.
///
/// The number of `txs` should equal the number of partitions the output
/// produces from.
pub fn new(schema: SchemaRef, txs: Vec<tokio::sync::mpsc::Sender<RecordBatch>>) -> Self {
Self { schema, txs }
}
}

impl Destination for ChannelDestination {
fn new_writer(
&self,
partition: Partition,
) -> error_stack::Result<Box<dyn Writer>, DestinationError> {
let partition: usize = partition.into();
let tx = self
.txs
.get(partition)
.ok_or_else(|| {
DestinationError::internal_msg("expected channel for partition {partition}")
})?
.clone(); // Senders can be cloned cheaply
Ok(Box::new(ChannelWriter {
schema: self.schema.clone(),
tx,
}))
}
}

#[derive(Debug)]
struct ChannelWriter {
schema: SchemaRef,
tx: tokio::sync::mpsc::Sender<RecordBatch>,
}

impl Writer for ChannelWriter {
fn write_batch(&mut self, batch: Batch) -> error_stack::Result<(), WriteError> {
// HACK: This converts `Batch` to `RecordBatch` because the current execution logic
// expects `RecordBatch` outputs. This should be changed to standardize on `Batch`
// which makes it easier to carry a primitive value out.
if let Some(batch) = batch.into_record_batch(self.schema.clone()) {
self.tx
.blocking_send(batch)
.into_report()
.change_context(WriteError::internal())?;
}
Ok(())
}

fn close(&self) -> error_stack::Result<(), WriteError> {
self.tx.downgrade();
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/sparrow-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
clippy::print_stderr
)]

pub mod channel;
pub mod in_memory;
1 change: 1 addition & 0 deletions crates/sparrow-merge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ sparrow-arrow = { path = "../sparrow-arrow" }
sparrow-batch = { path = "../sparrow-batch" }
sparrow-core = { path = "../sparrow-core" }
sparrow-instructions = { path = "../sparrow-instructions" }
sparrow-interfaces = { path = "../sparrow-interfaces" }
sparrow-physical = { path = "../sparrow-physical" }
sparrow-scheduler = { path = "../sparrow-scheduler" }
tokio.workspace = true
Expand Down
5 changes: 2 additions & 3 deletions crates/sparrow-merge/src/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use arrow_schema::DataType;
use error_stack::{IntoReport, ResultExt};
use parking_lot::Mutex;
use sparrow_batch::Batch;
use sparrow_interfaces::types::{Partition, Partitioned};
use sparrow_physical::StepId;
use sparrow_scheduler::{
InputHandles, Partition, Partitioned, Pipeline, PipelineError, Scheduler, TaskRef,
};
use sparrow_scheduler::{InputHandles, Pipeline, PipelineError, Scheduler, TaskRef};
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::mpsc::error::TryRecvError;

Expand Down
11 changes: 7 additions & 4 deletions crates/sparrow-plan-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use hashbrown::HashMap;
use sparrow_interfaces::source::Source;
use sparrow_interfaces::ExecutionOptions;
use sparrow_io::channel::ChannelDestination;
use sparrow_merge::MergePipeline;
use sparrow_physical::StepId;
use sparrow_transforms::TransformPipeline;
use std::sync::Arc;
use write_pipeline::WritePipeline;

use error_stack::ResultExt;
use itertools::Itertools;
Expand All @@ -27,15 +29,14 @@ use uuid::Uuid;

mod error;
mod source_tasks;
mod write_channel_pipeline;
mod write_pipeline;

#[cfg(test)]
mod tests;

pub use error::*;

use crate::source_tasks::SourceTasks;
use crate::write_channel_pipeline::WriteChannelPipeline;

pub struct PlanExecutor {
worker_pool: WorkerPoolBuilder,
Expand Down Expand Up @@ -85,9 +86,11 @@ impl PlanExecutor {
let last_step = plan.steps.last().expect("at least one step");
let output_schema = result_type_to_output_schema(&last_step.result_type);

// Creates a single channel destination for the output.
let destination = Arc::new(ChannelDestination::new(output_schema.clone(), vec![output]));
let sink_pipeline = executor
.worker_pool
.add_pipeline(1, WriteChannelPipeline::new(output, output_schema));
.add_pipeline(1, WritePipeline::new(destination, output_schema));

// Map from the producing step ID to the consumers.
let mut step_consumers: HashMap<StepId, InputHandles> = HashMap::new();
Expand All @@ -112,7 +115,7 @@ impl PlanExecutor {

let first_step_inputs = &plan.steps[first_step_id].inputs;

// Create a transform pipeline (if possible), othrewise create the
// Create a transform pipeline (if possible), otherwise create the
// appropriate non-transform pipeline.
let pipeline = if pipeline
.steps
Expand Down
Loading
Loading