Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add destination/writer trait #838

Merged
merged 4 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[workspace]
members = ["crates/*"]
resolver = "2"
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved

[workspace.package]
authors = ["Kaskada Developers"]
Expand Down
5 changes: 4 additions & 1 deletion crates/sparrow-interfaces/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ derive_more.workspace = true
error-stack.workspace = true
futures.workspace = true
hashbrown.workspace = true
index_vec.workspace = true
inventory.workspace = true
itertools.workspace = true
serde.workspace = true
sparrow-arrow = { path = "../sparrow-arrow" }
sparrow-batch = { path = "../sparrow-batch" }
static_init.workspace = true

[dev-dependencies]
[package.metadata.cargo-machete]
ignored = ["serde"]

[lib]
bench = false
Expand Down
47 changes: 47 additions & 0 deletions crates/sparrow-interfaces/src/destination.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use sparrow_batch::Batch;
use std::fmt::Debug;

use crate::types::Partition;

/// Interface defining how output is written to a particular destination.
pub trait Destination: Send + Sync + Debug {
/// Creates a new writer to this destination.
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
///
/// Arguments:
/// * partition: The partition this writer will write to.
fn new_writer(&self, partition: Partition) -> error_stack::Result<Box<dyn Writer>, WriteError>;
}

/// Writer for a specific destination.
pub trait Writer: Send + Sync + Debug {
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
/// Write a batch to the given writer.
///
/// NOTE: Some destinations (such as Parquet) may actually rotate files
/// during / after calls to `write_batch`.
///
/// Arguments:
/// * batch - The batch to write.
fn write_batch(&mut self, batch: Batch) -> error_stack::Result<(), WriteError>;

/// Close this writer.
fn close(&self) -> error_stack::Result<(), WriteError>;
}

#[non_exhaustive]
#[derive(derive_more::Display, Debug)]
pub enum WriteError {
#[display(fmt = "internal error on write: {}", _0)]
jordanrfrazier marked this conversation as resolved.
Show resolved Hide resolved
Internal(&'static str),
}

impl error_stack::Context for WriteError {}

impl WriteError {
pub fn internal() -> Self {
WriteError::Internal("no additional context")
}

pub fn internal_msg(msg: &'static str) -> Self {
WriteError::Internal(msg)
}
}
2 changes: 2 additions & 0 deletions crates/sparrow-interfaces/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
clippy::print_stderr
)]

pub mod destination;
mod execution_options;
pub mod expression;
pub mod source;
pub mod types;

pub use execution_options::*;
2 changes: 2 additions & 0 deletions crates/sparrow-interfaces/src/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
mod partition;
pub use partition::*;
1 change: 1 addition & 0 deletions crates/sparrow-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async-stream.workspace = true
arrow-schema.workspace = true
error-stack.workspace = true
futures.workspace = true
tokio.workspace = true
sparrow-merge = { path = "../sparrow-merge" }
sparrow-batch = { path = "../sparrow-batch" }
sparrow-interfaces = { path = "../sparrow-interfaces" }
Expand Down
5 changes: 2 additions & 3 deletions crates/sparrow-io/src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
mod source;
mod destination;

pub use source::InMemoryBatches;
pub use source::InMemorySource;
pub use destination::*;
68 changes: 68 additions & 0 deletions crates/sparrow-io/src/channel/destination.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use error_stack::{IntoReport, ResultExt};
use sparrow_batch::Batch;
use sparrow_interfaces::{
destination::{Destination, WriteError, Writer},
types::Partition,
};

#[derive(Debug)]
pub struct ChannelDestination {
/// The expected output schema.
schema: SchemaRef,
// TODO: The output current expects [RecordBatch] instead of [Batch]
// but we should standardize on [Batch].
txs: Vec<tokio::sync::mpsc::Sender<RecordBatch>>,
}

impl ChannelDestination {
/// Construct a new channel destination with the given senders.
///
/// The number of `txs` should equal the number of partitions the output
/// produces from.
pub fn new(schema: SchemaRef, txs: Vec<tokio::sync::mpsc::Sender<RecordBatch>>) -> Self {
Self { schema, txs }
}
}

impl Destination for ChannelDestination {
fn new_writer(&self, partition: Partition) -> error_stack::Result<Box<dyn Writer>, WriteError> {
let partition: usize = partition.into();
let tx = self
.txs
.get(partition)
.ok_or_else(|| WriteError::internal_msg("expected channel for partition {partition}"))?
.clone(); // Senders can be cloned cheaply
Ok(Box::new(ChannelWriter {
schema: self.schema.clone(),
tx,
}))
}
}

#[derive(Debug)]
struct ChannelWriter {
schema: SchemaRef,
tx: tokio::sync::mpsc::Sender<RecordBatch>,
}

impl Writer for ChannelWriter {
fn write_batch(&mut self, batch: Batch) -> error_stack::Result<(), WriteError> {
// HACK: This converts `Batch` to `RecordBatch` because the current execution logic
// expects `RecordBatch` outputs. This should be changed to standardize on `Batch`
// which makes it easier to carry a primitive value out.
if let Some(batch) = batch.into_record_batch(self.schema.clone()) {
self.tx
.blocking_send(batch)
.into_report()
.change_context(WriteError::internal())?;
}
Ok(())
}

fn close(&self) -> error_stack::Result<(), WriteError> {
self.tx.downgrade();
Ok(())
}
}
1 change: 1 addition & 0 deletions crates/sparrow-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
clippy::print_stderr
)]

pub mod channel;
pub mod in_memory;
1 change: 1 addition & 0 deletions crates/sparrow-merge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ sparrow-arrow = { path = "../sparrow-arrow" }
sparrow-batch = { path = "../sparrow-batch" }
sparrow-core = { path = "../sparrow-core" }
sparrow-instructions = { path = "../sparrow-instructions" }
sparrow-interfaces = { path = "../sparrow-interfaces" }
sparrow-physical = { path = "../sparrow-physical" }
sparrow-scheduler = { path = "../sparrow-scheduler" }
tokio.workspace = true
Expand Down
25 changes: 13 additions & 12 deletions crates/sparrow-merge/src/merge_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use arrow_schema::DataType;
use error_stack::{IntoReport, ResultExt};
use parking_lot::Mutex;
use sparrow_batch::Batch;
use sparrow_interfaces::types::{Partition, Partitioned};
use sparrow_physical::StepId;
use sparrow_scheduler::{
InputHandles, Partition, Partitioned, Pipeline, PipelineError, Scheduler, TaskRef,
};
use sparrow_scheduler::{InputHandles, Pipeline, PipelineError, Scheduler, TaskRef};
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::mpsc::error::TryRecvError;

Expand Down Expand Up @@ -271,15 +270,15 @@ impl Pipeline for MergePipeline {
let partition = &self.partitions[input_partition];
let _enter = tracing::trace_span!("MergePipeline::do_work", %partition.task).entered();

let mut handler = partition.state.lock();
if let Some(active_input) = handler.merger.blocking_input() {
let receiver = &mut handler.rxs[active_input];
let mut state = partition.state.lock();
if let Some(active_input) = state.merger.blocking_input() {
let receiver = &mut state.rxs[active_input];
match receiver.try_recv() {
Ok(batch) => {
// Add the batch to the active input
let ready_to_produce = handler.merger.add_batch(active_input, batch);
let ready_to_produce = state.merger.add_batch(active_input, batch);
if ready_to_produce {
let merged_batch = handler
let merged_batch = state
.merger
.merge()
.change_context(PipelineError::Execution)?;
Expand All @@ -289,6 +288,8 @@ impl Pipeline for MergePipeline {
.add_input(input_partition, merged_batch, scheduler)
.change_context(PipelineError::Execution)?;
}

// TODO: Reschedule the task if the channel is non-empty.
}
Err(TryRecvError::Empty) => {
error_stack::ensure!(
Expand All @@ -308,12 +309,12 @@ impl Pipeline for MergePipeline {
} else {
// Though nothing is blocking the gatherer (in this case, all inputs must be closed),
// the merger may have batches remaining to flush.
assert!(handler.merger.all_closed());
assert!(state.merger.all_closed());
tracing::info!("Inputs are closed. Flushing merger.");

// Check whether we need to flush the leftovers
if handler.merger.can_produce() {
let last_batch = handler
if state.merger.can_produce() {
let last_batch = state
.merger
.merge()
.change_context(PipelineError::Execution)?;
Expand All @@ -322,7 +323,7 @@ impl Pipeline for MergePipeline {
.add_input(input_partition, last_batch, scheduler)
.change_context(PipelineError::Execution)?;
}
assert!(!handler.merger.can_produce(), "expected only one batch");
assert!(!state.merger.can_produce(), "expected only one batch");
}

tracing::info!(
Expand Down
11 changes: 7 additions & 4 deletions crates/sparrow-plan-execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use hashbrown::HashMap;
use sparrow_interfaces::source::Source;
use sparrow_interfaces::ExecutionOptions;
use sparrow_io::channel::ChannelDestination;
use sparrow_merge::MergePipeline;
use sparrow_physical::StepId;
use sparrow_transforms::TransformPipeline;
use std::sync::Arc;
use write_pipeline::WritePipeline;

use error_stack::ResultExt;
use itertools::Itertools;
Expand All @@ -27,15 +29,14 @@ use uuid::Uuid;

mod error;
mod source_tasks;
mod write_channel_pipeline;
mod write_pipeline;

#[cfg(test)]
mod tests;

pub use error::*;

use crate::source_tasks::SourceTasks;
use crate::write_channel_pipeline::WriteChannelPipeline;

pub struct PlanExecutor {
worker_pool: WorkerPoolBuilder,
Expand Down Expand Up @@ -85,9 +86,11 @@ impl PlanExecutor {
let last_step = plan.steps.last().expect("at least one step");
let output_schema = result_type_to_output_schema(&last_step.result_type);

// Creates a single channel destination for the output.
let destination = Arc::new(ChannelDestination::new(output_schema.clone(), vec![output]));
let sink_pipeline = executor
.worker_pool
.add_pipeline(1, WriteChannelPipeline::new(output, output_schema));
.add_pipeline(1, WritePipeline::new(destination, output_schema));

// Map from the producing step ID to the consumers.
let mut step_consumers: HashMap<StepId, InputHandles> = HashMap::new();
Expand All @@ -112,7 +115,7 @@ impl PlanExecutor {

let first_step_inputs = &plan.steps[first_step_id].inputs;

// Create a transform pipeline (if possible), othrewise create the
// Create a transform pipeline (if possible), otherwise create the
// appropriate non-transform pipeline.
let pipeline = if pipeline
.steps
Expand Down
Loading
Loading