From 0cd627cfaf9f43e28b0fb28e589d94c6223b6367 Mon Sep 17 00:00:00 2001 From: Alexandcoats Date: Wed, 16 Nov 2022 12:54:07 -0500 Subject: [PATCH] feat(cli): split `influxdb` feature properly (#870) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Split influxdb feature properly * Update src/db/mod.rs Co-authored-by: Jochen Görtler * Fix features * Add config to influxdb and check flags properly * Remove timer `Option` * Properly use `analytics-enabled` CLI flag Co-authored-by: Jochen Görtler Co-authored-by: Jochen Görtler --- Cargo.toml | 4 +- config.template.toml | 3 +- src/bin/inx-chronicle/cli.rs | 39 +++++++++++----- src/bin/inx-chronicle/config.rs | 2 +- src/bin/inx-chronicle/error.rs | 2 +- src/bin/inx-chronicle/main.rs | 6 +-- src/bin/inx-chronicle/stardust_inx/error.rs | 2 +- src/bin/inx-chronicle/stardust_inx/mod.rs | 50 ++++++++++++--------- src/db/collections/analytics/mod.rs | 1 - src/db/collections/metrics/mod.rs | 10 ++--- src/db/influxdb/mod.rs | 26 ++++++++--- src/db/mod.rs | 2 +- src/types/stardust/milestone.rs | 2 +- src/types/tangle/milestone.rs | 2 +- 14 files changed, 92 insertions(+), 59 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9556063c8..2eb8f60de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,7 +99,7 @@ default = [ "stardust", ] analytics = [ - "influxdb", + "dep:influxdb", ] api = [ "dep:auth-helper", @@ -130,7 +130,7 @@ loki = [ "dep:tracing-loki", ] metrics = [ - "influxdb", + "dep:influxdb", "dep:chrono", ] opentelemetry = [ diff --git a/config.template.toml b/config.template.toml index d73a6cbfa..26c1e0794 100644 --- a/config.template.toml +++ b/config.template.toml @@ -15,7 +15,8 @@ min_pool_size = 2 [influxdb] ### Whether influx time-series data will be written. -enabled = true +metrics_enabled = true +analytics_enabled = true ### URL pointing to the InfluxDB instance. url = "http://localhost:8086" diff --git a/src/bin/inx-chronicle/cli.rs b/src/bin/inx-chronicle/cli.rs index 3ece00541..fe1f2f73e 100644 --- a/src/bin/inx-chronicle/cli.rs +++ b/src/bin/inx-chronicle/cli.rs @@ -20,7 +20,7 @@ pub struct ClArgs { #[command(flatten)] pub api: ApiArgs, /// InfluxDb arguments. - #[cfg(feature = "influxdb")] + #[cfg(any(feature = "analytics", feature = "metrics"))] #[command(flatten)] pub influxdb: InfluxDbArgs, /// INX arguments. @@ -81,18 +81,21 @@ pub struct MongoDbArgs { pub mongodb_conn_str: Option, } -#[cfg(feature = "influxdb")] +#[cfg(any(feature = "analytics", feature = "metrics"))] #[derive(Args, Debug)] pub struct InfluxDbArgs { - /// Toggle InfluxDb time-series writes. - #[arg(long, env = "INFLUXDB_ENABLED")] - pub influxdb_enabled: Option, + /// Toggle InfluxDb time-series metrics writes. + #[arg(long, env = "METRICS_ENABLED")] + pub metrics_enabled: Option, + /// Toggle InfluxDb time-series analytics writes. + #[arg(long, env = "ANALYTICS_ENABLED")] + pub analytics_enabled: Option, /// The url pointing to an InfluxDb instance. #[arg(long, env = "INFLUXDB_URL")] pub influxdb_url: Option, } -#[cfg(feature = "influxdb")] +#[cfg(feature = "loki")] #[derive(Args, Debug)] pub struct LokiArgs { /// Toggle Grafana Loki log writes. @@ -130,11 +133,22 @@ impl ClArgs { } } - #[cfg(feature = "influxdb")] + #[cfg(feature = "analytics")] { - if let Some(enabled) = self.influxdb.influxdb_enabled { - config.influxdb.enabled = enabled; + if let Some(enabled) = self.influxdb.analytics_enabled { + config.influxdb.analytics_enabled = enabled; } + } + + #[cfg(feature = "metrics")] + { + if let Some(enabled) = self.influxdb.metrics_enabled { + config.influxdb.metrics_enabled = enabled; + } + } + + #[cfg(any(feature = "analytics", feature = "metrics"))] + { if let Some(url) = &self.influxdb.influxdb_url { config.influxdb.url = url.clone(); } @@ -203,7 +217,7 @@ impl ClArgs { ); return Ok(PostCommand::Exit); } - #[cfg(feature = "analytics")] + #[cfg(all(feature = "analytics", feature = "stardust"))] Subcommands::FillAnalytics { start_milestone, end_milestone, @@ -269,6 +283,7 @@ impl ClArgs { return Ok(PostCommand::Exit); } } + #[cfg(feature = "stardust")] Subcommands::BuildIndexes => { tracing::info!("Connecting to database using hosts: `{}`.", config.mongodb.hosts_str()?); let db = chronicle::db::MongoDb::connect(&config.mongodb).await?; @@ -288,8 +303,7 @@ pub enum Subcommands { /// Generate a JWT token using the available config. #[cfg(feature = "api")] GenerateJWT, - /// Manually fill the analytics database. - #[cfg(feature = "analytics")] + #[cfg(all(feature = "analytics", feature = "stardust"))] FillAnalytics { /// The inclusive starting milestone index. #[arg(short, long)] @@ -309,6 +323,7 @@ pub enum Subcommands { run: bool, }, /// Manually build indexes. + #[cfg(feature = "stardust")] BuildIndexes, } diff --git a/src/bin/inx-chronicle/config.rs b/src/bin/inx-chronicle/config.rs index 56f8a1159..32d2c7b9a 100644 --- a/src/bin/inx-chronicle/config.rs +++ b/src/bin/inx-chronicle/config.rs @@ -23,7 +23,7 @@ pub enum ConfigError { #[serde(default)] pub struct ChronicleConfig { pub mongodb: MongoDbConfig, - #[cfg(feature = "influxdb")] + #[cfg(any(feature = "analytics", feature = "metrics"))] pub influxdb: chronicle::db::influxdb::InfluxDbConfig, #[cfg(feature = "api")] pub api: crate::api::ApiConfig, diff --git a/src/bin/inx-chronicle/error.rs b/src/bin/inx-chronicle/error.rs index c4fed862c..2f52e871e 100644 --- a/src/bin/inx-chronicle/error.rs +++ b/src/bin/inx-chronicle/error.rs @@ -11,7 +11,7 @@ pub enum Error { Config(#[from] ConfigError), #[error(transparent)] MongoDb(#[from] mongodb::error::Error), - #[cfg(feature = "influxdb")] + #[cfg(any(feature = "analytics", feature = "metrics"))] #[error(transparent)] InfluxDb(#[from] influxdb::Error), #[cfg(feature = "api")] diff --git a/src/bin/inx-chronicle/main.rs b/src/bin/inx-chronicle/main.rs index 9f760cfd1..26eeaa107 100644 --- a/src/bin/inx-chronicle/main.rs +++ b/src/bin/inx-chronicle/main.rs @@ -60,8 +60,8 @@ async fn main() -> Result<(), Error> { #[cfg(all(feature = "inx", feature = "stardust"))] if config.inx.enabled { - #[cfg(feature = "influxdb")] - let influx_db = if config.influxdb.enabled { + #[cfg(any(feature = "analytics", feature = "metrics"))] + let influx_db = if config.influxdb.analytics_enabled || config.influxdb.metrics_enabled { info!("Connecting to influx database at address `{}`", config.influxdb.url); let influx_db = chronicle::db::influxdb::InfluxDb::connect(&config.influxdb).await?; info!("Connected to influx database `{}`", influx_db.database_name()); @@ -72,7 +72,7 @@ async fn main() -> Result<(), Error> { let mut worker = stardust_inx::InxWorker::new( &db, - #[cfg(feature = "influxdb")] + #[cfg(any(feature = "analytics", feature = "metrics"))] influx_db.as_ref(), &config.inx, ); diff --git a/src/bin/inx-chronicle/stardust_inx/error.rs b/src/bin/inx-chronicle/stardust_inx/error.rs index a249fa436..f73aef0ae 100644 --- a/src/bin/inx-chronicle/stardust_inx/error.rs +++ b/src/bin/inx-chronicle/stardust_inx/error.rs @@ -8,7 +8,7 @@ use thiserror::Error; pub enum InxWorkerError { #[error("failed to establish connection")] ConnectionError, - #[cfg(feature = "influxdb")] + #[cfg(any(feature = "analytics", feature = "metrics"))] #[error("InfluxDb error: {0}")] InfluxDb(#[from] influxdb::Error), #[error("expected INX address with format `http://
:`, but found `{0}`")] diff --git a/src/bin/inx-chronicle/stardust_inx/mod.rs b/src/bin/inx-chronicle/stardust_inx/mod.rs index 4f7220e4c..eb360be56 100644 --- a/src/bin/inx-chronicle/stardust_inx/mod.rs +++ b/src/bin/inx-chronicle/stardust_inx/mod.rs @@ -35,7 +35,7 @@ pub const INSERT_BATCH_SIZE: usize = 1000; pub struct InxWorker { db: MongoDb, - #[cfg(feature = "influxdb")] + #[cfg(any(feature = "analytics", feature = "metrics"))] influx_db: Option, config: InxConfig, } @@ -44,12 +44,12 @@ impl InxWorker { /// Creates an [`Inx`] client by connecting to the endpoint specified in `inx_config`. pub fn new( db: &MongoDb, - #[cfg(feature = "influxdb")] influx_db: Option<&chronicle::db::influxdb::InfluxDb>, + #[cfg(any(feature = "analytics", feature = "metrics"))] influx_db: Option<&chronicle::db::influxdb::InfluxDb>, inx_config: &InxConfig, ) -> Self { Self { db: db.clone(), - #[cfg(feature = "influxdb")] + #[cfg(any(feature = "analytics", feature = "metrics"))] influx_db: influx_db.cloned(), config: inx_config.clone(), } @@ -239,7 +239,7 @@ impl InxWorker { stream: &mut (impl futures::Stream> + Unpin), ) -> Result<(), InxWorkerError> { #[cfg(feature = "metrics")] - let start_time = self.influx_db.is_some().then(std::time::Instant::now); + let start_time = std::time::Instant::now(); let MarkerMessage { milestone_index, @@ -333,29 +333,35 @@ impl InxWorker { let analytics_start_time = std::time::Instant::now(); #[cfg(feature = "analytics")] if let Some(influx_db) = &self.influx_db { - let analytics = self.db.get_all_analytics(milestone_index).await?; - influx_db - .insert_all_analytics(milestone_timestamp, milestone_index, analytics) - .await?; + if influx_db.config().analytics_enabled { + let analytics = self.db.get_all_analytics(milestone_index).await?; + influx_db + .insert_all_analytics(milestone_timestamp, milestone_index, analytics) + .await?; + } } #[cfg(all(feature = "analytics", feature = "metrics"))] - let analytics_elapsed = analytics_start_time.elapsed(); + let analytics_elapsed = self + .influx_db + .as_ref() + .and_then(|db| db.config().analytics_enabled.then_some(analytics_start_time.elapsed())); #[cfg(feature = "metrics")] if let Some(influx_db) = &self.influx_db { - // Unwrap: Safe because we checked above - let elapsed = start_time.unwrap().elapsed(); - influx_db - .insert(chronicle::db::collections::metrics::SyncMetrics { - time: chrono::Utc::now(), - milestone_index, - milestone_time: elapsed.as_millis() as u64, - #[cfg(feature = "analytics")] - analytics_time: analytics_elapsed.as_millis() as u64, - network_name, - chronicle_version: std::env!("CARGO_PKG_VERSION").to_string(), - }) - .await?; + if influx_db.config().metrics_enabled { + let elapsed = start_time.elapsed(); + influx_db + .insert(chronicle::db::collections::metrics::SyncMetrics { + time: chrono::Utc::now(), + milestone_index, + milestone_time: elapsed.as_millis() as u64, + #[cfg(feature = "analytics")] + analytics_time: analytics_elapsed.map(|d| d.as_millis() as u64), + network_name, + chronicle_version: std::env!("CARGO_PKG_VERSION").to_string(), + }) + .await?; + } } Ok(()) diff --git a/src/db/collections/analytics/mod.rs b/src/db/collections/analytics/mod.rs index 813c9ccaa..f5ff1a04e 100644 --- a/src/db/collections/analytics/mod.rs +++ b/src/db/collections/analytics/mod.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 /// Schema implementation for InfluxDb. -#[cfg(feature = "influxdb")] pub mod influx; use decimal::d128; diff --git a/src/db/collections/metrics/mod.rs b/src/db/collections/metrics/mod.rs index 4e4429584..73daa3c43 100644 --- a/src/db/collections/metrics/mod.rs +++ b/src/db/collections/metrics/mod.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 /// Schema implementation for InfluxDb. -#[cfg(feature = "influxdb")] pub mod influx; use chrono::{DateTime, Utc}; @@ -12,17 +11,16 @@ use serde::{Deserialize, Serialize}; use crate::types::tangle::MilestoneIndex; -#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] -#[cfg_attr(feature = "influxdb", derive(InfluxDbWriteable))] +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, InfluxDbWriteable)] #[allow(missing_docs)] pub struct SyncMetrics { pub time: DateTime, pub milestone_index: MilestoneIndex, pub milestone_time: u64, #[cfg(feature = "analytics")] - pub analytics_time: u64, - #[cfg_attr(feature = "influxdb", influxdb(tag))] + pub analytics_time: Option, + #[influxdb(tag)] pub chronicle_version: String, - #[cfg_attr(feature = "influxdb", influxdb(tag))] + #[influxdb(tag)] pub network_name: String, } diff --git a/src/db/influxdb/mod.rs b/src/db/influxdb/mod.rs index 5893278cd..2151844da 100644 --- a/src/db/influxdb/mod.rs +++ b/src/db/influxdb/mod.rs @@ -12,14 +12,20 @@ pub use self::measurement::InfluxDbMeasurement; /// A wrapper for the influxdb [`Client`]. #[derive(Clone, Debug)] -pub struct InfluxDb(Client); +pub struct InfluxDb { + client: Client, + config: InfluxDbConfig, +} impl InfluxDb { /// Create a new influx connection from config. pub async fn connect(config: &InfluxDbConfig) -> Result { let client = Client::new(&config.url, &config.database_name).with_auth(&config.username, &config.password); client.ping().await?; - Ok(Self(client)) + Ok(Self { + client, + config: config.clone(), + }) } /// Insert a measurement value. @@ -42,13 +48,18 @@ impl InfluxDb { .map(|mut res| res.values.remove(0)), )) } + + /// Get the config used to create the connection. + pub fn config(&self) -> &InfluxDbConfig { + &self.config + } } impl Deref for InfluxDb { type Target = Client; fn deref(&self) -> &Self::Target { - &self.0 + &self.client } } @@ -65,8 +76,10 @@ pub struct InfluxDbConfig { pub password: String, /// The name of the database to connect to. pub database_name: String, - /// Whether to enable influx writes. - pub enabled: bool, + /// Whether to enable influx metrics writes. + pub metrics_enabled: bool, + /// Whether to enable influx analytics writes. + pub analytics_enabled: bool, } impl Default for InfluxDbConfig { @@ -76,7 +89,8 @@ impl Default for InfluxDbConfig { database_name: "chronicle_analytics".to_string(), username: "root".to_string(), password: "password".to_string(), - enabled: true, + metrics_enabled: true, + analytics_enabled: true, } } } diff --git a/src/db/mod.rs b/src/db/mod.rs index 90250b897..ebf150670 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -7,8 +7,8 @@ #[cfg(feature = "stardust")] pub mod collections; -#[cfg(feature = "influxdb")] /// Module containing InfluxDb types and traits. +#[cfg(any(feature = "analytics", feature = "metrics"))] pub mod influxdb; mod mongodb; diff --git a/src/types/stardust/milestone.rs b/src/types/stardust/milestone.rs index e3ee47e57..79ea442c1 100644 --- a/src/types/stardust/milestone.rs +++ b/src/types/stardust/milestone.rs @@ -24,7 +24,7 @@ impl From for Bson { } } -#[cfg(feature = "influxdb")] +#[cfg(any(feature = "analytics", feature = "metrics"))] impl From for influxdb::Timestamp { fn from(value: MilestoneTimestamp) -> Self { Self::Seconds(value.0 as _) diff --git a/src/types/tangle/milestone.rs b/src/types/tangle/milestone.rs index 2ad5c238a..b2fbed3ba 100644 --- a/src/types/tangle/milestone.rs +++ b/src/types/tangle/milestone.rs @@ -85,7 +85,7 @@ impl From for Bson { } } -#[cfg(feature = "influxdb")] +#[cfg(any(feature = "analytics", feature = "metrics"))] impl From for influxdb::Type { fn from(value: MilestoneIndex) -> Self { Self::UnsignedInteger(value.0 as _)