Large diffs are not rendered by default.

@@ -0,0 +1,254 @@
use log::*;
use memmap2::MmapMut;
use rand::{thread_rng, Rng};
use std::fs::{remove_file, OpenOptions};
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

const DEFAULT_CAPACITY: u8 = 4;

#[repr(C)]
struct Header {
lock: AtomicU64,
}

impl Header {
fn try_lock(&self, uid: u64) -> bool {
Ok(0)
== self
.lock
.compare_exchange(0, uid, Ordering::Relaxed, Ordering::Relaxed)
}
fn unlock(&self, uid: u64) -> bool {
Ok(uid)
== self
.lock
.compare_exchange(uid, 0, Ordering::Relaxed, Ordering::Relaxed)
}
fn uid(&self) -> u64 {
self.lock.load(Ordering::Relaxed)
}
}

pub struct DataBucket {
drives: Arc<Vec<PathBuf>>,
path: PathBuf,
mmap: MmapMut,
pub cell_size: u64,
//power of 2
pub capacity: u8,
pub used: AtomicU64,
}

#[derive(Debug)]
pub enum DataBucketError {
AlreadyAllocated,
InvalidFree,
}

impl Drop for DataBucket {
fn drop(&mut self) {
if let Err(e) = remove_file(&self.path) {
error!("failed to remove {:?}: {:?}", &self.path, e);
}
}
}

impl DataBucket {
pub fn new_with_capacity(
drives: Arc<Vec<PathBuf>>,
num_elems: u64,
elem_size: u64,
capacity: u8,
) -> Self {
let cell_size = elem_size * num_elems + std::mem::size_of::<Header>() as u64;
let (mmap, path) = Self::new_map(&drives, cell_size as usize, capacity);
Self {
path,
mmap,
drives,
cell_size,
used: AtomicU64::new(0),
capacity,
}
}

pub fn new(drives: Arc<Vec<PathBuf>>, num_elems: u64, elem_size: u64) -> Self {
Self::new_with_capacity(drives, num_elems, elem_size, DEFAULT_CAPACITY)
}

pub fn uid(&self, ix: u64) -> u64 {
if ix >= self.num_cells() {
panic!("bad index size");
}
let ix = (ix * self.cell_size) as usize;
let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::<Header>()];
unsafe {
let hdr = hdr_slice.as_ptr() as *const Header;
return hdr.as_ref().unwrap().uid();
}
}

pub fn allocate(&self, ix: u64, uid: u64) -> Result<(), DataBucketError> {
if ix >= self.num_cells() {
panic!("allocate: bad index size");
}
if 0 == uid {
panic!("allocate: bad uid");
}
let mut e = Err(DataBucketError::AlreadyAllocated);
let ix = (ix * self.cell_size) as usize;
debug!("ALLOC {} {}", ix, uid);
let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::<Header>()];
unsafe {
let hdr = hdr_slice.as_ptr() as *const Header;
if hdr.as_ref().unwrap().try_lock(uid) {
e = Ok(());
self.used.fetch_add(1, Ordering::Relaxed);
}
};
e
}

pub fn free(&self, ix: u64, uid: u64) -> Result<(), DataBucketError> {
if ix >= self.num_cells() {
panic!("free: bad index size");
}
if 0 == uid {
panic!("free: bad uid");
}
let ix = (ix * self.cell_size) as usize;
debug!("FREE {} {}", ix, uid);
let hdr_slice: &[u8] = &self.mmap[ix..ix + std::mem::size_of::<Header>()];
let mut e = Err(DataBucketError::InvalidFree);
unsafe {
let hdr = hdr_slice.as_ptr() as *const Header;
debug!("FREE uid: {}", hdr.as_ref().unwrap().uid());
if hdr.as_ref().unwrap().unlock(uid) {
self.used.fetch_sub(1, Ordering::Relaxed);
e = Ok(());
}
};
e
}

pub fn get<T: Sized>(&self, ix: u64) -> &T {
if ix >= self.num_cells() {
panic!("bad index size");
}
let start = (ix * self.cell_size) as usize + std::mem::size_of::<Header>();
let end = start + std::mem::size_of::<T>();
let item_slice: &[u8] = &self.mmap[start..end];
unsafe {
let item = item_slice.as_ptr() as *const T;
&*item
}
}

pub fn get_cell_slice<T: Sized>(&self, ix: u64, len: u64) -> &[T] {
if ix >= self.num_cells() {
panic!("bad index size");
}
let ix = self.cell_size * ix;
let start = ix as usize + std::mem::size_of::<Header>();
let end = start + std::mem::size_of::<T>() * len as usize;
debug!("GET slice {} {}", start, end);
let item_slice: &[u8] = &self.mmap[start..end];
unsafe {
let item = item_slice.as_ptr() as *const T;
std::slice::from_raw_parts(item, len as usize)
}
}

pub fn get_mut<T: Sized>(&self, ix: u64) -> &mut T {
if ix >= self.num_cells() {
panic!("bad index size");
}
let start = (ix * self.cell_size) as usize + std::mem::size_of::<Header>();
let end = start + std::mem::size_of::<T>();
let item_slice: &[u8] = &self.mmap[start..end];
unsafe {
let item = item_slice.as_ptr() as *mut T;
&mut *item
}
}

pub fn get_mut_cell_slice<T: Sized>(&self, ix: u64, len: u64) -> &mut [T] {
if ix >= self.num_cells() {
panic!("bad index size");
}
let ix = self.cell_size * ix;
let start = ix as usize + std::mem::size_of::<Header>();
let end = start + std::mem::size_of::<T>() * len as usize;
debug!("GET mut slice {} {}", start, end);
let item_slice: &[u8] = &self.mmap[start..end];
unsafe {
let item = item_slice.as_ptr() as *mut T;
std::slice::from_raw_parts_mut(item, len as usize)
}
}

fn new_map(drives: &[PathBuf], cell_size: usize, capacity: u8) -> (MmapMut, PathBuf) {
let capacity = 1u64 << capacity;
let r = thread_rng().gen_range(0, drives.len());
let drive = &drives[r];
let pos = format!("{}", thread_rng().gen_range(0, u128::MAX),);
let file = drive.join(pos);
let mut data = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(file.clone())
.map_err(|e| {
panic!(
"Unable to create data file {} in current dir({:?}): {:?}",
file.display(),
std::env::current_dir(),
e
);
})
.unwrap();

// Theoretical performance optimization: write a zero to the end of
// the file so that we won't have to resize it later, which may be
// expensive.
debug!("GROWING file {}", capacity * cell_size as u64);
data.seek(SeekFrom::Start(capacity * cell_size as u64 - 1))
.unwrap();
data.write_all(&[0]).unwrap();
data.seek(SeekFrom::Start(0)).unwrap();
data.flush().unwrap();
(unsafe { MmapMut::map_mut(&data).unwrap() }, file)
}

pub fn grow(&mut self) {
let old_cap = self.num_cells();
let old_map = &self.mmap;
let old_file = self.path.clone();
let (new_map, new_file) =
Self::new_map(&self.drives, self.cell_size as usize, self.capacity + 1);
(0..old_cap as usize).into_iter().for_each(|i| {
let old_ix = i * self.cell_size as usize;
let new_ix = old_ix * 2;
let dst_slice: &[u8] = &new_map[new_ix..new_ix + self.cell_size as usize];
let src_slice: &[u8] = &old_map[old_ix..old_ix + self.cell_size as usize];

unsafe {
let dst = dst_slice.as_ptr() as *mut u8;
let src = src_slice.as_ptr() as *const u8;
std::ptr::copy(src, dst, self.cell_size as usize);
};
});
self.mmap = new_map;
self.path = new_file;
self.capacity = self.capacity + 1;
remove_file(old_file).unwrap();
}
pub fn num_cells(&self) -> u64 {
1u64 << self.capacity
}
}
@@ -0,0 +1,4 @@
#![cfg_attr(RUSTC_WITH_SPECIALIZATION, feature(min_specialization))]
#![allow(clippy::integer_arithmetic)]
pub mod bucket_map;
mod data_bucket;
@@ -0,0 +1,45 @@
use rayon::prelude::*;
use solana_bucket_map::bucket_map::BucketMap;
use solana_measure::measure::Measure;
use solana_sdk::pubkey::Pubkey;
use std::path::PathBuf;
use std::sync::Arc;

#[test]
#[ignore]
fn bucket_map_test_mt() {
let threads = 4096;
let items = 4096;
let tmpdir1 = std::env::temp_dir().join("bucket_map_test_mt");
let tmpdir2 = PathBuf::from("/mnt/data/aeyakovenko").join("bucket_map_test_mt");
let paths: Vec<PathBuf> = [tmpdir1, tmpdir2]
.iter()
.filter(|x| std::fs::create_dir_all(x).is_ok())
.cloned()
.collect();
assert!(!paths.is_empty());
let drives = Arc::new(paths);
let index = BucketMap::new(12, drives.clone());
(0..threads).into_iter().into_par_iter().for_each(|_| {
let key = Pubkey::new_unique();
index.update(&key, |_| Some(vec![0u64]));
});
let mut timer = Measure::start("bucket_map_test_mt");
(0..threads).into_iter().into_par_iter().for_each(|_| {
for _ in 0..items {
let key = Pubkey::new_unique();
let ix: u64 = index.bucket_ix(&key) as u64;
index.update(&key, |_| Some(vec![ix]));
assert_eq!(index.read_value(&key), Some(vec![ix]));
}
});
timer.stop();
println!("time: {}ns per item", timer.as_ns() / (threads * items));
let mut total = 0;
for tmpdir in drives.iter() {
let folder_size = fs_extra::dir::get_size(tmpdir).unwrap();
total += folder_size;
std::fs::remove_dir_all(tmpdir).unwrap();
}
println!("overhead: {}bytes per item", total / (threads * items));
}
@@ -42,6 +42,7 @@ solana-frozen-abi-macro = { path = "../frozen-abi/macro", version = "=1.8.0" }
solana-logger = { path = "../logger", version = "=1.8.0" }
solana-measure = { path = "../measure", version = "=1.8.0" }
solana-metrics = { path = "../metrics", version = "=1.8.0" }
solana-bucket-map = { path = "../bucket_map", version = "=1.8.0" }
solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "=1.8.0" }
solana-sdk = { path = "../sdk", version = "=1.8.0" }
solana-stake-program = { path = "../programs/stake", version = "=1.8.0" }
@@ -4214,15 +4214,6 @@ impl AccountsDb {
) -> Result<(Hash, u64), BankHashVerificationError> {
use BankHashVerificationError::*;
let mut collect = Measure::start("collect");
let keys: Vec<_> = self
.accounts_index
.account_maps
.read()
.unwrap()
.keys()
.cloned()
.collect();
collect.stop();

let mut scan = Measure::start("scan");
let mismatch_found = AtomicU64::new(0);
@@ -4231,7 +4222,15 @@ impl AccountsDb {
let chunks = crate::accounts_hash::MERKLE_FANOUT.pow(4);
let total_lamports = Mutex::<u64>::new(0);
let get_hashes = || {
keys.par_chunks(chunks)
let mut hashes = vec![];
for ix in 0..self.accounts_index.account_maps.num_buckets() {
let keys = self.accounts_index.account_maps.keys(ix);
Copy link
Member Author

@aeyakovenko aeyakovenko Jun 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryoqun @sakridge @jeffwashington

wdyt, would this work here? the initial design had a snapshot slot that would keep the data behind that slot form being cleaned up as the index is updated. So we shouldn't need a global lock on all the keys to do the snapshot.

Copy link
Contributor

@carllin carllin Jun 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brooksprumo is good to reference here, as he's been working on snapshotting related logic

Copy link
Contributor

@jeffwashington jeffwashington Jun 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code path is already dead except for debug validation. We don’t want to use account index for hash calculation.
Even so, does bucket[x] have keys that are all < the keys in bucket[x+1]? This particular algorithm is written to assume the keys vector is in sorted order. The btree provides that currently. If not, we have to save key and hash from this scan, sort by key, then hash.

Copy link
Contributor

@brooksprumo brooksprumo Jun 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can confirm what @jeffwashington said that on the snapshot side this is never called/used. The snapshot code that calls AccountsDb::calculate_accounts_hash_without_index() is inside an if that is always false.

There are hooks for testing, but none of the tests actually exercise this code path. I assume in the past this was different, but as of now AccountsDb::calculate_accounts_hash_without_index() is never called in the snapshot code (nor anything that calls these snapshot functions).

Relevant snapshot functions:
snapshot_utils::snapshot_bank()
snapshot_utils::package_snapshot()

Copy link
Contributor

@brooksprumo brooksprumo Jun 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can confirm what @jeffwashington said that on the snapshot side this is never called/used. The snapshot code that calls AccountsDb::calculate_accounts_hash_without_index() is inside an if that is always false.

There are hooks for testing, but none of the tests actually exercise this code path. I assume in the past this was different, but as of now AccountsDb::calculate_accounts_hash_without_index() is never called in the snapshot code (nor anything that calls these snapshot functions).

Relevant snapshot functions:
snapshot_utils::snapshot_bank()
snapshot_utils::package_snapshot()

Copy link
Member Author

@aeyakovenko aeyakovenko Jun 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeffwashington

Even so, does bucket[x] have keys that are all < the keys in bucket[x+1]?

Yep. that is guaranteed. https://github.com/solana-labs/solana/pull/18179/files#diff-d90ce8cbcea9459dbca60678b48f6d6d2816b8d4de8b8ebfd5c434b819f153b3R97

At boot, the validator creates a fixed number of buckets. On lemond with 128 cores, I see performance improve up to 4096 buckets.

if keys.is_none() {
continue;
}
let mut keys = keys.unwrap();
keys.par_sort_unstable();
let new_hashes: Vec<_> = keys.par_chunks(chunks)
.map(|pubkeys| {
let mut sum = 0u128;
let result: Vec<Hash> = pubkeys
@@ -4289,7 +4288,10 @@ impl AccountsDb {
*total =
AccountsHash::checked_cast_for_capitalization(*total as u128 + sum);
result
}).collect()
}).collect();
hashes.extend(new_hashes.into_iter());
}
hashes
};

let hashes: Vec<Vec<Hash>> = if check_hash {
@@ -5548,13 +5550,15 @@ impl AccountsDb {
}

let mut stored_sizes_and_counts = HashMap::new();
for account_entry in self.accounts_index.account_maps.read().unwrap().values() {
for (_slot, account_entry) in account_entry.slot_list.read().unwrap().iter() {
let storage_entry_meta = stored_sizes_and_counts
.entry(account_entry.store_id)
.or_insert((0, 0));
storage_entry_meta.0 += account_entry.stored_size;
storage_entry_meta.1 += 1;
for i in 0..self.accounts_index.account_maps.num_buckets() {
for slot_list in self.accounts_index.account_maps.values(i).into_iter() {
for (_slot, account_entry) in slot_list.into_iter() {
let storage_entry_meta = stored_sizes_and_counts
.entry(account_entry.store_id)
.or_insert((0, 0));
storage_entry_meta.0 += account_entry.stored_size;
storage_entry_meta.1 += 1;
}
}
}
for slot_stores in self.storage.0.iter() {
@@ -8,16 +8,15 @@ use bv::BitVec;
use dashmap::DashSet;
use log::*;
use ouroboros::self_referencing;
use solana_bucket_map::bucket_map::BucketMap;
use solana_measure::measure::Measure;
use solana_sdk::{
clock::Slot,
pubkey::{Pubkey, PUBKEY_BYTES},
};
use std::{
collections::{
btree_map::{self, BTreeMap},
HashSet,
},
collections::BTreeMap,
collections::HashSet,
ops::{
Bound,
Bound::{Excluded, Included, Unbounded},
@@ -35,7 +34,7 @@ pub type SlotList<T> = Vec<(Slot, T)>;
pub type SlotSlice<'s, T> = &'s [(Slot, T)];

pub type RefCount = u64;
pub type AccountMap<K, V> = BTreeMap<K, V>;
pub type AccountMap<V> = BucketMap<V>;

type AccountMapEntry<T> = Arc<AccountMapEntryInner<T>>;

@@ -113,10 +112,10 @@ impl<T> AccountMapEntryInner<T> {
}
}

pub enum AccountIndexGetResult<'a, T: 'static> {
pub enum AccountIndexGetResult<T: 'static> {
Found(ReadAccountMapEntry<T>, usize),
NotFoundOnFork,
Missing(AccountMapsReadLock<'a, T>),
Missing(AccountMapsReadLock<T>),
}

#[self_referencing]
@@ -491,7 +490,7 @@ pub struct AccountsIndexRootsStats {
}

pub struct AccountsIndexIterator<'a, T> {
account_maps: &'a RwLock<AccountMap<Pubkey, AccountMapEntry<T>>>,
account_maps: &'a AccountMap<AccountMapEntry<T>>,
start_bound: Bound<Pubkey>,
end_bound: Bound<Pubkey>,
is_finished: bool,
@@ -506,10 +505,7 @@ impl<'a, T> AccountsIndexIterator<'a, T> {
}
}

pub fn new<R>(
account_maps: &'a RwLock<AccountMap<Pubkey, AccountMapEntry<T>>>,
range: Option<R>,
) -> Self
pub fn new<R>(account_maps: &'a AccountMap<AccountMapEntry<T>>, range: Option<R>) -> Self
where
R: RangeBounds<Pubkey>,
{
@@ -558,13 +554,13 @@ pub trait ZeroLamport {
fn is_zero_lamport(&self) -> bool;
}

type MapType<T> = AccountMap<Pubkey, AccountMapEntry<T>>;
type AccountMapsWriteLock<'a, T> = RwLockWriteGuard<'a, AccountMap<Pubkey, AccountMapEntry<T>>>;
type AccountMapsReadLock<'a, T> = RwLockReadGuard<'a, AccountMap<Pubkey, AccountMapEntry<T>>>;
type MapType<T> = AccountMap<AccountMapEntry<T>>;
type AccountMapsWriteLock<T> = AccountMap<AccountMapEntry<T>>;
type AccountMapsReadLock<T> = AccountMap<AccountMapEntry<T>>;

#[derive(Debug)]
pub struct AccountsIndex<T> {
pub account_maps: RwLock<MapType<T>>,
pub account_maps: MapType<T>,
program_id_index: SecondaryIndex<DashMapSecondaryIndexEntry>,
spl_token_mint_index: SecondaryIndex<DashMapSecondaryIndexEntry>,
spl_token_owner_index: SecondaryIndex<RwLockSecondaryIndexEntry>,
@@ -576,7 +572,7 @@ pub struct AccountsIndex<T> {
impl<T> Default for AccountsIndex<T> {
fn default() -> Self {
Self {
account_maps: RwLock::<AccountMap<Pubkey, AccountMapEntry<T>>>::default(),
account_maps: AccountMap::<AccountMapEntry<T>>::default(),
program_id_index: SecondaryIndex::<DashMapSecondaryIndexEntry>::new(
"program_id_index_stats",
),
@@ -896,7 +892,7 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
pub fn get_account_read_entry_with_lock(
&self,
pubkey: &Pubkey,
lock: &AccountMapsReadLock<'_, T>,
lock: &AccountMapsReadLock<T>,
) -> Option<ReadAccountMapEntry<T>> {
lock.get(pubkey)
.cloned()
@@ -970,17 +966,20 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
) {
if !dead_keys.is_empty() {
for key in dead_keys.iter() {
let mut w_index = self.get_account_maps_write_lock();
if let btree_map::Entry::Occupied(index_entry) = w_index.entry(**key) {
if index_entry.get().slot_list.read().unwrap().is_empty() {
index_entry.remove();

// Note it's only safe to remove all the entries for this key
// because we have the lock for this key's entry in the AccountsIndex,
// so no other thread is also updating the index
self.purge_secondary_indexes_by_inner_key(key, account_indexes);
let w_index = self.get_account_maps_write_lock();
w_index.update(**key, |val| {
if let Some(slot_list) = val {
if slot_list.is_empty() {
// Note it's only safe to remove all the entries for this key
// because we have the lock for this key's entry in the AccountsIndex,
// so no other thread is also updating the index
self.purge_secondary_indexes_by_inner_key(key, account_indexes);
return None;
}
} else {
return Some(val);
}
}
});
}
}
}
@@ -1142,7 +1141,7 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
pubkey: &Pubkey,
ancestors: Option<&Ancestors>,
max_root: Option<Slot>,
) -> AccountIndexGetResult<'_, T> {
) -> AccountIndexGetResult<T> {
let read_lock = self.account_maps.read().unwrap();
let account = read_lock
.get(pubkey)
@@ -1237,12 +1236,12 @@ impl<T: 'static + Clone + IsCached + ZeroLamport> AccountsIndex<T> {
}
}

fn get_account_maps_write_lock(&self) -> AccountMapsWriteLock<T> {
self.account_maps.write().unwrap()
fn get_account_maps_write_lock(&self) -> &AccountMapsWriteLock<T> {
&self.account_maps
}

pub(crate) fn get_account_maps_read_lock(&self) -> AccountMapsReadLock<T> {
self.account_maps.read().unwrap()
pub(crate) fn get_account_maps_read_lock(&self) -> &AccountMapsReadLock<T> {
&self.account_maps
}

// Same functionally to upsert, but:
@@ -1630,7 +1629,7 @@ pub mod tests {
}
}

impl<'a, T: 'static> AccountIndexGetResult<'a, T> {
impl<T: 'static> AccountIndexGetResult<T> {
pub fn unwrap(self) -> (ReadAccountMapEntry<T>, usize) {
match self {
AccountIndexGetResult::Found(lock, size) => (lock, size),
@@ -6825,8 +6825,8 @@ pub(crate) mod tests {
}
}

fn map_to_test_bad_range() -> AccountMap<Pubkey, i8> {
let mut map: AccountMap<Pubkey, i8> = AccountMap::new();
fn map_to_test_bad_range() -> AccountMap<i8> {
let mut map: AccountMap<i8> = AccountMap::new();
// when empty, AccountMap (= std::collections::BTreeMap) doesn't sanitize given range...
map.insert(solana_sdk::pubkey::new_rand(), 1);
map
@@ -74,7 +74,6 @@ fn main() {
("commit", git_commit_hash.trim().to_string(), String)
);
*/

}
let last_median = get_last_metrics(&"median".to_string(), &db, &name, &branch)
.unwrap_or_default();