Skip to content

Commit

Permalink
closes #130
Browse files Browse the repository at this point in the history
Signed-off-by: Valerian Saliou <valerian@valeriansaliou.name>
  • Loading branch information
valeriansaliou committed Apr 21, 2019
1 parent f742b4a commit 4e26de0
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 24 deletions.
1 change: 1 addition & 0 deletions CONFIGURATION.md
Expand Up @@ -42,6 +42,7 @@ Sonic Configuration
* `max_compactions` (type: _integer_, allowed: numbers, default: `1`) — Limit on the number of concurrent database compaction jobs
* `max_flushes` (type: _integer_, allowed: numbers, default: `1`) — Limit on the number of concurrent database flush jobs
* `write_buffer` (type: _integer_, allowed: numbers, default: `16384`) — Maximum size in KB of the database write buffer, after which data gets flushed to disk (ie. `16384` is `16MB`; the size should be a multiple of `1024`, eg. `128 * 1024 = 131072` for `128MB`)
* `write_ahead_log` (type: _boolean_, allowed: `true`, `false`, default: `true`) — Whether to enable Write-Ahead Log or not (it avoids losing non-flushed data in case of server crash)

**[store.fst]**

Expand Down
32 changes: 16 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -32,7 +32,7 @@ graceful = "0.1"
rand = "0.6"
unicode-segmentation = "1.2"
radix = "0.4"
rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb", features = ["lz4"] }
rocksdb = { git = "https://github.com/crisp-dev/rust-rocksdb", features = ["lz4"] }
fst = "0.3"
fst-levenshtein = "0.2"
fst-regex = "0.2"
Expand Down
1 change: 1 addition & 0 deletions config.cfg
Expand Up @@ -46,6 +46,7 @@ max_files = 100
max_compactions = 1
max_flushes = 1
write_buffer = 16384
write_ahead_log = true

[store.fst]

Expand Down
2 changes: 1 addition & 1 deletion src/channel/listen.rs
Expand Up @@ -5,8 +5,8 @@
// License: Mozilla Public License v2.0 (MPL v2.0)

use std::net::TcpListener;
use std::sync::RwLock;
use std::process;
use std::sync::RwLock;
use std::thread;

use super::handle::ChannelHandle;
Expand Down
3 changes: 3 additions & 0 deletions src/config/config.rs
Expand Up @@ -94,6 +94,9 @@ pub struct ConfigStoreKVDatabase {

#[serde(default = "defaults::store_kv_database_write_buffer")]
pub write_buffer: usize,

#[serde(default = "defaults::store_kv_database_write_ahead_log")]
pub write_ahead_log: bool,
}

#[derive(Deserialize)]
Expand Down
4 changes: 4 additions & 0 deletions src/config/defaults.rs
Expand Up @@ -71,6 +71,10 @@ pub fn store_kv_database_write_buffer() -> usize {
16384
}

pub fn store_kv_database_write_ahead_log() -> bool {
true
}

pub fn store_fst_path() -> PathBuf {
PathBuf::from("./data/store/fst/")
}
Expand Down
6 changes: 5 additions & 1 deletion src/main.rs
Expand Up @@ -51,12 +51,13 @@ use clap::{App, Arg};
use graceful::SignalGuard;
use log::LevelFilter;

use channel::listen::{ChannelListenBuilder, ChannelListen};
use channel::listen::{ChannelListen, ChannelListenBuilder};
use channel::statistics::ensure_states as ensure_states_channel_statistics;
use config::config::Config;
use config::logger::ConfigLogger;
use config::reader::ConfigReader;
use store::fst::StoreFSTPool;
use store::kv::StoreKVPool;
use tasker::runtime::TaskerBuilder;

struct AppArgs {
Expand Down Expand Up @@ -170,6 +171,9 @@ fn main() {
// Teardown Sonic Channel
ChannelListen::teardown();

// Perform a KV flush (ensures all in-memory changes are synced on-disk before shutdown)
StoreKVPool::flush();

// Perform a FST consolidation (ensures all in-memory items are synced on-disk before \
// shutdown; otherwise we would lose all non-consolidated FST changes)
StoreFSTPool::consolidate(true);
Expand Down
63 changes: 58 additions & 5 deletions src/store/kv.rs
Expand Up @@ -13,7 +13,7 @@ use rocksdb::backup::{
};
use rocksdb::{
DBCompactionStyle, DBCompressionType, DBVector, Error as DBError, Options as DBOptions,
WriteBatch, DB,
WriteBatch, WriteOptions, FlushOptions, DB,
};
use std::fmt;
use std::fs;
Expand Down Expand Up @@ -152,6 +152,32 @@ impl StoreKVPool {
)
}

pub fn flush() {
debug!("flushing changes on kv store pool items to disk");

// Generate shared flush options
let mut flush_options = FlushOptions::default();

flush_options.set_wait(true);

// Acquire access lock (in blocking write mode), and reference it in context
// Notice: this prevents store to be acquired from any context
{
let _access = STORE_ACCESS_LOCK.write().unwrap();

for (collection_bucket, store) in STORE_POOL.read().unwrap().iter() {
// Perform flush (in blocking mode)
if store.database.flush_opt(&flush_options).is_ok() {
debug!("flushed kv store pool item to disk: {}", collection_bucket);
} else {
error!("failed flushing kv store pool item to disk: {}", collection_bucket);
}
}
}

info!("done flushing changes on kv store pool items to disk");
}

fn dump_action(
action: &str,
read_path: &Path,
Expand Down Expand Up @@ -377,11 +403,38 @@ impl StoreKV {
}

pub fn put(&self, key: &[u8], data: &[u8]) -> Result<(), DBError> {
self.database.put(key, data)
let mut batch = WriteBatch::default();

batch.put(key, data)?;

self.do_write(batch)
}

pub fn delete(&self, key: &[u8]) -> Result<(), DBError> {
self.database.delete(key)
let mut batch = WriteBatch::default();

batch.delete(key)?;

self.do_write(batch)
}

fn do_write(&self, batch: WriteBatch) -> Result<(), DBError> {
// Configure this write
let mut write_options = WriteOptions::default();

// WAL disabled?
if !APP_CONF.store.kv.database.write_ahead_log {
debug!("ignoring wal for kv write");

write_options.disable_wal(true);
} else {
debug!("using wal for kv write");

write_options.disable_wal(false);
}

// Commit this write
self.database.write_opt(batch, &write_options)
}
}

Expand Down Expand Up @@ -947,7 +1000,7 @@ impl<'a> StoreKVAction<'a> {
.is_ok()
{
// Commit operation to database
if let Err(err) = store.database.write(batch) {
if let Err(err) = store.do_write(batch) {
error!(
"failed in store batch erase bucket: {} with error: {}",
self.bucket.as_str(),
Expand All @@ -957,7 +1010,7 @@ impl<'a> StoreKVAction<'a> {
// Ensure last key is deleted (as RocksDB end key is exclusive; while \
// start key is inclusive, we need to ensure the end-of-range key is \
// deleted)
store.database.delete(&key_prefix_end).ok();
store.delete(&key_prefix_end).ok();

debug!(
"succeeded in store batch erase bucket: {}",
Expand Down

0 comments on commit 4e26de0

Please sign in to comment.