Skip to content

Commit

Permalink
Automatically clear expired items in SortedSet.
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Mar 2, 2024
1 parent 66fda04 commit 93f05d8
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 49 deletions.
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ mod tests {

fn get_cfg(name: &str) -> Config {
let cfg = Config {
typ: StorageType::Sled,
typ: StorageType::Redis,
sled: SledConfig {
path: format!("./.catch/{}", name),
cleanup_f: |_db| {},
Expand Down Expand Up @@ -1759,5 +1759,6 @@ mod tests {
.unwrap();

assert_eq!(db.len().await.unwrap(), 2);
println!("test_len len: {:?}", db.len().await);
}
}
113 changes: 65 additions & 48 deletions src/storage_redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,26 @@ impl RedisStorageDB {
return Err(anyhow!(e));
}
};
Ok(Self { prefix, async_conn })
let db = Self { prefix, async_conn }.cleanup();
Ok(db)
}

fn cleanup(self) -> Self {
let db = self.clone();
std::thread::spawn(move || loop {
std::thread::sleep(std::time::Duration::from_secs(30));
let mut async_conn = db.async_conn();
let db_zkey = db.make_len_sortedset_key();
futures::executor::block_on(async move {
if let Err(e) = async_conn
.zrembyscore::<'_, _, _, _, ()>(db_zkey.as_slice(), 0, timestamp_millis())
.await
{
log::error!("{:?}", e);
}
});
});
self
}

#[inline]
Expand Down Expand Up @@ -165,7 +184,7 @@ impl RedisStorageDB {
K: AsRef<[u8]> + Sync + Send,
V: serde::ser::Serialize + Sync + Send,
{
let full_key = self.make_full_key(key);
let full_key = self.make_full_key(key.as_ref());

#[cfg(not(feature = "len"))]
{
Expand All @@ -192,14 +211,14 @@ impl RedisStorageDB {
.atomic()
.set(full_key.as_slice(), bincode::serialize(val)?)
.pexpire(full_key.as_slice(), expire_interval)
.zadd(db_zkey, full_key, timestamp_millis() + expire_interval)
.zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval)
.query_async::<_, ()>(&mut async_conn)
.await?;
} else {
pipe()
.atomic()
.set(full_key.as_slice(), bincode::serialize(val)?)
.zadd(db_zkey, full_key, i64::MAX)
.zadd(db_zkey, key.as_ref(), i64::MAX)
.query_async::<_, ()>(&mut async_conn)
.await?;
}
Expand All @@ -211,19 +230,20 @@ impl RedisStorageDB {
#[inline]
async fn _batch_insert(
&self,
key_vals: Vec<(Key, Vec<u8>, Option<TimestampMillis>)>,
key_val_expires: Vec<(Key, Vec<u8>, Option<TimestampMillis>)>,
) -> Result<()> {
// let full_key = self.make_full_key(k);
#[cfg(not(feature = "len"))]
{
let keys_and_values: Vec<(&Key, &Vec<u8>)> = key_vals
let keys_vals: Vec<(Key, &Vec<u8>)> = key_val_expires
.iter()
.map(|(key_ref, value, _)| (key_ref, value))
.map(|(key_ref, value, _)| (self.make_full_key(key_ref), value))
.collect();

let mut async_conn = self.async_conn();
let mut p = pipe();
let mut rpipe = p.atomic().mset(keys_and_values.as_slice());
for (k, _, at) in key_vals {
let mut rpipe = p.atomic().mset(keys_vals.as_slice());
for (k, _, at) in key_val_expires {
if let Some(at) = at {
rpipe = rpipe.expire(k, at);
}
Expand All @@ -233,31 +253,29 @@ impl RedisStorageDB {

#[cfg(feature = "len")]
{
let (keys_and_values, keys_and_expire_at): (
Vec<(&Key, &Vec<u8>)>,
Vec<(TimestampMillis, &Key)>,
) = key_vals
.iter()
.map(|(key_ref, value, timestamp)| {
let value_tuple = (key_ref, value);
let timestamp_tuple = (
timestamp
.map(|t| timestamp_millis() + t)
.unwrap_or(i64::MAX),
key_ref,
);
(value_tuple, timestamp_tuple)
})
.unzip();
let (full_key_vals, expire_keys): (Vec<(Key, &Vec<u8>)>, Vec<(TimestampMillis, &Key)>) =
key_val_expires
.iter()
.map(|(key_ref, value, timestamp)| {
let full_key_vals = (self.make_full_key(key_ref), value);
let expire_keys = (
timestamp
.map(|t| timestamp_millis() + t)
.unwrap_or(i64::MAX),
key_ref,
);
(full_key_vals, expire_keys)
})
.unzip();

let db_zkey = self.make_len_sortedset_key();
let mut async_conn = self.async_conn();
let mut p = pipe();
let mut rpipe = p
.atomic()
.mset(keys_and_values.as_slice())
.zadd_multiple(db_zkey, keys_and_expire_at.as_slice());
for (k, _, at) in key_vals {
.mset(full_key_vals.as_slice())
.zadd_multiple(db_zkey, expire_keys.as_slice());
for (k, _, at) in key_val_expires {
if let Some(at) = at {
rpipe = rpipe.expire(k, at);
}
Expand All @@ -268,7 +286,11 @@ impl RedisStorageDB {
}

#[inline]
async fn _batch_remove(&self, full_keys: Vec<Key>) -> Result<()> {
async fn _batch_remove(&self, keys: Vec<Key>) -> Result<()> {
let full_keys = keys
.iter()
.map(|k| self.make_full_key(k))
.collect::<Vec<_>>();
#[cfg(not(feature = "len"))]
{
self.async_conn().del(full_keys).await?;
Expand All @@ -280,7 +302,7 @@ impl RedisStorageDB {
pipe()
.atomic()
.del(full_keys.as_slice())
.zrem(db_zkey, full_keys)
.zrem(db_zkey, keys)
.query_async::<_, ()>(&mut async_conn)
.await?;
}
Expand All @@ -297,7 +319,7 @@ impl RedisStorageDB {
where
K: AsRef<[u8]> + Sync + Send,
{
let full_key = self.make_full_key(key);
let full_key = self.make_full_key(key.as_ref());
#[cfg(not(feature = "len"))]
{
if let Some(expire_interval) = expire_interval {
Expand All @@ -321,14 +343,14 @@ impl RedisStorageDB {
.atomic()
.incr(full_key.as_slice(), increment)
.pexpire(full_key.as_slice(), expire_interval)
.zadd(db_zkey, full_key, timestamp_millis() + expire_interval)
.zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval)
.query_async::<_, ()>(&mut async_conn)
.await?;
} else {
pipe()
.atomic()
.incr(full_key.as_slice(), increment)
.zadd(db_zkey, full_key, i64::MAX)
.zadd(db_zkey, key.as_ref(), i64::MAX)
.query_async::<_, ()>(&mut async_conn)
.await?;
}
Expand All @@ -346,7 +368,7 @@ impl RedisStorageDB {
where
K: AsRef<[u8]> + Sync + Send,
{
let full_key = self.make_full_key(key);
let full_key = self.make_full_key(key.as_ref());

#[cfg(not(feature = "len"))]
{
Expand All @@ -371,14 +393,14 @@ impl RedisStorageDB {
.atomic()
.decr(full_key.as_slice(), decrement)
.pexpire(full_key.as_slice(), expire_interval)
.zadd(db_zkey, full_key, timestamp_millis() + expire_interval)
.zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval)
.query_async::<_, ()>(&mut async_conn)
.await?;
} else {
pipe()
.atomic()
.decr(full_key.as_slice(), decrement)
.zadd(db_zkey, full_key, i64::MAX)
.zadd(db_zkey, key.as_ref(), i64::MAX)
.query_async::<_, ()>(&mut async_conn)
.await?;
}
Expand All @@ -396,7 +418,7 @@ impl RedisStorageDB {
where
K: AsRef<[u8]> + Sync + Send,
{
let full_key = self.make_full_key(key);
let full_key = self.make_full_key(key.as_ref());
#[cfg(not(feature = "len"))]
{
if let Some(expire_interval) = expire_interval {
Expand All @@ -420,14 +442,14 @@ impl RedisStorageDB {
.atomic()
.set(full_key.as_slice(), val)
.pexpire(full_key.as_slice(), expire_interval)
.zadd(db_zkey, full_key, timestamp_millis() + expire_interval)
.zadd(db_zkey, key.as_ref(), timestamp_millis() + expire_interval)
.query_async::<_, ()>(&mut async_conn)
.await?;
} else {
pipe()
.atomic()
.set(full_key.as_slice(), val)
.zadd(db_zkey, full_key, i64::MAX)
.zadd(db_zkey, key.as_ref(), i64::MAX)
.query_async::<_, ()>(&mut async_conn)
.await?;
}
Expand All @@ -454,7 +476,7 @@ impl RedisStorageDB {
pipe()
.atomic()
.del(full_key.as_slice())
.zrem(db_zkey, full_key)
.zrem(db_zkey, key.as_ref())
.query_async::<_, ()>(&mut async_conn)
.await?;
}
Expand Down Expand Up @@ -569,9 +591,8 @@ impl StorageDB for RedisStorageDB {
let keys_vals_expires = key_vals
.into_iter()
.map(|(k, v)| {
let full_key = self.make_full_key(k);
bincode::serialize(&v)
.map(move |v| (full_key, v, None))
.map(move |v| (k, v, None))
.map_err(|e| anyhow!(e))
})
.collect::<Result<Vec<_>>>()?;
Expand All @@ -583,10 +604,6 @@ impl StorageDB for RedisStorageDB {
#[inline]
async fn batch_remove(&self, keys: Vec<Key>) -> Result<()> {
if !keys.is_empty() {
let keys = keys
.into_iter()
.map(|k| self.make_full_key(k))
.collect::<Vec<_>>();
self._batch_remove(keys).await?;
}
Ok(())
Expand Down Expand Up @@ -686,7 +703,7 @@ impl StorageDB for RedisStorageDB {
let mut async_conn = self.async_conn();
let (_, res) = pipe()
.atomic()
.zadd(db_zkey, full_name.as_slice(), at)
.zadd(db_zkey, key.as_ref(), at)
.pexpire_at(full_name.as_slice(), at)
.query_async::<_, (i64, bool)>(&mut async_conn)
.await?;
Expand All @@ -713,7 +730,7 @@ impl StorageDB for RedisStorageDB {
let mut async_conn = self.async_conn();
let (_, res) = pipe()
.atomic()
.zadd(db_zkey, full_name.as_slice(), timestamp_millis() + dur)
.zadd(db_zkey, key.as_ref(), timestamp_millis() + dur)
.pexpire(full_name.as_slice(), dur)
.query_async::<_, (i64, bool)>(&mut async_conn)
.await?;
Expand Down

0 comments on commit 93f05d8

Please sign in to comment.