Skip to content

Commit

Permalink
Specify the Cleanup function in Sledconfig
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Dec 28, 2023
1 parent 4db3fbd commit fd3f6b9
Show file tree
Hide file tree
Showing 2 changed files with 441 additions and 177 deletions.
89 changes: 87 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,14 @@ pub(crate) fn timestamp_millis() -> TimestampMillis {
mod tests {
use super::storage::*;
use super::*;
use convert::Bytesize;
use std::time::Duration;

fn get_cfg(name: &str) -> Config {
let cfg = Config {
typ: StorageType::Sled,
sled: SledConfig {
path: format!("./.catch/{}", name),
gc_at_hour: 0,
gc_at_minute: 1,
..Default::default()
},
redis: RedisConfig {
Expand All @@ -121,6 +120,92 @@ mod tests {
cfg
}

#[tokio::main]
#[test]
#[cfg(feature = "ttl")]
async fn test_sled_cleanup() {
use super::{SledStorageDB, StorageDB};
let cfg = Config {
typ: StorageType::Sled,
sled: SledConfig {
path: format!("./.catch/{}", "sled_cleanup"),
cache_capacity: Bytesize::from(1024 * 1024 * 1024 * 3),
cleanup_f: Some(|_db| {
#[cfg(feature = "ttl")]
{
let db = _db.clone();
std::thread::spawn(move || {
let limit = 1000;
loop {
std::thread::sleep(std::time::Duration::from_secs(10));
let mut total_cleanups = 0;
let now = std::time::Instant::now();
loop {
let count = db.cleanup(limit);
total_cleanups += count;
println!(
"def_cleanup: {}, total cleanups: {}, cost time: {:?}",
count,
total_cleanups,
now.elapsed()
);

if count < limit {
break;
}
}
println!(
"total cleanups: {}, cost time: {:?}",
total_cleanups,
now.elapsed()
);
}
});
}
}),
..Default::default()
},
redis: RedisConfig {
url: "redis://127.0.0.1:6379/".into(),
prefix: "sled_cleanup".to_owned(),
},
};

let db = SledStorageDB::new(cfg.sled.clone()).await.unwrap();
let max = 30000;

for i in 0..max {
let map = db.map(format!("map_{}", i));
map.insert("k_1", &1).await.unwrap();
map.insert("k_2", &2).await.unwrap();
map.expire(1000).await.unwrap();
}

for i in 0..max {
let list = db.list(format!("list_{}", i));
list.push(&1).await.unwrap();
list.push(&2).await.unwrap();
list.expire(1000).await.unwrap();
}

tokio::time::sleep(Duration::from_millis(1200)).await;

println!(
"db_size: {:?}, map_size: {}, list_size: {}",
db.db_size(),
db.map_size(),
db.list_size()
);

tokio::time::sleep(Duration::from_secs(30)).await;
println!(
"db_size: {:?}, map_size: {}, list_size: {}",
db.db_size(),
db.map_size(),
db.list_size()
);
}

#[tokio::main]
#[test]
async fn test_stress() {
Expand Down

0 comments on commit fd3f6b9

Please sign in to comment.