Skip to content

Commit

Permalink
Restructure thread local data storage (improves performance)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrarier2111 committed Feb 21, 2024
1 parent cd022c3 commit 7ce2a12
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 141 deletions.
4 changes: 2 additions & 2 deletions benches/thread_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ fn main() {

c.bench_function("get", |b| {
let local: ThreadLocal<Box<i32>, ()> = ThreadLocal::new();
local.get_or(|_| Box::new(0), |_| {});
local.get_or(|_| Box::new(6), |_| {});
b.iter(|| {
black_box(local.get());
});
Expand All @@ -17,7 +17,7 @@ fn main() {
b.iter_batched_ref(
ThreadLocal::<i32, ()>::new,
|local| {
black_box(local.get_or(|_| 0, |_| {}));
black_box(local.get_or(|_| 7, |_| {}));
},
BatchSize::SmallInput,
)
Expand Down
91 changes: 13 additions & 78 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
#![allow(clippy::mutex_atomic)]
#![feature(thread_local)]
#![feature(core_intrinsics)]
#![feature(const_collections_with_hasher)]
#![feature(const_binary_heap_constructor)]

mod mutex;
mod thread_id;
Expand Down Expand Up @@ -508,7 +510,7 @@ impl<T: Send, M: Send + Sync + Default, const AUTO_FREE_IDS: bool>
MF: FnOnce(EntryToken<RefAccess, T, M, AUTO_FREE_IDS>),
{
// detect the amount of capacity we need.
let thread = thread_id::get();
let (thread, free_list) = thread_id::get();
let capacity = thread.bucket_size();

let mut ret = Self::with_capacity(capacity);
Expand All @@ -533,7 +535,7 @@ impl<T: Send, M: Send + Sync + Default, const AUTO_FREE_IDS: bool>
));
}
let cleanup_fn = Entry::<T, M, AUTO_FREE_IDS>::cleanup;
unsafe { thread.free_list.as_ref().unwrap_unchecked() }
unsafe { free_list.as_ref().unwrap_unchecked() }
.free_list
.lock()
.insert(
Expand All @@ -545,7 +547,7 @@ impl<T: Send, M: Send + Sync + Default, const AUTO_FREE_IDS: bool>

let entry = unsafe { &mut *ptr.add(thread.index) };

*entry.free_list.get_mut() = thread.free_list.cast_mut();
*entry.free_list.get_mut() = free_list.cast_mut();
*entry.guard.get_mut() = GUARD_READY;

(
Expand Down Expand Up @@ -577,7 +579,7 @@ impl<T: Send, M: Send + Sync + Default, const AUTO_FREE_IDS: bool>

/// Returns the element for the current thread, if it exists.
pub fn get(&self) -> Option<EntryToken<RefAccess, T, M, AUTO_FREE_IDS>> {
self.get_inner(thread_id::get())
self.get_inner(thread_id::get().0)
}

/// Returns the element for the current thread, or creates it if it doesn't
Expand All @@ -591,7 +593,7 @@ impl<T: Send, M: Send + Sync + Default, const AUTO_FREE_IDS: bool>
F: FnOnce(UnsafeToken<T, M, AUTO_FREE_IDS>) -> T,
MF: FnOnce(EntryToken<RefAccess, T, M, AUTO_FREE_IDS>),
{
let thread = thread_id::get();
let (thread, _) = thread_id::get();
if let Some(val) = self.get_inner(thread) {
return val;
}
Expand All @@ -615,7 +617,7 @@ impl<T: Send, M: Send + Sync + Default, const AUTO_FREE_IDS: bool>

Some(EntryToken(
unsafe { NonNull::new_unchecked(entry_ptr) },
Default::default(),
PhantomData,
))
}

Expand All @@ -629,7 +631,7 @@ impl<T: Send, M: Send + Sync + Default, const AUTO_FREE_IDS: bool>
f: F,
fm: FM,
) -> EntryToken<RefAccess, T, M, AUTO_FREE_IDS> {
let thread = thread_id::get();
let (thread, free_list) = thread_id::get();
let bucket_atomic_ptr = unsafe { self.buckets.get_unchecked(thread.bucket) };
let bucket_ptr: *const _ = bucket_atomic_ptr.load(Ordering::Acquire);

Expand Down Expand Up @@ -673,11 +675,11 @@ impl<T: Send, M: Send + Sync + Default, const AUTO_FREE_IDS: bool>
unsafe {
NonNull::new_unchecked((entry as *const Entry<T, M, AUTO_FREE_IDS>).cast_mut())
},
Default::default(),
PhantomData,
));
}
let cleanup_fn = Entry::<T, M, AUTO_FREE_IDS>::cleanup;
unsafe { thread.free_list.as_ref().unwrap_unchecked() }
unsafe { free_list.as_ref().unwrap_unchecked() }
.free_list
.lock()
.insert(
Expand All @@ -688,14 +690,14 @@ impl<T: Send, M: Send + Sync + Default, const AUTO_FREE_IDS: bool>
);
entry
.free_list
.store(thread.free_list.cast_mut(), Ordering::Release);
.store(free_list.cast_mut(), Ordering::Release);
entry.guard.store(GUARD_READY, Ordering::Release);

EntryToken(
unsafe {
NonNull::new_unchecked((entry as *const Entry<T, M, AUTO_FREE_IDS>).cast_mut())
},
Default::default(),
PhantomData,
)
}

Expand Down Expand Up @@ -1079,73 +1081,6 @@ unsafe fn deallocate_bucket<T, M: Send + Sync + Default, const AUTO_FREE_IDS: bo
let _ = Box::from_raw(std::slice::from_raw_parts_mut(bucket.as_ptr(), size));
}

struct SizedBox<T> {
alloc_ptr: NonNull<T>,
}

impl<T> SizedBox<T> {
const LAYOUT: Layout = { Layout::new::<T>() };

fn new(val: T) -> Self {
// SAFETY: The layout we provided was checked at compiletime, so it has to be initialized correctly
let alloc = unsafe { alloc(Self::LAYOUT) }.cast::<T>();
if alloc.is_null() {
panic!("Out of memory!");
}
// FIXME: add safety comment
unsafe {
alloc.write(val);
}
Self {
alloc_ptr: unsafe { NonNull::new_unchecked(alloc) },
}
}

#[inline]
fn as_ref(&self) -> &T {
// SAFETY: This is safe because we know that alloc_ptr can't be zero
// and because we know that alloc_ptr has to point to a valid
// instance of T in memory
unsafe { self.alloc_ptr.as_ref() }
}

#[inline]
fn as_mut(&mut self) -> &mut T {
// SAFETY: This is safe because we know that alloc_ptr can't be zero
// and because we know that alloc_ptr has to point to a valid
// instance of T in memory
unsafe { self.alloc_ptr.as_mut() }
}

#[inline]
fn into_ptr(self) -> NonNull<T> {
let ret = self.alloc_ptr;
mem::forget(self);
ret
}

#[inline]
unsafe fn from_ptr(ptr: NonNull<T>) -> Self {
Self { alloc_ptr: ptr }
}
}

impl<T> Drop for SizedBox<T> {
fn drop(&mut self) {
// SAFETY: This is safe to call because SizedBox can only be dropped once
unsafe {
ptr::drop_in_place(self.alloc_ptr.as_ptr());
}
// FIXME: add safety comment
unsafe {
dealloc(self.alloc_ptr.as_ptr().cast::<u8>(), SizedBox::<T>::LAYOUT);
}
}
}

unsafe impl<T: Send> Send for SizedBox<T> {}
unsafe impl<T: Sync> Sync for SizedBox<T> {}

#[cfg(test)]
mod tests {
use super::ThreadLocal;
Expand Down
Loading

0 comments on commit 7ce2a12

Please sign in to comment.