Skip to content

Commit

Permalink
Merge branch 'main' into crfa
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Aug 7, 2022
2 parents 98546ff + ce240a5 commit 859883b
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 77 deletions.
4 changes: 2 additions & 2 deletions src/bootstrap.rs
Expand Up @@ -24,7 +24,7 @@ pub fn build(
mut reducer: reducers::Bootstrapper,
mut storage: storage::Bootstrapper,
) -> Result<Pipeline, crate::Error> {
let cursor = storage.read_cursor()?;
let cursor = storage.build_cursor();

let mut pipeline = Pipeline::new();

Expand All @@ -42,7 +42,7 @@ pub fn build(
100,
);

source.spawn_stages(&mut pipeline, &cursor);
source.spawn_stages(&mut pipeline, cursor);
enrich.spawn_stages(&mut pipeline);
reducer.spawn_stages(&mut pipeline);
storage.spawn_stages(&mut pipeline);
Expand Down
2 changes: 0 additions & 2 deletions src/crosscut/args.rs
Expand Up @@ -68,8 +68,6 @@ impl ToString for PointArg {
}
}

pub type Cursor = Option<PointArg>;

#[derive(Debug, Deserialize, Clone)]
pub struct MagicArg(pub u64);

Expand Down
8 changes: 2 additions & 6 deletions src/sources/mod.rs
@@ -1,11 +1,7 @@
use gasket::messaging::OutputPort;
use serde::Deserialize;

use crate::{
bootstrap,
crosscut::{self, PointArg},
model,
};
use crate::{bootstrap, crosscut, model, storage};

#[cfg(target_family = "unix")]
pub mod n2c;
Expand Down Expand Up @@ -48,7 +44,7 @@ impl Bootstrapper {
}
}

pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline, cursor: &Option<PointArg>) {
pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline, cursor: storage::Cursor) {
match self {
Bootstrapper::N2N(p) => p.spawn_stages(pipeline, cursor),
Bootstrapper::N2C(p) => p.spawn_stages(pipeline, cursor),
Expand Down
17 changes: 9 additions & 8 deletions src/sources/n2c/chainsync.rs
Expand Up @@ -8,7 +8,7 @@ use gasket::{
metrics::{Counter, Gauge},
};

use crate::{crosscut, model::RawBlockPayload, sources::utils};
use crate::{crosscut, model, sources::utils, storage};

use super::transport::Transport;

Expand Down Expand Up @@ -63,7 +63,8 @@ impl chainsync::Observer<chainsync::BlockContent> for ChainObserver {
.remove(&point)
.expect("required block not found in memory");

self.output.send(RawBlockPayload::roll_forward(block))?;
self.output
.send(model::RawBlockPayload::roll_forward(block))?;
self.block_count.inc(1);
}

Expand All @@ -86,23 +87,23 @@ impl chainsync::Observer<chainsync::BlockContent> for ChainObserver {
chainsync::RollbackEffect::OutOfScope => {
log::debug!("rollback out of buffer scope, sending event down the pipeline");
self.output
.send(RawBlockPayload::roll_back(point.clone()))?;
.send(model::RawBlockPayload::roll_back(point.clone()))?;
}
}

Ok(chainsync::Continuation::Proceed)
}
}

type OutputPort = gasket::messaging::OutputPort<RawBlockPayload>;
type OutputPort = gasket::messaging::OutputPort<model::RawBlockPayload>;
type MyAgent = chainsync::BlockConsumer<ChainObserver>;

pub struct Worker {
socket: String,
min_depth: usize,
chain: crosscut::ChainWellKnownInfo,
intersect: crosscut::IntersectConfig,
cursor: crosscut::Cursor,
cursor: storage::Cursor,
//finalize_config: Option<FinalizeConfig>,
agent: Option<MyAgent>,
transport: Option<Transport>,
Expand All @@ -117,7 +118,7 @@ impl Worker {
min_depth: usize,
chain: crosscut::ChainWellKnownInfo,
intersect: crosscut::IntersectConfig,
cursor: crosscut::Cursor,
cursor: storage::Cursor,
output: OutputPort,
) -> Self {
Self {
Expand Down Expand Up @@ -146,10 +147,10 @@ impl gasket::runtime::Worker for Worker {
fn bootstrap(&mut self) -> Result<(), gasket::error::Error> {
let mut transport = Transport::setup(&self.socket, self.chain.magic).or_retry()?;

let known_points = utils::define_known_points(
let known_points = utils::define_chainsync_start(
&self.chain,
&self.intersect,
&self.cursor,
&mut self.cursor,
&mut transport.channel5,
)
.or_retry()?;
Expand Down
11 changes: 6 additions & 5 deletions src/sources/n2c/mod.rs
Expand Up @@ -4,7 +4,8 @@ mod transport;
use serde::Deserialize;
use std::time::Duration;

use crate::{bootstrap::Pipeline, crosscut, model::RawBlockPayload};
use crate::{bootstrap, crosscut, model, storage};

use gasket::messaging::OutputPort;

#[derive(Deserialize)]
Expand All @@ -31,22 +32,22 @@ pub struct Bootstrapper {
config: Config,
intersect: crosscut::IntersectConfig,
chain: crosscut::ChainWellKnownInfo,
output: OutputPort<RawBlockPayload>,
output: OutputPort<model::RawBlockPayload>,
}

impl Bootstrapper {
pub fn borrow_output_port(&mut self) -> &'_ mut OutputPort<RawBlockPayload> {
pub fn borrow_output_port(&mut self) -> &'_ mut OutputPort<model::RawBlockPayload> {
&mut self.output
}

pub fn spawn_stages(self, pipeline: &mut Pipeline, cursor: &Option<crosscut::PointArg>) {
pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline, cursor: storage::Cursor) {
pipeline.register_stage(gasket::runtime::spawn_stage(
self::chainsync::Worker::new(
self.config.path.clone(),
0,
self.chain,
self.intersect,
cursor.clone(),
cursor,
self.output,
),
gasket::runtime::Policy {
Expand Down
10 changes: 5 additions & 5 deletions src/sources/n2n/chainsync.rs
Expand Up @@ -9,8 +9,8 @@ use gasket::{

use super::ChainSyncInternalPayload;
use crate::sources::n2n::transport::Transport;
use crate::Error;
use crate::{crosscut, sources::utils};
use crate::{storage, Error};

fn to_traverse<'b>(header: &'b HeaderContent) -> Result<MultiEraHeader<'b>, Error> {
MultiEraHeader::decode(
Expand Down Expand Up @@ -112,7 +112,7 @@ pub struct Worker {
min_depth: usize,
chain: crosscut::ChainWellKnownInfo,
intersect: crosscut::IntersectConfig,
cursor: Option<crosscut::PointArg>,
cursor: storage::Cursor,
//finalize_config: Option<FinalizeConfig>,
agent: Option<chainsync::HeaderConsumer<ChainObserver>>,
transport: Option<Transport>,
Expand All @@ -127,7 +127,7 @@ impl Worker {
min_depth: usize,
chain: crosscut::ChainWellKnownInfo,
intersect: crosscut::IntersectConfig,
cursor: Option<crosscut::PointArg>,
cursor: storage::Cursor,
output: OutputPort,
) -> Self {
Self {
Expand Down Expand Up @@ -156,10 +156,10 @@ impl gasket::runtime::Worker for Worker {
fn bootstrap(&mut self) -> Result<(), gasket::error::Error> {
let mut transport = Transport::setup(&self.address, self.chain.magic).or_retry()?;

let known_points = utils::define_known_points(
let known_points = utils::define_chainsync_start(
&self.chain,
&self.intersect,
&self.cursor,
&mut self.cursor,
&mut transport.channel2,
)
.or_retry()?;
Expand Down
10 changes: 5 additions & 5 deletions src/sources/n2n/mod.rs
Expand Up @@ -9,7 +9,7 @@ use gasket::messaging::{InputPort, OutputPort};
use pallas::network::miniprotocols::Point;
use serde::Deserialize;

use crate::{bootstrap::Pipeline, crosscut, model::RawBlockPayload};
use crate::{bootstrap, crosscut, model, storage};

#[derive(Debug)]
pub enum ChainSyncInternalPayload {
Expand Down Expand Up @@ -55,15 +55,15 @@ pub struct Bootstrapper {
config: Config,
intersect: crosscut::IntersectConfig,
chain: crosscut::ChainWellKnownInfo,
output: OutputPort<RawBlockPayload>,
output: OutputPort<model::RawBlockPayload>,
}

impl Bootstrapper {
pub fn borrow_output_port(&mut self) -> &'_ mut OutputPort<RawBlockPayload> {
pub fn borrow_output_port(&mut self) -> &'_ mut OutputPort<model::RawBlockPayload> {
&mut self.output
}

pub fn spawn_stages(self, pipeline: &mut Pipeline, cursor: &Option<crosscut::PointArg>) {
pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline, cursor: storage::Cursor) {
let mut headers_out = OutputPort::<ChainSyncInternalPayload>::default();
let mut headers_in = InputPort::<ChainSyncInternalPayload>::default();
gasket::messaging::connect_ports(&mut headers_out, &mut headers_in, 10);
Expand All @@ -74,7 +74,7 @@ impl Bootstrapper {
0,
self.chain.clone(),
self.intersect,
cursor.clone(),
cursor,
headers_out,
),
gasket::runtime::Policy {
Expand Down
10 changes: 5 additions & 5 deletions src/sources/utils.rs
Expand Up @@ -3,7 +3,7 @@ use pallas::network::{
multiplexer::StdChannelBuffer,
};

use crate::crosscut;
use crate::{crosscut, storage};

pub fn find_end_of_chain(
chain: &crosscut::ChainWellKnownInfo,
Expand All @@ -24,18 +24,18 @@ pub fn find_end_of_chain(
}
}

pub fn define_known_points(
pub fn define_chainsync_start(
chain: &crosscut::ChainWellKnownInfo,
intersect: &crosscut::IntersectConfig,
cursor: &crosscut::Cursor,
cursor: &mut storage::Cursor,
channel: &mut StdChannelBuffer,
) -> Result<Option<Vec<Point>>, crate::Error> {
match cursor {
match cursor.last_point()? {
Some(x) => {
log::info!("found existing cursor in storage plugin: {:?}", x);
return Ok(Some(vec![x.clone().try_into()?]));
}
None => log::debug!("no cursor found in storage plugin"),
None => log::info!("no cursor found in storage plugin"),
};

match &intersect {
Expand Down
26 changes: 22 additions & 4 deletions src/storage/mod.rs
Expand Up @@ -4,7 +4,11 @@ pub mod skip;
use gasket::messaging::InputPort;
use serde::Deserialize;

use crate::{bootstrap, crosscut, model};
use crate::{
bootstrap,
crosscut::{self, PointArg},
model,
};

#[derive(Deserialize)]
#[serde(tag = "type")]
Expand Down Expand Up @@ -39,10 +43,10 @@ impl Bootstrapper {
}
}

pub fn read_cursor(&mut self) -> Result<crosscut::Cursor, crate::Error> {
pub fn build_cursor(&mut self) -> Cursor {
match self {
Bootstrapper::Redis(x) => x.read_cursor(),
Bootstrapper::Skip(x) => x.read_cursor(),
Bootstrapper::Redis(x) => Cursor::Redis(x.build_cursor()),
Bootstrapper::Skip(x) => Cursor::Skip(x.build_cursor()),
}
}

Expand All @@ -53,3 +57,17 @@ impl Bootstrapper {
}
}
}

pub enum Cursor {
Redis(redis::Cursor),
Skip(skip::Cursor),
}

impl Cursor {
pub fn last_point(&mut self) -> Result<Option<PointArg>, crate::Error> {
match self {
Cursor::Redis(x) => x.last_point(),
Cursor::Skip(x) => x.last_point(),
}
}
}
54 changes: 32 additions & 22 deletions src/storage/redis.rs
Expand Up @@ -52,8 +52,38 @@ impl Bootstrapper {
&mut self.input
}

pub fn read_cursor(&mut self) -> Result<crosscut::Cursor, crate::Error> {
let mut connection = redis::Client::open(self.config.connection_params.clone())
pub fn build_cursor(&self) -> Cursor {
Cursor {
connection_params: self.config.connection_params.clone(),
}
}

pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline) {
let worker = Worker {
config: self.config.clone(),
connection: None,
input: self.input,
ops_count: Default::default(),
};

pipeline.register_stage(spawn_stage(
worker,
gasket::runtime::Policy {
tick_timeout: Some(Duration::from_secs(600)),
..Default::default()
},
Some("redis"),
));
}
}

pub struct Cursor {
connection_params: String,
}

impl Cursor {
pub fn last_point(&mut self) -> Result<Option<crosscut::PointArg>, crate::Error> {
let mut connection = redis::Client::open(self.connection_params.clone())
.and_then(|x| x.get_connection())
.map_err(crate::Error::storage)?;

Expand All @@ -66,26 +96,6 @@ impl Bootstrapper {

Ok(point)
}

pub fn spawn_stages(self, pipeline: &mut bootstrap::Pipeline) {
let worker = Worker {
config: self.config.clone(),
connection: None,
input: self.input,
ops_count: Default::default(),
};

pipeline.register_stage(
spawn_stage(
worker,
gasket::runtime::Policy {
tick_timeout: Some(Duration::from_secs(600)),
..Default::default()
},
Some("redis"),
),
);
}
}

pub struct Worker {
Expand Down

0 comments on commit 859883b

Please sign in to comment.