Skip to content

Commit

Permalink
feat: add destination/writer trait (#838)
Browse files Browse the repository at this point in the history
And implements them for tokio channel(s).
  • Loading branch information
jordanrfrazier committed Oct 31, 2023
1 parent 944b648 commit 7508a6d
Show file tree
Hide file tree
Showing 26 changed files with 412 additions and 131 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[workspace]
members = ["crates/*"]
resolver = "2"

[workspace.package]
authors = ["Kaskada Developers"]
Expand Down
5 changes: 4 additions & 1 deletion crates/sparrow-interfaces/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ derive_more.workspace = true
error-stack.workspace = true
futures.workspace = true
hashbrown.workspace = true
index_vec.workspace = true
inventory.workspace = true
itertools.workspace = true
serde.workspace = true
sparrow-arrow = { path = "../sparrow-arrow" }
sparrow-batch = { path = "../sparrow-batch" }
static_init.workspace = true

[dev-dependencies]
[package.metadata.cargo-machete]
ignored = ["serde"]

[lib]
bench = false
Expand Down
47 changes: 47 additions & 0 deletions crates/sparrow-interfaces/src/destination.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use sparrow_batch::Batch;
use std::fmt::Debug;

use crate::types::Partition;

/// Interface defining how output is written to a particular destination.
pub trait Destination: Send + Sync + Debug {
/// Creates a new writer to this destination.
///
/// Arguments:
/// * partition: The partition this writer will write to.
fn new_writer(&self, partition: Partition) -> error_stack::Result<Box<dyn Writer>, WriteError>;
}

/// Writer for a specific destination.
pub trait Writer: Send + Sync + Debug {
/// Write a batch to the given writer.
///
/// NOTE: Some destinations (such as Parquet) may actually rotate files
/// during / after calls to `write_batch`.
///
/// Arguments:
/// * batch - The batch to write.
fn write_batch(&mut self, batch: Batch) -> error_stack::Result<(), WriteError>;

/// Close this writer.
fn close(&self) -> error_stack::Result<(), WriteError>;
}

#[non_exhaustive]
#[derive(derive_more::Display, Debug)]
pub enum WriteError {
#[display(fmt = "internal error on write: {}", _0)]
Internal(&'static str),
}

impl error_stack::Context for WriteError {}

impl WriteError {
pub fn internal() -> Self {
WriteError::Internal("no additional context")
}

pub fn internal_msg(msg: &'static str) -> Self {
WriteError::Internal(msg)
}
}
2 changes: 2 additions & 0 deletions crates/sparrow-interfaces/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
clippy::print_stderr
)]

pub mod destination;
mod execution_options;
pub mod expression;
pub mod source;
pub mod types;

pub use execution_options::*;
2 changes: 2 additions & 0 deletions crates/sparrow-interfaces/src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod partition;
pub use partition::*;
File renamed without changes.
1 change: 1 addition & 0 deletions crates/sparrow-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async-stream.workspace = true
arrow-schema.workspace = true
error-stack.workspace = true
futures.workspace = true
tokio.workspace = true
sparrow-merge = { path = "../sparrow-merge" }
sparrow-batch = { path = "../sparrow-batch" }
sparrow-interfaces = { path = "../sparrow-interfaces" }
Expand Down
5 changes: 2 additions & 3 deletions crates/sparrow-io/src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
mod source;
mod destination;

pub use source::InMemoryBatches;
pub use source::InMemorySource;
pub use destination::*;
68 changes: 68 additions & 0 deletions crates/sparrow-io/src/channel/destination.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use error_stack::{IntoReport, ResultExt};
use sparrow_batch::Batch;
use sparrow_interfaces::{
destination::{Destination, WriteError, Writer},
types::Partition,
};

#[derive(Debug)]
pub struct ChannelDestination {
/// The expected output schema.
schema: SchemaRef,
// TODO: The output current expects [RecordBatch] instead of [Batch]
// but we should standardize on [Batch].
txs: Vec<tokio::sync::mpsc::Sender<RecordBatch>>,
}

impl ChannelDestination {
/// Construct a new channel destination with the given senders.
///
/// The number of `txs` should equal the number of partitions the output
/// produces from.
pub fn new(schema: SchemaRef, txs: Vec<tokio::sync::mpsc::Sender<RecordBatch>>) -> Self {
Self { schema, txs }
}
}

impl Destination for ChannelDestination {
fn new_writer(&self, partition: Partition) -> error_stack::Result<Box<dyn Writer>, WriteError> {
let partition: usize = partition.into();
let tx = self
.txs
.get(partition)
.ok_or_else(|| WriteError::internal_msg("expected channel for partition {partition}"))?
.clone(); // Senders can be cloned cheaply
Ok(Box::new(ChannelWriter {
schema: self.schema.clone(),
tx,
}))
}
}

#[derive(Debug)]
struct ChannelWriter {
schema: SchemaRef,
tx: tokio::sync::mpsc::Sender<RecordBatch>,
}

impl Writer for ChannelWriter {
fn write_batch(&mut self, batch: Batch) -> error_stack::Result<(), WriteError> {
// HACK: This converts `Batch` to `RecordBatch` because the current execution logic
// expects `RecordBatch` outputs. This should be changed to standardize on `Batch`
// which makes it easier to carry a primitive value out.
if let Some(batch) = batch.into_record_batch(self.schema.clone()) {
self.tx
.blocking_send(batch)
.into_report()
.change_context(WriteError::internal())?;
}
Ok(())
}

fn close(&self) -> error_stack::Result<(), WriteError> {
self.tx.downgrade();
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/sparrow-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
clippy::print_stderr
)]

pub mod channel;
pub mod in_memory;
1 change: 1 addition & 0 deletions crates/sparrow-merge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ sparrow-arrow = { path = "../sparrow-arrow" }
sparrow-batch = { path = "../sparrow-batch" }
sparrow-core = { path = "../sparrow-core" }
sparrow-instructions = { path = "../sparrow-instructions" }
sparrow-interfaces = { path = "../sparrow-interfaces" }
sparrow-physical = { path = "../sparrow-physical" }
sparrow-scheduler = { path = "../sparrow-scheduler" }
tokio.workspace = true
Expand Down
25 changes: 13 additions & 12 deletions crates/sparrow-merge/src/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use arrow_schema::DataType;
use error_stack::{IntoReport, ResultExt};
use parking_lot::Mutex;
use sparrow_batch::Batch;
use sparrow_interfaces::types::{Partition, Partitioned};
use sparrow_physical::StepId;
use sparrow_scheduler::{
InputHandles, Partition, Partitioned, Pipeline, PipelineError, Scheduler, TaskRef,
};
use sparrow_scheduler::{InputHandles, Pipeline, PipelineError, Scheduler, TaskRef};
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::mpsc::error::TryRecvError;

Expand Down Expand Up @@ -271,15 +270,15 @@ impl Pipeline for MergePipeline {
let partition = &self.partitions[input_partition];
let _enter = tracing::trace_span!("MergePipeline::do_work", %partition.task).entered();

let mut handler = partition.state.lock();
if let Some(active_input) = handler.merger.blocking_input() {
let receiver = &mut handler.rxs[active_input];
let mut state = partition.state.lock();
if let Some(active_input) = state.merger.blocking_input() {
let receiver = &mut state.rxs[active_input];
match receiver.try_recv() {
Ok(batch) => {
// Add the batch to the active input
let ready_to_produce = handler.merger.add_batch(active_input, batch);
let ready_to_produce = state.merger.add_batch(active_input, batch);
if ready_to_produce {
let merged_batch = handler
let merged_batch = state
.merger
.merge()
.change_context(PipelineError::Execution)?;
Expand All @@ -289,6 +288,8 @@ impl Pipeline for MergePipeline {
.add_input(input_partition, merged_batch, scheduler)
.change_context(PipelineError::Execution)?;
}

// TODO: Reschedule the task if the channel is non-empty.
}
Err(TryRecvError::Empty) => {
error_stack::ensure!(
Expand All @@ -308,12 +309,12 @@ impl Pipeline for MergePipeline {
} else {
// Though nothing is blocking the gatherer (in this case, all inputs must be closed),
// the merger may have batches remaining to flush.
assert!(handler.merger.all_closed());
assert!(state.merger.all_closed());
tracing::info!("Inputs are closed. Flushing merger.");

// Check whether we need to flush the leftovers
if handler.merger.can_produce() {
let last_batch = handler
if state.merger.can_produce() {
let last_batch = state
.merger
.merge()
.change_context(PipelineError::Execution)?;
Expand All @@ -322,7 +323,7 @@ impl Pipeline for MergePipeline {
.add_input(input_partition, last_batch, scheduler)
.change_context(PipelineError::Execution)?;
}
assert!(!handler.merger.can_produce(), "expected only one batch");
assert!(!state.merger.can_produce(), "expected only one batch");
}

tracing::info!(
Expand Down
11 changes: 7 additions & 4 deletions crates/sparrow-plan-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use hashbrown::HashMap;
use sparrow_interfaces::source::Source;
use sparrow_interfaces::ExecutionOptions;
use sparrow_io::channel::ChannelDestination;
use sparrow_merge::MergePipeline;
use sparrow_physical::StepId;
use sparrow_transforms::TransformPipeline;
use std::sync::Arc;
use write_pipeline::WritePipeline;

use error_stack::ResultExt;
use itertools::Itertools;
Expand All @@ -27,15 +29,14 @@ use uuid::Uuid;

mod error;
mod source_tasks;
mod write_channel_pipeline;
mod write_pipeline;

#[cfg(test)]
mod tests;

pub use error::*;

use crate::source_tasks::SourceTasks;
use crate::write_channel_pipeline::WriteChannelPipeline;

pub struct PlanExecutor {
worker_pool: WorkerPoolBuilder,
Expand Down Expand Up @@ -85,9 +86,11 @@ impl PlanExecutor {
let last_step = plan.steps.last().expect("at least one step");
let output_schema = result_type_to_output_schema(&last_step.result_type);

// Creates a single channel destination for the output.
let destination = Arc::new(ChannelDestination::new(output_schema.clone(), vec![output]));
let sink_pipeline = executor
.worker_pool
.add_pipeline(1, WriteChannelPipeline::new(output, output_schema));
.add_pipeline(1, WritePipeline::new(destination, output_schema));

// Map from the producing step ID to the consumers.
let mut step_consumers: HashMap<StepId, InputHandles> = HashMap::new();
Expand All @@ -112,7 +115,7 @@ impl PlanExecutor {

let first_step_inputs = &plan.steps[first_step_id].inputs;

// Create a transform pipeline (if possible), othrewise create the
// Create a transform pipeline (if possible), otherwise create the
// appropriate non-transform pipeline.
let pipeline = if pipeline
.steps
Expand Down
Loading

0 comments on commit 7508a6d

Please sign in to comment.