Skip to content

Commit

Permalink
*: remove lifetime of snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
BusyJay committed Jun 17, 2016
1 parent 88c9bff commit e547877
Show file tree
Hide file tree
Showing 15 changed files with 261 additions and 109 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ clippy = {version = "*", optional = true}
git = "https://github.com/rust-lang-nursery/getopts"

[dependencies.rocksdb]
git = "https://github.com/ngaut/rust-rocksdb.git"
git = "https://github.com/busyjay/rust-rocksdb.git"
branch = "busyjay/introduce-unsafe-snap"

[dependencies.protobuf]
git = "https://github.com/stepancheg/rust-protobuf.git"
Expand Down
8 changes: 4 additions & 4 deletions src/raftstore/coprocessor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ pub trait Coprocessor {
}

/// Context of request.
pub struct ObserverContext<'a> {
pub struct ObserverContext {
/// A snapshot of requested region.
pub snap: RegionSnapshot<'a>,
pub snap: RegionSnapshot,
/// Whether to bypass following observer hook.
pub bypass: bool,
}

impl<'a> ObserverContext<'a> {
pub fn new(peer: &'a PeerStorage) -> ObserverContext<'a> {
impl ObserverContext {
pub fn new(peer: &PeerStorage) -> ObserverContext {
ObserverContext {
snap: RegionSnapshot::new(peer),
bypass: false,
Expand Down
20 changes: 10 additions & 10 deletions src/raftstore/coprocessor/region_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use rocksdb::{DB, SeekKey, DBVector, DBIterator};
use rocksdb::rocksdb::Snapshot;
use kvproto::metapb::Region;

use raftstore::store::engine::Peekable;
use raftstore::store::engine::{Snapshot, Peekable, Iterable};
use raftstore::store::{keys, util, PeerStorage};
use raftstore::{Error, Result};

Expand All @@ -25,22 +25,22 @@ type Kv<'a> = (&'a [u8], &'a [u8]);
/// Snapshot of a region.
///
/// Only data within a region can be accessed.
pub struct RegionSnapshot<'a> {
snap: Snapshot<'a>,
pub struct RegionSnapshot {
snap: Snapshot,
region: Region,
}

impl<'a> RegionSnapshot<'a> {
pub fn new(ps: &'a PeerStorage) -> RegionSnapshot<'a> {
impl RegionSnapshot {
pub fn new(ps: &PeerStorage) -> RegionSnapshot {
RegionSnapshot {
snap: ps.raw_snapshot(),
region: ps.get_region().clone(),
}
}

pub fn from_raw(db: &'a DB, region: Region) -> RegionSnapshot<'a> {
pub fn from_raw(db: Arc<DB>, region: Region) -> RegionSnapshot {
RegionSnapshot {
snap: db.snapshot(),
snap: Snapshot::new(db),
region: region,
}
}
Expand All @@ -50,7 +50,7 @@ impl<'a> RegionSnapshot<'a> {
}

pub fn iter(&self) -> RegionIterator {
RegionIterator::new(self.snap.iter(), self.region.clone())
RegionIterator::new(self.snap.new_iterator(), self.region.clone())
}

// scan scans database using an iterator in range [start_key, end_key), calls function f for
Expand Down Expand Up @@ -86,7 +86,7 @@ impl<'a> RegionSnapshot<'a> {
}
}

impl<'a> Peekable for RegionSnapshot<'a> {
impl Peekable for RegionSnapshot {
fn get_value(&self, key: &[u8]) -> Result<Option<DBVector>> {
try!(util::check_key_in_region(key, &self.region));
let data_key = keys::data_key(key);
Expand Down
61 changes: 50 additions & 11 deletions src/raftstore/store/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,44 @@
// limitations under the License.

use std::option::Option;
use std::sync::Arc;

use rocksdb::{DB, Writable, DBIterator, DBVector, WriteBatch};
use rocksdb::rocksdb::Snapshot;
use rocksdb::{DB, Writable, DBIterator, DBVector, WriteBatch, ReadOptions};
use rocksdb::rocksdb::UnsafeSnap;
use protobuf;
use byteorder::{ByteOrder, BigEndian};

use raftstore::Result;


pub struct Snapshot {
db: Arc<DB>,
snap: UnsafeSnap,
}

/// Because snap will be valid whenever db is valid, so it's safe to send
/// it around.
unsafe impl Send for Snapshot {}

impl Snapshot {
pub fn new(db: Arc<DB>) -> Snapshot {
unsafe {
Snapshot {
snap: db.unsafe_snap(),
db: db,
}
}
}
}

impl Drop for Snapshot {
fn drop(&mut self) {
unsafe {
self.db.release_snap(&self.snap);
}
}
}

pub fn new_engine(path: &str) -> Result<DB> {
// TODO: set proper options here,
let db = try!(DB::open_default(path));
Expand Down Expand Up @@ -119,16 +149,24 @@ impl Iterable for DB {
}
}

impl<'a> Peekable for Snapshot<'a> {
impl Peekable for Snapshot {
fn get_value(&self, key: &[u8]) -> Result<Option<DBVector>> {
let v = try!(self.get(key));
let mut opt = ReadOptions::new();
unsafe {
opt.set_snapshot(&self.snap);
}
let v = try!(self.db.get_opt(key, &opt));
Ok(v)
}
}

impl<'a> Iterable for Snapshot<'a> {
impl Iterable for Snapshot {
fn new_iterator(&self) -> DBIterator {
self.iter()
let mut opt = ReadOptions::new();
unsafe {
opt.set_snapshot(&self.snap);
}
DBIterator::new(&self.db, &opt)
}
}

Expand Down Expand Up @@ -161,6 +199,7 @@ impl Mutable for WriteBatch {}

#[cfg(test)]
mod tests {
use std::sync::Arc;
use tempdir::TempDir;
use rocksdb::Writable;

Expand All @@ -170,15 +209,15 @@ mod tests {
#[test]
fn test_base() {
let path = TempDir::new("var").unwrap();
let engine = new_engine(path.path().to_str().unwrap()).unwrap();
let engine = Arc::new(new_engine(path.path().to_str().unwrap()).unwrap());

let mut r = Region::new();
r.set_id(10);

let key = b"key";
engine.put_msg(key, &r).unwrap();

let snap = engine.snapshot();
let snap = Snapshot::new(engine.clone());

let mut r1: Region = engine.get_msg(key).unwrap().unwrap();
assert_eq!(r, r1);
Expand All @@ -199,7 +238,7 @@ mod tests {
assert_eq!(engine.get_i64(key).unwrap(), Some(-1));
assert!(engine.get_i64(b"missing_key").unwrap().is_none());

let snap = engine.snapshot();
let snap = Snapshot::new(engine.clone());
assert_eq!(snap.get_i64(key).unwrap(), Some(-1));
assert!(snap.get_i64(b"missing_key").unwrap().is_none());

Expand All @@ -211,7 +250,7 @@ mod tests {
#[test]
fn test_scan() {
let path = TempDir::new("var").unwrap();
let engine = new_engine(path.path().to_str().unwrap()).unwrap();
let engine = Arc::new(new_engine(path.path().to_str().unwrap()).unwrap());

engine.put(b"a1", b"v1").unwrap();
engine.put(b"a2", b"v2").unwrap();
Expand Down Expand Up @@ -243,7 +282,7 @@ mod tests {

assert_eq!(data.len(), 1);

let snap = engine.snapshot();
let snap = Snapshot::new(engine.clone());

engine.put(b"a3", b"v3").unwrap();
assert!(engine.seek(b"a3").unwrap().is_some());
Expand Down
7 changes: 3 additions & 4 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::vec::Vec;
use std::default::Default;

use rocksdb::{DB, WriteBatch, Writable};
use rocksdb::rocksdb::Snapshot;
use protobuf::{self, Message};
use uuid::Uuid;

Expand All @@ -40,7 +39,7 @@ use super::msg::Callback;
use super::cmd_resp;
use super::transport::Transport;
use super::keys;
use super::engine::{Peekable, Iterable, Mutable};
use super::engine::{Snapshot, Peekable, Iterable, Mutable};

pub struct PendingCmd {
pub uuid: Uuid,
Expand Down Expand Up @@ -731,7 +730,7 @@ impl Peer {
let (mut resp, exec_result) = {
let engine = self.engine.clone();
let ctx = ExecContext {
snap: engine.snapshot(),
snap: Snapshot::new(engine),
wb: &wb,
req: req,
};
Expand Down Expand Up @@ -804,7 +803,7 @@ fn get_change_peer_cmd(msg: &RaftCmdRequest) -> Option<&ChangePeerRequest> {
}

struct ExecContext<'a> {
pub snap: Snapshot<'a>,
pub snap: Snapshot,
pub wb: &'a WriteBatch,
pub req: &'a RaftCmdRequest,
}
Expand Down
14 changes: 6 additions & 8 deletions src/raftstore/store/peer_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::{error, mem};
use std::time::Instant;

use rocksdb::{DB, WriteBatch, Writable};
use rocksdb::rocksdb::Snapshot as RocksDbSnapshot;
use protobuf::Message;

use kvproto::metapb;
Expand All @@ -31,7 +30,7 @@ use util::codec::number::NumberDecoder;
use raft::{self, Storage, RaftState, StorageError, Error as RaftError, Ready};
use raftstore::{Result, Error};
use super::keys::{self, enc_start_key, enc_end_key};
use super::engine::{Peekable, Iterable, Mutable};
use super::engine::{Snapshot as DbSnapshot, Peekable, Iterable, Mutable};
use super::{SnapFile, SnapKey, SnapManager};

// When we create a region peer, we should initialize its log term/index > 0,
Expand Down Expand Up @@ -241,8 +240,8 @@ impl PeerStorage {
&self.region
}

pub fn raw_snapshot(&self) -> RocksDbSnapshot {
self.engine.snapshot()
pub fn raw_snapshot(&self) -> DbSnapshot {
DbSnapshot::new(self.engine.clone())
}

pub fn snapshot(&mut self) -> raft::Result<Snapshot> {
Expand Down Expand Up @@ -542,7 +541,7 @@ impl PeerStorage {
}

fn build_snap_file(f: &mut SnapFile,
snap: &RocksDbSnapshot,
snap: &DbSnapshot,
region_id: u64,
ranges: Ranges)
-> raft::Result<()> {
Expand Down Expand Up @@ -580,7 +579,7 @@ fn build_snap_file(f: &mut SnapFile,
}

pub fn do_snapshot(mgr: SnapManager,
snap: &RocksDbSnapshot,
snap: &DbSnapshot,
key: SnapKey,
ranges: Ranges)
-> raft::Result<Snapshot> {
Expand Down Expand Up @@ -836,8 +835,7 @@ mod test {
}

fn get_snap(s: &RaftStorage, mgr: SnapManager) -> Snapshot {
let engine = s.rl().get_engine();
let raw_snap = engine.snapshot();
let raw_snap = s.rl().raw_snapshot();
let key_ranges = s.rl().region_key_ranges();
let applied_id = s.rl().load_applied_index(&raw_snap).unwrap();
let term = s.rl().term(applied_id).unwrap();
Expand Down
3 changes: 1 addition & 2 deletions src/raftstore/store/worker/snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,13 @@ impl Runner {

fn generate_snap(&self, task: &Task) -> Result<(), Error> {
// do we need to check leader here?
let db = task.storage.rl().get_engine();
let raw_snap;
let ranges;
let key;

{
let storage = task.storage.rl();
raw_snap = db.snapshot();
raw_snap = storage.raw_snapshot();
ranges = storage.region_key_ranges();
let applied_idx = box_try!(storage.load_applied_index(&raw_snap));
let term = box_try!(storage.term(applied_idx));
Expand Down
Loading

0 comments on commit e547877

Please sign in to comment.