diff --git a/lib/bolt/core/src/context/service.rs b/lib/bolt/core/src/context/service.rs index f4c549a859..277739cd18 100644 --- a/lib/bolt/core/src/context/service.rs +++ b/lib/bolt/core/src/context/service.rs @@ -893,16 +893,6 @@ impl ServiceContextData { self.config().cockroachdb.min_connections.to_string(), )); - if self.depends_on_clickhouse() { - let clickhouse_data = terraform::output::read_clickhouse(&project_ctx).await; - let clickhouse_host = format!( - "https://{}:{}", - *clickhouse_data.host, *clickhouse_data.port_https - ); - - env.push(("CLICKHOUSE_URL".into(), clickhouse_host)); - } - if self.depends_on_prometheus_api() { env.push(( format!("PROMETHEUS_URL"), @@ -1159,6 +1149,21 @@ impl ServiceContextData { )); } + // ClickHouse + if self.depends_on_clickhouse() { + let clickhouse_data = terraform::output::read_clickhouse(&project_ctx).await; + let username = "chirp"; + let password = project_ctx + .read_secret(&["clickhouse", "users", username, "password"]) + .await?; + let uri = format!( + "https://{}:{}@{}:{}", + username, password, *clickhouse_data.host, *clickhouse_data.port_https + ); + + env.push(("CLICKHOUSE_URL".into(), uri)); + } + // Expose S3 endpoints to services that need them let s3_deps = if self.depends_on_s3() { project_ctx.all_services().await.to_vec() diff --git a/lib/chirp/worker/src/test.rs b/lib/chirp/worker/src/test.rs index 6e738b381c..c745b6526b 100644 --- a/lib/chirp/worker/src/test.rs +++ b/lib/chirp/worker/src/test.rs @@ -74,4 +74,8 @@ impl TestCtx { pub async fn redis_user_presence(&self) -> Result { self.op_ctx.redis_user_presence().await } + + pub async fn clickhouse(&self) -> Result { + self.op_ctx.clickhouse().await + } } diff --git a/lib/connection/src/lib.rs b/lib/connection/src/lib.rs index 48eb0962f1..aa8501ce54 100644 --- a/lib/connection/src/lib.rs +++ b/lib/connection/src/lib.rs @@ -88,6 +88,10 @@ impl Connection { pub fn perf(&self) -> &chirp_perf::PerfCtx { self.client.perf() } + + pub async fn clickhouse(&self) -> Result { + self.pools.clickhouse() + } } impl std::ops::Deref for Connection { diff --git a/lib/operation/core/src/lib.rs b/lib/operation/core/src/lib.rs index 67b36152ac..3036b11e57 100644 --- a/lib/operation/core/src/lib.rs +++ b/lib/operation/core/src/lib.rs @@ -283,6 +283,10 @@ where pub fn perf(&self) -> &chirp_perf::PerfCtx { self.conn.perf() } + + pub async fn clickhouse(&self) -> Result { + self.conn.clickhouse().await + } } impl std::ops::Deref for OperationContext diff --git a/lib/pools/Cargo.toml b/lib/pools/Cargo.toml index 55fd607b7c..6c1e8126fe 100644 --- a/lib/pools/Cargo.toml +++ b/lib/pools/Cargo.toml @@ -20,6 +20,7 @@ tokio = { version = "1.29", features = ["tracing"] } tokio-util = "0.7" tracing = "0.1" governor = "0.6" +url = "2.5.0" [dependencies.sqlx] version = "0.7.1" diff --git a/lib/pools/src/error.rs b/lib/pools/src/error.rs index 9d47994b64..441b8c5653 100644 --- a/lib/pools/src/error.rs +++ b/lib/pools/src/error.rs @@ -12,6 +12,9 @@ pub enum Error { #[error("missing redis pool: {key:?}")] MissingRedisPool { key: Option }, + #[error("missing clickhouse pool")] + MissingClickHousePool, + #[error("tokio join: {0}")] TokioJoin(tokio::task::JoinError), @@ -29,4 +32,10 @@ pub enum Error { #[error("build sqlx: {0}")] BuildSqlx(sqlx::Error), + + #[error("build clickhouse: {0}")] + BuildClickHouse(clickhouse::error::Error), + + #[error("build clickhouse url: {0}")] + BuildClickHouseUrl(url::ParseError), } diff --git a/lib/pools/src/lib.rs b/lib/pools/src/lib.rs index 9c2e56b8da..ae2d4d70ed 100644 --- a/lib/pools/src/lib.rs +++ b/lib/pools/src/lib.rs @@ -8,14 +8,15 @@ use std::{collections::HashMap, env, fmt::Debug, str::FromStr, sync::Arc, time:: use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; -use crate::pools::{CrdbPool, NatsPool, PoolsInner, RedisPool}; +use crate::pools::{ClickHousePool, CrdbPool, NatsPool, PoolsInner, RedisPool}; pub mod prelude { pub use async_nats as nats; + pub use clickhouse; pub use redis; pub use sqlx; - pub use crate::pools::{CrdbPool, NatsPool, RedisPool}; + pub use crate::pools::{ClickHousePool, CrdbPool, NatsPool, RedisPool}; pub use crate::{ __sql_query, __sql_query_as, __sql_query_as_raw, sql_execute, sql_fetch, sql_fetch_all, sql_fetch_many, sql_fetch_one, sql_fetch_optional, @@ -35,12 +36,14 @@ pub async fn from_env(client_name: impl ToString + Debug) -> Result Result, Error> { Ok(redis) } +#[tracing::instrument] +fn clickhouse_from_env() -> Result, Error> { + if let Some(url) = std::env::var("CLICKHOUSE_URL").ok() { + tracing::info!(%url, "clickhouse connecting"); + + // Build HTTP client + let mut http_connector = hyper::client::connect::HttpConnector::new(); + http_connector.enforce_http(false); + http_connector.set_keepalive(Some(Duration::from_secs(60))); + let https_connector = hyper_tls::HttpsConnector::new_with_connector(http_connector); + let http_client = hyper::Client::builder() + .pool_idle_timeout(Duration::from_secs(2)) + .build(https_connector); + + // Build ClickHouse client + let parsed_url = url::Url::parse(&url).map_err(Error::BuildClickHouseUrl)?; + let mut client = clickhouse::Client::with_http_client(http_client) + .with_url(url) + .with_user(parsed_url.username()); + if let Some(password) = parsed_url.password() { + client = client.with_password(password); + } + + Ok(Some(client)) + } else { + Ok(None) + } +} + #[tracing::instrument(level = "trace", skip(_pools))] async fn runtime(_pools: Pools, client_name: String) { // TODO: Delete this once confirmed this is no longer an issue diff --git a/lib/pools/src/pools.rs b/lib/pools/src/pools.rs index 10e90f4bdf..7b3a4cba9e 100644 --- a/lib/pools/src/pools.rs +++ b/lib/pools/src/pools.rs @@ -6,6 +6,7 @@ use crate::Error; pub type NatsPool = async_nats::Client; pub type CrdbPool = sqlx::PgPool; pub type RedisPool = redis::aio::ConnectionManager; +pub type ClickHousePool = clickhouse::Client; pub type Pools = Arc; @@ -14,6 +15,7 @@ pub struct PoolsInner { pub(crate) nats: Option, pub(crate) crdb: Option, pub(crate) redis: HashMap, + pub(crate) clickhouse: Option, } impl PoolsInner { @@ -67,6 +69,10 @@ impl PoolsInner { pub fn redis_cache(&self) -> Result { self.redis("ephemeral") } + + pub fn clickhouse(&self) -> Result { + self.clickhouse.clone().ok_or(Error::MissingClickHousePool) + } } impl PoolsInner { diff --git a/lib/pools/src/utils/clickhouse.rs b/lib/pools/src/utils/clickhouse.rs deleted file mode 100644 index 4db3030947..0000000000 --- a/lib/pools/src/utils/clickhouse.rs +++ /dev/null @@ -1,24 +0,0 @@ -use std::{env, time::Duration}; - -use hyper::client::connect::HttpConnector; -use hyper_tls::HttpsConnector; - -use crate::error::Error; - -const TCP_KEEPALIVE: Duration = Duration::from_secs(60); -const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(2); - -pub fn client() -> Result { - let mut http = HttpConnector::new(); - http.enforce_http(false); - http.set_keepalive(Some(TCP_KEEPALIVE)); - let https = HttpsConnector::new_with_connector(http); - let client = hyper::Client::builder() - .pool_idle_timeout(POOL_IDLE_TIMEOUT) - .build(https); - - let clickhouse_url = env::var("CLICKHOUSE_URL").map_err(Error::Env)?; - let client = clickhouse::Client::with_http_client(client).with_url(clickhouse_url); - - Ok(client) -} diff --git a/lib/pools/src/utils/mod.rs b/lib/pools/src/utils/mod.rs index cc661d371b..3c22d50794 100644 --- a/lib/pools/src/utils/mod.rs +++ b/lib/pools/src/utils/mod.rs @@ -1,3 +1,2 @@ -pub mod clickhouse; pub mod crdb; pub mod sql_query_macros; diff --git a/svc/Cargo.lock b/svc/Cargo.lock index 22a8637aa2..d529bf5381 100644 --- a/svc/Cargo.lock +++ b/svc/Cargo.lock @@ -3074,9 +3074,9 @@ checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" [[package]] name = "form_urlencoded" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ "percent-encoding", ] @@ -4084,9 +4084,9 @@ dependencies = [ [[package]] name = "idna" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" dependencies = [ "unicode-bidi", "unicode-normalization", @@ -5593,9 +5593,9 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.3.0" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "perf-log-get" @@ -6558,6 +6558,7 @@ dependencies = [ "tokio", "tokio-util 0.7.10", "tracing", + "url", ] [[package]] @@ -8516,9 +8517,9 @@ dependencies = [ [[package]] name = "url" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "143b538f18257fac9cad154828a57c6bf5157e1aa604d4816b5995bf6de87ae5" +checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" dependencies = [ "form_urlencoded", "idna", diff --git a/svc/pkg/job-log/ops/read/src/lib.rs b/svc/pkg/job-log/ops/read/src/lib.rs index 98aaa95154..a18905314e 100644 --- a/svc/pkg/job-log/ops/read/src/lib.rs +++ b/svc/pkg/job-log/ops/read/src/lib.rs @@ -11,10 +11,7 @@ struct LogEntry { async fn handle( ctx: OperationContext, ) -> GlobalResult { - let clickhouse = rivet_pools::utils::clickhouse::client()? - .with_user("chirp") - .with_password(util::env::read_secret(&["clickhouse", "users", "chirp", "password"]).await?) - .with_database("db_job_log"); + let clickhouse = ctx.clickhouse().await?; let run_id = unwrap_ref!(ctx.run_id).as_uuid(); let req_query = unwrap_ref!(ctx.query); @@ -49,7 +46,7 @@ async fn handle( async fn query_all( req: &job_log::read::Request, - clickhouse: &clickhouse::Client, + clickhouse: &ClickHousePool, run_id: Uuid, order_by: &str, ) -> GlobalResult> { @@ -57,7 +54,7 @@ async fn query_all( .query(&formatdoc!( " SELECT ts, message - FROM run_logs + FROM db_job_log.run_logs WHERE run_id = ? AND task = ? AND stream_type = ? ORDER BY ts {order_by} LIMIT ? @@ -79,7 +76,7 @@ async fn query_all( async fn query_before_ts( req: &job_log::read::Request, - clickhouse: &clickhouse::Client, + clickhouse: &ClickHousePool, run_id: Uuid, ts: i64, order_by: &str, @@ -88,7 +85,7 @@ async fn query_before_ts( .query(&formatdoc!( " SELECT ts, message - FROM run_logs + FROM db_job_log.run_logs WHERE run_id = ? AND task = ? AND stream_type = ? AND ts < fromUnixTimestamp64Nano(?) ORDER BY ts {order_by} LIMIT ? @@ -111,7 +108,7 @@ async fn query_before_ts( async fn query_after_ts( req: &job_log::read::Request, - clickhouse: &clickhouse::Client, + clickhouse: &ClickHousePool, run_id: Uuid, ts: i64, order_by: &str, @@ -120,7 +117,7 @@ async fn query_after_ts( .query(&formatdoc!( " SELECT ts, message - FROM run_logs + FROM db_job_log.run_logs WHERE run_id = ? AND task = ? AND stream_type = ? AND ts > fromUnixTimestamp64Nano(?) ORDER BY ts {order_by} LIMIT ? @@ -143,7 +140,7 @@ async fn query_after_ts( async fn query_ts_range( req: &job_log::read::Request, - clickhouse: &clickhouse::Client, + clickhouse: &ClickHousePool, run_id: Uuid, after_ts: i64, before_ts: i64, @@ -153,7 +150,7 @@ async fn query_ts_range( .query(&formatdoc!( " SELECT ts, message - FROM run_logs + FROM db_job_log.run_logs WHERE run_id = ? AND task = ? AND stream_type = ? AND ts > fromUnixTimestamp64Nano(?) AND ts < fromUnixTimestamp64Nano(?) ORDER BY ts {order_by} LIMIT ? diff --git a/svc/pkg/job-log/worker/src/workers/export.rs b/svc/pkg/job-log/worker/src/workers/export.rs index 0ed2af4d68..b80045625d 100644 --- a/svc/pkg/job-log/worker/src/workers/export.rs +++ b/svc/pkg/job-log/worker/src/workers/export.rs @@ -8,11 +8,6 @@ struct LogEntry { #[worker(name = "job-log-export")] async fn worker(ctx: &OperationContext) -> GlobalResult<()> { - let clickhouse = rivet_pools::utils::clickhouse::client()? - .with_user("chirp") - .with_password(util::env::read_secret(&["clickhouse", "users", "chirp", "password"]).await?) - .with_database("db_job_log"); - let request_id = unwrap_ref!(ctx.request_id).as_uuid(); let run_id = unwrap_ref!(ctx.run_id).as_uuid(); @@ -22,11 +17,13 @@ async fn worker(ctx: &OperationContext) -> Global backend::job::log::StreamType::StdErr => "stderr.txt", }; - let mut entries_cursor = clickhouse + let mut entries_cursor = ctx + .clickhouse() + .await? .query(indoc!( " SELECT message - FROM run_logs + FROM db_job_log.run_logs WHERE run_id = ? AND task = ? AND stream_type = ? ORDER BY ts ASC "