Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

thread-local arena: grow buf if there is no enough space #116

Merged
merged 13 commits into from
Jun 7, 2022
Merged
125 changes: 85 additions & 40 deletions skiplist/src/arena.rs
Original file line number Diff line number Diff line change
@@ -1,76 +1,121 @@
use std::{
cell::Cell,
mem, ptr,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
sync::atomic::{AtomicUsize, Ordering},
};

struct ArenaCore {
len: AtomicU32,
cap: usize,
ptr: *mut u8,
const ADDR_ALIGN_MASK: usize = 7;

// Thread-local Arena
pub struct Arena {
len: AtomicUsize,
cap: Cell<usize>,
ptr: Cell<*mut u8>,
}

impl Drop for ArenaCore {
impl Drop for Arena {
fn drop(&mut self) {
let ptr = self.ptr as *mut u64;
let cap = self.cap / 8;
let ptr = self.ptr.get() as *mut u64;
let cap = self.cap.get() / 8;
unsafe {
Vec::from_raw_parts(ptr, 0, cap);
}
}
}

pub struct Arena {
core: Arc<ArenaCore>,
}

impl Arena {
pub fn with_capacity(cap: u32) -> Arena {
let mut buf: Vec<u64> = Vec::with_capacity(cap as usize / 8);
pub fn with_capacity(cap: usize) -> Arena {
let mut buf: Vec<u64> = Vec::with_capacity(cap / 8);
let ptr = buf.as_mut_ptr() as *mut u8;
let cap = buf.capacity() * 8;
mem::forget(buf);
Arena {
core: Arc::new(ArenaCore {
len: AtomicU32::new(1),
cap,
ptr,
}),
// Offset 0 is invalid value for func `offset` and `get_mut`, initialize the
// len 8 to guarantee the allocated memory addr is always align with 8 bytes.
len: AtomicUsize::new(8),
cap: Cell::new(cap),
ptr: Cell::new(ptr),
}
}

pub fn len(&self) -> u32 {
self.core.len.load(Ordering::SeqCst)
pub fn len(&self) -> usize {
self.len.load(Ordering::SeqCst)
}

pub fn alloc(&self, align: usize, size: usize) -> u32 {
let align_mask = align - 1;
// Leave enough padding for align.
let size = size + align_mask;
let offset = self.core.len.fetch_add(size as u32, Ordering::SeqCst);
// Calculate the correct align point, it equals to
// (offset + align_mask) / align * align.
let ptr_offset = (offset as usize + align_mask) & !align_mask;
assert!(offset as usize + size <= self.core.cap);
ptr_offset as u32
/// Alloc 8-byte aligned memory.
pub fn alloc(&self, size: usize) -> usize {
// Leave enough padding for alignment.
let size = (size + ADDR_ALIGN_MASK) & !ADDR_ALIGN_MASK;
let offset = self.len.fetch_add(size, Ordering::SeqCst);

// Grow the arena if there is no enough space
if offset + size > self.cap.get() {
// Alloc new buf and copy data to new buf
let mut grow_by = self.cap.get();
if grow_by > 1 << 30 {
grow_by = 1 << 30;
}
if grow_by < size {
grow_by = size;
}
let mut new_buf: Vec<u64> = Vec::with_capacity((self.cap.get() + grow_by) as usize / 8);
let new_ptr = new_buf.as_mut_ptr() as *mut u8;
unsafe {
ptr::copy_nonoverlapping(new_ptr, self.ptr.get(), self.cap.get());
}

// Release old buf
let old_ptr = self.ptr.get() as *mut u64;
unsafe {
Vec::from_raw_parts(old_ptr, 0, self.cap.get() / 8);
}

// Use new buf
self.ptr.set(new_ptr);
self.cap.set(new_buf.capacity() * 8);
mem::forget(new_buf);
}
offset
}

pub unsafe fn get_mut<N>(&self, offset: u32) -> *mut N {
pub unsafe fn get_mut<N>(&self, offset: usize) -> *mut N {
if offset == 0 {
return ptr::null_mut();
}
self.core.ptr.add(offset as usize) as _

self.ptr.get().add(offset) as _
}

pub fn offset<N>(&self, ptr: *const N) -> u32 {
pub fn offset<N>(&self, ptr: *const N) -> usize {
let ptr_addr = ptr as usize;
let self_addr = self.core.ptr as usize;
if ptr_addr > self_addr && ptr_addr < self_addr + self.core.cap {
(ptr_addr - self_addr) as u32
let self_addr = self.ptr.get() as usize;
if ptr_addr > self_addr && ptr_addr < self_addr + self.cap.get() {
ptr_addr - self_addr
} else {
0
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_arena() {
// There is enough space
let arena = Arena::with_capacity(128);
let offset = arena.alloc(8);
assert_eq!(offset, 8);
assert_eq!(arena.len(), 16);
unsafe {
let ptr = arena.get_mut::<u64>(offset);
let offset = arena.offset::<u64>(ptr);
assert_eq!(offset, 8);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allocate a more aligned ptr on the same arena: alloc(16,16)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we simply alloc align to 8

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually some libraries and compilers expects to align with 16. Check jemalloc/jemalloc#1533. Perhaps we should add static assert on all types we want to store can be aligned with 8.

// There is not enough space, grow buf and then return the offset
let offset = arena.alloc(256);
assert_eq!(offset, 16);
}
}
17 changes: 8 additions & 9 deletions skiplist/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
mem, ptr,
ptr::NonNull,
sync::{
atomic::{AtomicU32, AtomicUsize, Ordering},
atomic::{AtomicUsize, Ordering},
Arc,
},
u32,
Expand All @@ -22,16 +22,15 @@ pub struct Node {
key: Bytes,
value: Bytes,
height: usize,
tower: [AtomicU32; MAX_HEIGHT as usize],
tower: [AtomicUsize; MAX_HEIGHT],
}

impl Node {
fn alloc(arena: &Arena, key: Bytes, value: Bytes, height: usize) -> u32 {
let align = mem::align_of::<Node>();
fn alloc(arena: &Arena, key: Bytes, value: Bytes, height: usize) -> usize {
let size = mem::size_of::<Node>();
// Not all values in Node::tower will be utilized.
let not_used = (MAX_HEIGHT as usize - height as usize - 1) * mem::size_of::<AtomicU32>();
let node_offset = arena.alloc(align, size - not_used);
let not_used = (MAX_HEIGHT as usize - height as usize - 1) * mem::size_of::<AtomicUsize>();
let node_offset = arena.alloc(size - not_used);
unsafe {
let node_ptr: *mut Node = arena.get_mut(node_offset);
let node = &mut *node_ptr;
Expand All @@ -43,7 +42,7 @@ impl Node {
node_offset
}

fn next_offset(&self, height: usize) -> u32 {
fn next_offset(&self, height: usize) -> usize {
self.tower[height].load(Ordering::SeqCst)
}
}
Expand All @@ -61,7 +60,7 @@ pub struct Skiplist<C> {
}

impl<C> Skiplist<C> {
pub fn with_capacity(c: C, arena_size: u32) -> Skiplist<C> {
pub fn with_capacity(c: C, arena_size: usize) -> Skiplist<C> {
let arena = Arena::with_capacity(arena_size);
let head_offset = Node::alloc(&arena, Bytes::new(), Bytes::new(), MAX_HEIGHT - 1);
let head = unsafe { NonNull::new_unchecked(arena.get_mut(head_offset)) };
Expand Down Expand Up @@ -339,7 +338,7 @@ impl<C: KeyComparator> Skiplist<C> {
}
}

pub fn mem_size(&self) -> u32 {
pub fn mem_size(&self) -> usize {
self.core.arena.len()
}
}
Expand Down
4 changes: 2 additions & 2 deletions skiplist/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use bytes::*;
use skiplist::*;
use yatp::task::callback::Handle;

const ARENA_SIZE: u32 = 1 << 20;
const ARENA_SIZE: usize = 1 << 20;

fn new_value(v: usize) -> Bytes {
Bytes::from(format!("{:05}", v))
Expand Down Expand Up @@ -63,7 +63,7 @@ fn test_basic() {
}
}

fn test_concurrent_basic(n: usize, cap: u32, value_len: usize) {
fn test_concurrent_basic(n: usize, cap: usize, value_len: usize) {
let pool = yatp::Builder::new("concurrent_basic").build_callback_pool();
let comp = FixedLengthSuffixComparator::new(8);
let list = Skiplist::with_capacity(comp, cap);
Expand Down
2 changes: 1 addition & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Core {
let path = Self::memtable_file_path(opts, file_id);
let c = make_comparator();
// TODO: refactor skiplist to use `u64`
let skl = Skiplist::with_capacity(c, opts.arena_size() as u32);
let skl = Skiplist::with_capacity(c, opts.arena_size() as usize);

// We don't need to create the WAL for the skiplist in in-memory mode so return the memtable.
if opts.in_memory {
Expand Down