Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix: avoid cache dead-lock
Browse files Browse the repository at this point in the history
Also added tests for excess entry, and resolved failing test of
zero duration expiry.
  • Loading branch information
maqi authored and Yoga07 committed Jun 6, 2021
1 parent 2ff32ce commit 8aa62ba
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 17 deletions.
1 change: 1 addition & 0 deletions src/cache/item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ mod tests {
#[tokio::test]
async fn expired_when_duration_is_zero() {
let item = Item::new(OBJECT, Some(Duration::new(0, 0)));
tokio::time::sleep(Duration::new(0, 0)).await;
assert_eq!(item.expired(), true);
}
}
61 changes: 44 additions & 17 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::sync::RwLock;
#[derive(Debug)]
pub struct Cache<T, V>
where
T: Hash + Eq,
T: Hash + Eq + Copy,
{
items: RwLock<BTreeMap<T, Item<V>>>,
item_duration: Option<Duration>,
Expand All @@ -29,7 +29,7 @@ where
#[allow(clippy::len_without_is_empty)]
impl<T, V> Cache<T, V>
where
T: Ord + Hash,
T: Ord + Hash + Copy,
{
/// Creating capacity based `Cache`.
pub fn with_capacity(capacity: usize) -> Self {
Expand Down Expand Up @@ -110,33 +110,42 @@ where
}

///
#[allow(unused_assignments)]
pub async fn remove_expired(&self) {
let read_items = self.items.read().await;
let expired_keys: Vec<_> = read_items
.iter()
.filter(|(_, item)| item.expired())
.map(|(key, _)| key)
.collect();
let mut expired_keys = Vec::new();
{
let read_items = self.items.read().await;
expired_keys = read_items
.iter()
.filter(|(_, item)| item.expired())
.map(|(key, _)| *key)
.collect();
}

for key in expired_keys {
let _ = self.items.write().await.remove(key);
let _ = self.items.write().await.remove(&key);
}
}

/// removes keys beyond capacity
#[allow(unused_assignments)]
async fn drop_excess(&self) {
let len = self.len().await;
if len > self.capacity {
let excess = len - self.capacity;
let read_items = self.items.read().await;
let mut items = read_items.iter().collect_vec();
let mut excess_keys = Vec::new();
{
let read_items = self.items.read().await;
let mut items = read_items.iter().collect_vec();

// reversed sort
items.sort_by(|(_, item_a), (_, item_b)| item_b.elapsed().cmp(&item_a.elapsed()));
// reversed sort
items.sort_by(|(_, item_a), (_, item_b)| item_b.elapsed().cmp(&item_a.elapsed()));

// take the excess
for (key, _) in items.iter().take(excess) {
let _ = self.items.write().await.remove(key);
// take the excess
excess_keys = items.iter().take(excess).map(|(key, _)| **key).collect();
}
for key in excess_keys {
let _ = self.items.write().await.remove(&key);
}
}
}
Expand Down Expand Up @@ -208,7 +217,7 @@ mod tests {
#[tokio::test]
async fn remove_expired_item() {
let cache = Cache::with_expiry_duration(Duration::from_secs(0));
let _ = cache.set(KEY, VALUE, None).await;
assert!(cache.set(KEY, VALUE, None).await.is_none());
cache.remove_expired().await;
assert!(
cache.items.read().await.get(&KEY).is_none(),
Expand Down Expand Up @@ -260,4 +269,22 @@ mod tests {
"some value was returned from remove"
);
}

#[tokio::test]
async fn drop_excess_entry_zero_entry() {
let cache = Cache::with_capacity(0);
let _ = cache.set(KEY, VALUE, None).await;
assert!(cache.get(&KEY).await.is_none());
}

#[tokio::test]
async fn drop_excess_entry_one_entry() {
let cache = Cache::with_capacity(1);
let _ = cache.set(KEY, VALUE, None).await;
let key: i8 = 1;
let value: &str = "hello";
let _ = cache.set(key, value, None).await;
assert!(cache.get(&KEY).await.is_none());
assert_eq!(cache.get(&key).await, Some(value));
}
}

0 comments on commit 8aa62ba

Please sign in to comment.