Skip to content

Commit

Permalink
fix(collector): merge the collector and inx (#141)
Browse files Browse the repository at this point in the history
* Fix features

* whut to heck ????

* Clippy proper features

* Feature gate the collector as it should not be included when running only the API. Clean up some other clippy warnings.

* Move inx underneath the collector and clean up feature gates.

* Cleanup

* Fix config

* >:I

* Add a `collector` feature and rename `api-analytics`

Co-authored-by: Jochen Görtler <jochen.goertler@iota.org>
  • Loading branch information
Alexandcoats and grtlr authored May 13, 2022
1 parent ea77593 commit 1406a9f
Show file tree
Hide file tree
Showing 15 changed files with 389 additions and 418 deletions.
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,34 +67,34 @@ console-subscriber = { version = "0.1", default-features = false, optional = tru

[features]
default = [
"api-analytics",
"analytics",
"api-explorer",
"api-node",
"inx",
"stardust",
]
api = [
analytics = []
api = [
"dep:axum",
"derive_more/from",
"dep:hyper",
"dep:serde_urlencoded",
"dep:tower",
"dep:tower-http",
]
api-analytics = [
"api",
]
api-explorer = [
"api",
]
api-node = [
"api",
]
collector = []
console = [
"dep:console-subscriber",
"tokio/tracing",
]
inx = [
"collector",
"dep:inx",
]
metrics = [
Expand Down
8 changes: 4 additions & 4 deletions bin/inx-chronicle/config.template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ connect_url = "mongodb://localhost:27017"
username = "root"
password = "root"

[inx]
connect_url = "http://localhost:9029"
connection_retry_interval = "5s"

[api]
port = 9092
allow_origins = ["0.0.0.0"]

[collector.inx]
connect_url = "http://localhost:9029"
connection_retry_interval = "5s"

[collector]
solidifier_count = 10

Expand Down
4 changes: 2 additions & 2 deletions bin/inx-chronicle/src/api/stardust/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use axum::Router;

#[cfg(feature = "api-analytics")]
#[cfg(feature = "analytics")]
pub mod analytics;
#[cfg(feature = "api-explorer")]
pub mod explorer;
Expand All @@ -14,7 +14,7 @@ pub fn routes() -> Router {
#[allow(unused_mut)]
let mut router = Router::new();

#[cfg(feature = "api-analytics")]
#[cfg(feature = "analytics")]
{
router = router.nest("/analytics", analytics::routes());
}
Expand Down
11 changes: 9 additions & 2 deletions bin/inx-chronicle/src/collector/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,27 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct CollectorConfig {
pub solidifier_count: usize,
#[cfg(all(feature = "stardust", feature = "inx"))]
pub inx: super::stardust_inx::InxConfig,
}

impl CollectorConfig {
const MAX_SOLIDIFIERS: usize = 100;

pub fn new(solidifier_count: usize) -> Self {
pub fn new(solidifier_count: usize, #[cfg(all(feature = "stardust", feature = "inx"))] inx: String) -> Self {
Self {
solidifier_count: solidifier_count.clamp(1, Self::MAX_SOLIDIFIERS),
inx: super::stardust_inx::InxConfig::new(inx),
}
}
}

impl Default for CollectorConfig {
fn default() -> Self {
Self::new(10)
Self::new(
10,
#[cfg(all(feature = "stardust", feature = "inx"))]
Default::default(),
)
}
}
190 changes: 9 additions & 181 deletions bin/inx-chronicle/src/collector/mod.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
// Copyright 2022 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::collections::{HashMap, VecDeque};
mod config;
pub mod solidifier;
#[cfg(all(feature = "stardust", feature = "inx"))]
pub(crate) mod stardust_inx;

use std::collections::HashMap;

use async_trait::async_trait;
use chronicle::{
db::{bson::DocError, MongoDb},
runtime::{Actor, ActorContext, ActorError, Addr, ConfigureActor, HandleEvent, Report, RuntimeError},
types::{ledger::Metadata, stardust::message::MessageId},
};
pub use config::CollectorConfig;
use mongodb::bson::document::ValueAccessError;
use solidifier::Solidifier;
use thiserror::Error;

mod config;
pub mod solidifier;

#[derive(Debug, Error)]
pub enum CollectorError {
#[error(transparent)]
Expand Down Expand Up @@ -55,6 +56,9 @@ impl Actor for Collector {
.await,
);
}
#[cfg(all(feature = "stardust", feature = "inx"))]
cx.spawn_child(stardust_inx::InxWorker::new(self.config.inx.clone()))
.await;
Ok(solidifiers)
}
}
Expand Down Expand Up @@ -90,179 +94,3 @@ impl HandleEvent<Report<Solidifier>> for Collector {
Ok(())
}
}

#[cfg(all(feature = "stardust", feature = "inx"))]
pub mod stardust_inx {
use std::collections::HashSet;

use chronicle::db::model::stardust::{message::MessageRecord, milestone::MilestoneRecord};

use super::*;

#[derive(Debug)]
pub struct MilestoneState {
pub milestone_index: u32,
pub process_queue: VecDeque<MessageId>,
pub visited: HashSet<MessageId>,
}

impl MilestoneState {
pub fn new(milestone_index: u32) -> Self {
Self {
milestone_index,
process_queue: VecDeque::new(),
visited: HashSet::new(),
}
}
}

#[derive(Debug)]
pub struct RequestedMessage {
raw: Option<inx::proto::RawMessage>,
metadata: inx::proto::MessageMetadata,
solidifier: Addr<Solidifier>,
ms_state: MilestoneState,
}

impl RequestedMessage {
pub fn new(
raw: Option<inx::proto::RawMessage>,
metadata: inx::proto::MessageMetadata,
solidifier: Addr<Solidifier>,
ms_state: MilestoneState,
) -> Self {
Self {
raw,
metadata,
solidifier,
ms_state,
}
}
}

#[async_trait]
impl HandleEvent<inx::proto::Message> for Collector {
async fn handle_event(
&mut self,
_cx: &mut ActorContext<Self>,
message: inx::proto::Message,
_solidifiers: &mut Self::State,
) -> Result<(), Self::Error> {
log::trace!("Received Stardust Message Event");
match MessageRecord::try_from(message) {
Ok(rec) => {
self.db.upsert_message_record(&rec).await?;
}
Err(e) => {
log::error!("Could not read message: {:?}", e);
}
};
Ok(())
}
}

#[async_trait]
impl HandleEvent<inx::proto::MessageMetadata> for Collector {
async fn handle_event(
&mut self,
_cx: &mut ActorContext<Self>,
metadata: inx::proto::MessageMetadata,
_solidifiers: &mut Self::State,
) -> Result<(), Self::Error> {
log::trace!("Received Stardust Message Referenced Event");
match inx::MessageMetadata::try_from(metadata) {
Ok(rec) => {
let message_id = rec.message_id;
self.db
.update_message_metadata(&message_id.into(), &Metadata::from(rec))
.await?;
}
Err(e) => {
log::error!("Could not read message metadata: {:?}", e);
}
};
Ok(())
}
}

#[async_trait]
impl HandleEvent<inx::proto::Milestone> for Collector {
async fn handle_event(
&mut self,
_cx: &mut ActorContext<Self>,
milestone: inx::proto::Milestone,
solidifiers: &mut Self::State,
) -> Result<(), Self::Error> {
log::trace!("Received Stardust Milestone Event");
match MilestoneRecord::try_from(milestone) {
Ok(rec) => {
self.db.upsert_milestone_record(&rec).await?;
// Get or create the milestone state
let mut state = MilestoneState::new(rec.milestone_index);
state
.process_queue
.extend(Vec::from(rec.payload.essence.parents).into_iter());
solidifiers
// Divide solidifiers fairly by milestone
.get(&(rec.milestone_index as usize % self.config.solidifier_count))
// Unwrap: We never remove solidifiers, so they should always exist
.unwrap()
.send(state)?;
}
Err(e) => {
log::error!("Could not read milestone: {:?}", e);
}
}
Ok(())
}
}

#[async_trait]
impl HandleEvent<RequestedMessage> for Collector {
async fn handle_event(
&mut self,
_cx: &mut ActorContext<Self>,
RequestedMessage {
raw,
metadata,
solidifier,
ms_state,
}: RequestedMessage,
_solidifiers: &mut Self::State,
) -> Result<(), Self::Error> {
match raw {
Some(raw) => {
log::trace!("Received Stardust Requested Message and Metadata");
match MessageRecord::try_from((raw, metadata)) {
Ok(rec) => {
self.db.upsert_message_record(&rec).await?;
// Send this directly to the solidifier that requested it
solidifier.send(ms_state)?;
}
Err(e) => {
log::error!("Could not read message: {:?}", e);
}
};
}
None => {
log::trace!("Received Stardust Requested Metadata");
match inx::MessageMetadata::try_from(metadata) {
Ok(rec) => {
let message_id = rec.message_id;
self.db
.update_message_metadata(&message_id.into(), &Metadata::from(rec))
.await?;
// Send this directly to the solidifier that requested it
solidifier.send(ms_state)?;
}
Err(e) => {
log::error!("Could not read message metadata: {:?}", e);
}
};
}
}

Ok(())
}
}
}
Loading

0 comments on commit 1406a9f

Please sign in to comment.