Skip to content

Commit

Permalink
Refactored stores to remove superfluous STORE_WRITE_LOCK and GRAPH_WR…
Browse files Browse the repository at this point in the history
…ITE_LOCK

Mutexes are bad.

Signed-off-by: Valerian Saliou <valerian@valeriansaliou.name>
  • Loading branch information
valeriansaliou committed Mar 27, 2019
1 parent 1d9ba90 commit 60566d2
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 50 deletions.
32 changes: 8 additions & 24 deletions src/store/fst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ static LOOKUP_REGEX_RANGE_LATIN: &'static str = "[\\x{0000}-\\x{024F}]";

lazy_static! {
pub static ref GRAPH_ACCESS_LOCK: Arc<RwLock<bool>> = Arc::new(RwLock::new(false));
static ref GRAPH_WRITE_LOCK: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
static ref GRAPH_REBUILD_LOCK: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
static ref GRAPH_POOL: Arc<RwLock<HashMap<StoreFSTKey, StoreFSTBox>>> =
Arc::new(RwLock::new(HashMap::new()));
Expand All @@ -100,11 +99,6 @@ impl StoreFSTPool {

let pool_key = StoreFSTKey::from_str(collection_str, bucket_str);

// Acquire general lock, and reference it in context
// Notice: this prevents graph to be opened while also erased; or 2 graphs on the \
// same collection to be opened at the same time.
let _write = GRAPH_WRITE_LOCK.lock().unwrap();

// Acquire a thread-safe store pool reference in read mode
let graph_pool_read = GRAPH_POOL.read().unwrap();

Expand All @@ -130,7 +124,6 @@ impl StoreFSTPool {
&*GRAPH_POOL,
APP_CONF.store.fst.pool.inactive_after,
&*GRAPH_ACCESS_LOCK,
&*GRAPH_WRITE_LOCK,
)
}

Expand All @@ -142,14 +135,9 @@ impl StoreFSTPool {
// certain heavy tasks, which is better to spread out consolidation steps over time over \
// a large number of very active buckets.

// Acquire rebuild + access locks, and reference them in context
// Notice: access lock prevents the consolidate process from using the graph if it is \
// ongoing erasure, while the rebuild lock prevents two consolidate operations to be \
// executed at the same time.
let (_access, _rebuild) = (
GRAPH_ACCESS_LOCK.read().unwrap(),
GRAPH_REBUILD_LOCK.lock().unwrap(),
);
// Acquire rebuild lock, and reference it in context
// Notice: this prevents two consolidate operations to be executed at the same time.
let _rebuild = GRAPH_REBUILD_LOCK.lock().unwrap();

// Exit trap: Register is empty? Abort there.
if GRAPH_CONSOLIDATE.read().unwrap().is_empty() == true {
Expand Down Expand Up @@ -218,8 +206,10 @@ impl StoreFSTPool {
for key in &keys_consolidate {
{
// As we may be renaming the FST file, ensure no consumer out of this is \
// trying to open the FST file as it gets processed.
let _write = GRAPH_WRITE_LOCK.lock().unwrap();
// trying to access the FST file as it gets processed. This also waits for \
// current consumers to finish reading the FST, and prevents any new \
// consumer from opening it while we are not done there.
let _access = GRAPH_ACCESS_LOCK.write().unwrap();

let do_close = if let Some(store) = GRAPH_POOL.read().unwrap().get(key) {
debug!("fst key: {} consolidate started", key);
Expand Down Expand Up @@ -593,13 +583,7 @@ impl StoreFSTActionBuilder {
}

pub fn erase<'a, T: Into<&'a str>>(collection: T, bucket: Option<T>) -> Result<u32, ()> {
Self::dispatch_erase(
"fst",
collection,
bucket,
&*GRAPH_ACCESS_LOCK,
&*GRAPH_WRITE_LOCK,
)
Self::dispatch_erase("fst", collection, bucket, &*GRAPH_ACCESS_LOCK)
}

fn build(store: StoreFSTBox) -> StoreFSTAction {
Expand Down
18 changes: 7 additions & 11 deletions src/store/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use core::hash::Hash;
use hashbrown::HashMap;
use std::fmt::Display;
use std::mem;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, RwLock};
use std::time::SystemTime;

pub trait StoreGenericKey {}
Expand Down Expand Up @@ -83,14 +83,12 @@ pub trait StoreGenericPool<
pool: &Arc<RwLock<HashMap<K, Arc<S>>>>,
inactive_after: u64,
access_lock: &Arc<RwLock<bool>>,
write_lock: &Arc<Mutex<bool>>,
) {
debug!("scanning for {} store pool items to janitor", kind);

// Acquire write + access locks, and reference it in context
// Notice: write lock prevents store to be acquired from any context; while access lock \
// lets the erasure process wait that any thread using the store is done with work.
let (_access, _write) = (access_lock.write().unwrap(), write_lock.lock().unwrap());
// Acquire access lock (in blocking write mode), and reference it in context
// Notice: this prevents store to be acquired from any context
let _access = access_lock.write().unwrap();

let mut removal_register: Vec<K> = Vec::new();

Expand Down Expand Up @@ -151,16 +149,14 @@ pub trait StoreGenericActionBuilder {
collection: T,
bucket: Option<T>,
access_lock: &Arc<RwLock<bool>>,
write_lock: &Arc<Mutex<bool>>,
) -> Result<u32, ()> {
let collection_str = collection.into();

info!("{} erase requested on collection: {}", kind, collection_str);

// Acquire write + access locks, and reference it in context
// Notice: write lock prevents store to be acquired from any context; while access lock \
// lets the erasure process wait that any thread using the store is done with work.
let (_access, _write) = (access_lock.write().unwrap(), write_lock.lock().unwrap());
// Acquire access lock (in blocking write mode), and reference it in context
// Notice: this prevents store to be acquired from any context
let _access = access_lock.write().unwrap();

if let Some(bucket) = bucket {
Self::proceed_erase_bucket(collection_str, bucket.into())
Expand Down
17 changes: 2 additions & 15 deletions src/store/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::fmt;
use std::fs;
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, RwLock};
use std::time::SystemTime;
use std::vec::Drain;

Expand Down Expand Up @@ -57,7 +57,6 @@ type StoreKVBox = Arc<StoreKV>;

lazy_static! {
pub static ref STORE_ACCESS_LOCK: Arc<RwLock<bool>> = Arc::new(RwLock::new(false));
static ref STORE_WRITE_LOCK: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
static ref STORE_POOL: Arc<RwLock<HashMap<StoreKVKey, StoreKVBox>>> =
Arc::new(RwLock::new(HashMap::new()));
}
Expand All @@ -70,11 +69,6 @@ impl StoreKVPool {
let collection_str = collection.into();
let pool_key = StoreKVKey::from_str(collection_str);

// Acquire general lock, and reference it in context
// Notice: this prevents database to be opened while also erased; or 2 databases on the \
// same collection to be opened at the same time.
let _write = STORE_WRITE_LOCK.lock().unwrap();

// Acquire a thread-safe store pool reference in read mode
let store_pool_read = STORE_POOL.read().unwrap();

Expand Down Expand Up @@ -116,7 +110,6 @@ impl StoreKVPool {
&*STORE_POOL,
APP_CONF.store.kv.pool.inactive_after,
&*STORE_ACCESS_LOCK,
&*STORE_WRITE_LOCK,
)
}
}
Expand Down Expand Up @@ -213,13 +206,7 @@ impl StoreKVActionBuilder {
}

pub fn erase<'a, T: Into<&'a str>>(collection: T, bucket: Option<T>) -> Result<u32, ()> {
Self::dispatch_erase(
"kv",
collection,
bucket,
&*STORE_ACCESS_LOCK,
&*STORE_WRITE_LOCK,
)
Self::dispatch_erase("kv", collection, bucket, &*STORE_ACCESS_LOCK)
}

fn build<'a>(bucket: StoreItemPart<'a>, store: Option<StoreKVBox>) -> StoreKVAction<'a> {
Expand Down

0 comments on commit 60566d2

Please sign in to comment.