Skip to content

Commit

Permalink
fix(logger): don't block when deleting old logs (#1690)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonaro00 authored Mar 21, 2024
1 parent ee7809d commit cb9559f
Showing 1 changed file with 12 additions and 15 deletions.
27 changes: 12 additions & 15 deletions logger/src/dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use prost_types::Timestamp;
use shuttle_proto::logger::{LogItem, LogLine};
use sqlx::{
migrate::Migrator,
postgres::PgConnectOptions,
types::chrono::{DateTime, Utc},
Executor, FromRow, PgPool, QueryBuilder,
};
Expand Down Expand Up @@ -60,14 +59,7 @@ impl Postgres {
pub async fn new(connection_uri: &Uri) -> Self {
let pool = PgPool::connect(connection_uri.to_string().as_str())
.await
.expect("to be able to connect to the postgres db using the connection url");
Self::from_pool(pool).await
}

pub async fn with_options(options: PgConnectOptions) -> Self {
let pool = PgPool::connect_with(options)
.await
.expect("to be able to connect to the postgres db using the pg connect options");
.expect("to connect to the db");
Self::from_pool(pool).await
}

Expand All @@ -77,13 +69,17 @@ impl Postgres {
.await
.expect("to run migrations successfully");

// Perform cleaning of old logs on startup
pool.execute("DELETE FROM logs WHERE tx_timestamp < (NOW() - INTERVAL '1 month')")
.await
.expect("to clean old logs successfully");
let pool_clone = pool.clone();
tokio::spawn(async move {
info!("cleaning old logs");
pool_clone
.execute("DELETE FROM logs WHERE tx_timestamp < (NOW() - INTERVAL '1 month')")
.await
.expect("to clean old logs successfully");
info!("done cleaning old logs");
});

let (tx, mut rx) = broadcast::channel::<(Vec<Log>, Span)>(1000);
let pool_spawn = pool.clone();

let interval_tx = tx.clone();
tokio::spawn(async move {
Expand All @@ -95,6 +91,7 @@ impl Postgres {
}
});

let pool_clone = pool.clone();
tokio::spawn(async move {
loop {
match rx.recv().await {
Expand Down Expand Up @@ -124,7 +121,7 @@ impl Postgres {
});
let query = builder.build();

if let Err(error) = query.execute(&pool_spawn).instrument(parent_span).await
if let Err(error) = query.execute(&pool_clone).instrument(parent_span).await
{
error!(
error = &error as &dyn std::error::Error,
Expand Down

0 comments on commit cb9559f

Please sign in to comment.