Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

thread-local arena: grow buf if there is no enough space #116

Merged
merged 13 commits into from
Jun 7, 2022
Merged
48 changes: 38 additions & 10 deletions skiplist/src/arena.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::{
},
};

const ADDR_ALIGN_MASK: usize = 7;

struct ArenaCore {
len: AtomicU32,
cap: usize,
Expand Down Expand Up @@ -34,7 +36,9 @@ impl Arena {
mem::forget(buf);
Arena {
core: Arc::new(ArenaCore {
len: AtomicU32::new(1),
// Reserve 8 bytes at the beginning, because func offset return 0 means invalid value,
// also 'fn get_mut' return invalid ptr when offset is 0
len: AtomicU32::new(8),
cap,
ptr,
}),
Expand All @@ -45,16 +49,17 @@ impl Arena {
self.core.len.load(Ordering::SeqCst)
}

pub fn alloc(&self, align: usize, size: usize) -> u32 {
let align_mask = align - 1;
// Leave enough padding for align.
let size = size + align_mask;
pub fn alloc(&self, size: usize) -> Option<u32> {
// Leave enough padding for alignment.
let size = (size + ADDR_ALIGN_MASK) & !ADDR_ALIGN_MASK;
let offset = self.core.len.fetch_add(size as u32, Ordering::SeqCst);
// Calculate the correct align point, it equals to
// (offset + align_mask) / align * align.
let ptr_offset = (offset as usize + align_mask) & !align_mask;
assert!(offset as usize + size <= self.core.cap);
ptr_offset as u32

// Return 0 if there is not enough space to allocate.
if offset as usize + size > self.core.cap {
let _ = self.core.len.fetch_sub(size as u32, Ordering::SeqCst);
return None;
}
Some(offset as u32)
}

pub unsafe fn get_mut<N>(&self, offset: u32) -> *mut N {
Expand All @@ -74,3 +79,26 @@ impl Arena {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_arena() {
// There is enough space
let arena = Arena::with_capacity(128);
let offset = arena.alloc(8);
assert_eq!(offset, Some(8));
assert_eq!(arena.len(), 16);
unsafe {
let ptr = arena.get_mut::<u64>(offset.unwrap());
let offset = arena.offset::<u64>(ptr);
assert_eq!(offset, 8);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allocate a more aligned ptr on the same arena: alloc(16,16)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we simply alloc align to 8

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually some libraries and compilers expects to align with 16. Check jemalloc/jemalloc#1533. Perhaps we should add static assert on all types we want to store can be aligned with 8.

// There is not enough space, return 0
let offset = arena.alloc(256);
assert_eq!(offset, None);
}
}
56 changes: 36 additions & 20 deletions skiplist/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ use super::{arena::Arena, KeyComparator, MAX_HEIGHT};

const HEIGHT_INCREASE: u32 = u32::MAX / 3;

#[derive(Debug)]
pub enum ListErr {
NoEnoughMem,
}

// Uses C layout to make sure tower is at the bottom
#[derive(Debug)]
#[repr(C)]
Expand All @@ -26,21 +31,23 @@ pub struct Node {
}

impl Node {
fn alloc(arena: &Arena, key: Bytes, value: Bytes, height: usize) -> u32 {
let align = mem::align_of::<Node>();
fn alloc(arena: &Arena, key: Bytes, value: Bytes, height: usize) -> Option<u32> {
let size = mem::size_of::<Node>();
// Not all values in Node::tower will be utilized.
let not_used = (MAX_HEIGHT as usize - height as usize - 1) * mem::size_of::<AtomicU32>();
let node_offset = arena.alloc(align, size - not_used);
unsafe {
let node_ptr: *mut Node = arena.get_mut(node_offset);
let node = &mut *node_ptr;
ptr::write(&mut node.key, key);
ptr::write(&mut node.value, value);
node.height = height;
ptr::write_bytes(node.tower.as_mut_ptr(), 0, height + 1);
if let Some(offset) = arena.alloc(size - not_used) {
unsafe {
let node_ptr: *mut Node = arena.get_mut(offset);
let node = &mut *node_ptr;
ptr::write(&mut node.key, key);
ptr::write(&mut node.value, value);
node.height = height;
ptr::write_bytes(node.tower.as_mut_ptr(), 0, height + 1);
}
Some(offset)
} else {
None
}
node_offset
}

fn next_offset(&self, height: usize) -> u32 {
Expand All @@ -63,7 +70,8 @@ pub struct Skiplist<C> {
impl<C> Skiplist<C> {
pub fn with_capacity(c: C, arena_size: u32) -> Skiplist<C> {
let arena = Arena::with_capacity(arena_size);
let head_offset = Node::alloc(&arena, Bytes::new(), Bytes::new(), MAX_HEIGHT - 1);
// arena must have enough space for head, so we can unwrap safely.
let head_offset = Node::alloc(&arena, Bytes::new(), Bytes::new(), MAX_HEIGHT - 1).unwrap();
let head = unsafe { NonNull::new_unchecked(arena.get_mut(head_offset)) };
Skiplist {
core: Arc::new(SkiplistCore {
Expand Down Expand Up @@ -169,7 +177,11 @@ impl<C: KeyComparator> Skiplist<C> {
}
}

pub fn put(&self, key: impl Into<Bytes>, value: impl Into<Bytes>) -> Option<(Bytes, Bytes)> {
pub fn put(
&self,
key: impl Into<Bytes>,
value: impl Into<Bytes>,
) -> Result<Option<(Bytes, Bytes)>, ListErr> {
let (key, value) = (key.into(), value.into());
let mut list_height = self.height();
let mut prev = [ptr::null_mut(); MAX_HEIGHT + 1];
Expand All @@ -183,15 +195,19 @@ impl<C: KeyComparator> Skiplist<C> {
if p == n {
unsafe {
if (*p).value != value {
return Some((key, value));
return Ok(Some((key, value)));
}
}
return None;
return Ok(None);
}
}

let height = self.random_height();
let node_offset = Node::alloc(&self.core.arena, key, value, height);
let node_offset = if let Some(offset) = Node::alloc(&self.core.arena, key, value, height) {
offset
} else {
return Err(ListErr::NoEnoughMem);
};
while height > list_height {
match self.core.height.compare_exchange_weak(
list_height,
Expand Down Expand Up @@ -230,20 +246,20 @@ impl<C: KeyComparator> Skiplist<C> {
if unsafe { &*p }.value != x.value {
let key = mem::replace(&mut x.key, Bytes::new());
let value = mem::replace(&mut x.value, Bytes::new());
return Some((key, value));
return Ok(Some((key, value)));
}
unsafe {
ptr::drop_in_place(x);
}
return None;
return Ok(None);
}
prev[i] = p;
next[i] = n;
}
}
}
}
None
Ok(None)
}

pub fn is_empty(&self) -> bool {
Expand Down Expand Up @@ -428,7 +444,7 @@ mod tests {
for i in 0..1000 {
let key = Bytes::from(format!("{:05}{:08}", i * 10 + 5, 0));
let value = Bytes::from(format!("{:05}", i));
list.put(key, value);
list.put(key, value).unwrap();
}
let mut cases = vec![
("00001", false, false, Some("00005")),
Expand Down
14 changes: 7 additions & 7 deletions skiplist/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn test_basic() {
];

for (key, value) in &table {
list.put(key_with_ts(*key, 0), value.clone());
list.put(key_with_ts(*key, 0), value.clone()).unwrap();
}

assert_eq!(list.get(&key_with_ts("key", 0)), None);
Expand Down Expand Up @@ -80,7 +80,7 @@ fn test_concurrent_basic(n: usize, cap: u32, value_len: usize) {
let tx = tx.clone();
let list = list.clone();
pool.spawn(move |_: &mut Handle<'_>| {
list.put(k, v);
list.put(k, v).unwrap();
tx.send(()).unwrap();
})
}
Expand Down Expand Up @@ -121,14 +121,14 @@ fn test_one_key() {
let list = Skiplist::with_capacity(comp, ARENA_SIZE);
let key = key_with_ts("thekey", 0);
let (tx, rx) = mpsc::channel();
list.put(key.clone(), new_value(0));
list.put(key.clone(), new_value(0)).unwrap();
for i in 0..n {
let tx = tx.clone();
let list = list.clone();
let key = key.clone();
let value = new_value(i);
write_pool.spawn(move |_: &mut Handle<'_>| {
list.put(key, value);
list.put(key, value).unwrap();
tx.send("w").unwrap();
yield_now();
})
Expand Down Expand Up @@ -177,7 +177,7 @@ fn test_iterator_next() {
assert!(!iter_ref.valid());
for i in (0..n).rev() {
let key = key_with_ts(format!("{:05}", i).as_str(), 0);
list.put(key, new_value(i));
list.put(key, new_value(i)).unwrap();
}
iter_ref.seek_to_first();
for i in 0..n {
Expand All @@ -200,7 +200,7 @@ fn test_iterator_prev() {
assert!(!iter_ref.valid());
for i in (0..n).rev() {
let key = key_with_ts(format!("{:05}", i).as_str(), 0);
list.put(key, new_value(i));
list.put(key, new_value(i)).unwrap();
}
iter_ref.seek_to_last();
for i in (0..n).rev() {
Expand All @@ -224,7 +224,7 @@ fn test_iterator_seek() {
for i in (0..n).rev() {
let v = i * 10 + 1000;
let key = key_with_ts(format!("{:05}", v).as_str(), 0);
list.put(key, new_value(v));
list.put(key, new_value(v)).unwrap();
}
iter_ref.seek_to_first();
assert!(iter_ref.valid());
Expand Down