Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions lib/bolt/core/src/context/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,16 +893,6 @@ impl ServiceContextData {
self.config().cockroachdb.min_connections.to_string(),
));

if self.depends_on_clickhouse() {
let clickhouse_data = terraform::output::read_clickhouse(&project_ctx).await;
let clickhouse_host = format!(
"https://{}:{}",
*clickhouse_data.host, *clickhouse_data.port_https
);

env.push(("CLICKHOUSE_URL".into(), clickhouse_host));
}

if self.depends_on_prometheus_api() {
env.push((
format!("PROMETHEUS_URL"),
Expand Down Expand Up @@ -1159,6 +1149,21 @@ impl ServiceContextData {
));
}

// ClickHouse
if self.depends_on_clickhouse() {
let clickhouse_data = terraform::output::read_clickhouse(&project_ctx).await;
let username = "chirp";
let password = project_ctx
.read_secret(&["clickhouse", "users", username, "password"])
.await?;
let uri = format!(
"https://{}:{}@{}:{}",
username, password, *clickhouse_data.host, *clickhouse_data.port_https
);

env.push(("CLICKHOUSE_URL".into(), uri));
}

// Expose S3 endpoints to services that need them
let s3_deps = if self.depends_on_s3() {
project_ctx.all_services().await.to_vec()
Expand Down
4 changes: 4 additions & 0 deletions lib/chirp/worker/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,8 @@ impl TestCtx {
pub async fn redis_user_presence(&self) -> Result<RedisPool, rivet_pools::Error> {
self.op_ctx.redis_user_presence().await
}

pub async fn clickhouse(&self) -> Result<ClickHousePool, rivet_pools::Error> {
self.op_ctx.clickhouse().await
}
}
4 changes: 4 additions & 0 deletions lib/connection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ impl Connection {
pub fn perf(&self) -> &chirp_perf::PerfCtx {
self.client.perf()
}

pub async fn clickhouse(&self) -> Result<ClickHousePool, rivet_pools::Error> {
self.pools.clickhouse()
}
}

impl std::ops::Deref for Connection {
Expand Down
4 changes: 4 additions & 0 deletions lib/operation/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ where
pub fn perf(&self) -> &chirp_perf::PerfCtx {
self.conn.perf()
}

pub async fn clickhouse(&self) -> Result<ClickHousePool, rivet_pools::Error> {
self.conn.clickhouse().await
}
}

impl<B> std::ops::Deref for OperationContext<B>
Expand Down
1 change: 1 addition & 0 deletions lib/pools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tokio = { version = "1.29", features = ["tracing"] }
tokio-util = "0.7"
tracing = "0.1"
governor = "0.6"
url = "2.5.0"

[dependencies.sqlx]
version = "0.7.1"
Expand Down
9 changes: 9 additions & 0 deletions lib/pools/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub enum Error {
#[error("missing redis pool: {key:?}")]
MissingRedisPool { key: Option<String> },

#[error("missing clickhouse pool")]
MissingClickHousePool,

#[error("tokio join: {0}")]
TokioJoin(tokio::task::JoinError),

Expand All @@ -29,4 +32,10 @@ pub enum Error {

#[error("build sqlx: {0}")]
BuildSqlx(sqlx::Error),

#[error("build clickhouse: {0}")]
BuildClickHouse(clickhouse::error::Error),

#[error("build clickhouse url: {0}")]
BuildClickHouseUrl(url::ParseError),
}
36 changes: 34 additions & 2 deletions lib/pools/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ use std::{collections::HashMap, env, fmt::Debug, str::FromStr, sync::Arc, time::
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;

use crate::pools::{CrdbPool, NatsPool, PoolsInner, RedisPool};
use crate::pools::{ClickHousePool, CrdbPool, NatsPool, PoolsInner, RedisPool};

pub mod prelude {
pub use async_nats as nats;
pub use clickhouse;
pub use redis;
pub use sqlx;

pub use crate::pools::{CrdbPool, NatsPool, RedisPool};
pub use crate::pools::{ClickHousePool, CrdbPool, NatsPool, RedisPool};
pub use crate::{
__sql_query, __sql_query_as, __sql_query_as_raw, sql_execute, sql_fetch, sql_fetch_all,
sql_fetch_many, sql_fetch_one, sql_fetch_optional,
Expand All @@ -35,12 +36,14 @@ pub async fn from_env(client_name: impl ToString + Debug) -> Result<Pools, Error
crdb_from_env(client_name.clone()),
redis_from_env(),
)?;
let clickhouse = clickhouse_from_env()?;

let pool = Arc::new(PoolsInner {
_guard: token.clone().drop_guard(),
nats,
crdb,
redis,
clickhouse,
});
pool.clone().start(token);

Expand Down Expand Up @@ -225,6 +228,35 @@ async fn redis_from_env() -> Result<HashMap<String, RedisPool>, Error> {
Ok(redis)
}

#[tracing::instrument]
fn clickhouse_from_env() -> Result<Option<ClickHousePool>, Error> {
if let Some(url) = std::env::var("CLICKHOUSE_URL").ok() {
tracing::info!(%url, "clickhouse connecting");

// Build HTTP client
let mut http_connector = hyper::client::connect::HttpConnector::new();
http_connector.enforce_http(false);
http_connector.set_keepalive(Some(Duration::from_secs(60)));
let https_connector = hyper_tls::HttpsConnector::new_with_connector(http_connector);
let http_client = hyper::Client::builder()
.pool_idle_timeout(Duration::from_secs(2))
.build(https_connector);

// Build ClickHouse client
let parsed_url = url::Url::parse(&url).map_err(Error::BuildClickHouseUrl)?;
let mut client = clickhouse::Client::with_http_client(http_client)
.with_url(url)
.with_user(parsed_url.username());
if let Some(password) = parsed_url.password() {
client = client.with_password(password);
}

Ok(Some(client))
} else {
Ok(None)
}
}

#[tracing::instrument(level = "trace", skip(_pools))]
async fn runtime(_pools: Pools, client_name: String) {
// TODO: Delete this once confirmed this is no longer an issue
Expand Down
6 changes: 6 additions & 0 deletions lib/pools/src/pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::Error;
pub type NatsPool = async_nats::Client;
pub type CrdbPool = sqlx::PgPool;
pub type RedisPool = redis::aio::ConnectionManager;
pub type ClickHousePool = clickhouse::Client;

pub type Pools = Arc<PoolsInner>;

Expand All @@ -14,6 +15,7 @@ pub struct PoolsInner {
pub(crate) nats: Option<NatsPool>,
pub(crate) crdb: Option<CrdbPool>,
pub(crate) redis: HashMap<String, RedisPool>,
pub(crate) clickhouse: Option<clickhouse::Client>,
}

impl PoolsInner {
Expand Down Expand Up @@ -67,6 +69,10 @@ impl PoolsInner {
pub fn redis_cache(&self) -> Result<RedisPool, Error> {
self.redis("ephemeral")
}

pub fn clickhouse(&self) -> Result<ClickHousePool, Error> {
self.clickhouse.clone().ok_or(Error::MissingClickHousePool)
}
}

impl PoolsInner {
Expand Down
24 changes: 0 additions & 24 deletions lib/pools/src/utils/clickhouse.rs

This file was deleted.

1 change: 0 additions & 1 deletion lib/pools/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
pub mod clickhouse;
pub mod crdb;
pub mod sql_query_macros;
17 changes: 9 additions & 8 deletions svc/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 9 additions & 12 deletions svc/pkg/job-log/ops/read/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ struct LogEntry {
async fn handle(
ctx: OperationContext<job_log::read::Request>,
) -> GlobalResult<job_log::read::Response> {
let clickhouse = rivet_pools::utils::clickhouse::client()?
.with_user("chirp")
.with_password(util::env::read_secret(&["clickhouse", "users", "chirp", "password"]).await?)
.with_database("db_job_log");
let clickhouse = ctx.clickhouse().await?;

let run_id = unwrap_ref!(ctx.run_id).as_uuid();
let req_query = unwrap_ref!(ctx.query);
Expand Down Expand Up @@ -49,15 +46,15 @@ async fn handle(

async fn query_all(
req: &job_log::read::Request,
clickhouse: &clickhouse::Client,
clickhouse: &ClickHousePool,
run_id: Uuid,
order_by: &str,
) -> GlobalResult<Vec<backend::job::log::LogEntry>> {
let mut entries_cursor = clickhouse
.query(&formatdoc!(
"
SELECT ts, message
FROM run_logs
FROM db_job_log.run_logs
WHERE run_id = ? AND task = ? AND stream_type = ?
ORDER BY ts {order_by}
LIMIT ?
Expand All @@ -79,7 +76,7 @@ async fn query_all(

async fn query_before_ts(
req: &job_log::read::Request,
clickhouse: &clickhouse::Client,
clickhouse: &ClickHousePool,
run_id: Uuid,
ts: i64,
order_by: &str,
Expand All @@ -88,7 +85,7 @@ async fn query_before_ts(
.query(&formatdoc!(
"
SELECT ts, message
FROM run_logs
FROM db_job_log.run_logs
WHERE run_id = ? AND task = ? AND stream_type = ? AND ts < fromUnixTimestamp64Nano(?)
ORDER BY ts {order_by}
LIMIT ?
Expand All @@ -111,7 +108,7 @@ async fn query_before_ts(

async fn query_after_ts(
req: &job_log::read::Request,
clickhouse: &clickhouse::Client,
clickhouse: &ClickHousePool,
run_id: Uuid,
ts: i64,
order_by: &str,
Expand All @@ -120,7 +117,7 @@ async fn query_after_ts(
.query(&formatdoc!(
"
SELECT ts, message
FROM run_logs
FROM db_job_log.run_logs
WHERE run_id = ? AND task = ? AND stream_type = ? AND ts > fromUnixTimestamp64Nano(?)
ORDER BY ts {order_by}
LIMIT ?
Expand All @@ -143,7 +140,7 @@ async fn query_after_ts(

async fn query_ts_range(
req: &job_log::read::Request,
clickhouse: &clickhouse::Client,
clickhouse: &ClickHousePool,
run_id: Uuid,
after_ts: i64,
before_ts: i64,
Expand All @@ -153,7 +150,7 @@ async fn query_ts_range(
.query(&formatdoc!(
"
SELECT ts, message
FROM run_logs
FROM db_job_log.run_logs
WHERE run_id = ? AND task = ? AND stream_type = ? AND ts > fromUnixTimestamp64Nano(?) AND ts < fromUnixTimestamp64Nano(?)
ORDER BY ts {order_by}
LIMIT ?
Expand Down
11 changes: 4 additions & 7 deletions svc/pkg/job-log/worker/src/workers/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ struct LogEntry {

#[worker(name = "job-log-export")]
async fn worker(ctx: &OperationContext<job_log::msg::export::Message>) -> GlobalResult<()> {
let clickhouse = rivet_pools::utils::clickhouse::client()?
.with_user("chirp")
.with_password(util::env::read_secret(&["clickhouse", "users", "chirp", "password"]).await?)
.with_database("db_job_log");

let request_id = unwrap_ref!(ctx.request_id).as_uuid();
let run_id = unwrap_ref!(ctx.run_id).as_uuid();

Expand All @@ -22,11 +17,13 @@ async fn worker(ctx: &OperationContext<job_log::msg::export::Message>) -> Global
backend::job::log::StreamType::StdErr => "stderr.txt",
};

let mut entries_cursor = clickhouse
let mut entries_cursor = ctx
.clickhouse()
.await?
.query(indoc!(
"
SELECT message
FROM run_logs
FROM db_job_log.run_logs
WHERE run_id = ? AND task = ? AND stream_type = ?
ORDER BY ts ASC
"
Expand Down