Skip to content

Commit

Permalink
Add redis eviction-policy check/warning (#80)
Browse files Browse the repository at this point in the history
We want to warn users if queues are at potential risk of deletion
in low-memory situations. Add a startup check that will log a
warning if the eviction policy can't be determined or if an unsafe
eviction policy is found.

Note: The `warn` is likely to fire all the time on cloud providers,
which tend to disable the `CONFIG` command entirely.
  • Loading branch information
jaymell committed Apr 25, 2024
1 parent 5ae2200 commit 12f0e4b
Showing 1 changed file with 53 additions and 0 deletions.
53 changes: 53 additions & 0 deletions omniqueue/src/backends/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use redis::{
};
use serde::Serialize;
use svix_ksuid::KsuidLike;
use thiserror::Error;
use tokio::task::JoinSet;
use tracing::{debug, error, trace, warn};

Expand Down Expand Up @@ -90,6 +91,49 @@ impl RedisConnection for RedisClusterConnectionManager {
}
}

#[derive(Debug, Error)]
enum EvictionCheckError {
#[error("Unable to verify eviction policy. Ensure `maxmemory-policy` set to `noeviction` or `volatile-*`")]
CheckEvictionPolicyFailed,
#[error("Unsafe eviction policy found. Your queue is at risk of data loss. Please ensure `maxmemory-policy` set to `noeviction` or `volatile-*`")]
UnsafeEvictionPolicy,
}

async fn check_eviction_policy<R: RedisConnection>(
pool: bb8::Pool<R>,
) -> std::result::Result<(), EvictionCheckError> {
let mut conn = pool
.get()
.await
.map_err(|_| EvictionCheckError::CheckEvictionPolicyFailed)?;

let results: Vec<String> = redis::cmd("CONFIG")
.arg("GET")
.arg("maxmemory-policy")
.query_async::<<R as RedisConnection>::Connection, Vec<String>>(&mut *conn)
.await
.map_err(|_| EvictionCheckError::CheckEvictionPolicyFailed)?;

let eviction_policy = results
.get(1)
.ok_or(EvictionCheckError::CheckEvictionPolicyFailed)?;

if [
"noeviction",
"volatile-lru",
"volatile-lfu",
"volatile-random",
"volatile-ttl",
]
.contains(&eviction_policy.as_str())
{
tracing::debug!("Eviction policy `{eviction_policy}` found");
Ok(())
} else {
Err(EvictionCheckError::UnsafeEvictionPolicy)
}
}

pub struct RedisConfig {
pub dsn: String,
pub max_connections: u16,
Expand Down Expand Up @@ -326,6 +370,15 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
}
});

join_set.spawn({
async move {
if let Err(e) = check_eviction_policy(redis.clone()).await {
tracing::warn!("{e}");
}
Ok(())
}
});

Arc::new(join_set)
}
}
Expand Down

0 comments on commit 12f0e4b

Please sign in to comment.