Skip to content

Commit

Permalink
Fix the caches mutating a deque node through a ptr derived
Browse files Browse the repository at this point in the history
from a shared ref

- Add `Deque::peek_front_ptr` method that returns a `NonNull` pointer (instead of a
  shared reference) to the front node.
- Remove `DeqNode::next` method.
- Add `DeqNode::next_ptr` _function_ that returns a `NonNull` pointers to the next
  node.
- Add `Deque::move_front_to_back` method that moves the front node to the back of the
  deque, instead of moving the node through a shared reference to the node.
  • Loading branch information
tatsuya6502 committed Apr 27, 2023
1 parent 03c75ff commit 57e13c7
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 67 deletions.
2 changes: 2 additions & 0 deletions src/common/concurrent/deques.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,5 @@ impl<K> Deques<K> {
}
}
}

// TODO: Add tests and run Miri with them.
80 changes: 45 additions & 35 deletions src/common/deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ impl<T> DeqNode<T> {
}
}

pub(crate) fn next_node(&self) -> Option<&DeqNode<T>> {
self.next.as_ref().map(|node| unsafe { node.as_ref() })
pub(crate) fn next_node_ptr(this: NonNull<Self>) -> Option<NonNull<DeqNode<T>>> {
unsafe { this.as_ref() }.next
}
}

Expand Down Expand Up @@ -126,11 +126,13 @@ impl<T> Deque<T> {
}

pub(crate) fn peek_front(&self) -> Option<&DeqNode<T>> {
// This method takes care not to create mutable references to whole nodes,
// to maintain validity of aliasing pointers into `element`.
self.head.as_ref().map(|node| unsafe { node.as_ref() })
}

pub(crate) fn peek_front_ptr(&self) -> Option<NonNull<DeqNode<T>>> {
self.head.as_ref().cloned()
}

/// Removes and returns the node at the front of the list.
pub(crate) fn pop_front(&mut self) -> Option<Box<DeqNode<T>>> {
// This method takes care not to create mutable references to whole nodes,
Expand Down Expand Up @@ -158,8 +160,6 @@ impl<T> Deque<T> {
}

pub(crate) fn peek_back(&self) -> Option<&DeqNode<T>> {
// This method takes care not to create mutable references to whole nodes,
// to maintain validity of aliasing pointers into `element`.
self.tail.as_ref().map(|node| unsafe { node.as_ref() })
}

Expand Down Expand Up @@ -221,6 +221,12 @@ impl<T> Deque<T> {
}
}

pub(crate) fn move_front_to_back(&mut self) {
if let Some(node) = self.head {
unsafe { self.move_to_back(node) };
}
}

/// Unlinks the specified node from the current list.
///
/// This method takes care not to create mutable references to `element`, to
Expand Down Expand Up @@ -335,8 +341,6 @@ impl<T> Deque<T> {

#[cfg(test)]
mod tests {
use std::ptr::NonNull;

use super::{CacheRegion::MainProbation, DeqNode, Deque};

#[test]
Expand Down Expand Up @@ -685,35 +689,35 @@ mod tests {
// -------------------------------------------------------
// First iteration.
// peek_front() -> node1
let node1a = deque.peek_front().unwrap();
assert_eq!(node1a.element, "a".to_string());
let node2a = node1a.next_node().unwrap();
assert_eq!(node2a.element, "b".to_string());
let node3a = node2a.next_node().unwrap();
assert_eq!(node3a.element, "c".to_string());
assert!(node3a.next_node().is_none());
let node1a = deque.peek_front_ptr().unwrap();
assert_eq!(unsafe { node1a.as_ref() }.element, "a".to_string());
let node2a = DeqNode::next_node_ptr(node1a).unwrap();
assert_eq!(unsafe { node2a.as_ref() }.element, "b".to_string());
let node3a = DeqNode::next_node_ptr(node2a).unwrap();
assert_eq!(unsafe { node3a.as_ref() }.element, "c".to_string());
assert!(DeqNode::next_node_ptr(node3a).is_none());

// -------------------------------------------------------
// Iterate after a move_to_back.
// Move "b" to the back. So now "a" -> "c" -> "b".
unsafe { deque.move_to_back(node2_ptr) };
let node1a = deque.peek_front().unwrap();
assert_eq!(node1a.element, "a".to_string());
let node3a = node1a.next_node().unwrap();
assert_eq!(node3a.element, "c".to_string());
let node2a = node3a.next_node().unwrap();
assert_eq!(node2a.element, "b".to_string());
assert!(node2a.next_node().is_none());
let node1a = deque.peek_front_ptr().unwrap();
assert_eq!(unsafe { node1a.as_ref() }.element, "a".to_string());
let node3a = DeqNode::next_node_ptr(node1a).unwrap();
assert_eq!(unsafe { node3a.as_ref() }.element, "c".to_string());
let node2a = DeqNode::next_node_ptr(node3a).unwrap();
assert_eq!(unsafe { node2a.as_ref() }.element, "b".to_string());
assert!(DeqNode::next_node_ptr(node2a).is_none());

// -------------------------------------------------------
// Iterate after an unlink.
// Unlink the second node "c". Now "a" -> "c".
unsafe { deque.unlink_and_drop(node3_ptr) };
let node1a = deque.peek_front().unwrap();
assert_eq!(node1a.element, "a".to_string());
let node2a = node1a.next_node().unwrap();
assert_eq!(node2a.element, "b".to_string());
assert!(node2a.next_node().is_none());
let node1a = deque.peek_front_ptr().unwrap();
assert_eq!(unsafe { node1a.as_ref() }.element, "a".to_string());
let node2a = DeqNode::next_node_ptr(node1a).unwrap();
assert_eq!(unsafe { node2a.as_ref() }.element, "b".to_string());
assert!(DeqNode::next_node_ptr(node2a).is_none());
}

#[test]
Expand All @@ -728,18 +732,24 @@ mod tests {
let _ = deque.push_back(Box::new(node3));
// "a" -> "b" -> "c"

let node1a = deque.peek_front().unwrap();
assert_eq!(node1a.element, "a".to_string());
unsafe { deque.move_to_back(NonNull::from(node1a)) };
let node1a = deque.peek_front_ptr().unwrap();
assert_eq!(unsafe { node1a.as_ref() }.element, "a".to_string());
unsafe { deque.move_to_back(node1a) };
// "b" -> "c" -> "a"

let node2a = deque.peek_front().unwrap();
assert_eq!(node2a.element, "b".to_string());
let node2a = deque.peek_front_ptr().unwrap();
assert_eq!(unsafe { node2a.as_ref() }.element, "b".to_string());

let node3a = DeqNode::next_node(node2a).unwrap();
assert_eq!(node3a.element, "c".to_string());
unsafe { deque.move_to_back(NonNull::from(node3a)) };
let node3a = DeqNode::next_node_ptr(node2a).unwrap();
assert_eq!(unsafe { node3a.as_ref() }.element, "c".to_string());
unsafe { deque.move_to_back(node3a) };
// "b" -> "a" -> "c"

deque.move_front_to_back();
// "a" -> "c" -> "b"

let node1b = deque.peek_front().unwrap();
assert_eq!(node1b.element, "a".to_string());
}

#[test]
Expand Down
14 changes: 7 additions & 7 deletions src/common/timer_wheel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,18 +311,20 @@ impl<K> TimerWheel<K> {
/// Returns a pointer to the timer event (cache entry) at the front of the queue.
/// Returns `None` if the front node is a sentinel.
fn pop_timer_node(&mut self, level: usize, index: usize) -> Option<Box<DeqNode<TimerNode<K>>>> {
if let Some(node) = self.wheels[level][index].peek_front() {
let deque = &mut self.wheels[level][index];
if let Some(node) = deque.peek_front() {
if node.element.is_sentinel() {
return None;
}
}

self.wheels[level][index].pop_front()
deque.pop_front()
}

/// Reset the positions of the nodes in the queue at the given level and index.
fn reset_timer_node_positions(&mut self, level: usize, index: usize) {
if let Some(node) = self.wheels[level][index].peek_back() {
let deque = &mut self.wheels[level][index];
if let Some(node) = deque.peek_back() {
if node.element.is_sentinel() {
// The sentinel is at the back of the queue. We are already set.
return;
Expand All @@ -338,11 +340,9 @@ impl<K> TimerWheel<K> {
// queue.
loop {
// Safe to unwrap because we already checked the queue is not empty.
let node = self.wheels[level][index].pop_front().unwrap();
let node = deque.peek_front().unwrap();
let is_sentinel = node.element.is_sentinel();

// Move the front node to the back.
self.wheels[level][index].push_back(node);
deque.move_front_to_back();

// If the node we just moved was the sentinel, we are done.
if is_sentinel {
Expand Down
32 changes: 7 additions & 25 deletions src/sync_base/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1682,10 +1682,6 @@ where

// Move the skipped nodes to the back of the deque. We do not unlink (drop)
// them because ValueEntries in the write op queue should be pointing them.
//
// TODO FIXME: This `move_to_back()` will be considered UB as violating the
// aliasing rule because these skipped nodes were acquired by `peek_front` or
// `next_node`. (They both return `&node` instead of `&mut node`).
for node in skipped_nodes {
unsafe { deqs.probation.move_to_back(node) };
}
Expand Down Expand Up @@ -1723,26 +1719,26 @@ where
let mut skipped_nodes = SmallVec::default();

// Get first potential victim at the LRU position.
let mut next_victim = deqs.probation.peek_front();
let mut next_victim = deqs.probation.peek_front_ptr();

// Aggregate potential victims.
while victims.policy_weight < candidate.policy_weight {
if candidate.freq < victims.freq {
break;
}
if let Some(victim) = next_victim.take() {
next_victim = victim.next_node();
let vic_elem = &victim.element;
next_victim = DeqNode::next_node_ptr(victim);
let vic_elem = &unsafe {victim.as_ref()}.element;

if let Some(vic_entry) = cache.get(vic_elem.hash(), |k| k == vic_elem.key()) {
victims.add_policy_weight(vic_entry.policy_weight());
victims.add_frequency(freq, vic_elem.hash());
victim_nodes.push(NonNull::from(victim));
victim_nodes.push(victim);
retries = 0;
} else {
// Could not get the victim from the cache (hash map). Skip this node
// as its ValueEntry might have been invalidated.
skipped_nodes.push(NonNull::from(victim));
skipped_nodes.push(victim);

retries += 1;
if retries > MAX_CONSECUTIVE_RETRIES {
Expand Down Expand Up @@ -2085,14 +2081,7 @@ where
// invalidated ValueEntry (which should be still in the write op queue)
// has a pointer to this node, move the node to the back of the deque
// instead of popping (dropping) it.
//
// TODO FIXME: This `peek_front()` and `move_to_back()` combo will be
// considered UB as violating the aliasing rule. (`peek_front` returns
// `&node` instead of `&mut node`).
if let Some(node) = deq.peek_front() {
let node = NonNull::from(node);
unsafe { deq.move_to_back(node) };
}
deq.move_front_to_back();
true
}
}
Expand Down Expand Up @@ -2157,14 +2146,7 @@ where
// invalidated ValueEntry (which should be still in the write op
// queue) has a pointer to this node, move the node to the back of
// the deque instead of popping (dropping) it.
//
// TODO FIXME: This `peek_front()` and `move_to_back()` combo will be
// considered UB as violating the aliasing rule (`peek_front` returns
// `&node` instead of `&mut node`).
if let Some(node) = deqs.write_order.peek_front() {
let node = NonNull::from(node);
unsafe { deqs.write_order.move_to_back(node) };
}
deqs.write_order.move_front_to_back();
}
}
}
Expand Down

0 comments on commit 57e13c7

Please sign in to comment.