Skip to content

Commit

Permalink
refactor(analytics): Adds tenant ID to Analytics Provider
Browse files Browse the repository at this point in the history
  • Loading branch information
RaghavRox committed May 10, 2024
1 parent cf0e3da commit 1396a90
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 8 deletions.
2 changes: 2 additions & 0 deletions crates/analytics/src/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ use crate::{
outgoing_webhook_event::events::OutgoingWebhookLogsResult,
sdk_events::events::SdkEventsResult,
types::TableEngine,
TenantID,
};

pub type ClickhouseResult<T> = error_stack::Result<T, ClickhouseError>;

#[derive(Clone, Debug)]
pub struct ClickhouseClient {
pub config: Arc<ClickhouseConfig>,
pub tenant_id: TenantID,
}

#[derive(Clone, Debug, serde::Deserialize)]
Expand Down
21 changes: 17 additions & 4 deletions crates/analytics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ use self::{
types::MetricsError,
};

#[derive(Clone, Debug)]
pub struct TenantID(String);
impl Default for TenantID {
fn default() -> Self {
Self(String::from("default"))
}
}

#[derive(Clone, Debug)]
pub enum AnalyticsProvider {
Sqlx(SqlxClient),
Expand Down Expand Up @@ -601,22 +609,27 @@ impl AnalyticsProvider {
}
}

pub async fn from_conf(config: &AnalyticsConfig) -> Self {
pub async fn from_conf(config: &AnalyticsConfig, tenant_id: TenantID) -> Self {
match config {
AnalyticsConfig::Sqlx { sqlx } => Self::Sqlx(SqlxClient::from_conf(sqlx).await),
AnalyticsConfig::Sqlx { sqlx } => {
Self::Sqlx(SqlxClient::from_conf(sqlx, tenant_id).await)
}
AnalyticsConfig::Clickhouse { clickhouse } => Self::Clickhouse(ClickhouseClient {
config: Arc::new(clickhouse.clone()),
tenant_id,
}),
AnalyticsConfig::CombinedCkh { sqlx, clickhouse } => Self::CombinedCkh(
SqlxClient::from_conf(sqlx).await,
SqlxClient::from_conf(sqlx, tenant_id.clone()).await,
ClickhouseClient {
config: Arc::new(clickhouse.clone()),
tenant_id,
},
),
AnalyticsConfig::CombinedSqlx { sqlx, clickhouse } => Self::CombinedSqlx(
SqlxClient::from_conf(sqlx).await,
SqlxClient::from_conf(sqlx, tenant_id.clone()).await,
ClickhouseClient {
config: Arc::new(clickhouse.clone()),
tenant_id,
},
),
}
Expand Down
8 changes: 6 additions & 2 deletions crates/analytics/src/sqlx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use sqlx::{
use storage_impl::config::Database;
use time::PrimitiveDateTime;

use crate::TenantID;

use super::{
health_check::HealthCheck,
query::{Aggregate, ToSql, Window},
Expand All @@ -31,6 +33,7 @@ use super::{
#[derive(Debug, Clone)]
pub struct SqlxClient {
pool: Pool<Postgres>,
tenant_id: TenantID,
}

impl Default for SqlxClient {
Expand All @@ -44,12 +47,13 @@ impl Default for SqlxClient {
pool: PgPoolOptions::new()
.connect_lazy(&database_url)
.expect("SQLX Pool Creation failed"),
tenant_id: TenantID::default(),
}
}
}

impl SqlxClient {
pub async fn from_conf(conf: &Database) -> Self {
pub async fn from_conf(conf: &Database, tenant_id: TenantID) -> Self {
let password = &conf.password.peek();
let database_url = format!(
"postgres://{}:{}@{}:{}/{}",
Expand All @@ -61,7 +65,7 @@ impl SqlxClient {
.acquire_timeout(std::time::Duration::from_secs(conf.connection_timeout))
.connect_lazy(&database_url)
.expect("SQLX Pool Creation failed");
Self { pool }
Self { pool, tenant_id }
}
}

Expand Down
8 changes: 6 additions & 2 deletions crates/router/src/routes/app.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use actix_web::{web, Scope};
use analytics::TenantID;
#[cfg(all(feature = "business_profile_routing", feature = "olap"))]
use api_models::routing::RoutingRetrieveQuery;
#[cfg(feature = "olap")]
Expand Down Expand Up @@ -218,8 +219,11 @@ impl AppState {
};

#[cfg(feature = "olap")]
let pool =
crate::analytics::AnalyticsProvider::from_conf(conf.analytics.get_inner()).await;
let pool = crate::analytics::AnalyticsProvider::from_conf(
conf.analytics.get_inner(),
TenantID::default(),
)
.await;

#[cfg(feature = "email")]
let email_client = Arc::new(create_email_client(&conf).await);
Expand Down

0 comments on commit 1396a90

Please sign in to comment.