Skip to content

Commit

Permalink
feat(cli): split influxdb feature properly (#870)
Browse files Browse the repository at this point in the history
* Split influxdb feature properly

* Update src/db/mod.rs

Co-authored-by: Jochen Görtler <grtlr@users.noreply.github.com>

* Fix features

* Add config to influxdb and check flags properly

* Remove timer `Option`

* Properly use `analytics-enabled` CLI flag

Co-authored-by: Jochen Görtler <grtlr@users.noreply.github.com>
Co-authored-by: Jochen Görtler <jochen.goertler@iota.org>
  • Loading branch information
3 people committed Nov 16, 2022
1 parent e94c171 commit 0cd627c
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 59 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Expand Up @@ -99,7 +99,7 @@ default = [
"stardust",
]
analytics = [
"influxdb",
"dep:influxdb",
]
api = [
"dep:auth-helper",
Expand Down Expand Up @@ -130,7 +130,7 @@ loki = [
"dep:tracing-loki",
]
metrics = [
"influxdb",
"dep:influxdb",
"dep:chrono",
]
opentelemetry = [
Expand Down
3 changes: 2 additions & 1 deletion config.template.toml
Expand Up @@ -15,7 +15,8 @@ min_pool_size = 2

[influxdb]
### Whether influx time-series data will be written.
enabled = true
metrics_enabled = true
analytics_enabled = true

### URL pointing to the InfluxDB instance.
url = "http://localhost:8086"
Expand Down
39 changes: 27 additions & 12 deletions src/bin/inx-chronicle/cli.rs
Expand Up @@ -20,7 +20,7 @@ pub struct ClArgs {
#[command(flatten)]
pub api: ApiArgs,
/// InfluxDb arguments.
#[cfg(feature = "influxdb")]
#[cfg(any(feature = "analytics", feature = "metrics"))]
#[command(flatten)]
pub influxdb: InfluxDbArgs,
/// INX arguments.
Expand Down Expand Up @@ -81,18 +81,21 @@ pub struct MongoDbArgs {
pub mongodb_conn_str: Option<String>,
}

#[cfg(feature = "influxdb")]
#[cfg(any(feature = "analytics", feature = "metrics"))]
#[derive(Args, Debug)]
pub struct InfluxDbArgs {
/// Toggle InfluxDb time-series writes.
#[arg(long, env = "INFLUXDB_ENABLED")]
pub influxdb_enabled: Option<bool>,
/// Toggle InfluxDb time-series metrics writes.
#[arg(long, env = "METRICS_ENABLED")]
pub metrics_enabled: Option<bool>,
/// Toggle InfluxDb time-series analytics writes.
#[arg(long, env = "ANALYTICS_ENABLED")]
pub analytics_enabled: Option<bool>,
/// The url pointing to an InfluxDb instance.
#[arg(long, env = "INFLUXDB_URL")]
pub influxdb_url: Option<String>,
}

#[cfg(feature = "influxdb")]
#[cfg(feature = "loki")]
#[derive(Args, Debug)]
pub struct LokiArgs {
/// Toggle Grafana Loki log writes.
Expand Down Expand Up @@ -130,11 +133,22 @@ impl ClArgs {
}
}

#[cfg(feature = "influxdb")]
#[cfg(feature = "analytics")]
{
if let Some(enabled) = self.influxdb.influxdb_enabled {
config.influxdb.enabled = enabled;
if let Some(enabled) = self.influxdb.analytics_enabled {
config.influxdb.analytics_enabled = enabled;
}
}

#[cfg(feature = "metrics")]
{
if let Some(enabled) = self.influxdb.metrics_enabled {
config.influxdb.metrics_enabled = enabled;
}
}

#[cfg(any(feature = "analytics", feature = "metrics"))]
{
if let Some(url) = &self.influxdb.influxdb_url {
config.influxdb.url = url.clone();
}
Expand Down Expand Up @@ -203,7 +217,7 @@ impl ClArgs {
);
return Ok(PostCommand::Exit);
}
#[cfg(feature = "analytics")]
#[cfg(all(feature = "analytics", feature = "stardust"))]
Subcommands::FillAnalytics {
start_milestone,
end_milestone,
Expand Down Expand Up @@ -269,6 +283,7 @@ impl ClArgs {
return Ok(PostCommand::Exit);
}
}
#[cfg(feature = "stardust")]
Subcommands::BuildIndexes => {
tracing::info!("Connecting to database using hosts: `{}`.", config.mongodb.hosts_str()?);
let db = chronicle::db::MongoDb::connect(&config.mongodb).await?;
Expand All @@ -288,8 +303,7 @@ pub enum Subcommands {
/// Generate a JWT token using the available config.
#[cfg(feature = "api")]
GenerateJWT,
/// Manually fill the analytics database.
#[cfg(feature = "analytics")]
#[cfg(all(feature = "analytics", feature = "stardust"))]
FillAnalytics {
/// The inclusive starting milestone index.
#[arg(short, long)]
Expand All @@ -309,6 +323,7 @@ pub enum Subcommands {
run: bool,
},
/// Manually build indexes.
#[cfg(feature = "stardust")]
BuildIndexes,
}

Expand Down
2 changes: 1 addition & 1 deletion src/bin/inx-chronicle/config.rs
Expand Up @@ -23,7 +23,7 @@ pub enum ConfigError {
#[serde(default)]
pub struct ChronicleConfig {
pub mongodb: MongoDbConfig,
#[cfg(feature = "influxdb")]
#[cfg(any(feature = "analytics", feature = "metrics"))]
pub influxdb: chronicle::db::influxdb::InfluxDbConfig,
#[cfg(feature = "api")]
pub api: crate::api::ApiConfig,
Expand Down
2 changes: 1 addition & 1 deletion src/bin/inx-chronicle/error.rs
Expand Up @@ -11,7 +11,7 @@ pub enum Error {
Config(#[from] ConfigError),
#[error(transparent)]
MongoDb(#[from] mongodb::error::Error),
#[cfg(feature = "influxdb")]
#[cfg(any(feature = "analytics", feature = "metrics"))]
#[error(transparent)]
InfluxDb(#[from] influxdb::Error),
#[cfg(feature = "api")]
Expand Down
6 changes: 3 additions & 3 deletions src/bin/inx-chronicle/main.rs
Expand Up @@ -60,8 +60,8 @@ async fn main() -> Result<(), Error> {

#[cfg(all(feature = "inx", feature = "stardust"))]
if config.inx.enabled {
#[cfg(feature = "influxdb")]
let influx_db = if config.influxdb.enabled {
#[cfg(any(feature = "analytics", feature = "metrics"))]
let influx_db = if config.influxdb.analytics_enabled || config.influxdb.metrics_enabled {
info!("Connecting to influx database at address `{}`", config.influxdb.url);
let influx_db = chronicle::db::influxdb::InfluxDb::connect(&config.influxdb).await?;
info!("Connected to influx database `{}`", influx_db.database_name());
Expand All @@ -72,7 +72,7 @@ async fn main() -> Result<(), Error> {

let mut worker = stardust_inx::InxWorker::new(
&db,
#[cfg(feature = "influxdb")]
#[cfg(any(feature = "analytics", feature = "metrics"))]
influx_db.as_ref(),
&config.inx,
);
Expand Down
2 changes: 1 addition & 1 deletion src/bin/inx-chronicle/stardust_inx/error.rs
Expand Up @@ -8,7 +8,7 @@ use thiserror::Error;
pub enum InxWorkerError {
#[error("failed to establish connection")]
ConnectionError,
#[cfg(feature = "influxdb")]
#[cfg(any(feature = "analytics", feature = "metrics"))]
#[error("InfluxDb error: {0}")]
InfluxDb(#[from] influxdb::Error),
#[error("expected INX address with format `http://<address>:<port>`, but found `{0}`")]
Expand Down
50 changes: 28 additions & 22 deletions src/bin/inx-chronicle/stardust_inx/mod.rs
Expand Up @@ -35,7 +35,7 @@ pub const INSERT_BATCH_SIZE: usize = 1000;

pub struct InxWorker {
db: MongoDb,
#[cfg(feature = "influxdb")]
#[cfg(any(feature = "analytics", feature = "metrics"))]
influx_db: Option<chronicle::db::influxdb::InfluxDb>,
config: InxConfig,
}
Expand All @@ -44,12 +44,12 @@ impl InxWorker {
/// Creates an [`Inx`] client by connecting to the endpoint specified in `inx_config`.
pub fn new(
db: &MongoDb,
#[cfg(feature = "influxdb")] influx_db: Option<&chronicle::db::influxdb::InfluxDb>,
#[cfg(any(feature = "analytics", feature = "metrics"))] influx_db: Option<&chronicle::db::influxdb::InfluxDb>,
inx_config: &InxConfig,
) -> Self {
Self {
db: db.clone(),
#[cfg(feature = "influxdb")]
#[cfg(any(feature = "analytics", feature = "metrics"))]
influx_db: influx_db.cloned(),
config: inx_config.clone(),
}
Expand Down Expand Up @@ -239,7 +239,7 @@ impl InxWorker {
stream: &mut (impl futures::Stream<Item = Result<LedgerUpdateMessage, InxError>> + Unpin),
) -> Result<(), InxWorkerError> {
#[cfg(feature = "metrics")]
let start_time = self.influx_db.is_some().then(std::time::Instant::now);
let start_time = std::time::Instant::now();

let MarkerMessage {
milestone_index,
Expand Down Expand Up @@ -333,29 +333,35 @@ impl InxWorker {
let analytics_start_time = std::time::Instant::now();
#[cfg(feature = "analytics")]
if let Some(influx_db) = &self.influx_db {
let analytics = self.db.get_all_analytics(milestone_index).await?;
influx_db
.insert_all_analytics(milestone_timestamp, milestone_index, analytics)
.await?;
if influx_db.config().analytics_enabled {
let analytics = self.db.get_all_analytics(milestone_index).await?;
influx_db
.insert_all_analytics(milestone_timestamp, milestone_index, analytics)
.await?;
}
}
#[cfg(all(feature = "analytics", feature = "metrics"))]
let analytics_elapsed = analytics_start_time.elapsed();
let analytics_elapsed = self
.influx_db
.as_ref()
.and_then(|db| db.config().analytics_enabled.then_some(analytics_start_time.elapsed()));

#[cfg(feature = "metrics")]
if let Some(influx_db) = &self.influx_db {
// Unwrap: Safe because we checked above
let elapsed = start_time.unwrap().elapsed();
influx_db
.insert(chronicle::db::collections::metrics::SyncMetrics {
time: chrono::Utc::now(),
milestone_index,
milestone_time: elapsed.as_millis() as u64,
#[cfg(feature = "analytics")]
analytics_time: analytics_elapsed.as_millis() as u64,
network_name,
chronicle_version: std::env!("CARGO_PKG_VERSION").to_string(),
})
.await?;
if influx_db.config().metrics_enabled {
let elapsed = start_time.elapsed();
influx_db
.insert(chronicle::db::collections::metrics::SyncMetrics {
time: chrono::Utc::now(),
milestone_index,
milestone_time: elapsed.as_millis() as u64,
#[cfg(feature = "analytics")]
analytics_time: analytics_elapsed.map(|d| d.as_millis() as u64),
network_name,
chronicle_version: std::env!("CARGO_PKG_VERSION").to_string(),
})
.await?;
}
}

Ok(())
Expand Down
1 change: 0 additions & 1 deletion src/db/collections/analytics/mod.rs
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

/// Schema implementation for InfluxDb.
#[cfg(feature = "influxdb")]
pub mod influx;

use decimal::d128;
Expand Down
10 changes: 4 additions & 6 deletions src/db/collections/metrics/mod.rs
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

/// Schema implementation for InfluxDb.
#[cfg(feature = "influxdb")]
pub mod influx;

use chrono::{DateTime, Utc};
Expand All @@ -12,17 +11,16 @@ use serde::{Deserialize, Serialize};

use crate::types::tangle::MilestoneIndex;

#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(feature = "influxdb", derive(InfluxDbWriteable))]
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, InfluxDbWriteable)]
#[allow(missing_docs)]
pub struct SyncMetrics {
pub time: DateTime<Utc>,
pub milestone_index: MilestoneIndex,
pub milestone_time: u64,
#[cfg(feature = "analytics")]
pub analytics_time: u64,
#[cfg_attr(feature = "influxdb", influxdb(tag))]
pub analytics_time: Option<u64>,
#[influxdb(tag)]
pub chronicle_version: String,
#[cfg_attr(feature = "influxdb", influxdb(tag))]
#[influxdb(tag)]
pub network_name: String,
}
26 changes: 20 additions & 6 deletions src/db/influxdb/mod.rs
Expand Up @@ -12,14 +12,20 @@ pub use self::measurement::InfluxDbMeasurement;

/// A wrapper for the influxdb [`Client`].
#[derive(Clone, Debug)]
pub struct InfluxDb(Client);
pub struct InfluxDb {
client: Client,
config: InfluxDbConfig,
}

impl InfluxDb {
/// Create a new influx connection from config.
pub async fn connect(config: &InfluxDbConfig) -> Result<Self, influxdb::Error> {
let client = Client::new(&config.url, &config.database_name).with_auth(&config.username, &config.password);
client.ping().await?;
Ok(Self(client))
Ok(Self {
client,
config: config.clone(),
})
}

/// Insert a measurement value.
Expand All @@ -42,13 +48,18 @@ impl InfluxDb {
.map(|mut res| res.values.remove(0)),
))
}

/// Get the config used to create the connection.
pub fn config(&self) -> &InfluxDbConfig {
&self.config
}
}

impl Deref for InfluxDb {
type Target = Client;

fn deref(&self) -> &Self::Target {
&self.0
&self.client
}
}

Expand All @@ -65,8 +76,10 @@ pub struct InfluxDbConfig {
pub password: String,
/// The name of the database to connect to.
pub database_name: String,
/// Whether to enable influx writes.
pub enabled: bool,
/// Whether to enable influx metrics writes.
pub metrics_enabled: bool,
/// Whether to enable influx analytics writes.
pub analytics_enabled: bool,
}

impl Default for InfluxDbConfig {
Expand All @@ -76,7 +89,8 @@ impl Default for InfluxDbConfig {
database_name: "chronicle_analytics".to_string(),
username: "root".to_string(),
password: "password".to_string(),
enabled: true,
metrics_enabled: true,
analytics_enabled: true,
}
}
}
2 changes: 1 addition & 1 deletion src/db/mod.rs
Expand Up @@ -7,8 +7,8 @@
#[cfg(feature = "stardust")]
pub mod collections;

#[cfg(feature = "influxdb")]
/// Module containing InfluxDb types and traits.
#[cfg(any(feature = "analytics", feature = "metrics"))]
pub mod influxdb;
mod mongodb;

Expand Down
2 changes: 1 addition & 1 deletion src/types/stardust/milestone.rs
Expand Up @@ -24,7 +24,7 @@ impl From<MilestoneTimestamp> for Bson {
}
}

#[cfg(feature = "influxdb")]
#[cfg(any(feature = "analytics", feature = "metrics"))]
impl From<MilestoneTimestamp> for influxdb::Timestamp {
fn from(value: MilestoneTimestamp) -> Self {
Self::Seconds(value.0 as _)
Expand Down
2 changes: 1 addition & 1 deletion src/types/tangle/milestone.rs
Expand Up @@ -85,7 +85,7 @@ impl From<MilestoneIndex> for Bson {
}
}

#[cfg(feature = "influxdb")]
#[cfg(any(feature = "analytics", feature = "metrics"))]
impl From<MilestoneIndex> for influxdb::Type {
fn from(value: MilestoneIndex) -> Self {
Self::UnsignedInteger(value.0 as _)
Expand Down

0 comments on commit 0cd627c

Please sign in to comment.