Skip to content

Commit

Permalink
Move ShardedLmdb into separate crate
Browse files Browse the repository at this point in the history
  • Loading branch information
illicitonion committed Jun 20, 2019
1 parent 210c3cd commit bfc7751
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 154 deletions.
12 changes: 12 additions & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/rust/engine/Cargo.toml
Expand Up @@ -43,6 +43,7 @@ members = [
"resettable",
"rule_graph",
"serverset",
"sharded_lmdb",
"tar_api",
"testutil",
"testutil/mock",
Expand Down Expand Up @@ -74,6 +75,7 @@ default-members = [
"resettable",
"rule_graph",
"serverset",
"sharded_lmdb",
"tar_api",
"testutil",
"testutil/mock",
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/fs/store/Cargo.toml
Expand Up @@ -24,6 +24,7 @@ serverset = { path = "../../serverset" }
sha2 = "0.8"
serde = "1.0"
serde_derive = "1.0"
sharded_lmdb = { path = "../../sharded_lmdb" }
tempfile = "3"
tokio-threadpool = "0.1.12"
uuid = { version = "0.7.1", features = ["v4"] }
Expand Down
162 changes: 8 additions & 154 deletions src/rust/engine/fs/store/src/lib.rs
Expand Up @@ -867,19 +867,15 @@ mod local {
use futures::future::{self, Future};
use hashing::{Digest, Fingerprint, EMPTY_DIGEST};
use lmdb::Error::{KeyExist, NotFound};
use lmdb::{
self, Cursor, Database, DatabaseFlags, Environment, EnvironmentCopyFlags, EnvironmentFlags,
RwTransaction, Transaction, WriteFlags,
};
use log::{error, trace};
use lmdb::{self, Cursor, Database, RwTransaction, Transaction, WriteFlags};
use log::error;
use sha2::Sha256;
use sharded_lmdb::ShardedLmdb;
use std;
use std::collections::{BinaryHeap, HashMap};
use std::fmt;
use std::path::{Path, PathBuf};
use std::collections::BinaryHeap;
use std::path::Path;
use std::sync::Arc;
use std::time;
use tempfile::TempDir;

use super::MAX_LOCAL_STORE_SIZE_BYTES;

Expand All @@ -903,8 +899,9 @@ mod local {
let directories_root = root.join("directories");
Ok(ByteStore {
inner: Arc::new(InnerStore {
file_dbs: ShardedLmdb::new(files_root.clone()).map(Arc::new),
directory_dbs: ShardedLmdb::new(directories_root.clone()).map(Arc::new),
file_dbs: ShardedLmdb::new(files_root.clone(), MAX_LOCAL_STORE_SIZE_BYTES).map(Arc::new),
directory_dbs: ShardedLmdb::new(directories_root.clone(), MAX_LOCAL_STORE_SIZE_BYTES)
.map(Arc::new),
}),
})
}
Expand Down Expand Up @@ -1211,149 +1208,6 @@ mod local {
}
}

// Each LMDB directory can have at most one concurrent writer.
// We use this type to shard storage into 16 LMDB directories, based on the first 4 bits of the
// fingerprint being stored, so that we can write to them in parallel.
#[derive(Clone)]
struct ShardedLmdb {
// First Database is content, second is leases.
lmdbs: HashMap<u8, (Arc<Environment>, Database, Database)>,
root_path: PathBuf,
}

impl ShardedLmdb {
pub fn new(root_path: PathBuf) -> Result<ShardedLmdb, String> {
trace!("Initializing ShardedLmdb at root {:?}", root_path);
let mut lmdbs = HashMap::new();

#[allow(clippy::identity_conversion)]
// False positive: https://github.com/rust-lang/rust-clippy/issues/3913
for (env, dir, fingerprint_prefix) in ShardedLmdb::envs(&root_path)? {
trace!("Making ShardedLmdb content database for {:?}", dir);
let content_database = env
.create_db(Some("content"), DatabaseFlags::empty())
.map_err(|e| {
format!(
"Error creating/opening content database at {:?}: {}",
dir, e
)
})?;

trace!("Making ShardedLmdb lease database for {:?}", dir);
let lease_database = env
.create_db(Some("leases"), DatabaseFlags::empty())
.map_err(|e| {
format!(
"Error creating/opening content database at {:?}: {}",
dir, e
)
})?;

lmdbs.insert(
fingerprint_prefix,
(Arc::new(env), content_database, lease_database),
);
}

Ok(ShardedLmdb { lmdbs, root_path })
}

fn envs(root_path: &Path) -> Result<Vec<(Environment, PathBuf, u8)>, String> {
let mut envs = Vec::with_capacity(0x10);
for b in 0x00..0x10 {
let fingerprint_prefix = b << 4;
let mut dirname = String::new();
fmt::Write::write_fmt(&mut dirname, format_args!("{:x}", fingerprint_prefix)).unwrap();
let dirname = dirname[0..1].to_owned();
let dir = root_path.join(dirname);
fs::safe_create_dir_all(&dir)
.map_err(|err| format!("Error making directory for store at {:?}: {:?}", dir, err))?;
envs.push((ShardedLmdb::make_env(&dir)?, dir, fingerprint_prefix));
}
Ok(envs)
}

fn make_env(dir: &Path) -> Result<Environment, String> {
Environment::new()
// NO_SYNC
// =======
//
// Don't force fsync on every lmdb write transaction
//
// This significantly improves performance on slow or contended disks.
//
// On filesystems which preserve order of writes, on system crash this may lead to some
// transactions being rolled back. This is fine because this is just a write-once
// content-addressed cache. There is no risk of corruption, just compromised durability.
//
// On filesystems which don't preserve the order of writes, this may lead to lmdb
// corruption on system crash (but in no other circumstances, such as process crash).
//
// ------------------------------------------------------------------------------------
//
// NO_TLS
// ======
//
// Without this flag, each time a read transaction is started, it eats into our
// transaction limit (default: 126) until that thread dies.
//
// This flag makes transactions be removed from that limit when they are dropped, rather
// than when their thread dies. This is important, because we perform reads from a
// thread pool, so our threads never die. Without this flag, all read requests will fail
// after the first 126.
//
// The only down-side is that you need to make sure that any individual OS thread must
// not try to perform multiple write transactions concurrently. Fortunately, this
// property holds for us.
.set_flags(EnvironmentFlags::NO_SYNC | EnvironmentFlags::NO_TLS)
// 2 DBs; one for file contents, one for leases.
.set_max_dbs(2)
.set_map_size(MAX_LOCAL_STORE_SIZE_BYTES)
.open(dir)
.map_err(|e| format!("Error making env for store at {:?}: {}", dir, e))
}

// First Database is content, second is leases.
pub fn get(&self, fingerprint: &Fingerprint) -> (Arc<Environment>, Database, Database) {
self.lmdbs[&(fingerprint.0[0] & 0xF0)].clone()
}

pub fn all_lmdbs(&self) -> Vec<(Arc<Environment>, Database, Database)> {
self.lmdbs.values().cloned().collect()
}

#[allow(clippy::identity_conversion)] // False positive: https://github.com/rust-lang/rust-clippy/issues/3913
pub fn compact(&self) -> Result<(), String> {
for (env, old_dir, _) in ShardedLmdb::envs(&self.root_path)? {
let new_dir = TempDir::new_in(old_dir.parent().unwrap()).expect("TODO");
env
.copy(new_dir.path(), EnvironmentCopyFlags::COMPACT)
.map_err(|e| {
format!(
"Error copying store from {:?} to {:?}: {}",
old_dir,
new_dir.path(),
e
)
})?;
std::fs::remove_dir_all(&old_dir)
.map_err(|e| format!("Error removing old store at {:?}: {}", old_dir, e))?;
std::fs::rename(&new_dir.path(), &old_dir).map_err(|e| {
format!(
"Error replacing {:?} with {:?}: {}",
old_dir,
new_dir.path(),
e
)
})?;

// Prevent the tempdir from being deleted on drop.
std::mem::drop(new_dir);
}
Ok(())
}
}

#[derive(Eq, PartialEq, Ord, PartialOrd)]
struct AgedFingerprint {
// expired_seconds_ago must be the first field for the Ord implementation.
Expand Down
12 changes: 12 additions & 0 deletions src/rust/engine/sharded_lmdb/Cargo.toml
@@ -0,0 +1,12 @@
[package]
name = "sharded_lmdb"
version = "0.0.1"
authors = [ "Pants Build <pantsbuild@gmail.com>" ]
edition = "2018"

[dependencies]
fs = { path = "../fs" }
hashing = { path = "../hashing" }
lmdb = { git = "https://github.com/pantsbuild/lmdb-rs.git", rev = "06bdfbfc6348f6804127176e561843f214fc17f8" }
log = "0.4"
tempfile = "3"

0 comments on commit bfc7751

Please sign in to comment.