Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap ShardedLmdb in a Resettable #5775

Merged
merged 1 commit into from May 3, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
107 changes: 34 additions & 73 deletions src/rust/engine/fs/src/store.rs
Expand Up @@ -88,7 +88,7 @@ impl Store {
/// ever acquire, so lmdb returns EAGAIN whenever a transaction is created in the second process.
///
pub fn reset_prefork(&self) {
self.local.reset_lmdb_connections();
self.local.reset_prefork();
if let Some(ref remote) = self.remote {
remote.reset_threadpool();
}
Expand Down Expand Up @@ -491,11 +491,12 @@ mod local {
use lmdb::{self, Cursor, Database, DatabaseFlags, Environment, RwTransaction, Transaction,
WriteFlags, NO_OVERWRITE, NO_SYNC, NO_TLS};
use lmdb::Error::{KeyExist, NotFound};
use resettable::Resettable;
use sha2::Sha256;
use std::collections::{BinaryHeap, HashMap};
use std::fmt;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::path::Path;
use std::sync::Arc;
use std::time;

use pool::ResettablePool;
Expand All @@ -511,35 +512,37 @@ mod local {
// Store directories separately from files because:
// 1. They may have different lifetimes.
// 2. It's nice to know whether we should be able to parse something as a proto.
file_dbs: ShardedLmdb,
directory_dbs: ShardedLmdb,
file_dbs: Resettable<Result<Arc<ShardedLmdb>, String>>,
directory_dbs: Resettable<Result<Arc<ShardedLmdb>, String>>,
}

impl ByteStore {
pub fn new<P: AsRef<Path>>(path: P, pool: Arc<ResettablePool>) -> Result<ByteStore, String> {
let root = path.as_ref();
let files_root = root.join("files");
let directories_root = root.join("directories");
Ok(ByteStore {
inner: Arc::new(InnerStore {
pool: pool,
file_dbs: ShardedLmdb::new(root.join("files")),
directory_dbs: ShardedLmdb::new(root.join("directories")),
file_dbs: Resettable::new(Arc::new(move || {
ShardedLmdb::new(&files_root).map(|db| Arc::new(db))
})),
directory_dbs: Resettable::new(Arc::new(move || {
ShardedLmdb::new(&directories_root).map(|db| Arc::new(db))
})),
}),
})
}

pub fn reset_lmdb_connections(&self) {
{
let mut file_dbs = self.inner.file_dbs.lmdbs.write().unwrap();
*file_dbs = None;
}
let mut directory_dbs = self.inner.directory_dbs.lmdbs.write().unwrap();
*directory_dbs = None;
pub fn reset_prefork(&self) {
self.inner.file_dbs.reset();
self.inner.directory_dbs.reset();
}

// Note: This performs IO on the calling thread. Hopefully the IO is small enough not to matter.
pub fn entry_type(&self, fingerprint: &Fingerprint) -> Result<Option<EntryType>, String> {
{
let (env, directory_database, _) = self.inner.directory_dbs.get(fingerprint)?;
let (env, directory_database, _) = self.inner.directory_dbs.get()?.get(fingerprint);
let txn = env
.begin_ro_txn()
.map_err(|err| format!("Failed to begin read transaction: {:?}", err))?;
Expand All @@ -554,7 +557,7 @@ mod local {
}
};
}
let (env, file_database, _) = self.inner.file_dbs.get(fingerprint)?;
let (env, file_database, _) = self.inner.file_dbs.get()?.get(fingerprint);
let txn = env
.begin_ro_txn()
.map_err(|err| format!("Failed to begin read transaction: {}", err))?;
Expand All @@ -577,7 +580,7 @@ mod local {
) -> Result<(), String> {
let until = Self::default_lease_until_secs_since_epoch();
for digest in digests {
let (env, _, lease_database) = self.inner.file_dbs.get(&digest.0)?;
let (env, _, lease_database) = self.inner.file_dbs.get()?.get(&digest.0);
env
.begin_rw_txn()
.and_then(|mut txn| self.lease(&lease_database, &digest.0, until, &mut txn))
Expand Down Expand Up @@ -641,7 +644,7 @@ mod local {
EntryType::File => self.inner.file_dbs.clone(),
EntryType::Directory => self.inner.directory_dbs.clone(),
};
let (env, database, lease_database) = lmdbs.get(&aged_fingerprint.fingerprint)?;
let (env, database, lease_database) = lmdbs.get()?.get(&aged_fingerprint.fingerprint);
{
env
.begin_rw_txn()
Expand Down Expand Up @@ -674,7 +677,7 @@ mod local {
EntryType::Directory => self.inner.directory_dbs.clone(),
};

for &(ref env, ref database, ref lease_database) in database.all_lmdbs()?.iter() {
for &(ref env, ref database, ref lease_database) in database.get()?.all_lmdbs().iter() {
let txn = env
.begin_ro_txn()
.map_err(|err| format!("Error beginning transaction to garbage collect: {}", err))?;
Expand Down Expand Up @@ -738,7 +741,7 @@ mod local {
Fingerprint::from_bytes_unsafe(hasher.fixed_result().as_slice())
};

let (env, content_database, lease_database) = dbs.get(&fingerprint)?;
let (env, content_database, lease_database) = dbs.get()?.get(&fingerprint);
let put_res = env.begin_rw_txn().and_then(|mut txn| {
txn.put(content_database, &fingerprint, &bytes, NO_OVERWRITE)?;
if initial_lease {
Expand Down Expand Up @@ -779,7 +782,7 @@ mod local {
.inner
.pool
.spawn_fn(move || {
let (env, db, _) = dbs.get(&fingerprint)?;
let (env, db, _) = dbs.get()?.get(&fingerprint);
let ro_txn = env
.begin_ro_txn()
.map_err(|err| format!("Failed to begin read transaction: {}", err));
Expand All @@ -801,21 +804,13 @@ mod local {
// fingerprint being stored, so that we can write to them in parallel.
#[derive(Clone)]
struct ShardedLmdb {
root_path: PathBuf,
// First Database is content, second is leases.
lmdbs: Arc<RwLock<Option<HashMap<u8, (Arc<Environment>, Database, Database)>>>>,
lmdbs: HashMap<u8, (Arc<Environment>, Database, Database)>,
}

impl ShardedLmdb {
pub fn new(root_path: PathBuf) -> ShardedLmdb {
ShardedLmdb {
root_path: root_path,
lmdbs: Arc::new(RwLock::new(None)),
}
}

fn make_lmdbs(&self) -> Result<HashMap<u8, (Arc<Environment>, Database, Database)>, String> {
debug!("Initializing ShardedLmdb at root {:?}", self.root_path);
pub fn new(root_path: &Path) -> Result<ShardedLmdb, String> {
debug!("Initializing ShardedLmdb at root {:?}", root_path);
let mut lmdbs = HashMap::new();

for b in 0x00..0x10 {
Expand All @@ -826,7 +821,7 @@ mod local {
fmt::Write::write_fmt(&mut s, format_args!("{:x}", key)).unwrap();
s[0..1].to_owned()
};
let dir = self.root_path.join(dirname);
let dir = root_path.join(dirname);
super::super::safe_create_dir_all(&dir)
.map_err(|err| format!("Error making directory for store at {:?}: {:?}", dir, err))?;
debug!("Making ShardedLmdb env for {:?}", dir);
Expand Down Expand Up @@ -890,51 +885,17 @@ mod local {

lmdbs.insert(key, (Arc::new(env), content_database, lease_database));
}
Ok(lmdbs)

Ok(ShardedLmdb { lmdbs })
}

// First Database is content, second is leases.
pub fn get(
&self,
fingerprint: &Fingerprint,
) -> Result<(Arc<Environment>, Database, Database), String> {
{
let maybe_lmdbs = self.lmdbs.read().unwrap();
match maybe_lmdbs.as_ref() {
Some(lmdbs) => return Ok(lmdbs.get(&(fingerprint.0[0] & 0xF0)).unwrap().clone()),
None => {}
}
}
{
let mut maybe_lmdbs = self.lmdbs.write().unwrap();
{
match maybe_lmdbs.as_ref() {
Some(_) => {}
None => {
*maybe_lmdbs = Some(self.make_lmdbs()?);
}
}
}
match maybe_lmdbs.as_ref() {
Some(lmdbs) => Ok(lmdbs.get(&(fingerprint.0[0] & 0xF0)).unwrap().clone()),
None => unreachable!(),
}
}
pub fn get(&self, fingerprint: &Fingerprint) -> (Arc<Environment>, Database, Database) {
self.lmdbs.get(&(fingerprint.0[0] & 0xF0)).unwrap().clone()
}

pub fn all_lmdbs(&self) -> Result<Vec<(Arc<Environment>, Database, Database)>, String> {
// TODO: Maybe do a read-locked check first (but this is only used when GCing so... Shrug).
let mut maybe_lmdbs = self.lmdbs.write().unwrap();
match maybe_lmdbs.as_ref() {
Some(_) => {}
None => {
*maybe_lmdbs = Some(self.make_lmdbs()?);
}
}
match maybe_lmdbs.as_ref() {
Some(lmdbs) => Ok(lmdbs.values().map(|v| v.clone()).collect()),
None => unreachable!(),
}
pub fn all_lmdbs(&self) -> Vec<(Arc<Environment>, Database, Database)> {
self.lmdbs.values().map(|v| v.clone()).collect()
}
}

Expand Down