Skip to content

Commit

Permalink
Add a KV flush tasker
Browse files Browse the repository at this point in the history
Signed-off-by: Valerian Saliou <valerian@valeriansaliou.name>
  • Loading branch information
valeriansaliou committed Apr 21, 2019
1 parent 866761e commit 6713488
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 34 deletions.
1 change: 1 addition & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Sonic Configuration

**[store.kv.database]**

* `flush_after` (type: _integer_, allowed: seconds, default: `900`) — Time after which pending database updates should be flushed from memory to disk (increase this delay if you encounter high-CPU usage issues when a flush task kicks-in; this value should be lower than `store.kv.pool.inactive_after`)
* `compress` (type: _boolean_, allowed: `true`, `false`, default: `true`) — Whether to compress database or not (uses LZ4)
* `parallelism` (type: _integer_, allowed: numbers, default: `2`) — Limit on the number of compaction and flush threads that can run at the same time
* `max_files` (type: _integer_, allowed: numbers, no default) — Maximum number of database files kept open at the same time per-database (if any; otherwise there are no limits)
Expand Down
2 changes: 2 additions & 0 deletions config.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ inactive_after = 1800

[store.kv.database]

flush_after = 900

compress = true
parallelism = 2
max_files = 100
Expand Down
3 changes: 3 additions & 0 deletions src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ pub struct ConfigStoreKVPool {

#[derive(Deserialize)]
pub struct ConfigStoreKVDatabase {
#[serde(default = "defaults::store_kv_database_flush_after")]
pub flush_after: u64,

#[serde(default = "defaults::store_kv_database_compress")]
pub compress: bool,

Expand Down
4 changes: 4 additions & 0 deletions src/config/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ pub fn store_kv_pool_inactive_after() -> u64 {
1800
}

pub fn store_kv_database_flush_after() -> u64 {
900
}

pub fn store_kv_database_compress() -> bool {
true
}
Expand Down
5 changes: 5 additions & 0 deletions src/config/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ impl ConfigReader {
panic!("write_buffer for kv must not be zero");
}

// Check 'flush_after' for KV
if config.store.kv.database.flush_after >= config.store.kv.pool.inactive_after {
panic!("flush_after for kv must be strictly lower than inactive_after");
}

// Check 'consolidate_after' for FST
if config.store.fst.graph.consolidate_after >= config.store.fst.pool.inactive_after {
panic!("consolidate_after for fst must be strictly lower than inactive_after");
Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ fn main() {
ChannelListen::teardown();

// Perform a KV flush (ensures all in-memory changes are synced on-disk before shutdown)
StoreKVPool::flush();
StoreKVPool::flush(true);

// Perform a FST consolidation (ensures all in-memory items are synced on-disk before \
// shutdown; otherwise we would lose all non-consolidated FST changes)
Expand Down
5 changes: 0 additions & 5 deletions src/store/fst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,11 +859,6 @@ impl StoreGeneric for StoreFST {
fn ref_last_used<'a>(&'a self) -> &'a RwLock<SystemTime> {
&self.last_used
}

fn hook_pre_janitor(&self) -> Result<(), ()> {
// Nothing done there.
Ok(())
}
}

impl StoreFSTActionBuilder {
Expand Down
9 changes: 0 additions & 9 deletions src/store/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ pub trait StoreGenericKey {}

pub trait StoreGeneric {
fn ref_last_used<'a>(&'a self) -> &'a RwLock<SystemTime>;
fn hook_pre_janitor(&self) -> Result<(), ()>;
}

pub trait StoreGenericPool<
Expand Down Expand Up @@ -107,14 +106,6 @@ pub trait StoreGenericPool<
kind, collection_bucket, last_used_elapsed
);

// Trigger pre-janitor hook for this store
if store.hook_pre_janitor().is_err() {
error!(
"pre-janitor hook failed for {} store pool item: {}",
kind, collection_bucket
);
}

// Notice: the bucket value needs to be cloned, as we cannot reference as value \
// that will outlive referenced value once we remove it from its owner set.
removal_register.push(*collection_bucket);
Expand Down
103 changes: 84 additions & 19 deletions src/store/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::fs;
use std::io::{self, Cursor};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::SystemTime;
use std::vec::Drain;

Expand All @@ -37,6 +38,7 @@ pub struct StoreKVBuilder;
pub struct StoreKV {
database: DB,
last_used: Arc<RwLock<SystemTime>>,
last_flushed: Arc<RwLock<SystemTime>>,
pub lock: RwLock<bool>,
}

Expand Down Expand Up @@ -66,6 +68,7 @@ const ATOM_HASH_RADIX: usize = 16;
lazy_static! {
pub static ref STORE_ACCESS_LOCK: Arc<RwLock<bool>> = Arc::new(RwLock::new(false));
static ref STORE_ACQUIRE_LOCK: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
static ref STORE_FLUSH_LOCK: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
static ref STORE_POOL: Arc<RwLock<HashMap<StoreKVKey, StoreKVBox>>> =
Arc::new(RwLock::new(HashMap::new()));
}
Expand Down Expand Up @@ -152,28 +155,90 @@ impl StoreKVPool {
)
}

pub fn flush() {
debug!("flushing changes on kv store pool items to disk");
pub fn flush(force: bool) {
debug!("scanning for kv store pool items to flush to disk");

// Acquire flush lock, and reference it in context
// Notice: this prevents two flush operations to be executed at the same time.
let _flush = STORE_FLUSH_LOCK.lock().unwrap();

// Step 1: List keys to be flushed
let mut keys_flush: Vec<StoreKVKey> = Vec::new();

{
// Acquire access lock (in blocking write mode), and reference it in context
// Notice: this prevents store to be acquired from any context
let _access = STORE_ACCESS_LOCK.write().unwrap();

for (collection_bucket, store) in STORE_POOL.read().unwrap().iter() {
// Perform flush (in blocking mode)
if store.flush().is_ok() {
debug!("flushed kv store pool item to disk: {}", collection_bucket);
let store_pool_read = STORE_POOL.read().unwrap();

for (key, store) in &*store_pool_read {
let not_flushed_for = store
.last_flushed
.read()
.unwrap()
.elapsed()
.unwrap()
.as_secs();

if force || not_flushed_for >= APP_CONF.store.kv.database.flush_after {
info!(
"kv key: {} not flushed for: {} seconds, may flush",
key, not_flushed_for
);

keys_flush.push(*key);
} else {
error!(
"failed flushing kv store pool item to disk: {}",
collection_bucket
debug!(
"kv key: {} not flushed for: {} seconds, no flush",
key, not_flushed_for
);
}
}
}

info!("done flushing changes on kv store pool items to disk");
// Exit trap: Nothing to flush yet? Abort there.
if keys_flush.is_empty() {
info!("no kv store pool items need to be flushed at the moment");

return;
}

// Step 2: Flush KVs, one-by-one (sequential locking; this avoids global locks)
let mut count_flushed = 0;

{
for key in &keys_flush {
{
// Acquire access lock (in blocking write mode), and reference it in context
// Notice: this prevents store to be acquired from any context
let _access = STORE_ACCESS_LOCK.write().unwrap();

if let Some(store) = STORE_POOL.read().unwrap().get(key) {
debug!("kv key: {} flush started", key);

if let Err(err) = store.flush() {
error!("kv key: {} flush failed: {}", key, err);
} else {
count_flushed += 1;

debug!("kv key: {} flush complete", key);
}

// Bump 'last flushed' time
*store.last_flushed.write().unwrap() = SystemTime::now();
}
}

// Give a bit of time to other threads before continuing
thread::yield_now();
}
}

info!(
"done scanning for kv store pool items to flush to disk (flushed: {})",
count_flushed
);
}

fn dump_action(
Expand Down Expand Up @@ -382,10 +447,15 @@ impl StoreKVBuilder {
impl StoreGenericBuilder<StoreKVKey, StoreKV> for StoreKVBuilder {
fn new(pool_key: StoreKVKey) -> Result<StoreKV, ()> {
Self::open(pool_key.collection_hash)
.map(|db| StoreKV {
database: db,
last_used: Arc::new(RwLock::new(SystemTime::now())),
lock: RwLock::new(false),
.map(|db| {
let now = SystemTime::now();

StoreKV {
database: db,
last_used: Arc::new(RwLock::new(now)),
last_flushed: Arc::new(RwLock::new(now)),
lock: RwLock::new(false),
}
})
.or_else(|err| {
error!("failed opening kv: {}", err);
Expand Down Expand Up @@ -450,11 +520,6 @@ impl StoreGeneric for StoreKV {
fn ref_last_used<'a>(&'a self) -> &'a RwLock<SystemTime> {
&self.last_used
}

fn hook_pre_janitor(&self) -> Result<(), ()> {
// Flush all in-memory data to on-disk database (before it is closed)
self.flush().or(Err(()))
}
}

impl StoreKVActionBuilder {
Expand Down
1 change: 1 addition & 0 deletions src/tasker/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ impl Tasker {
StoreFSTPool::janitor();

// #2: Others
StoreKVPool::flush(false);
StoreFSTPool::consolidate(false);
}
}

0 comments on commit 6713488

Please sign in to comment.