Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: refactor, make 'mvcc' an extend Trait for 'Engine' #23

Merged
merged 3 commits into from
Jan 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ extern crate protobuf;
extern crate bytes;
extern crate byteorder;

pub use storage::{Storage, Dsn};

pub mod util;
pub mod raft;
pub mod proto;
mod storage;
pub mod storage;
24 changes: 12 additions & 12 deletions src/storage/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,34 +77,34 @@ mod tests {
#[test]
fn memory() {
let mut e = super::new_engine(Dsn::Memory).unwrap();
get_put(&mut *e);
batch(&mut *e);
seek(&mut *e);
get_put(e.as_mut());
batch(e.as_mut());
seek(e.as_mut());
}

#[test]
fn rocksdb() {
let mut e = super::new_engine(Dsn::RocksDBPath("/tmp/rocks")).unwrap();
get_put(&mut *e);
batch(&mut *e);
seek(&mut *e);
get_put(e.as_mut());
batch(e.as_mut());
seek(e.as_mut());
}

fn assert_has(engine: &Engine, key: &[u8], value: &[u8]) {
fn assert_has<T: Engine + ?Sized>(engine: &T, key: &[u8], value: &[u8]) {
assert_eq!(engine.get(key).unwrap().unwrap(), value);
}

fn assert_none(engine: &Engine, key: &[u8]) {
fn assert_none<T: Engine + ?Sized>(engine: &T, key: &[u8]) {
assert_eq!(engine.get(key).unwrap(), None);
}

fn assert_seek(engine: &Engine, key: &[u8], pair: (&[u8], &[u8])) {
fn assert_seek<T: Engine + ?Sized>(engine: &T, key: &[u8], pair: (&[u8], &[u8])) {
let (k, v) = engine.seek(key).unwrap().unwrap();
assert_eq!(k, pair.0);
assert_eq!(v, pair.1);
}

fn get_put(engine: &mut Engine) {
fn get_put<T: Engine + ?Sized>(engine: &mut T) {
assert_none(engine, b"x");
engine.put(b"x", b"1").unwrap();
assert_has(engine, b"x", b"1");
Expand All @@ -114,7 +114,7 @@ mod tests {
assert_none(engine, b"x");
}

fn batch(engine: &mut Engine) {
fn batch<T: Engine + ?Sized>(engine: &mut T) {
engine.write(vec![Modify::Put((b"x", b"1")), Modify::Put((b"y", b"2"))]).unwrap();
assert_has(engine, b"x", b"1");
assert_has(engine, b"y", b"2");
Expand All @@ -124,7 +124,7 @@ mod tests {
assert_none(engine, b"y");
}

fn seek(engine: &mut Engine) {
fn seek<T: Engine + ?Sized>(engine: &mut T) {
engine.put(b"x", b"1").unwrap();
assert_seek(engine, b"x", (b"x", b"1"));
assert_seek(engine, b"a", (b"x", b"1"));
Expand Down
17 changes: 9 additions & 8 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use self::engine::Engine;
pub use self::engine::Dsn;
use self::mvcc::Result;

mod engine;
mod mvcc;

pub use self::engine::Dsn;

use self::engine::Engine;
use self::mvcc::{MvccEngine, Result};

pub struct Storage {
engine: Box<Engine>,
}
Expand All @@ -17,17 +18,17 @@ impl Storage {

pub fn get(&self, key: &[u8], version: u64) -> Result<Option<Vec<u8>>> {
trace!("storage: get {:?}@{}", key, version);
mvcc::get(&*self.engine, key, version)
self.engine.as_ref().mvcc_get(key, version)
}

pub fn put(&mut self, key: &[u8], value: &[u8], version: u64) -> Result<()> {
trace!("storage: put {:?}@{}", key, version);
mvcc::put(&mut *self.engine, key, value, version)
self.engine.as_mut().mvcc_put(key, value, version)
}

pub fn delete(&mut self, key: &[u8], version: u64) -> Result<()> {
trace!("storage: delete {:?}@{}", key, version);
mvcc::delete(&mut *self.engine, key, version)
self.engine.as_mut().mvcc_delete(key, version)
}

pub fn scan(&self,
Expand All @@ -36,6 +37,6 @@ impl Storage {
version: u64)
-> Result<Vec<(Vec<u8>, Vec<u8>)>> {
trace!("storage: scan {:?}({})@{}", start_key, limit, version);
mvcc::scan(&*self.engine, start_key, limit, version)
self.engine.as_ref().mvcc_scan(start_key, limit, version)
}
}
182 changes: 93 additions & 89 deletions src/storage/mvcc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,92 +6,96 @@ use self::codec::{encode_key, decode_key};
mod meta;
mod codec;

pub fn get(eng: &Engine, key: &[u8], version: u64) -> Result<Option<Vec<u8>>> {
let mkey = encode_key(key, 0u64);
let mval = match try!(eng.get(&mkey)) {
Some(x) => x,
None => return Ok(None),
};
let meta = try!(Meta::parse(&mval));
let ver = match meta.latest(version) {
Some(x) => x,
None => return Ok(None),
};
let dkey = encode_key(key, ver);
match try!(eng.get(&dkey)) {
Some(x) => Ok(Some(x)),
None => MvccErrorKind::DataMissing.as_result(),
}
}

pub fn put(eng: &mut Engine, key: &[u8], value: &[u8], version: u64) -> Result<()> {
let mkey = encode_key(key, 0u64);
let dkey = encode_key(key, version);
let mval = try!(eng.get(&mkey));
let mut meta = match mval {
Some(x) => try!(Meta::parse(&x)),
None => Meta::new(),
};
meta.add(version);
let mval = meta.into_bytes();
let batch = vec![Modify::Put((&mkey, &mval)), Modify::Put((&dkey, value))];
eng.write(batch).map_err(|e| Error::from(e))
}

pub fn delete(eng: &mut Engine, key: &[u8], version: u64) -> Result<()> {
let mkey = encode_key(key, 0u64);
let dkey = encode_key(key, version);
let mval = try!(eng.get(&mkey));
let mut meta = match mval {
Some(x) => try!(Meta::parse(&x)),
None => Meta::new(),
};
let has_old_ver = meta.has_version(version);
meta.delete(version);
let mval = meta.into_bytes();
let mut batch = vec![Modify::Put((&mkey, &mval))];
if has_old_ver {
batch.push(Modify::Delete(&dkey));
}
eng.write(batch).map_err(|e| Error::from(e))
}

pub fn scan(eng: &Engine,
start_key: &[u8],
limit: usize,
version: u64)
-> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let mut pairs = vec![];
let mut seek_key = encode_key(start_key, 0u64);
loop {
if pairs.len() >= limit {
break;
}
let (mkey, mval) = match try!(eng.seek(&seek_key)) {
pub trait MvccEngine : Engine {
fn mvcc_get(&self, key: &[u8], version: u64) -> Result<Option<Vec<u8>>> {
let mkey = encode_key(key, 0u64);
let mval = match try!(self.get(&mkey)) {
Some(x) => x,
None => break,
None => return Ok(None),
};
let (mut key, _) = try!(decode_key(&mkey));
let meta = try!(Meta::parse(&mval));
let ver = match meta.latest(version) {
Some(x) => x,
None => {
key.push(0);
seek_key = encode_key(&key, 0u64);
continue;
}
None => return Ok(None),
};
let dkey = encode_key(&key, ver);
match try!(eng.get(&dkey)) {
Some(x) => pairs.push((key.clone(), x)),
None => return MvccErrorKind::DataMissing.as_result(),
let dkey = encode_key(key, ver);
match try!(self.get(&dkey)) {
Some(x) => Ok(Some(x)),
None => MvccErrorKind::DataMissing.as_result(),
}
key.push(0);
seek_key = encode_key(&key, 0u64);
}
Ok(pairs)

fn mvcc_put(&mut self, key: &[u8], value: &[u8], version: u64) -> Result<()> {
let mkey = encode_key(key, 0u64);
let dkey = encode_key(key, version);
let mval = try!(self.get(&mkey));
let mut meta = match mval {
Some(x) => try!(Meta::parse(&x)),
None => Meta::new(),
};
meta.add(version);
let mval = meta.into_bytes();
let batch = vec![Modify::Put((&mkey, &mval)), Modify::Put((&dkey, value))];
self.write(batch).map_err(|e| Error::from(e))
}

fn mvcc_delete(&mut self, key: &[u8], version: u64) -> Result<()> {
let mkey = encode_key(key, 0u64);
let dkey = encode_key(key, version);
let mval = try!(self.get(&mkey));
let mut meta = match mval {
Some(x) => try!(Meta::parse(&x)),
None => Meta::new(),
};
let has_old_ver = meta.has_version(version);
meta.delete(version);
let mval = meta.into_bytes();
let mut batch = vec![Modify::Put((&mkey, &mval))];
if has_old_ver {
batch.push(Modify::Delete(&dkey));
}
self.write(batch).map_err(|e| Error::from(e))
}

fn mvcc_scan(&self,
start_key: &[u8],
limit: usize,
version: u64)
-> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let mut pairs = vec![];
let mut seek_key = encode_key(start_key, 0u64);
loop {
if pairs.len() >= limit {
break;
}
let (mkey, mval) = match try!(self.seek(&seek_key)) {
Some(x) => x,
None => break,
};
let (mut key, _) = try!(decode_key(&mkey));
let meta = try!(Meta::parse(&mval));
let ver = match meta.latest(version) {
Some(x) => x,
None => {
key.push(0);
seek_key = encode_key(&key, 0u64);
continue;
}
};
let dkey = encode_key(&key, ver);
match try!(self.get(&dkey)) {
Some(x) => pairs.push((key.clone(), x)),
None => return MvccErrorKind::DataMissing.as_result(),
}
key.push(0);
seek_key = encode_key(&key, 0u64);
}
Ok(pairs)
}
}

impl<T: Engine + ?Sized> MvccEngine for T {}

#[derive(Debug)]
pub enum Error {
Engine(engine::Error),
Expand Down Expand Up @@ -161,29 +165,29 @@ pub type Result<T> = result::Result<T, Error>;
#[cfg(test)]
mod tests {
use storage::engine::{self, Dsn};
use super::{get, put, delete, scan};
use super::MvccEngine;

#[test]
fn test_mvcc() {
let mut eng = engine::new_engine(Dsn::Memory).unwrap();
assert_eq!(get(eng.as_ref(), b"x", 1).unwrap(), None);
put(&mut *eng, b"x", b"x10", 10).unwrap();
assert_eq!(get(eng.as_ref(), b"x", 10).unwrap().unwrap(), b"x10");
assert_eq!(get(eng.as_ref(), b"x", 11).unwrap().unwrap(), b"x10");
delete(&mut *eng, b"x", 20).unwrap();
assert_eq!(get(eng.as_ref(), b"x", 15).unwrap().unwrap(), b"x10");
assert_eq!(get(eng.as_ref(), b"x", 20).unwrap(), None);
assert_eq!(get(eng.as_ref(), b"x", 22).unwrap(), None);
assert_eq!(eng.mvcc_get(b"x", 1).unwrap(), None);
eng.mvcc_put(b"x", b"x10", 10).unwrap();
assert_eq!(eng.mvcc_get(b"x", 10).unwrap().unwrap(), b"x10");
assert_eq!(eng.mvcc_get(b"x", 11).unwrap().unwrap(), b"x10");
eng.mvcc_delete(b"x", 20).unwrap();
assert_eq!(eng.mvcc_get(b"x", 15).unwrap().unwrap(), b"x10");
assert_eq!(eng.mvcc_get(b"x", 20).unwrap(), None);
assert_eq!(eng.mvcc_get(b"x", 22).unwrap(), None);
}

#[test]
fn test_scan() {
let mut eng = engine::new_engine(Dsn::Memory).unwrap();
put(&mut *eng, b"aa", b"11", 10).unwrap();
put(&mut *eng, b"bb", b"22", 20).unwrap();
put(&mut *eng, b"cc", b"33", 15).unwrap();
eng.mvcc_put(b"aa", b"11", 10).unwrap();
eng.mvcc_put(b"bb", b"22", 20).unwrap();
eng.mvcc_put(b"cc", b"33", 15).unwrap();

let vec = scan(eng.as_ref(), b"a", 4, 19).unwrap();
let vec = eng.mvcc_scan(b"a", 4, 19).unwrap();
assert_eq!(vec.len(), 2);
assert_eq!(vec[0].0, b"aa");
assert_eq!(vec[0].1, b"11");
Expand Down