Skip to content

Commit

Permalink
storage/engine: replace ThreadPool with Worker, replace close() with …
Browse files Browse the repository at this point in the history
…Drop.
  • Loading branch information
disksing committed Jul 14, 2016
1 parent 72643e3 commit efb2caf
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 18 deletions.
2 changes: 0 additions & 2 deletions src/storage/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ pub enum Modify {
pub trait Engine: Send + Sync + Debug {
fn async_write(&self, ctx: &Context, batch: Vec<Modify>, callback: Callback<()>) -> Result<()>;
fn async_snapshot(&self, ctx: &Context, callback: Callback<Box<Snapshot>>) -> Result<()>;
// maybe mut is better.
fn close(&self) {}

fn write(&self, ctx: &Context, batch: Vec<Modify>) -> Result<()> {
let finished = Event::new();
Expand Down
4 changes: 3 additions & 1 deletion src/storage/engine/raftkv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,10 @@ impl<C: PdClient> Engine for RaftKv<C> {
}));
Ok(())
}
}

fn close(&self) {
impl<C: PdClient> Drop for RaftKv<C> {
fn drop(&mut self) {
self.node.lock().unwrap().stop();
}
}
Expand Down
61 changes: 47 additions & 14 deletions src/storage/engine/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,53 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{self, Formatter, Debug};
use std::fmt::{self, Formatter, Debug, Display};
use std::sync::{Arc, Mutex};
use threadpool::ThreadPool;
use rocksdb::{DB, Writable, SeekKey, WriteBatch, DBIterator};
use kvproto::kvrpcpb::Context;
use storage::{Key, Value, CfName};
use raftstore::store::engine::{Snapshot as RocksSnapshot, Peekable, Iterable};
use util::escape;
use util::rocksdb;
use util::worker::{Runnable, Worker};
use super::{Engine, Snapshot, Modify, Cursor, Callback, TEMP_DIR, Result, Error, DEFAULT_CFNAME};
use tempdir::TempDir;

enum Task {
Write(Vec<Modify>, Callback<()>),
Snapshot(Callback<Box<Snapshot>>),
}

impl Display for Task {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match *self {
Task::Write(..) => write!(f, "write task"),
Task::Snapshot(_) => write!(f, "snapshot task"),
}
}
}

struct Runner(Arc<DB>);

impl Runnable<Task> for Runner {
fn run(&mut self, t: Task) {
match t {
Task::Write(modifies, cb) => cb(write_modifies(&self.0, modifies)),
Task::Snapshot(cb) => cb(Ok(box RocksSnapshot::new(self.0.clone()))),
}
}
}

impl Display for Runner {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "engine rocksdb runner")
}
}

pub struct EngineRocksdb {
db: Arc<DB>,
// only use for memory mode
temp_dir: Option<TempDir>,
// ThreadPool is handy for executing task asynchronously, each Engine will
// contain only 1 thread.
pool: Mutex<ThreadPool>,
worker: Mutex<Worker<Task>>,
}

impl EngineRocksdb {
Expand All @@ -43,10 +70,12 @@ impl EngineRocksdb {
}
_ => (path.to_owned(), None),
};
let mut worker = Worker::new("engine-rocksdb");
let db = try!(rocksdb::new_engine(&path, cfs));
box_try!(worker.start(Runner(Arc::new(db))));
Ok(EngineRocksdb {
db: Arc::new(try!(rocksdb::new_engine(&path, cfs))),
temp_dir: temp_dir,
pool: Mutex::new(ThreadPool::new_with_name(thd_name!("engine-rocksdb"), 1)),
worker: Mutex::new(worker),
})
}
}
Expand Down Expand Up @@ -94,20 +123,24 @@ fn write_modifies(db: &DB, modifies: Vec<Modify>) -> Result<()> {

impl Engine for EngineRocksdb {
fn async_write(&self, _: &Context, modifies: Vec<Modify>, cb: Callback<()>) -> Result<()> {
let db = self.db.clone();
self.pool.lock().unwrap().execute(move || cb(write_modifies(&db, modifies)));
box_try!(self.worker.lock().unwrap().schedule(Task::Write(modifies, cb)));
Ok(())
}

fn async_snapshot(&self, _: &Context, cb: Callback<Box<Snapshot>>) -> Result<()> {
let db = self.db.clone();
self.pool.lock().unwrap().execute(move || {
cb(Ok(box RocksSnapshot::new(db)));
});
box_try!(self.worker.lock().unwrap().schedule(Task::Snapshot(cb)));
Ok(())
}
}

impl Drop for EngineRocksdb {
fn drop(&mut self) {
if let Some(h) = self.worker.lock().unwrap().stop() {
h.join().unwrap();
}
}
}

impl Snapshot for RocksSnapshot {
fn get(&self, key: &Key) -> Result<Option<Value>> {
trace!("RocksSnapshot: get {}", key);
Expand Down
1 change: 0 additions & 1 deletion src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ impl Storage {
}
self.sched.take();
info!("storage {:?} closed.", self.engine);
self.engine.close();
Ok(())
}

Expand Down

0 comments on commit efb2caf

Please sign in to comment.