Skip to content

Commit

Permalink
feat(metrics): add channel metrics to runtime (#169)
Browse files Browse the repository at this point in the history
* Add channel metrics to runtime

* No need to send messages to the metrics worker

* Fix metric naming and add sender traits

* Add actor loop timings metric

* Fix deps

* Fix runtime metric names

* Remove prometheus_client dependency

* Clippy

* fmt

Co-authored-by: Jochen Görtler <grtlr@users.noreply.github.com>
  • Loading branch information
Alexandcoats and grtlr committed May 25, 2022
1 parent d0be40e commit afbf3a4
Show file tree
Hide file tree
Showing 18 changed files with 254 additions and 102 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ async-trait = { version = "0.1", default-features = false }
clap = { version = "3.1", default-features = false, features = [ "derive", "std" ] }
derive_more = { version = "0.99", default-features = false, features = [ "add", "add_assign", "deref", "deref_mut" ] }
dotenv = { version = "0.15", default-features = false }
dyn-clone = { version = "1.0", default-features = false }
env_logger = { version = "0.9", default-features = false, features = ["termcolor", "atty", "humantime"] }
futures = { version = "0.3", default-features = false }
humantime-serde = { version = "1.1", default-features = false }
Expand Down Expand Up @@ -56,7 +57,7 @@ tower-http = { version = "0.3", default-features = false, features = ["cors", "c
inx = { git = "https://github.com/iotaledger/inx", version = "0.4", default-features = false, features = ["types"], optional = true }

# Metrics
bee-metrics = { git = "https://github.com/iotaledger/bee", branch = "mainnet-develop-0.4", default-features = false, optional = true }
bee-metrics = { git = "https://github.com/iotaledger/bee", branch = "mainnet-develop-0.4", default-features = false, features = ["sync"], optional = true }

# Stardust types
bee-block-stardust = { package = "bee-block", git = "https://github.com/iotaledger/bee.git", branch = "shimmer-develop", default-features = false, features = ["std", "serde", "dto"], optional = true }
Expand Down
14 changes: 5 additions & 9 deletions bin/inx-chronicle/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,14 @@ impl Actor for ApiWorker {
#[cfg(feature = "metrics")]
let routes = {
use self::metrics::MetricsLayer;
use crate::metrics::{MetricsWorker, RegisterMetric};

let layer = MetricsLayer::default();

let metrics_worker = cx.addr::<MetricsWorker>().await;
metrics_worker
.send(RegisterMetric {
name: "incoming_requests".to_string(),
help: "incoming_requests".to_string(),
metric: layer.metrics.incoming_requests.clone(),
})
.unwrap();
cx.metrics_registry().register(
"incoming_requests",
"Incoming API Requests",
layer.metrics.incoming_requests.clone(),
);

routes.layer(layer)
};
Expand Down
9 changes: 2 additions & 7 deletions bin/inx-chronicle/src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,8 @@ impl Actor for Collector {
#[cfg(feature = "metrics")]
let solid_counter = {
let solid_counter = bee_metrics::metrics::counter::Counter::default();
cx.addr::<crate::metrics::MetricsWorker>()
.await
.send(crate::metrics::RegisterMetric {
name: "solid_count".to_string(),
help: "Count of solidified milestones".to_string(),
metric: solid_counter.clone(),
})?;
cx.metrics_registry()
.register("solid_count", "Count of solidified milestones", solid_counter.clone());
solid_counter
};
let mut solidifiers = Vec::with_capacity(self.config.solidifier_count);
Expand Down
2 changes: 1 addition & 1 deletion bin/inx-chronicle/src/collector/stardust_inx/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use std::{fmt::Debug, marker::PhantomData};

use async_trait::async_trait;
use chronicle::runtime::{Actor, ActorContext, ActorError, ConfigureActor, HandleEvent, Report};
use chronicle::runtime::{Actor, ActorContext, ActorError, ConfigureActor, HandleEvent, Report, Sender};
use inx::{
client::InxClient,
proto::NoParams,
Expand Down
2 changes: 1 addition & 1 deletion bin/inx-chronicle/src/collector/stardust_inx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use chronicle::{
sync::SyncRecord,
tangle::MilestoneIndex,
},
runtime::{ActorContext, ActorError, Addr, HandleEvent, Report},
runtime::{ActorContext, ActorError, Addr, HandleEvent, Report, Sender},
};

pub(super) use self::{config::InxConfig, worker::InxWorker};
Expand Down
2 changes: 1 addition & 1 deletion bin/inx-chronicle/src/collector/stardust_inx/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::fmt::Debug;
use async_trait::async_trait;
use chronicle::{
db::model::stardust::block::BlockId,
runtime::{Actor, ActorContext, ActorError, Addr, HandleEvent, Report, SpawnActor},
runtime::{Actor, ActorContext, ActorError, Addr, HandleEvent, Report, Sender, SpawnActor},
};
use inx::{client::InxClient, proto::NoParams, tonic::Channel};

Expand Down
38 changes: 6 additions & 32 deletions bin/inx-chronicle/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
use std::{
convert::Infallible,
net::{IpAddr, SocketAddr},
sync::Arc,
};

use async_trait::async_trait;
use bee_metrics::{encoding::SendSyncEncodeMetric, metrics::process::ProcessMetrics, serve_metrics, Registry};
use chronicle::runtime::{Actor, ActorContext, HandleEvent};
use bee_metrics::{metrics::process::ProcessMetrics, serve_metrics};
use chronicle::runtime::{Actor, ActorContext};
use serde::{Deserialize, Serialize};
use tokio::{
sync::oneshot,
Expand All @@ -28,7 +27,6 @@ impl MetricsWorker {
}

pub struct MetricsState {
registry: Arc<Registry>,
server_handle: (JoinHandle<()>, Option<oneshot::Sender<()>>),
process_metrics_handle: JoinHandle<()>,
}
Expand All @@ -51,25 +49,23 @@ impl Default for MetricsConfig {
#[async_trait]
impl Actor for MetricsWorker {
type State = MetricsState;

type Error = Infallible;

async fn init(&mut self, cx: &mut ActorContext<Self>) -> Result<Self::State, Self::Error> {
let registry = Arc::new(Registry::default());

let addr = SocketAddr::new(self.config.address, self.config.port);

let metrics = ProcessMetrics::new(std::process::id());
let (mem_metric, cpu_metric) = metrics.metrics();

registry.register("memory_usage", "Memory usage", mem_metric);
registry.register("cpu_usage", "CPU usage", cpu_metric);
cx.metrics_registry()
.register("memory_usage", "Memory usage", mem_metric);
cx.metrics_registry().register("cpu_usage", "CPU usage", cpu_metric);

let (send, recv) = oneshot::channel();
let metrics_handle = cx.handle().clone();

let server_fut = {
let registry = registry.clone();
let registry = cx.metrics_registry().clone();
tokio::spawn(async move {
let fut = serve_metrics(addr, registry);

Expand Down Expand Up @@ -111,7 +107,6 @@ impl Actor for MetricsWorker {
};

Ok(MetricsState {
registry,
server_handle: (server_fut, Some(send)),
process_metrics_handle,
})
Expand All @@ -121,29 +116,8 @@ impl Actor for MetricsWorker {
log::debug!("{} shutting down ({})", self.name(), cx.id());

state.process_metrics_handle.abort();

state.server_handle.1.take().map(|send| send.send(()));

Ok(())
}
}

pub struct RegisterMetric<M: 'static + SendSyncEncodeMetric> {
pub name: String,
pub help: String,
pub metric: M,
}

#[async_trait]
impl<M: 'static + SendSyncEncodeMetric> HandleEvent<RegisterMetric<M>> for MetricsWorker {
async fn handle_event(
&mut self,
_cx: &mut ActorContext<Self>,
event: RegisterMetric<M>,
state: &mut Self::State,
) -> Result<(), Self::Error> {
state.registry.register(event.name, event.help, event.metric);

Ok(())
}
}
2 changes: 1 addition & 1 deletion src/db/model/ledger/conflict_reason.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use bee_block_stardust::semantic as bee;
use serde::{Deserialize, Serialize};

#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[repr(u8)]
pub enum ConflictReason {
None = 0,
Expand Down
2 changes: 1 addition & 1 deletion src/db/model/ledger/inclusion_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use thiserror::Error;
pub struct UnexpectedLedgerInclusionState(u8);

/// A block's ledger inclusion state.
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[repr(u8)]
pub enum LedgerInclusionState {
/// A conflicting block, ex. a double spend
Expand Down
55 changes: 15 additions & 40 deletions src/runtime/actor/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@
use std::ops::Deref;

use thiserror::Error;
use tokio::sync::mpsc::UnboundedSender;

use super::{
event::{DynEvent, Envelope},
Actor,
};
use crate::runtime::{error::RuntimeError, registry::ScopeId, scope::ScopeView};
use super::{event::Envelope, sender::CloneSender, Actor};
use crate::runtime::{registry::ScopeId, scope::ScopeView};

/// Error sending a block to an actor
#[derive(Error, Debug)]
Expand All @@ -31,15 +27,17 @@ impl<S: Into<String>> From<S> for SendError {
}

/// An actor handle, used to send events.
#[derive(Debug)]
pub struct Addr<A: Actor> {
pub(crate) scope: ScopeView,
pub(crate) sender: UnboundedSender<Envelope<A>>,
pub(crate) sender: Box<dyn CloneSender<Envelope<A>>>,
}

impl<A: Actor> Addr<A> {
pub(crate) fn new(scope: ScopeView, sender: UnboundedSender<Envelope<A>>) -> Self {
Self { scope, sender }
pub(crate) fn new(scope: ScopeView, sender: impl CloneSender<Envelope<A>> + 'static) -> Self {
Self {
scope,
sender: Box::new(sender) as _,
}
}

/// Shuts down the actor. Use with care!
Expand All @@ -56,21 +54,6 @@ impl<A: Actor> Addr<A> {
pub fn scope_id(&self) -> ScopeId {
self.scope.id()
}

/// Sends a block to the actor
pub fn send<E: 'static + DynEvent<A>>(&self, event: E) -> Result<(), RuntimeError>
where
Self: Sized,
{
self.sender
.send(Box::new(event))
.map_err(|_| RuntimeError::SendError("Failed to send event".into()))
}

/// Returns whether the actor's event channel is closed.
pub fn is_closed(&self) -> bool {
self.sender.is_closed()
}
}

impl<A: Actor> Clone for Addr<A> {
Expand All @@ -82,24 +65,16 @@ impl<A: Actor> Clone for Addr<A> {
}
}

/// An optional address, which allows sending events.
#[derive(Debug, Clone)]
pub struct OptionalAddr<A: Actor>(Option<Addr<A>>);

impl<A: Actor> OptionalAddr<A> {
/// Sends an event if the address exists. Returns an error if the address is not set.
pub fn send<E>(&self, event: E) -> Result<(), RuntimeError>
where
A: 'static + Actor,
E: 'static + DynEvent<A>,
{
self.0
.as_ref()
.ok_or_else(|| SendError::new(format!("No open address for {}", std::any::type_name::<A>())))?
.send(event)
impl<A: Actor> std::fmt::Debug for Addr<A> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Addr").field("scope", &self.scope).finish()
}
}

/// An optional address, which allows sending events.
#[derive(Debug, Clone)]
pub struct OptionalAddr<A: Actor>(pub(crate) Option<Addr<A>>);

impl<A: Actor> From<Option<Addr<A>>> for OptionalAddr<A> {
fn from(opt_addr: Option<Addr<A>>) -> Self {
Self(opt_addr)
Expand Down
2 changes: 1 addition & 1 deletion src/runtime/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use super::{
util::DelayedEvent,
Actor,
};
use crate::runtime::{config::SpawnConfig, error::RuntimeError, scope::RuntimeScope, shutdown::ShutdownStream};
use crate::runtime::{config::SpawnConfig, error::RuntimeError, scope::RuntimeScope, shutdown::ShutdownStream, Sender};

type Receiver<A> = ShutdownStream<EnvelopeStream<A>>;

Expand Down
22 changes: 22 additions & 0 deletions src/runtime/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub(crate) mod error;
pub(crate) mod event;
/// Module containing actor exit report types.
pub(crate) mod report;
/// Module containing event sender traits.
pub(crate) mod sender;
/// Module containing utilities.
pub(crate) mod util;

Expand Down Expand Up @@ -39,10 +41,30 @@ pub trait Actor: Send + Sync + Sized {

/// Run the actor event loop
async fn run(&mut self, cx: &mut ActorContext<Self>, state: &mut Self::State) -> Result<(), Self::Error> {
#[cfg(feature = "metrics")]
let histogram = {
let histogram = bee_metrics::metrics::histogram::Histogram::new(
bee_metrics::metrics::histogram::exponential_buckets(1.0, 2.0, 10),
);
cx.metrics_registry().register(
format!("{}_loop_time", util::sanitize_metric_name(self.name().as_ref())),
format!("{} loop timings", self.name()),
histogram.clone(),
);
histogram
};
while let Some(evt) = cx.inbox().next().await {
#[cfg(feature = "metrics")]
let start_time = std::time::Instant::now();
// Handle the event
evt.handle(cx, self, state).await?;
#[cfg(feature = "metrics")]
{
let elapsed = start_time.elapsed();
histogram.observe(elapsed.as_secs() as f64 + elapsed.subsec_nanos() as f64 / 1_000_000_000.0);
}
}

log::debug!("{} exited event loop ({})", self.name(), cx.id());
Ok(())
}
Expand Down
Loading

0 comments on commit afbf3a4

Please sign in to comment.