Skip to content

Commit

Permalink
feat: Provide a way to skip enrich step
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Jun 25, 2022
1 parent 39e0d5a commit 7889bdb
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/enrich/mod.rs
@@ -1,3 +1,4 @@
pub mod skip;
pub mod sled;

use gasket::messaging::{InputPort, OutputPort};
Expand All @@ -8,36 +9,42 @@ use crate::{bootstrap, model};
#[derive(Deserialize)]
#[serde(tag = "type")]
pub enum Config {
Skip,
Sled(sled::Config),
}

impl Config {
pub fn bootstrapper(self) -> Bootstrapper {
match self {
Config::Skip => Bootstrapper::Skip(skip::Bootstrapper::default()),
Config::Sled(c) => Bootstrapper::Sled(c.boostrapper()),
}
}
}

pub enum Bootstrapper {
Skip(skip::Bootstrapper),
Sled(sled::Bootstrapper),
}

impl Bootstrapper {
pub fn borrow_input_port(&mut self) -> &'_ mut InputPort<model::RawBlockPayload> {
match self {
Bootstrapper::Skip(x) => x.borrow_input_port(),
Bootstrapper::Sled(x) => x.borrow_input_port(),
}
}

pub fn borrow_output_port(&mut self) -> &'_ mut OutputPort<model::EnrichedBlockPayload> {
match self {
Bootstrapper::Skip(x) => x.borrow_output_port(),
Bootstrapper::Sled(x) => x.borrow_output_port(),
}
}

pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline) {
match self {
Bootstrapper::Skip(x) => x.spawn_stages(pipeline),
Bootstrapper::Sled(x) => x.spawn_stages(pipeline),
}
}
Expand Down
72 changes: 72 additions & 0 deletions src/enrich/skip.rs
@@ -0,0 +1,72 @@
use gasket::runtime::{spawn_stage, WorkOutcome};

use crate::{
bootstrap,
model::{self, BlockContext},
};

type InputPort = gasket::messaging::InputPort<model::RawBlockPayload>;
type OutputPort = gasket::messaging::OutputPort<model::EnrichedBlockPayload>;

pub struct Bootstrapper {
input: InputPort,
output: OutputPort,
}

impl Default for Bootstrapper {
fn default() -> Self {
Self {
input: Default::default(),
output: Default::default(),
}
}
}

impl Bootstrapper {
pub fn borrow_input_port(&mut self) -> &'_ mut InputPort {
&mut self.input
}

pub fn borrow_output_port(&mut self) -> &'_ mut OutputPort {
&mut self.output
}

pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline) {
let worker = Worker {
input: self.input,
output: self.output,
};

pipeline.register_stage("enrich-skip", spawn_stage(worker, Default::default()));
}
}

pub struct Worker {
input: InputPort,
output: OutputPort,
}

impl gasket::runtime::Worker for Worker {
fn metrics(&self) -> gasket::metrics::Registry {
gasket::metrics::Builder::new().build()
}

fn work(&mut self) -> gasket::runtime::WorkResult {
let msg = self.input.recv()?;

match msg.payload {
model::RawBlockPayload::RollForward(cbor) => {
self.output.send(model::EnrichedBlockPayload::roll_forward(
cbor,
BlockContext::default(),
))?;
}
model::RawBlockPayload::RollBack(x) => {
self.output
.send(model::EnrichedBlockPayload::roll_back(x))?;
}
};

Ok(WorkOutcome::Partial)
}
}

0 comments on commit 7889bdb

Please sign in to comment.