Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inx): stream-based mapper #528

Merged
merged 6 commits into from Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
228 changes: 95 additions & 133 deletions bin/inx-chronicle/src/stardust_inx/mod.rs
Expand Up @@ -5,7 +5,7 @@ mod config;
mod error;

use async_trait::async_trait;
use bee_inx::client::Inx;
use bee_inx::{client::Inx, LedgerUpdate};
use chronicle::{
db::MongoDb,
runtime::{Actor, ActorContext, HandleEvent},
Expand All @@ -16,15 +16,21 @@ use chronicle::{
};
pub use config::InxConfig;
pub use error::InxError;
use futures::{Stream, StreamExt, TryStreamExt};
use pin_project::pin_project;
use futures::{StreamExt, TryStreamExt};
use tokio::time::Instant;

pub struct InxWorker {
db: MongoDb,
config: InxConfig,
}

#[derive(Debug)]
pub struct MilestoneState {
outputs: Vec<OutputWithMetadata>,
milestone_index: MilestoneIndex,
start_time: tokio::time::Instant,
}

impl InxWorker {
/// Creates an [`Inx`] client by connecting to the endpoint specified in `inx_config`.
pub fn new(db: &MongoDb, inx_config: &InxConfig) -> Self {
Expand Down Expand Up @@ -60,7 +66,7 @@ impl InxWorker {

#[async_trait]
impl Actor for InxWorker {
type State = Inx;
type State = (Inx, Option<MilestoneState>);
type Error = InxError;

async fn init(&mut self, cx: &mut ActorContext<Self>) -> Result<Self::State, Self::Error> {
Expand Down Expand Up @@ -151,160 +157,91 @@ impl Actor for InxWorker {

session.commit_transaction().await?;

let ledger_update_stream =
LedgerUpdateStream::new(inx.listen_to_ledger_updates((start_index.0..).into()).await?);
let ledger_update_stream = inx.listen_to_ledger_updates((start_index.0..).into()).await?;

cx.add_stream(ledger_update_stream);

Ok(inx)
Ok((inx, None))
}

fn name(&self) -> std::borrow::Cow<'static, str> {
"Inx Worker".into()
}
}

#[derive(Debug)]
pub struct LedgerUpdateRecord {
milestone_index: MilestoneIndex,
outputs: Vec<OutputWithMetadata>,
}

#[pin_project]
pub struct LedgerUpdateStream<S> {
#[pin]
inner: S,
#[pin]
record: Option<LedgerUpdateRecord>,
}

impl<S> LedgerUpdateStream<S> {
fn new(inner: S) -> Self {
Self { inner, record: None }
impl InxWorker {
fn handle_milestone_begin(&mut self, begin: bee_inx::Marker) -> MilestoneState {
let start_time = Instant::now();
MilestoneState {
outputs: Vec::with_capacity(begin.created_count + begin.consumed_count),
milestone_index: begin.milestone_index.into(),
start_time,
}
}
}

impl<S: Stream<Item = Result<bee_inx::LedgerUpdate, bee_inx::Error>>> Stream for LedgerUpdateStream<S> {
type Item = Result<LedgerUpdateRecord, InxError>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use std::task::Poll;

use bee_inx::LedgerUpdate;

let this = self.project();
if let Poll::Ready(next) = this.inner.poll_next(cx) {
if let Some(res) = next {
match res {
Ok(ledger_update) => match ledger_update {
LedgerUpdate::Begin(marker) => {
// We shouldn't already have a record. If we do, that's bad.
let record = this.record.get_mut();
if let Some(record) = record.take() {
return Poll::Ready(Some(Err(InxError::InvalidLedgerUpdateCount {
received: record.outputs.len(),
expected: record.outputs.capacity(),
})));
} else {
*record = Some(LedgerUpdateRecord {
milestone_index: marker.milestone_index.into(),
outputs: Vec::with_capacity(marker.created_count + marker.consumed_count),
});
}
}
LedgerUpdate::Consumed(consumed) => {
if let Some(record) = this.record.get_mut() {
match OutputWithMetadata::try_from(consumed) {
Ok(consumed) => {
record.outputs.push(consumed);
}
Err(e) => {
return Poll::Ready(Some(Err(e.into())));
}
}
} else {
return Poll::Ready(Some(Err(InxError::InvalidMilestoneState)));
}
}
LedgerUpdate::Created(created) => {
if let Some(record) = this.record.get_mut() {
match OutputWithMetadata::try_from(created) {
Ok(created) => {
record.outputs.push(created);
}
Err(e) => {
return Poll::Ready(Some(Err(e.into())));
}
}
} else {
return Poll::Ready(Some(Err(InxError::InvalidMilestoneState)));
}
}
LedgerUpdate::End(marker) => {
if let Some(record) = this.record.get_mut().take() {
if record.outputs.len() != marker.consumed_count + marker.created_count {
return Poll::Ready(Some(Err(InxError::InvalidLedgerUpdateCount {
received: record.outputs.len(),
expected: marker.consumed_count + marker.created_count,
})));
}
return Poll::Ready(Some(Ok(record)));
} else {
return Poll::Ready(Some(Err(InxError::InvalidMilestoneState)));
}
}
},
Err(e) => {
return Poll::Ready(Some(Err(e.into())));
}
}
} else {
// If we were supposed to be in the middle of a milestone, something went wrong.
if let Some(record) = this.record.get_mut().take() {
return Poll::Ready(Some(Err(InxError::InvalidLedgerUpdateCount {
received: record.outputs.len(),
expected: record.outputs.capacity(),
})));
}
}
}
Poll::Pending
fn handle_milestone_consumed(
&mut self,
milestone_state: &mut Option<MilestoneState>,
consumed: bee_inx::LedgerSpent,
) -> Result<(), InxError> {
let milestone_state = if let Some(milestone_state) = milestone_state {
milestone_state
} else {
return Err(InxError::InvalidMilestoneState);
};
milestone_state.outputs.push(consumed.try_into()?);
Ok(())
}
}

#[async_trait]
impl HandleEvent<Result<LedgerUpdateRecord, InxError>> for InxWorker {
async fn handle_event(
fn handle_milestone_created(
&mut self,
_cx: &mut ActorContext<Self>,
ledger_update_result: Result<LedgerUpdateRecord, InxError>,
inx: &mut Self::State,
) -> Result<(), Self::Error> {
log::trace!("Received ledger update event {:#?}", ledger_update_result);
milestone_state: &mut Option<MilestoneState>,
created: bee_inx::LedgerOutput,
) -> Result<(), InxError> {
let milestone_state = if let Some(milestone_state) = milestone_state {
milestone_state
} else {
return Err(InxError::InvalidMilestoneState);
};

// TODO: Use tracing here.
let start_time = Instant::now();
milestone_state.outputs.push(created.try_into()?);
Ok(())
}

async fn handle_milestone_end(
&mut self,
inx: &mut Inx,
milestone_state: Option<MilestoneState>,
end: bee_inx::Marker,
) -> Result<Option<MilestoneState>, InxError> {
let milestone_state = if let Some(milestone_state) = milestone_state {
milestone_state
} else {
return Err(InxError::InvalidMilestoneState);
};

let ledger_update = ledger_update_result?;
if milestone_state.outputs.len() != end.consumed_count + end.created_count {
return Err(InxError::InvalidLedgerUpdateCount {
received: milestone_state.outputs.len(),
expected: end.consumed_count + end.created_count,
});
}

let mut session = self.db.start_transaction(None).await?;

self.db
.insert_ledger_updates(&mut session, ledger_update.outputs.into_iter())
.insert_ledger_updates(&mut session, milestone_state.outputs.into_iter())
.await?;

let milestone = inx.read_milestone(ledger_update.milestone_index.0.into()).await?;
let milestone = inx.read_milestone(milestone_state.milestone_index.0.into()).await?;
let parameters: ProtocolParameters = inx
.read_protocol_parameters(ledger_update.milestone_index.0.into())
.read_protocol_parameters(milestone_state.milestone_index.0.into())
.await?
.inner()?
.into();

self.db
.update_latest_protocol_parameters(&mut session, ledger_update.milestone_index, parameters)
.update_latest_protocol_parameters(&mut session, milestone_state.milestone_index, parameters)
.await?;

log::trace!("Received milestone: `{:?}`", milestone);
Expand All @@ -314,20 +251,20 @@ impl HandleEvent<Result<LedgerUpdateRecord, InxError>> for InxWorker {
let milestone_id = milestone
.milestone_info
.milestone_id
.ok_or(Self::Error::MissingMilestoneInfo(milestone_index))?
.ok_or(InxError::MissingMilestoneInfo(milestone_index))?
.into();
let payload = Into::into(
&milestone
.milestone
.ok_or(Self::Error::MissingMilestoneInfo(milestone_index))?,
.ok_or(InxError::MissingMilestoneInfo(milestone_index))?,
);

let cone_stream = inx.read_milestone_cone(milestone_index.0.into()).await?;

let blocks_with_metadata = cone_stream
.map(|res| {
let bee_inx::BlockWithMetadata { block, metadata } = res?;
Result::<_, Self::Error>::Ok((block.clone().inner()?.into(), block.data(), metadata.into()))
Result::<_, InxError>::Ok((block.clone().inner()?.into(), block.data(), metadata.into()))
})
.try_collect::<Vec<_>>()
.await?;
Expand All @@ -348,13 +285,38 @@ impl HandleEvent<Result<LedgerUpdateRecord, InxError>> for InxWorker {

session.commit_transaction().await?;

let duration = start_time.elapsed();
let duration = milestone_state.start_time.elapsed();
log::debug!(
"Milestone `{}` synced in {}.",
milestone_index,
humantime::Duration::from(duration)
);

Ok(None)
}
}

#[async_trait]
impl HandleEvent<Result<bee_inx::LedgerUpdate, bee_inx::Error>> for InxWorker {
async fn handle_event(
&mut self,
_cx: &mut ActorContext<Self>,
ledger_update_result: Result<bee_inx::LedgerUpdate, bee_inx::Error>,
(inx, milestone_state): &mut Self::State,
) -> Result<(), Self::Error> {
log::trace!("Received ledger update event {:#?}", ledger_update_result);

match ledger_update_result? {
LedgerUpdate::Begin(marker) => *milestone_state = Some(self.handle_milestone_begin(marker)),
LedgerUpdate::Consumed(consumed) => self.handle_milestone_consumed(milestone_state, consumed)?,
LedgerUpdate::Created(created) => self.handle_milestone_created(milestone_state, created)?,
LedgerUpdate::End(marker) => {
*milestone_state = self
.handle_milestone_end(inx, std::mem::take(milestone_state), marker)
.await?
}
}

Ok(())
}
}
61 changes: 61 additions & 0 deletions docker/docker-compose.private.yml
@@ -0,0 +1,61 @@
version: '3'
services:

mongo1:
image: mongo:latest
container_name: mongo1
# Warning: We don't keep logs to make development simpler
command: ["--replSet", "my-replica-set", "--bind_ip_all", "--port", "27017", "--quiet", "--logpath", "/dev/null"]
volumes:
- ./chronicle_data/mongo-1:/data/db
ports:
- 27017:27017
healthcheck:
test: test $$(echo "rs.initiate({_id:'my-replica-set',members:[{_id:0,host:\"mongo1:27017\"},{_id:1,host:\"mongo2:27019\"},{_id:2,host:\"mongo3:27020\"}]}).ok || rs.status().ok" | mongo --port 27017 --quiet) -eq 1
interval: 10s
start_period: 5s

mongo2:
image: mongo:latest
container_name: mongo2
# Warning: We don't keep logs to make development simpler
command: ["--replSet", "my-replica-set", "--bind_ip_all", "--port", "27019", "--quiet", "--logpath", "/dev/null"]
volumes:
- ./chronicle_data/mongo-2:/data/db
ports:
- 27019:27019

mongo3:
image: mongo:latest
container_name: mongo3
# Warning: We don't keep logs to make development simpler
command: ["--replSet", "my-replica-set", "--bind_ip_all", "--port", "27020", "--quiet", "--logpath", "/dev/null"]
volumes:
- ./chronicle_data/mongo-3:/data/db
ports:
- 27020:27020


inx-chronicle:
container_name: inx-chronicle
depends_on: [mongo1, mongo2, mongo3]
build:
context: ..
dockerfile: docker/Dockerfile
image: inx-chronicle:dev
ports:
- "8042:8042/tcp" # REST API
- "9100:9100/tcp" # Metrics
environment:
- RUST_LOG=warn,inx_chronicle=debug
tty: true
command:
- "--inx"
- "http://host.docker.internal:9029"
- "--db"
- "mongodb://mongo1:27017"
- "--config"
- "config.toml"
volumes:
- ../bin/inx-chronicle/config.template.toml:/app/config.toml:ro