From ce240a5fc622afce514cdc6b7144954e82c8ed97 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Sun, 7 Aug 2022 10:00:30 -0300 Subject: [PATCH] fix: Use latest cursor when attempting a reconnect (#61) --- src/bootstrap.rs | 4 +-- src/crosscut/args.rs | 2 -- src/sources/mod.rs | 8 ++---- src/sources/n2c/chainsync.rs | 17 ++++++------ src/sources/n2c/mod.rs | 11 ++++---- src/sources/n2n/chainsync.rs | 10 +++---- src/sources/n2n/mod.rs | 10 +++---- src/sources/utils.rs | 10 +++---- src/storage/mod.rs | 26 ++++++++++++++--- src/storage/redis.rs | 54 +++++++++++++++++++++--------------- src/storage/skip.rs | 46 +++++++++++++++++++++--------- 11 files changed, 121 insertions(+), 77 deletions(-) diff --git a/src/bootstrap.rs b/src/bootstrap.rs index dd268b93..132ac632 100644 --- a/src/bootstrap.rs +++ b/src/bootstrap.rs @@ -24,7 +24,7 @@ pub fn build( mut reducer: reducers::Bootstrapper, mut storage: storage::Bootstrapper, ) -> Result { - let cursor = storage.read_cursor()?; + let cursor = storage.build_cursor(); let mut pipeline = Pipeline::new(); @@ -42,7 +42,7 @@ pub fn build( 100, ); - source.spawn_stages(&mut pipeline, &cursor); + source.spawn_stages(&mut pipeline, cursor); enrich.spawn_stages(&mut pipeline); reducer.spawn_stages(&mut pipeline); storage.spawn_stages(&mut pipeline); diff --git a/src/crosscut/args.rs b/src/crosscut/args.rs index f5da3a44..223a73b8 100644 --- a/src/crosscut/args.rs +++ b/src/crosscut/args.rs @@ -68,8 +68,6 @@ impl ToString for PointArg { } } -pub type Cursor = Option; - #[derive(Debug, Deserialize, Clone)] pub struct MagicArg(pub u64); diff --git a/src/sources/mod.rs b/src/sources/mod.rs index f619c83f..84408c67 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -1,11 +1,7 @@ use gasket::messaging::OutputPort; use serde::Deserialize; -use crate::{ - bootstrap, - crosscut::{self, PointArg}, - model, -}; +use crate::{bootstrap, crosscut, model, storage}; #[cfg(target_family = "unix")] pub mod n2c; @@ -48,7 +44,7 @@ impl Bootstrapper { } } - pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline, cursor: &Option) { + pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline, cursor: storage::Cursor) { match self { Bootstrapper::N2N(p) => p.spawn_stages(pipeline, cursor), Bootstrapper::N2C(p) => p.spawn_stages(pipeline, cursor), diff --git a/src/sources/n2c/chainsync.rs b/src/sources/n2c/chainsync.rs index 3a470cc5..ed0c9bdf 100644 --- a/src/sources/n2c/chainsync.rs +++ b/src/sources/n2c/chainsync.rs @@ -8,7 +8,7 @@ use gasket::{ metrics::{Counter, Gauge}, }; -use crate::{crosscut, model::RawBlockPayload, sources::utils}; +use crate::{crosscut, model, sources::utils, storage}; use super::transport::Transport; @@ -63,7 +63,8 @@ impl chainsync::Observer for ChainObserver { .remove(&point) .expect("required block not found in memory"); - self.output.send(RawBlockPayload::roll_forward(block))?; + self.output + .send(model::RawBlockPayload::roll_forward(block))?; self.block_count.inc(1); } @@ -86,7 +87,7 @@ impl chainsync::Observer for ChainObserver { chainsync::RollbackEffect::OutOfScope => { log::debug!("rollback out of buffer scope, sending event down the pipeline"); self.output - .send(RawBlockPayload::roll_back(point.clone()))?; + .send(model::RawBlockPayload::roll_back(point.clone()))?; } } @@ -94,7 +95,7 @@ impl chainsync::Observer for ChainObserver { } } -type OutputPort = gasket::messaging::OutputPort; +type OutputPort = gasket::messaging::OutputPort; type MyAgent = chainsync::BlockConsumer; pub struct Worker { @@ -102,7 +103,7 @@ pub struct Worker { min_depth: usize, chain: crosscut::ChainWellKnownInfo, intersect: crosscut::IntersectConfig, - cursor: crosscut::Cursor, + cursor: storage::Cursor, //finalize_config: Option, agent: Option, transport: Option, @@ -117,7 +118,7 @@ impl Worker { min_depth: usize, chain: crosscut::ChainWellKnownInfo, intersect: crosscut::IntersectConfig, - cursor: crosscut::Cursor, + cursor: storage::Cursor, output: OutputPort, ) -> Self { Self { @@ -146,10 +147,10 @@ impl gasket::runtime::Worker for Worker { fn bootstrap(&mut self) -> Result<(), gasket::error::Error> { let mut transport = Transport::setup(&self.socket, self.chain.magic).or_retry()?; - let known_points = utils::define_known_points( + let known_points = utils::define_chainsync_start( &self.chain, &self.intersect, - &self.cursor, + &mut self.cursor, &mut transport.channel5, ) .or_retry()?; diff --git a/src/sources/n2c/mod.rs b/src/sources/n2c/mod.rs index edfc1f82..6045db1d 100644 --- a/src/sources/n2c/mod.rs +++ b/src/sources/n2c/mod.rs @@ -4,7 +4,8 @@ mod transport; use serde::Deserialize; use std::time::Duration; -use crate::{bootstrap::Pipeline, crosscut, model::RawBlockPayload}; +use crate::{bootstrap, crosscut, model, storage}; + use gasket::messaging::OutputPort; #[derive(Deserialize)] @@ -31,22 +32,22 @@ pub struct Bootstrapper { config: Config, intersect: crosscut::IntersectConfig, chain: crosscut::ChainWellKnownInfo, - output: OutputPort, + output: OutputPort, } impl Bootstrapper { - pub fn borrow_output_port(&mut self) -> &'_ mut OutputPort { + pub fn borrow_output_port(&mut self) -> &'_ mut OutputPort { &mut self.output } - pub fn spawn_stages(self, pipeline: &mut Pipeline, cursor: &Option) { + pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline, cursor: storage::Cursor) { pipeline.register_stage(gasket::runtime::spawn_stage( self::chainsync::Worker::new( self.config.path.clone(), 0, self.chain, self.intersect, - cursor.clone(), + cursor, self.output, ), gasket::runtime::Policy { diff --git a/src/sources/n2n/chainsync.rs b/src/sources/n2n/chainsync.rs index 762b7990..686970e6 100644 --- a/src/sources/n2n/chainsync.rs +++ b/src/sources/n2n/chainsync.rs @@ -9,8 +9,8 @@ use gasket::{ use super::ChainSyncInternalPayload; use crate::sources::n2n::transport::Transport; -use crate::Error; use crate::{crosscut, sources::utils}; +use crate::{storage, Error}; fn to_traverse<'b>(header: &'b HeaderContent) -> Result, Error> { MultiEraHeader::decode( @@ -112,7 +112,7 @@ pub struct Worker { min_depth: usize, chain: crosscut::ChainWellKnownInfo, intersect: crosscut::IntersectConfig, - cursor: Option, + cursor: storage::Cursor, //finalize_config: Option, agent: Option>, transport: Option, @@ -127,7 +127,7 @@ impl Worker { min_depth: usize, chain: crosscut::ChainWellKnownInfo, intersect: crosscut::IntersectConfig, - cursor: Option, + cursor: storage::Cursor, output: OutputPort, ) -> Self { Self { @@ -156,10 +156,10 @@ impl gasket::runtime::Worker for Worker { fn bootstrap(&mut self) -> Result<(), gasket::error::Error> { let mut transport = Transport::setup(&self.address, self.chain.magic).or_retry()?; - let known_points = utils::define_known_points( + let known_points = utils::define_chainsync_start( &self.chain, &self.intersect, - &self.cursor, + &mut self.cursor, &mut transport.channel2, ) .or_retry()?; diff --git a/src/sources/n2n/mod.rs b/src/sources/n2n/mod.rs index 67a04d71..5fca5033 100644 --- a/src/sources/n2n/mod.rs +++ b/src/sources/n2n/mod.rs @@ -9,7 +9,7 @@ use gasket::messaging::{InputPort, OutputPort}; use pallas::network::miniprotocols::Point; use serde::Deserialize; -use crate::{bootstrap::Pipeline, crosscut, model::RawBlockPayload}; +use crate::{bootstrap, crosscut, model, storage}; #[derive(Debug)] pub enum ChainSyncInternalPayload { @@ -55,15 +55,15 @@ pub struct Bootstrapper { config: Config, intersect: crosscut::IntersectConfig, chain: crosscut::ChainWellKnownInfo, - output: OutputPort, + output: OutputPort, } impl Bootstrapper { - pub fn borrow_output_port(&mut self) -> &'_ mut OutputPort { + pub fn borrow_output_port(&mut self) -> &'_ mut OutputPort { &mut self.output } - pub fn spawn_stages(self, pipeline: &mut Pipeline, cursor: &Option) { + pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline, cursor: storage::Cursor) { let mut headers_out = OutputPort::::default(); let mut headers_in = InputPort::::default(); gasket::messaging::connect_ports(&mut headers_out, &mut headers_in, 10); @@ -74,7 +74,7 @@ impl Bootstrapper { 0, self.chain.clone(), self.intersect, - cursor.clone(), + cursor, headers_out, ), gasket::runtime::Policy { diff --git a/src/sources/utils.rs b/src/sources/utils.rs index 66205cd6..4a6a14ea 100644 --- a/src/sources/utils.rs +++ b/src/sources/utils.rs @@ -3,7 +3,7 @@ use pallas::network::{ multiplexer::StdChannelBuffer, }; -use crate::crosscut; +use crate::{crosscut, storage}; pub fn find_end_of_chain( chain: &crosscut::ChainWellKnownInfo, @@ -24,18 +24,18 @@ pub fn find_end_of_chain( } } -pub fn define_known_points( +pub fn define_chainsync_start( chain: &crosscut::ChainWellKnownInfo, intersect: &crosscut::IntersectConfig, - cursor: &crosscut::Cursor, + cursor: &mut storage::Cursor, channel: &mut StdChannelBuffer, ) -> Result>, crate::Error> { - match cursor { + match cursor.last_point()? { Some(x) => { log::info!("found existing cursor in storage plugin: {:?}", x); return Ok(Some(vec![x.clone().try_into()?])); } - None => log::debug!("no cursor found in storage plugin"), + None => log::info!("no cursor found in storage plugin"), }; match &intersect { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 0b362220..8b9cc5ed 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -4,7 +4,11 @@ pub mod skip; use gasket::messaging::InputPort; use serde::Deserialize; -use crate::{bootstrap, crosscut, model}; +use crate::{ + bootstrap, + crosscut::{self, PointArg}, + model, +}; #[derive(Deserialize)] #[serde(tag = "type")] @@ -39,10 +43,10 @@ impl Bootstrapper { } } - pub fn read_cursor(&mut self) -> Result { + pub fn build_cursor(&mut self) -> Cursor { match self { - Bootstrapper::Redis(x) => x.read_cursor(), - Bootstrapper::Skip(x) => x.read_cursor(), + Bootstrapper::Redis(x) => Cursor::Redis(x.build_cursor()), + Bootstrapper::Skip(x) => Cursor::Skip(x.build_cursor()), } } @@ -53,3 +57,17 @@ impl Bootstrapper { } } } + +pub enum Cursor { + Redis(redis::Cursor), + Skip(skip::Cursor), +} + +impl Cursor { + pub fn last_point(&mut self) -> Result, crate::Error> { + match self { + Cursor::Redis(x) => x.last_point(), + Cursor::Skip(x) => x.last_point(), + } + } +} diff --git a/src/storage/redis.rs b/src/storage/redis.rs index 00324725..d3627edc 100644 --- a/src/storage/redis.rs +++ b/src/storage/redis.rs @@ -52,8 +52,38 @@ impl Bootstrapper { &mut self.input } - pub fn read_cursor(&mut self) -> Result { - let mut connection = redis::Client::open(self.config.connection_params.clone()) + pub fn build_cursor(&self) -> Cursor { + Cursor { + connection_params: self.config.connection_params.clone(), + } + } + + pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline) { + let worker = Worker { + config: self.config.clone(), + connection: None, + input: self.input, + ops_count: Default::default(), + }; + + pipeline.register_stage(spawn_stage( + worker, + gasket::runtime::Policy { + tick_timeout: Some(Duration::from_secs(600)), + ..Default::default() + }, + Some("redis"), + )); + } +} + +pub struct Cursor { + connection_params: String, +} + +impl Cursor { + pub fn last_point(&mut self) -> Result, crate::Error> { + let mut connection = redis::Client::open(self.connection_params.clone()) .and_then(|x| x.get_connection()) .map_err(crate::Error::storage)?; @@ -66,26 +96,6 @@ impl Bootstrapper { Ok(point) } - - pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline) { - let worker = Worker { - config: self.config.clone(), - connection: None, - input: self.input, - ops_count: Default::default(), - }; - - pipeline.register_stage( - spawn_stage( - worker, - gasket::runtime::Policy { - tick_timeout: Some(Duration::from_secs(600)), - ..Default::default() - }, - Some("redis"), - ), - ); - } } pub struct Worker { diff --git a/src/storage/skip.rs b/src/storage/skip.rs index 96be8144..bcd65785 100644 --- a/src/storage/skip.rs +++ b/src/storage/skip.rs @@ -1,4 +1,7 @@ -use std::time::Duration; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; use gasket::runtime::{spawn_stage, WorkOutcome}; @@ -15,12 +18,14 @@ impl Config { pub fn boostrapper(self) -> Bootstrapper { Bootstrapper { input: Default::default(), + last_point: Arc::new(Mutex::new(None)), } } } pub struct Bootstrapper { input: InputPort, + last_point: Arc>>, } impl Bootstrapper { @@ -28,32 +33,45 @@ impl Bootstrapper { &mut self.input } - pub fn read_cursor(&mut self) -> Result { - Ok(None) + pub fn build_cursor(&mut self) -> Cursor { + Cursor { + last_point: self.last_point.clone(), + } } pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline) { let worker = Worker { input: self.input, ops_count: Default::default(), + last_point: self.last_point.clone(), }; - pipeline.register_stage( - spawn_stage( - worker, - gasket::runtime::Policy { - tick_timeout: Some(Duration::from_secs(5)), - ..Default::default() - }, - Some("skip") - ), - ); + pipeline.register_stage(spawn_stage( + worker, + gasket::runtime::Policy { + tick_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }, + Some("skip"), + )); + } +} + +pub struct Cursor { + last_point: Arc>>, +} + +impl Cursor { + pub fn last_point(&self) -> Result, crate::Error> { + let value = self.last_point.lock().unwrap(); + Ok(value.clone()) } } pub struct Worker { ops_count: gasket::metrics::Counter, input: InputPort, + last_point: Arc>>, } impl gasket::runtime::Worker for Worker { @@ -96,6 +114,8 @@ impl gasket::runtime::Worker for Worker { } model::CRDTCommand::BlockFinished(point) => { log::debug!("block finished {:?}", point); + let mut last_point = self.last_point.lock().unwrap(); + *last_point = Some(crosscut::PointArg::from(point)); } };