Skip to content

Commit

Permalink
feat(analytics): selective analytics for INX connections (#1035)
Browse files Browse the repository at this point in the history
* Start impl

* Format and fixes

* Remove unwraps

* Fix feature attr

* change analytics cli flag

Co-authored-by: Jochen Görtler <grtlr@users.noreply.github.com>

* Small refactor

Co-authored-by: Jochen Görtler <grtlr@users.noreply.github.com>
  • Loading branch information
Alex6323 and grtlr committed Jan 17, 2023
1 parent b02ad42 commit b76c425
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 49 deletions.
61 changes: 14 additions & 47 deletions src/bin/inx-chronicle/cli.rs
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use chronicle::db::mongodb::config as mongodb;
use clap::{Args, Parser, Subcommand, ValueEnum};
use clap::{Args, Parser, Subcommand};

use crate::config::ChronicleConfig;

Expand Down Expand Up @@ -82,6 +82,10 @@ pub struct InfluxDbArgs {
#[cfg(feature = "analytics")]
#[arg(long, default_value_t = !influxdb::DEFAULT_ANALYTICS_ENABLED)]
pub disable_analytics: bool,
/// Select a subset of analytics to compute. If unset, all analytics will be computed.
#[cfg(feature = "analytics")]
#[arg(long, value_name = "ANALYTICS")]
analytics: Vec<chronicle::db::influxdb::AnalyticsChoice>,
/// Disable InfluxDb time-series metrics writes.
#[cfg(feature = "metrics")]
#[arg(long, default_value_t = !influxdb::DEFAULT_METRICS_ENABLED)]
Expand All @@ -99,6 +103,8 @@ impl From<&InfluxDbArgs> for chronicle::db::influxdb::InfluxDbConfig {
analytics_enabled: !value.disable_analytics,
#[cfg(feature = "analytics")]
analytics_database_name: value.analytics_database_name.clone(),
#[cfg(feature = "analytics")]
analytics: value.analytics.clone(),
#[cfg(feature = "metrics")]
metrics_enabled: !value.disable_metrics,
#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -280,15 +286,16 @@ impl ClArgs {
let influx_db = influx_db.clone();
let analytics_choice = analytics.clone();
join_set.spawn(async move {
let mut selected_analytics = if analytics_choice.is_empty() {
let mut analytics = if analytics_choice.is_empty() {
chronicle::db::collections::analytics::all_analytics()
} else {
let mut tmp: std::collections::HashSet<AnalyticsChoice> =
analytics_choice.iter().copied().collect();
let mut tmp: std::collections::HashSet<
chronicle::db::influxdb::config::AnalyticsChoice,
> = analytics_choice.iter().copied().collect();
tmp.drain().map(Into::into).collect()
};

tracing::info!("Computing the following analytics: {:?}", selected_analytics);
tracing::info!("Computing the following analytics: {:?}", analytics);

for index in (*start_milestone..*end_milestone).skip(i).step_by(num_tasks) {
let milestone_index = index.into();
Expand All @@ -303,7 +310,7 @@ impl ClArgs {
super::stardust_inx::gather_analytics(
&db,
&influx_db,
&mut selected_analytics,
&mut analytics,
milestone_index,
milestone_timestamp,
)
Expand Down Expand Up @@ -361,46 +368,6 @@ impl ClArgs {
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, ValueEnum)]
pub enum AnalyticsChoice {
// Please keep the alphabetic order.
Addresses,
BaseToken,
BlockActivity,
DailyActiveAddresses,
LedgerOutputs,
LedgerSize,
OutputActivity,
ProtocolParameters,
UnclaimedTokens,
UnlockConditions,
}

#[cfg(feature = "analytics")]
impl From<AnalyticsChoice> for Box<dyn chronicle::db::collections::analytics::Analytic> {
fn from(value: AnalyticsChoice) -> Self {
use chronicle::db::collections::analytics::{
AddressAnalytics, BaseTokenActivityAnalytics, BlockActivityAnalytics, DailyActiveAddressesAnalytics,
LedgerOutputAnalytics, LedgerSizeAnalytics, OutputActivityAnalytics, ProtocolParametersAnalytics,
UnclaimedTokenAnalytics, UnlockConditionAnalytics,
};

match value {
// Please keep the alphabetic order.
AnalyticsChoice::Addresses => Box::new(AddressAnalytics),
AnalyticsChoice::BaseToken => Box::new(BaseTokenActivityAnalytics),
AnalyticsChoice::BlockActivity => Box::new(BlockActivityAnalytics),
AnalyticsChoice::DailyActiveAddresses => Box::<DailyActiveAddressesAnalytics>::default(),
AnalyticsChoice::LedgerOutputs => Box::new(LedgerOutputAnalytics),
AnalyticsChoice::LedgerSize => Box::new(LedgerSizeAnalytics),
AnalyticsChoice::OutputActivity => Box::new(OutputActivityAnalytics),
AnalyticsChoice::ProtocolParameters => Box::new(ProtocolParametersAnalytics),
AnalyticsChoice::UnclaimedTokens => Box::new(UnclaimedTokenAnalytics),
AnalyticsChoice::UnlockConditions => Box::new(UnlockConditionAnalytics),
}
}
}

#[derive(Debug, Subcommand)]
pub enum Subcommands {
/// Generate a JWT token using the available config.
Expand All @@ -419,7 +386,7 @@ pub enum Subcommands {
num_tasks: Option<usize>,
/// Select a subset of analytics to compute.
#[arg(long)]
analytics: Vec<AnalyticsChoice>,
analytics: Vec<chronicle::db::influxdb::AnalyticsChoice>,
},
/// Clear the chronicle database.
#[cfg(debug_assertions)]
Expand Down
14 changes: 13 additions & 1 deletion src/bin/inx-chronicle/stardust_inx/mod.rs
Expand Up @@ -113,7 +113,19 @@ impl InxWorker {
debug!("Started listening to ledger updates via INX.");

#[cfg(feature = "analytics")]
let mut analytics = chronicle::db::collections::analytics::all_analytics();
let mut analytics = match self.influx_db.as_ref() {
None => Vec::new(),
Some(influx_db) if influx_db.config().analytics.is_empty() => {
chronicle::db::collections::analytics::all_analytics()
}
Some(influx_db) => {
tracing::info!("Computing the following analytics: {:?}", influx_db.config().analytics);

let mut tmp: std::collections::HashSet<chronicle::db::influxdb::config::AnalyticsChoice> =
influx_db.config().analytics.iter().copied().collect();
tmp.drain().map(Into::into).collect()
}
};

while let Some(ledger_update) = stream.try_next().await? {
self.handle_ledger_update(
Expand Down
46 changes: 46 additions & 0 deletions src/db/influxdb/config.rs
Expand Up @@ -38,6 +38,9 @@ pub struct InfluxDbConfig {
/// The name of the database to insert analytics.
#[cfg(feature = "analytics")]
pub analytics_database_name: String,
/// The selected analytics to compute.
#[cfg(feature = "analytics")]
pub analytics: Vec<AnalyticsChoice>,
/// Whether to enable influx metrics writes.
#[cfg(feature = "metrics")]
pub metrics_enabled: bool,
Expand All @@ -56,10 +59,53 @@ impl Default for InfluxDbConfig {
analytics_enabled: DEFAULT_ANALYTICS_ENABLED,
#[cfg(feature = "analytics")]
analytics_database_name: DEFAULT_ANALYTICS_DATABASE_NAME.to_string(),
#[cfg(feature = "analytics")]
analytics: Vec::new(),
#[cfg(feature = "metrics")]
metrics_enabled: DEFAULT_METRICS_ENABLED,
#[cfg(feature = "metrics")]
metrics_database_name: DEFAULT_METRICS_DATABASE_NAME.to_string(),
}
}
}

#[allow(missing_docs)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, clap::ValueEnum)]
pub enum AnalyticsChoice {
// Please keep the alphabetic order.
Addresses,
BaseToken,
BlockActivity,
DailyActiveAddresses,
LedgerOutputs,
LedgerSize,
OutputActivity,
ProtocolParameters,
UnclaimedTokens,
UnlockConditions,
}

#[cfg(feature = "analytics")]
impl From<AnalyticsChoice> for Box<dyn crate::db::collections::analytics::Analytic> {
fn from(value: AnalyticsChoice) -> Self {
use crate::db::collections::analytics::{
AddressAnalytics, BaseTokenActivityAnalytics, BlockActivityAnalytics, DailyActiveAddressesAnalytics,
LedgerOutputAnalytics, LedgerSizeAnalytics, OutputActivityAnalytics, ProtocolParametersAnalytics,
UnclaimedTokenAnalytics, UnlockConditionAnalytics,
};

match value {
// Please keep the alphabetic order.
AnalyticsChoice::Addresses => Box::new(AddressAnalytics),
AnalyticsChoice::BaseToken => Box::new(BaseTokenActivityAnalytics),
AnalyticsChoice::BlockActivity => Box::new(BlockActivityAnalytics),
AnalyticsChoice::DailyActiveAddresses => Box::<DailyActiveAddressesAnalytics>::default(),
AnalyticsChoice::LedgerOutputs => Box::new(LedgerOutputAnalytics),
AnalyticsChoice::LedgerSize => Box::new(LedgerSizeAnalytics),
AnalyticsChoice::OutputActivity => Box::new(OutputActivityAnalytics),
AnalyticsChoice::ProtocolParameters => Box::new(ProtocolParametersAnalytics),
AnalyticsChoice::UnclaimedTokens => Box::new(UnclaimedTokenAnalytics),
AnalyticsChoice::UnlockConditions => Box::new(UnlockConditionAnalytics),
}
}
}
5 changes: 4 additions & 1 deletion src/db/influxdb/mod.rs
Expand Up @@ -9,7 +9,10 @@ use std::ops::Deref;
use influxdb::{Client, ReadQuery};
use serde::de::DeserializeOwned;

pub use self::{config::InfluxDbConfig, measurement::InfluxDbMeasurement};
pub use self::{
config::{AnalyticsChoice, InfluxDbConfig},
measurement::InfluxDbMeasurement,
};

/// A wrapper for an InfluxDb [`Client`].
#[derive(Clone, Debug)]
Expand Down

0 comments on commit b76c425

Please sign in to comment.