Skip to content

Commit

Permalink
feat(multitenancy): add tenant_id as a field for data pipeline and su…
Browse files Browse the repository at this point in the history
…pport individual database for clickhouse (#4867)

Co-authored-by: hyperswitch-bot[bot] <148525504+hyperswitch-bot[bot]@users.noreply.github.com>
Co-authored-by: Arun Raj M <jarnura47@gmail.com>
Co-authored-by: Sampras Lopes <sampras.lopes@juspay.in>
  • Loading branch information
4 people authored Jun 18, 2024
1 parent d2092dc commit 776ddb8
Show file tree
Hide file tree
Showing 15 changed files with 55 additions and 68 deletions.
3 changes: 2 additions & 1 deletion config/config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ sdk_eligible_payment_methods = "card"

[multitenancy]
enabled = false
global_tenant = { schema = "public", redis_key_prefix = "" }

[multitenancy.tenants]
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = ""} # schema -> Postgres db schema, redis_key_prefix -> redis key distinguisher, base_url -> url of the tenant
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = "", clickhouse_database = "default"} # schema -> Postgres db schema, redis_key_prefix -> redis key distinguisher, base_url -> url of the tenant
3 changes: 2 additions & 1 deletion config/deployments/env_specific.toml
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ region = "kms_region" # The AWS region used by the KMS SDK for decrypting data.

[multitenancy]
enabled = false
global_tenant = { schema = "public", redis_key_prefix = "" }

[multitenancy.tenants]
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = ""}
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = "", clickhouse_database = "default"}
3 changes: 2 additions & 1 deletion config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,7 @@ sdk_eligible_payment_methods = "card"

[multitenancy]
enabled = false
global_tenant = { schema = "public", redis_key_prefix = "" }

[multitenancy.tenants]
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = ""}
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = "", clickhouse_database = "default"}
3 changes: 2 additions & 1 deletion config/docker_compose.toml
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ sdk_eligible_payment_methods = "card"

[multitenancy]
enabled = false
global_tenant = { schema = "public", redis_key_prefix = "" }

[multitenancy.tenants]
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = ""}
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = "", clickhouse_database = "default"}
5 changes: 2 additions & 3 deletions crates/analytics/src/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ pub type ClickhouseResult<T> = error_stack::Result<T, ClickhouseError>;
#[derive(Clone, Debug)]
pub struct ClickhouseClient {
pub config: Arc<ClickhouseConfig>,
pub database: String,
}

#[derive(Clone, Debug, serde::Deserialize)]
pub struct ClickhouseConfig {
username: String,
password: Option<String>,
host: String,
database_name: String,
}

impl Default for ClickhouseConfig {
Expand All @@ -51,7 +51,6 @@ impl Default for ClickhouseConfig {
username: "default".to_string(),
password: None,
host: "http://localhost:8123".to_string(),
database_name: "default".to_string(),
}
}
}
Expand All @@ -63,7 +62,7 @@ impl ClickhouseClient {
let params = CkhQuery {
date_time_output_format: String::from("iso"),
output_format_json_quote_64bit_integers: 0,
database: self.config.database_name.clone(),
database: self.database.clone(),
};
let response = client
.post(&self.config.host)
Expand Down
16 changes: 12 additions & 4 deletions crates/analytics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,22 +601,30 @@ impl AnalyticsProvider {
}
}

pub async fn from_conf(config: &AnalyticsConfig, tenant: &str) -> Self {
pub async fn from_conf(
config: &AnalyticsConfig,
tenant: &dyn storage_impl::config::ClickHouseConfig,
) -> Self {
match config {
AnalyticsConfig::Sqlx { sqlx } => Self::Sqlx(SqlxClient::from_conf(sqlx, tenant).await),
AnalyticsConfig::Sqlx { sqlx } => {
Self::Sqlx(SqlxClient::from_conf(sqlx, tenant.get_schema()).await)
}
AnalyticsConfig::Clickhouse { clickhouse } => Self::Clickhouse(ClickhouseClient {
config: Arc::new(clickhouse.clone()),
database: tenant.get_clickhouse_database().to_string(),
}),
AnalyticsConfig::CombinedCkh { sqlx, clickhouse } => Self::CombinedCkh(
SqlxClient::from_conf(sqlx, tenant).await,
SqlxClient::from_conf(sqlx, tenant.get_schema()).await,
ClickhouseClient {
config: Arc::new(clickhouse.clone()),
database: tenant.get_clickhouse_database().to_string(),
},
),
AnalyticsConfig::CombinedSqlx { sqlx, clickhouse } => Self::CombinedSqlx(
SqlxClient::from_conf(sqlx, tenant).await,
SqlxClient::from_conf(sqlx, tenant.get_schema()).await,
ClickhouseClient {
config: Arc::new(clickhouse.clone()),
database: tenant.get_clickhouse_database().to_string(),
},
),
}
Expand Down
4 changes: 2 additions & 2 deletions crates/common_utils/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ pub const MAX_TTL_FOR_EXTENDED_CARD_INFO: u16 = 60 * 60 * 2;
/// Default tenant to be used when multitenancy is disabled
pub const DEFAULT_TENANT: &str = "public";

/// Global tenant to be used when multitenancy is enabled
pub const GLOBAL_TENANT: &str = "global";
/// Default tenant to be used when multitenancy is disabled
pub const TENANT_HEADER: &str = "x-tenant-id";

/// Max Length for MerchantReferenceId
pub const MAX_ALLOWED_MERCHANT_REFERENCE_ID_LENGTH: u8 = 64;
Expand Down
39 changes: 2 additions & 37 deletions crates/diesel_models/src/query/user.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,9 @@
use async_bb8_diesel::AsyncRunQueryDsl;
use common_utils::pii;
use diesel::{
associations::HasTable, debug_query, result::Error as DieselError, ExpressionMethods,
JoinOnDsl, QueryDsl,
};
use error_stack::report;
use router_env::logger;
use diesel::{associations::HasTable, ExpressionMethods};
pub mod sample_data;

use crate::{
errors::{self},
query::generics,
schema::{
user_roles::{self, dsl as user_roles_dsl},
users::dsl as users_dsl,
},
user::*,
user_role::UserRole,
PgPooledConn, StorageResult,
query::generics, schema::users::dsl as users_dsl, user::*, PgPooledConn, StorageResult,
};

impl UserNew {
Expand Down Expand Up @@ -90,27 +76,6 @@ impl User {
.await
}

pub async fn find_joined_users_and_roles_by_merchant_id(
conn: &PgPooledConn,
mid: &str,
) -> StorageResult<Vec<(Self, UserRole)>> {
let query = Self::table()
.inner_join(user_roles::table.on(user_roles_dsl::user_id.eq(users_dsl::user_id)))
.filter(user_roles_dsl::merchant_id.eq(mid.to_owned()));

logger::debug!(query = %debug_query::<diesel::pg::Pg,_>(&query).to_string());

query
.get_results_async::<(Self, UserRole)>(conn)
.await
.map_err(|err| match err {
DieselError::NotFound => {
report!(err).change_context(errors::DatabaseError::NotFound)
}
_ => report!(err).change_context(errors::DatabaseError::Others),
})
}

pub async fn find_users_by_user_ids(
conn: &PgPooledConn,
user_ids: Vec<String>,
Expand Down
1 change: 1 addition & 0 deletions crates/drainer/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ pub struct Tenant {
pub base_url: String,
pub schema: String,
pub redis_key_prefix: String,
pub clickhouse_database: String,
}

#[derive(Debug, Deserialize, Clone)]
Expand Down
8 changes: 8 additions & 0 deletions crates/router/src/configs/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ pub struct Settings<S: SecretState> {
pub struct Multitenancy {
pub tenants: TenantConfig,
pub enabled: bool,
pub global_tenant: GlobalTenant,
}

impl Multitenancy {
Expand All @@ -153,6 +154,7 @@ pub struct Tenant {
pub base_url: String,
pub schema: String,
pub redis_key_prefix: String,
pub clickhouse_database: String,
}

impl storage_impl::config::TenantConfig for Tenant {
Expand All @@ -164,6 +166,12 @@ impl storage_impl::config::TenantConfig for Tenant {
}
}

impl storage_impl::config::ClickHouseConfig for Tenant {
fn get_clickhouse_database(&self) -> &str {
self.clickhouse_database.as_str()
}
}

#[derive(Debug, Deserialize, Clone, Default)]
pub struct GlobalTenant {
pub schema: String,
Expand Down
10 changes: 9 additions & 1 deletion crates/router/src/middleware.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use common_utils::consts::TENANT_HEADER;
use futures::StreamExt;
use router_env::{
logger,
Expand Down Expand Up @@ -140,10 +141,17 @@ where
// TODO: have a common source of truth for the list of top level fields
// /crates/router_env/src/logger/storage.rs also has a list of fields called PERSISTENT_KEYS
fn call(&self, req: actix_web::dev::ServiceRequest) -> Self::Future {
let tenant_id = req
.headers()
.get(TENANT_HEADER)
.and_then(|i| i.to_str().ok())
.map(|s| s.to_owned());
let response_fut = self.service.call(req);

Box::pin(
async move {
if let Some(tenant_id) = tenant_id {
router_env::tracing::Span::current().record("tenant_id", &tenant_id);
}
let response = response_fut.await;
router_env::tracing::Span::current().record("golden_log_line", true);
response
Expand Down
15 changes: 2 additions & 13 deletions crates/router/src/routes/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use actix_web::{web, Scope};
use api_models::routing::RoutingRetrieveQuery;
#[cfg(feature = "olap")]
use common_enums::TransactionType;
use common_utils::consts::{DEFAULT_TENANT, GLOBAL_TENANT};
#[cfg(feature = "email")]
use external_services::email::{ses::AwsSes, EmailService};
use external_services::file_storage::FileStorageInterface;
Expand Down Expand Up @@ -257,19 +256,11 @@ impl AppState {
let cache_store = get_cache_store(&conf.clone(), shut_down_signal, testable)
.await
.expect("Failed to create store");
let global_tenant = if conf.multitenancy.enabled {
GLOBAL_TENANT
} else {
DEFAULT_TENANT
};
let global_store: Box<dyn GlobalStorageInterface> = Self::get_store_interface(
&storage_impl,
&event_handler,
&conf,
&settings::GlobalTenant {
schema: global_tenant.to_string(),
redis_key_prefix: String::default(),
},
&conf.multitenancy.global_tenant,
Arc::clone(&cache_store),
testable,
)
Expand All @@ -288,9 +279,7 @@ impl AppState {
.get_storage_interface();
stores.insert(tenant_name.clone(), store);
#[cfg(feature = "olap")]
let pool =
AnalyticsProvider::from_conf(conf.analytics.get_inner(), tenant_name.as_str())
.await;
let pool = AnalyticsProvider::from_conf(conf.analytics.get_inner(), tenant).await;
#[cfg(feature = "olap")]
pools.insert(tenant_name.clone(), pool);
}
Expand Down
6 changes: 3 additions & 3 deletions crates/router/src/services/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use api_models::enums::{CaptureMethod, PaymentMethodType};
pub use client::{proxy_bypass_urls, ApiClient, MockApiClient, ProxyClient};
pub use common_utils::request::{ContentType, Method, Request, RequestBuilder};
use common_utils::{
consts::X_HS_LATENCY,
consts::{DEFAULT_TENANT, TENANT_HEADER, X_HS_LATENCY},
errors::{ErrorSwitch, ReportSwitchExt},
request::RequestContent,
};
Expand Down Expand Up @@ -783,10 +783,10 @@ where
.into_iter()
.collect();
let tenant_id = if !state.conf.multitenancy.enabled {
common_utils::consts::DEFAULT_TENANT.to_string()
DEFAULT_TENANT.to_string()
} else {
incoming_request_header
.get("x-tenant-id")
.get(TENANT_HEADER)
.and_then(|value| value.to_str().ok())
.ok_or_else(|| errors::ApiErrorResponse::MissingTenantId.switch())
.map(|req_tenant_id| {
Expand Down
4 changes: 4 additions & 0 deletions crates/storage_impl/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ pub trait TenantConfig: Send + Sync {
fn get_redis_key_prefix(&self) -> &str;
}

pub trait ClickHouseConfig: TenantConfig + Send + Sync {
fn get_clickhouse_database(&self) -> &str;
}

#[derive(Debug, serde::Deserialize, Clone, Copy, Default)]
#[serde(rename_all = "PascalCase")]
pub enum QueueStrategy {
Expand Down
3 changes: 2 additions & 1 deletion loadtest/config/development.toml
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ keys = "user-agent"

[multitenancy]
enabled = false
global_tenant = { schema = "public", redis_key_prefix = "" }

[multitenancy.tenants]
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = ""}
public = { name = "hyperswitch", base_url = "http://localhost:8080", schema = "public", redis_key_prefix = "", clickhouse_database = "default"}

0 comments on commit 776ddb8

Please sign in to comment.