Skip to content

Commit

Permalink
feat: Exit pipeline on stage error
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Jun 26, 2022
1 parent ac52eb9 commit 47b7589
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 35 deletions.
40 changes: 5 additions & 35 deletions src/bin/scrolls/daemon.rs
@@ -1,9 +1,9 @@
use std::time::Duration;

use clap::ArgMatches;
use scrolls::{bootstrap, crosscut, enrich, reducers, sources, storage};
use serde::Deserialize;

use crate::monitor;

#[derive(Deserialize)]
#[serde(tag = "type")]
pub enum ChainConfig {
Expand Down Expand Up @@ -83,47 +83,17 @@ pub fn run(args: &ArgMatches) -> Result<(), scrolls::Error> {

let source = config.source.bootstrapper(&chain, &config.intersect);

let enrich = config.enrich.unwrap_or_default().bootstrapper();
let enrich = config.enrich.unwrap_or_default().bootstrapper(&policy);

let reducer = reducers::Bootstrapper::new(config.reducers, &chain, &policy);

let storage = config.storage.plugin(&chain, &config.intersect);

let pipeline = bootstrap::build(source, enrich, reducer, storage)?;

loop {
for (name, tether) in pipeline.tethers.iter() {
match tether.check_state() {
gasket::runtime::TetherState::Dropped => log::warn!("{} stage dropped", name),
gasket::runtime::TetherState::Blocked(x) => {
log::warn!("{} stage blocked, state: {:?}", name, x);
}
gasket::runtime::TetherState::Alive(x) => {
log::info!("{} stage alive, state: {:?}", name, x);
}
}

match tether.read_metrics() {
Ok(readings) => {
for (key, value) in readings {
log::info!("stage {}, metric {}: {:?}", name, key, value);
}
}
Err(err) => {
println!("couldn't read metrics");
dbg!(err);
}
}
}

std::thread::sleep(Duration::from_secs(5));
}
monitor::monitor_while_alive(pipeline);

//for (name, tether) in tethers {
// log::warn!("{}", name);
// tether.dismiss_stage().expect("stage stops");
// tether.join_stage();
//}
Ok(())
}

/// Creates the clap definition for this sub-command
Expand Down
1 change: 1 addition & 0 deletions src/bin/scrolls/main.rs
Expand Up @@ -2,6 +2,7 @@ use clap::Command;
use std::process;

mod daemon;
mod monitor;

fn main() {
let args = Command::new("app")
Expand Down
54 changes: 54 additions & 0 deletions src/bin/scrolls/monitor.rs
@@ -0,0 +1,54 @@
use std::time::Duration;

use scrolls::bootstrap::Pipeline;

fn should_stop(pipeline: &Pipeline) -> bool {
pipeline
.tethers
.iter()
.any(|(_, tether)| match tether.check_state() {
gasket::runtime::TetherState::Alive(x) => match x {
gasket::runtime::StageState::StandBy => true,
_ => false,
},
_ => true,
})
}

pub fn print_metrics(pipeline: &Pipeline) {
for (name, tether) in pipeline.tethers.iter() {
match tether.read_metrics() {
Ok(readings) => {
for (key, value) in readings {
log::info!("stage {}, metric {}: {:?}", name, key, value);
}
}
Err(err) => {
println!("couldn't read metrics");
dbg!(err);
}
}
}
}

pub fn monitor_while_alive(pipeline: Pipeline) {
while !should_stop(&pipeline) {
print_metrics(&pipeline);
std::thread::sleep(Duration::from_secs(5));
}

for (name, tether) in pipeline.tethers {
let state = tether.check_state();
log::warn!("dismissing stage: {} with state {:?}", name, state);
tether.dismiss_stage().expect("stage stops");

// Can't join the stage because there's a risk of deadlock, usually
// because a stage gets stuck sending into a port which depends on a
// different stage not yet dismissed. The solution is to either create a
// DAG of dependencies and dismiss in the correct order, or implement a
// 2-phase teardown where ports are disconnected and flushed
// before joining the stage.

//tether.join_stage();
}
}

0 comments on commit 47b7589

Please sign in to comment.