Skip to content

Commit

Permalink
Merge branch 'master' into clear_if_apply_snap_failed
Browse files Browse the repository at this point in the history
  • Loading branch information
LykxSassinator committed May 24, 2024
2 parents 4a4fa89 + fba1fb2 commit b34a49f
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 44 deletions.
74 changes: 72 additions & 2 deletions components/raftstore/src/store/entry_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ impl CachedEntries {
pub fn take_entries(&self) -> (Vec<Entry>, usize) {
mem::take(&mut *self.entries.lock().unwrap())
}

#[cfg(test)]
pub fn has_entries(&self) -> bool {
!self.entries.lock().unwrap().0.is_empty()
}
}

struct EntryCache {
Expand Down Expand Up @@ -236,10 +241,16 @@ impl EntryCache {

// Clean cached entries which have been already sent to apply threads. For
// example, if entries [1, 10), [10, 20), [20, 30) are sent to apply threads and
// `compact_to(15)` is called, only [20, 30) will still be kept in cache.
// `compact_to(15)` is called:
// - if persisted >= 19, then only [20, 30) will still be kept in cache.
// - if persisted < 19, then [10, 20), [20, 30) will still be kept in cache.
let old_trace_cap = self.trace.capacity();
while let Some(cached_entries) = self.trace.pop_front() {
if cached_entries.range.start >= idx {
// Do not evict cached entries if not all of them are persisted.
// After PR #16626, it is possible that applying entries are not
// yet fully persisted. Therefore, it should not free these
// entries until they are completely persisted.
if cached_entries.range.start >= idx || cached_entries.range.end > self.persisted + 1 {
self.trace.push_front(cached_entries);
let trace_len = self.trace.len();
let trace_cap = self.trace.capacity();
Expand Down Expand Up @@ -1884,4 +1895,63 @@ pub mod tests {
// Cache should be warmed up.
assert_eq!(store.entry_cache_first_index().unwrap(), 5);
}

#[test]
fn test_evict_cached_entries() {
let ents = vec![new_entry(3, 3)];
let td = Builder::new().prefix("tikv-store-test").tempdir().unwrap();
let worker = LazyWorker::new("snap-manager");
let sched = worker.scheduler();
let (dummy_scheduler, _) = dummy_scheduler();
let mut store = new_storage_from_ents(sched, dummy_scheduler, &td, &ents);

// initial cache
for i in 4..10 {
append_ents(&mut store, &[new_entry(i, 4)]);
}

let cached_entries = vec![
CachedEntries::new(vec![new_entry(4, 4)]),
CachedEntries::new(vec![new_entry(5, 4)]),
CachedEntries::new(vec![new_entry(6, 4), new_entry(7, 4), new_entry(8, 4)]),
CachedEntries::new(vec![new_entry(9, 4)]),
];
for ents in &cached_entries {
store.trace_cached_entries(ents.clone());
}
assert_eq!(store.cache.first_index().unwrap(), 4);

store.evict_entry_cache(false);
assert_eq!(store.cache.first_index().unwrap(), 4);
assert!(cached_entries[0].has_entries());

store.cache.persisted = 4;
store.evict_entry_cache(false);
assert_eq!(store.cache.first_index().unwrap(), 5);
assert!(!cached_entries[0].has_entries());
assert!(cached_entries[1].has_entries());

store.cache.persisted = 5;
store.evict_entry_cache(false);
assert_eq!(store.cache.first_index().unwrap(), 6);
assert!(!cached_entries[1].has_entries());
assert!(cached_entries[2].has_entries());

for idx in [6, 7] {
store.cache.persisted = idx;
store.evict_entry_cache(false);
assert_eq!(store.cache.first_index().unwrap(), idx + 1);
assert!(cached_entries[2].has_entries());
}

store.cache.persisted = 8;
store.evict_entry_cache(false);
assert_eq!(store.cache.first_index().unwrap(), 9);
assert!(!cached_entries[2].has_entries());

store.cache.persisted = 9;
store.evict_entry_cache(false);
assert!(store.cache.first_index().is_none());
assert!(!cached_entries[3].has_entries());
}
}
108 changes: 66 additions & 42 deletions components/raftstore/src/store/snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
use std::{
borrow::Cow,
cmp::{self, Ordering as CmpOrdering, Reverse},
collections::VecDeque,
error::Error as StdError,
fmt::{self, Display, Formatter},
io::{self, ErrorKind, Read, Write},
Expand Down Expand Up @@ -1884,14 +1883,14 @@ impl SnapManager {
/// function to consult the concurrency limiter, determining if it can
/// receive a new snapshot. If the precheck is successful, the leader will
/// proceed to generate and send the snapshot.
pub fn recv_snap_precheck(&self) -> bool {
self.core.recv_concurrency_limiter.try_recv()
pub fn recv_snap_precheck(&self, region_id: u64) -> bool {
self.core.recv_concurrency_limiter.try_recv(region_id)
}

/// recv_snap_complete is part of the snapshot recv precheck process, and
/// should be called when a follower finishes receiving a snapshot.
pub fn recv_snap_complete(&self) {
self.core.recv_concurrency_limiter.finish_recv()
pub fn recv_snap_complete(&self, region_id: u64) {
self.core.recv_concurrency_limiter.finish_recv(region_id)
}
}

Expand Down Expand Up @@ -2000,7 +1999,7 @@ impl SnapManagerCore {
pub struct SnapRecvConcurrencyLimiter {
limit: Arc<AtomicUsize>,
ttl_secs: u64,
timestamps: Arc<Mutex<VecDeque<Instant>>>,
timestamps: Arc<Mutex<HashMap<u64, Instant>>>,
}

impl SnapRecvConcurrencyLimiter {
Expand All @@ -2009,46 +2008,65 @@ impl SnapRecvConcurrencyLimiter {
SnapRecvConcurrencyLimiter {
limit: Arc::new(AtomicUsize::new(limit)),
ttl_secs,
timestamps: Arc::new(Mutex::new(VecDeque::with_capacity(limit))),
timestamps: Arc::new(Mutex::new(HashMap::with_capacity_and_hasher(
limit,
Default::default(),
))),
}
}

// Attempts to add a snapshot receive operation if below the concurrency
// limit. Returns true if the operation is allowed, false otherwise.
pub fn try_recv(&self) -> bool {
pub fn try_recv(&self, region_id: u64) -> bool {
let mut timestamps = self.timestamps.lock().unwrap();
let current_time = Instant::now();
self.evict_expired_timestamps(&mut timestamps, current_time);

let limit = self.limit.load(Ordering::Relaxed);
if limit == 0 {
// 0 means no limit. In that case, we avoid pushing into the
// VecDeque to prevent it from growing indefinitely.
// 0 means no limit. In that case, we avoid inserting into the hash
// map to prevent it from growing indefinitely.
return true;
}

if timestamps.len() < limit {
timestamps.push_back(current_time);
// Insert into the map if its size is within limit. If the region id is
// already present in the map, update its timestamp.
if timestamps.len() < limit || timestamps.contains_key(&region_id) {
timestamps.insert(region_id, current_time);
return true;
}
false
}

fn evict_expired_timestamps(&self, timestamps: &mut VecDeque<Instant>, current_time: Instant) {
while let Some(&timestamp) = timestamps.front()
&& current_time.duration_since(timestamp) > Duration::from_secs(self.ttl_secs)
{
timestamps.pop_front();
}
fn evict_expired_timestamps(
&self,
timestamps: &mut HashMap<u64, Instant>,
current_time: Instant,
) {
timestamps.retain(|region_id, timestamp| {
if current_time.duration_since(*timestamp) <= Duration::from_secs(self.ttl_secs) {
true
} else {
// This shouldn't happen if the TTL is set properly. When it
// does happen, the limiter may permit more snapshots than the
// configured limit to be sent and trigger the receiver busy
// error.
warn!(
"region {} expired in the snap recv concurrency limiter",
region_id
);
false
}
});
timestamps.shrink_to(self.limit.load(Ordering::Relaxed));
}

// Completes a snapshot receive operation by removing a timestamp from the
// queue. It is sufficient to remove the head of the queue instead of
// finding the matching timestamp, as we are only concerned with maintaining
// a total count of active operations.
pub fn finish_recv(&self) {
self.timestamps.lock().unwrap().pop_front();
pub fn finish_recv(&self, region_id: u64) {
self.timestamps.lock().unwrap().remove(&region_id);
}

pub fn set_limit(&self, limit: usize) {
Expand Down Expand Up @@ -3503,48 +3521,54 @@ pub mod tests {
let ttl_secs = 60;

let limiter = SnapRecvConcurrencyLimiter::new(1, ttl_secs);
limiter.finish_recv(); // calling finish_recv() on an empty limiter is fine.
assert!(limiter.try_recv()); // first recv should succeed
limiter.finish_recv(10); // calling finish_recv() on an empty limiter is fine.
assert!(limiter.try_recv(1)); // first recv should succeed

// Second call should fail because we've reached the limit.
assert_eq!(limiter.try_recv(), false);
// limiter.try_recv(2) should fail because we've reached the limit. But
// calling limiter.try_recv(1) should succeed again due to idempotence.
assert!(!limiter.try_recv(2));
assert!(limiter.try_recv(1));

// After finish_recv() is called, try_recv() should succeed again.
limiter.finish_recv();
assert!(limiter.try_recv());
// After try_recv(1) is called, try_recv(2) should succeed.
limiter.finish_recv(1);
assert!(limiter.try_recv(2));

// Dynamically change the limit to 2, which will allow one more receive.
limiter.set_limit(2);
assert!(limiter.try_recv());
assert_eq!(limiter.try_recv(), false);
assert!(limiter.try_recv(1));
assert!(!limiter.try_recv(3));

// Test the evict_expired_timestamps function.
let t_now = Instant::now();
let mut timestamps = VecDeque::from(vec![
t_now - Duration::from_secs(ttl_secs + 2), // expired
t_now - Duration::from_secs(ttl_secs + 1), // expired
t_now - Duration::from_secs(ttl_secs - 1), // alive
t_now, // alive
]);
let mut timestamps = [
(1, t_now - Duration::from_secs(ttl_secs + 2)), // expired
(2, t_now - Duration::from_secs(ttl_secs + 1)), // expired
(3, t_now - Duration::from_secs(ttl_secs - 1)), // alive
(4, t_now), // alive
]
.iter()
.cloned()
.collect();

limiter.evict_expired_timestamps(&mut timestamps, t_now);
assert_eq!(timestamps.len(), 2);

// Test the expiring logic in try_recv() with a 0s TTL, which
assert!(timestamps.contains_key(&3));
assert!(timestamps.contains_key(&4));
// Test the expiring logic in try_recv(1) with a 0s TTL, which
// effectively means there's no limit.
let limiter = SnapRecvConcurrencyLimiter::new(1, 0);
assert!(limiter.try_recv());
assert!(limiter.try_recv());
assert!(limiter.try_recv());
assert!(limiter.try_recv(1));
assert!(limiter.try_recv(2));
assert!(limiter.try_recv(3));

// After canceling the limit, the capacity of the VecDeque should be 0.
limiter.set_limit(0);
assert!(limiter.try_recv());
assert!(limiter.try_recv(1));
assert!(limiter.timestamps.lock().unwrap().capacity() == 0);

// Test initializing a limiter with no limit.
let limiter = SnapRecvConcurrencyLimiter::new(0, 0);
assert!(limiter.try_recv());
assert!(limiter.try_recv(1));
assert!(limiter.timestamps.lock().unwrap().capacity() == 0);
}
}

0 comments on commit b34a49f

Please sign in to comment.