Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement "reserve" and improve "Extend" impl #28

Merged
merged 12 commits into from
Jan 26, 2020
270 changes: 242 additions & 28 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,23 @@ use std::sync::{
Once,
};

macro_rules! isize_bits {
() => {
std::mem::size_of::<isize>() * 8
};
}

/// The largest possible table capacity. This value must be
/// exactly 1<<30 to stay within Java array allocation and indexing
/// bounds for power of two table sizes, and is further required
/// because the top two bits of 32bit hash fields are used for
/// control purposes.
const MAXIMUM_CAPACITY: usize = 1 << 30;
const MAXIMUM_CAPACITY: usize = 1 << 30; // TODO: use isize_bits!()

/// The default initial table capacity. Must be a power of 2
/// (i.e., at least 1) and at most `MAXIMUM_CAPACITY`.
const DEFAULT_CAPACITY: usize = 16;

/// The load factor for this table. Overrides of this value in
/// constructors affect only the initial table capacity. The
/// actual floating point value isn't normally used -- it is
/// simpler to use expressions such as `n - (n >> 2)` for
/// the associated resizing threshold.
const LOAD_FACTOR: f64 = 0.75;

/// Minimum number of rebinnings per transfer step. Ranges are
/// subdivided to allow multiple resizer threads. This value
/// serves as a lower bound to avoid resizers encountering
Expand All @@ -39,18 +38,26 @@ const MIN_TRANSFER_STRIDE: isize = 16;

/// The number of bits used for generation stamp in `size_ctl`.
/// Must be at least 6 for 32bit arrays.
const RESIZE_STAMP_BITS: usize = 16;
const RESIZE_STAMP_BITS: usize = isize_bits!() / 2;

/// The maximum number of threads that can help resize.
/// Must fit in `32 - RESIZE_STAMP_BITS` bits.
const MAX_RESIZERS: isize = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
/// Must fit in `32 - RESIZE_STAMP_BITS` bits for 32 bit architectures
/// and `64 - RESIZE_STAMP_BITS` bits for 64 bit architectures
const MAX_RESIZERS: isize = (1 << (isize_bits!() - RESIZE_STAMP_BITS)) - 1;

/// The bit shift for recording size stamp in `size_ctl`.
const RESIZE_STAMP_SHIFT: usize = 32 - RESIZE_STAMP_BITS;
const RESIZE_STAMP_SHIFT: usize = isize_bits!() - RESIZE_STAMP_BITS;

static NCPU_INITIALIZER: Once = Once::new();
static NCPU: AtomicUsize = AtomicUsize::new(0);

macro_rules! load_factor {
($n: expr) => {
// ¾ n = n - n/4 = n - (n >> 2)
$n - ($n >> 2)
};
}

/// A concurrent hash table.
///
/// See the [crate-level documentation](index.html) for details.
Expand Down Expand Up @@ -105,7 +112,12 @@ where
}
}

impl<K, V, S: BuildHasher> HashMap<K, V, S> {
impl<K, V, S> HashMap<K, V, S>
where
K: Sync + Send + Clone + Hash + Eq,
V: Sync + Send,
S: BuildHasher,
{
/// Creates an empty map which will use `hash_builder` to hash keys.
///
/// The created map has the default initial capacity.
Expand Down Expand Up @@ -134,17 +146,17 @@ impl<K, V, S: BuildHasher> HashMap<K, V, S> {
/// Warning: `hash_builder` is normally randomly generated, and is designed to allow the map
/// to be resistant to attacks that cause many collisions and very poor performance.
/// Setting it manually using this function can expose a DoS attack vector.
pub fn with_capacity_and_hasher(hash_builder: S, n: usize) -> Self {
if n == 0 {
pub fn with_capacity_and_hasher(hash_builder: S, capacity: usize) -> Self {
if capacity == 0 {
return Self::with_hasher(hash_builder);
}

let mut m = Self::with_hasher(hash_builder);
let size = (1.0 + (n as f64) / LOAD_FACTOR) as usize;
// NOTE: tableSizeFor in Java
let cap = std::cmp::min(MAXIMUM_CAPACITY, size.next_power_of_two());
m.size_ctl = AtomicIsize::new(cap as isize);
m
let map = Self::with_hasher(hash_builder);

// safety: we are creating this map, so no other thread can access it,
// while we are initializing it.
map.try_presize(capacity, unsafe { epoch::unprotected() });
map
}
}

Expand Down Expand Up @@ -318,8 +330,7 @@ where
let new_table = Owned::new(Table::new(n));
table = new_table.into_shared(guard);
self.table.store(table, Ordering::SeqCst);
// sc = ¾ n = n - n/4 = n - (n >> 2)
sc = n as isize - (n >> 2) as isize;
sc = load_factor!(n as isize)
}
self.size_ctl.store(sc, Ordering::SeqCst);
break table;
Expand Down Expand Up @@ -640,6 +651,7 @@ where
} else if self.size_ctl.compare_and_swap(sc, rs + 2, Ordering::SeqCst) == sc {
// a resize is needed, but has not yet started
// TODO: figure out why this is rs + 2, not just rs
// NOTE: this also applies to `try_presize`
self.transfer(table, Shared::null(), guard);
}

Expand Down Expand Up @@ -935,7 +947,126 @@ where
/// Returns the stamp bits for resizing a table of size n.
/// Must be negative when shifted left by RESIZE_STAMP_SHIFT.
fn resize_stamp(n: usize) -> isize {
n.leading_zeros() as isize | (1 << (RESIZE_STAMP_BITS - 1)) as isize
n.leading_zeros() as isize | (1_isize << (RESIZE_STAMP_BITS - 1))
}

/// Tries to presize table to accommodate the given number of elements.
fn try_presize<'g>(&self, size: usize, guard: &'g Guard) {
let requested_capacity = if size >= MAXIMUM_CAPACITY / 2 {
MAXIMUM_CAPACITY
} else {
// round the requested_capacity to the next power of to from 1.5 * size + 1
// TODO: find out if this is neccessary
let size = size + (size >> 1) + 1;

std::cmp::min(MAXIMUM_CAPACITY, size.next_power_of_two())
} as isize;

loop {
let size_ctl = self.size_ctl.load(Ordering::SeqCst);
if size_ctl < 0 {
break;
}

let table = self.table.load(Ordering::SeqCst, &guard);

// The current capacity == the number of bins in the current table
let current_capactity = match table.is_null() {
true => 0,
false => unsafe { table.deref() }.len(),
};

if current_capactity == 0 {
// the table has not yet been initialized, so we can just create it
// with as many bins as were requested

// since the map is uninitialized, size_ctl describes the initial capacity
let initial_capacity = size_ctl;

// the new capacity is either the requested capacity or the initial capacity (size_ctl)
let new_capacity = requested_capacity.max(initial_capacity) as usize;

// try to aquire the initialization "lock" to indicate that we are initializing the table.
if self
.size_ctl
.compare_and_swap(size_ctl, -1, Ordering::SeqCst)
!= size_ctl
{
// somebody else is already initializing the table (or has already finished).
continue;
}

// we got the initialization `lock`; Make sure the table is still unitialized
// (or is the same table with 0 bins we read earlier, althought that should not be the case)
if self.table.load(Ordering::SeqCst, guard) != table {
// NOTE: this could probably be `!self.table.load(...).is_null()`
// if we decide that tables can never have 0 bins.

// the table is already initialized; Write the `size_ctl` value it had back to it's
// `size_ctl` field to release the initialization "lock"
self.size_ctl.store(size_ctl, Ordering::SeqCst);
continue;
}

// create a table with `new_capacity` empty bins
let new_table = Owned::new(Table::new(new_capacity)).into_shared(guard);

// store the new table to `self.table`
let old_table = self.table.swap(new_table, Ordering::SeqCst, &guard);

// old_table should be `null`, since we don't ever initialize a table with 0 bins
// and this branch only happens if table has not yet been initialized or it's length is 0.
assert!(old_table.is_null());

// TODO: if we allow tables with 0 bins. `defer_destroy` `old_table` if it's not `null`:
// if !old_table.is_null() {
// // TODO: safety argument, for why this is okay
// unsafe { guard.defer_destroy(old_table) }
// }

// resize the table once it is 75% full
let new_load_to_resize_at = load_factor!(new_capacity as isize);

// store the next load at which the table should resize to it's size_ctl field
// and thus release the initialization "lock"
self.size_ctl.store(new_load_to_resize_at, Ordering::SeqCst);
} else if requested_capacity <= size_ctl || current_capactity >= MAXIMUM_CAPACITY {
// Either the `requested_capacity` was smaller than or equal to the load we would resize at (size_ctl)
// and we don't need to resize, since our load factor will still be acceptable if we don't

// Or it was larger than the `MAXIMUM_CAPACITY` of the map and we refuse
// to resize to an invalid capacity
break;
} else if table == self.table.load(Ordering::SeqCst, &guard) {
// The table is initialized, try to resize it to the requested capacity

let rs: isize = Self::resize_stamp(current_capactity) << RESIZE_STAMP_SHIFT;
// TODO: see #29: `rs` is postive even though `resize_stamp` says:
// "Must be negative when shifted left by RESIZE_STAMP_SHIFT"
// and since our size_control field needs to be negative
// to indicate a resize this needs to be addressed

if self
.size_ctl
.compare_and_swap(size_ctl, rs + 2, Ordering::SeqCst)
== size_ctl
{
// someone else already started to resize the table
// TODO: can we `self.help_transfer`?
self.transfer(table, Shared::null(), &guard);
}
}
}
}

#[inline]
/// Tries to reserve capacity for at least additional more elements.
/// The collection may reserve more space to avoid frequent reallocations.
pub fn reserve(&self, additional: usize) {
let absolute = self.len() + additional;

let guard = epoch::pin();
self.try_presize(absolute, &guard);
}

/// Removes the key (and its corresponding value) from this map.
Expand Down Expand Up @@ -1127,6 +1258,21 @@ where
self.count.load(Ordering::Relaxed)
}

#[inline]
#[cfg(test)]
/// Returns the capacity of the map.
fn capacity<'g>(&self, guard: &'g Guard) -> usize {
let table = self.table.load(Ordering::Relaxed, &guard);

if table.is_null() {
0
} else {
// Safety: we loaded `table` under the `guard`,
// so it must still be valid here
unsafe { table.deref() }.len()
}
}

#[inline]
/// Returns `true` if the map is empty. Otherwise returns `false`.
pub fn is_empty(&self) -> bool {
Expand Down Expand Up @@ -1202,11 +1348,21 @@ where
S: BuildHasher,
{
#[inline]
// TODO: Implement Java's `tryPresize` method to pre-allocate space for
// the incoming entries
// NOTE: `hashbrown::HashMap::extend` provides some good guidance on how
// to choose the presizing value based on the iterator lower bound.
fn extend<T: IntoIterator<Item = (K, V)>>(&mut self, iter: T) {
// from `hashbrown::HashMap::extend`:
// Keys may be already present or show multiple times in the iterator.
// Reserve the entire hint lower bound if the map is empty.
// Otherwise reserve half the hint (rounded up), so the map
// will only resize twice in the worst case.
let iter = iter.into_iter();
let reserve = if self.is_empty() {
iter.size_hint().0
} else {
(iter.size_hint().0 + 1) / 2
};

self.reserve(reserve);

let guard = crossbeam_epoch::pin();
(*self).put_all(iter.into_iter(), &guard);
}
Expand Down Expand Up @@ -1297,6 +1453,64 @@ fn num_cpus() -> usize {
NCPU.load(Ordering::Relaxed)
}

#[test]
fn capacity() {
let map = HashMap::<usize, usize>::new();
let guard = epoch::pin();

assert_eq!(map.capacity(&guard), 0);
// The table has not yet been allocated

map.insert(42, 0, &guard);

assert_eq!(map.capacity(&guard), 16);
// The table has been allocated and has default capacity

for i in 0..16 {
map.insert(i, 42, &guard);
}

assert_eq!(map.capacity(&guard), 32);
// The table has been resized once (and it's capacity doubled),
// since we inserted more elements than it can hold
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn reserve() {
let map = HashMap::<usize, usize>::new();
let guard = epoch::pin();

map.insert(42, 0, &guard);

map.reserve(32);

let capacity = map.capacity(&guard);
assert!(capacity >= 16 + 32);
}

#[test]
fn reserve_uninit() {
let map = HashMap::<usize, usize>::new();
let guard = epoch::pin();

map.reserve(32);

let capacity = map.capacity(&guard);
assert!(capacity >= 32);
}

#[test]
fn resize_stamp_negative() {
let resize_stamp = HashMap::<usize, usize>::resize_stamp(1);
assert!(resize_stamp << RESIZE_STAMP_SHIFT < 0);

let resize_stamp = HashMap::<usize, usize>::resize_stamp(MAXIMUM_CAPACITY);
assert!(resize_stamp << RESIZE_STAMP_SHIFT < 0);
}
}

/// It's kind of stupid, but apparently there is no way to write a regular `#[test]` that is _not_
/// supposed to compile without pulling in `compiletest` as a dependency. See rust-lang/rust#12335.
/// But it _is_ possible to write `compile_test` tests as doctests, sooooo:
Expand Down
Loading