Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjinpeng87 committed Sep 7, 2016
1 parent 8910f7e commit b40d24d
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 42 deletions.
81 changes: 44 additions & 37 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ impl Peer {
ready.hs.is_some());

if is_applying {
// remove hard state so raft won't change the apply index.
// Remove the hard state so Raft won't change the apply index.
ready.hs.take();
} else if apply_index != 0 {
if let Some(ref mut hs) = ready.hs {
Expand Down Expand Up @@ -481,14 +481,14 @@ impl Peer {

let local_read = self.is_local_read(&req);
if local_read {
// for read-only, if we don't care stale read, we can
// execute these commands immediately in leader.
// For the read-only commands, if we don't care about stale read, we can
// execute these commands directly in the leader.
let engine = self.engine.clone();
let mut apply_wb = WriteBatch::new();
let mut wb = WriteBatch::new();
let mut ctx = ExecContext {
snap: Snapshot::new(engine),
apply_state: self.get_store().apply_state.clone(),
wb: &mut apply_wb,
wb: &mut wb,
req: &req,
};
let (mut resp, _) = self.exec_raft_cmd(&mut ctx).unwrap_or_else(|e| {
Expand Down Expand Up @@ -759,33 +759,42 @@ impl Peer {

fn handle_raft_commit_entries(&mut self,
committed_entries: &[eraftpb::Entry],
mut apply_wb: &mut WriteBatch)
wb: &mut WriteBatch)
-> Result<(u64, Vec<ExecResult>)> {
// If we send multiple ConfChange commands, only first one will be proposed correctly,
// others will be saved as a normal entry with no data, so we must re-propose these
// the others will be saved as a normal entry with no data, so we must re-propose these
// commands again.
let t = SlowTimer::new();
let mut results = vec![];
let committed_count = committed_entries.len();
let mut apply_index: u64 = 0;
let mut term: u64 = 0;
let mut apply_index = 0u64;
let mut term = 0u64;
for entry in committed_entries {
// Entries with different term need apply in different on_raft_ready round.
// We call callback before write apply_wb to engine for performance reason,
// think about the situation as follows:
// raft group A consists of node 1, 2, 3 and we assume node 1 is leader at
// beginning, when node 1 applied a put command on key "a", if we response to
// client(call cb), it means that if client raises a get "a" command after the
// response of put command, we must return the value we just put, if there
// is no leader transfer happened, we can guarantee this, because the get
// command is run after store.on_raft_ready finished regardless of if it is
// local read or not, but if there is a leader transfer happened, we will resend
// a get command to the new leader(assume node 2), the get command will use raft
// read if the applied index's term is not equal to the read command's term,
// and the front put command and the get command may be applied in the same
// on_raft_ready round in node 2, and the get command may get nothing because
// we have not written the value to engine, so we need to apply the put command
// and get command in different on_raft_ready round.
// The callback is called before writing apply_wb to the engine to improve the
// performance. Entries with different terms need to be applied in different
// on_raft_ready rounds. For more details, see the following example:
//
// The raft group A consists of three nodes: Node 1, Node 2, and Node 3.
// Node 1 is the leader at the beginning. If there is no leader transfer,
// the process is:
// 1. Node 1 applies the put command on the "a" key.
// 2. Node 1 sends a response to a client (call cb).
// 3. The client raises a get "a" command.
// 4. Node 1 returns the value ("a") from the put command to the client.
// The client can get the value from the put command because the get command
// is run after store.on_raft_ready is finished.
//
// However, if the leader is
// transferred from Node 1 to Node 2, the process is:
// 1. Node 1 applies the put command on the "a" key.
// 2. Node 1 sends a response to a client (call cb).
// 3. The client sends a get command to the new leader (Node 2).
// 4. The get command uses Raft read because of the leader transfer.
// In this case, the put command and the get command from the client may be applied
// in the same on_raft_ready round on Node 2. Because the "a" value from the put
// command is not written to the engine, the get command cannot get the value ("a").
// So the put command and get command must be applied in different on_raft_ready
// rounds.
if term == 0 {
term = entry.get_term();
} else if entry.get_term() != term {
Expand All @@ -794,11 +803,9 @@ impl Peer {
apply_index = entry.get_index();

let res = try!(match entry.get_entry_type() {
eraftpb::EntryType::EntryNormal => {
self.handle_raft_entry_normal(entry, &mut apply_wb)
}
eraftpb::EntryType::EntryNormal => self.handle_raft_entry_normal(entry, wb),
eraftpb::EntryType::EntryConfChange => {
self.handle_raft_entry_conf_change(entry, &mut apply_wb)
self.handle_raft_entry_conf_change(entry, wb)
}
});

Expand All @@ -816,27 +823,27 @@ impl Peer {

fn handle_raft_entry_normal(&mut self,
entry: &eraftpb::Entry,
mut apply_wb: &mut WriteBatch)
wb: &mut WriteBatch)
-> Result<Option<ExecResult>> {
let index = entry.get_index();
let term = entry.get_term();
let data = entry.get_data();

if data.is_empty() {
// when a peer become leader, it will send an empty entry.
// When a peer becomes the leader, it will send an empty entry.
let mut state = self.get_store().apply_state.clone();
state.set_applied_index(index);
let engine = self.engine.clone();
let raft_cf = try!(rocksdb::get_cf_handle(engine.as_ref(), CF_RAFT));
try!(apply_wb.put_msg_cf(*raft_cf, &keys::apply_state_key(self.region_id), &state));
try!(wb.put_msg_cf(*raft_cf, &keys::apply_state_key(self.region_id), &state));
self.mut_store().apply_state = state;
self.mut_store().applied_index_term = term;
return Ok(None);
}

let cmd = try!(protobuf::parse_from_bytes::<RaftCmdRequest>(data));
// no need to return error here.
self.process_raft_cmd(index, term, cmd, &mut apply_wb).or_else(|e| {
self.process_raft_cmd(index, term, cmd, wb).or_else(|e| {
error!("{} process raft command at index {} err: {:?}",
self.tag,
index,
Expand Down Expand Up @@ -900,15 +907,15 @@ impl Peer {
index: u64,
term: u64,
cmd: RaftCmdRequest,
mut apply_wb: &mut WriteBatch)
wb: &mut WriteBatch)
-> Result<Option<ExecResult>> {
if index == 0 {
return Err(box_err!("processing raft command needs a none zero index"));
}

let uuid = util::get_uuid_from_req(&cmd).unwrap();
let cb = self.find_cb(uuid, term, &cmd);
let (mut resp, exec_result) = self.apply_raft_cmd(index, term, &cmd, &mut apply_wb)
let (mut resp, exec_result) = self.apply_raft_cmd(index, term, &cmd, wb)
.unwrap_or_else(|e| {
error!("{} apply raft command err {:?}", self.tag, e);
(cmd_resp::new_error(e), None)
Expand Down Expand Up @@ -945,7 +952,7 @@ impl Peer {
index: u64,
term: u64,
req: &RaftCmdRequest,
mut apply_wb: &mut WriteBatch)
wb: &mut WriteBatch)
-> Result<(RaftCmdResponse, Option<ExecResult>)> {
if self.pending_remove {
let region_not_found = Error::RegionNotFound(self.region_id);
Expand All @@ -967,7 +974,7 @@ impl Peer {
let mut ctx = ExecContext {
snap: Snapshot::new(engine),
apply_state: self.get_store().apply_state.clone(),
wb: &mut apply_wb,
wb: wb,
req: req,
};
let (resp, exec_result) = self.exec_raft_cmd(&mut ctx).unwrap_or_else(|e| {
Expand Down
10 changes: 5 additions & 5 deletions src/raftstore/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -564,11 +564,11 @@ impl<T: Transport, C: PdClient> Store<T, C> {
let ids: Vec<u64> = self.pending_raft_groups.drain().collect();
let pending_count = ids.len();

let mut apply_wb = WriteBatch::new();
let mut wb = WriteBatch::new();
let mut results: Vec<(u64, Option<ReadyResult>)> = Vec::with_capacity(ids.len());
for region_id in ids {
if let Some(peer) = self.region_peers.get_mut(&region_id) {
match peer.handle_raft_ready(&self.trans, &mut apply_wb) {
match peer.handle_raft_ready(&self.trans, &mut wb) {
Err(e) => {
panic!("{} handle raft ready err: {:?}", peer.tag, e);
}
Expand All @@ -579,9 +579,9 @@ impl<T: Transport, C: PdClient> Store<T, C> {
}
}

// write apply write batch, write must success
if !apply_wb.is_empty() {
if let Err(e) = self.engine.write(apply_wb) {
// Batch write to engine, the write must success or panic.
if !wb.is_empty() {
if let Err(e) = self.engine.write(wb) {
panic!("write apply write batch failed, err {:?}", e);
}
}
Expand Down

0 comments on commit b40d24d

Please sign in to comment.