Skip to content

Commit

Permalink
support follower read in peer
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjinpeng87 committed Nov 9, 2016
1 parent 3469aca commit 87c0a3d
Showing 1 changed file with 95 additions and 10 deletions.
105 changes: 95 additions & 10 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@ use std::collections::{HashMap, HashSet, VecDeque};
use std::vec::Vec;
use std::default::Default;
use std::time::{Instant, Duration};
use time::{self, Timespec};

use rocksdb::{DB, WriteBatch, Writable};
use protobuf::{self, Message};
use protobuf::{self, Message, RepeatedField};
use uuid::Uuid;

use kvproto::metapb;
use kvproto::eraftpb::{self, ConfChangeType, MessageType};
use kvproto::eraftpb::{self, ConfChangeType, MessageType, EntryType};
use kvproto::raft_cmdpb::{RaftCmdRequest, RaftCmdResponse, ChangePeerRequest, CmdType,
AdminCmdType, Request, Response, AdminRequest, AdminResponse,
TransferLeaderRequest, TransferLeaderResponse};
use kvproto::raft_serverpb::{RaftMessage, RaftApplyState, RaftTruncatedState, PeerState};
use kvproto::pdpb::PeerStats;
use raft::{self, RawNode, StateRole, SnapshotStatus, Ready, ProgressState, INVALID_INDEX};
use raft::{self, RawNode, StateRole, SnapshotStatus, Ready, ProgressState, INVALID_INDEX,
ReadState};
use raftstore::{Result, Error};
use raftstore::coprocessor::CoprocessorHost;
use raftstore::coprocessor::split_observer::SplitObserver;
Expand Down Expand Up @@ -187,6 +189,10 @@ pub struct Peer {
pub tag: String,

pub last_compacted_idx: u64,

// follower readonly query
pending_readonly_queries: HashMap<Uuid, (PendingCmd, RaftCmdRequest, Timespec)>,
ready_read_states: VecDeque<ReadState>,
}

impl Peer {
Expand Down Expand Up @@ -273,6 +279,8 @@ impl Peer {
leader_missing_time: Some(Instant::now()),
tag: tag,
last_compacted_idx: 0,
pending_readonly_queries: Default::default(),
ready_read_states: Default::default(),
};

peer.load_all_coprocessors();
Expand Down Expand Up @@ -415,6 +423,17 @@ impl Peer {
Ok(())
}

fn send_read_index(&mut self, ctx: &[u8]) -> Result<()> {
let mut m = eraftpb::Message::new();
m.set_msg_type(MessageType::MsgReadIndex);
let mut e = eraftpb::Entry::new();
e.set_entry_type(EntryType::EntryNormal);
e.set_data(ctx.to_vec());
m.set_entries(RepeatedField::from_vec(vec![e]));
try!(self.step(m));
Ok(())
}

pub fn check_peers(&mut self) {
if !self.is_leader() {
self.peer_heartbeats.clear();
Expand Down Expand Up @@ -546,6 +565,9 @@ impl Peer {
try!(self.handle_raft_commit_entries(&ready.committed_entries))
};

// handle follower readonly query
self.handle_readonly_query(&ready.read_states);

slow_log!(t,
"{} handle ready, entries {}, committed entries {}, messages \
{}, snapshot {}, hard state changed {}",
Expand All @@ -563,6 +585,45 @@ impl Peer {
}))
}

fn handle_readonly_query(&mut self, read_states: &[ReadState]) {
for rs in read_states {
self.ready_read_states.push_back(rs.clone());
}
if self.ready_read_states.is_empty() {
return;
}

let mut pos = 0;
let applied_idx = self.get_store().apply_state.get_applied_index();
for rs in &self.ready_read_states {
if rs.index <= applied_idx {
pos += 1;
} else {
break;
}
}
for _ in 0..pos {
let rs = self.ready_read_states.pop_front().unwrap();
let uuid = Uuid::from_bytes(&rs.request_ctx).unwrap();
if let Some((mut cmd, req, _)) = self.pending_readonly_queries
.remove(&uuid) {
let mut ctx = ExecContext::new(self, 0, 0, &req);
let (mut resp, _) = self.exec_raft_cmd(&mut ctx).unwrap_or_else(|e| {
error!("{} execute raft command err: {:?}", self.tag, e);
(cmd_resp::new_error(e), None)
});

cmd_resp::bind_uuid(&mut resp, cmd.uuid);
cmd_resp::bind_term(&mut resp, self.term());
cmd.call(resp);
} else {
// command maybe timeout and has removed
info!("command [{}] receive read state but command not exist",
&uuid)
}
}
}

/// Propose a request.
///
/// Return true means the request has been proposed successfully.
Expand All @@ -581,6 +642,7 @@ impl Peer {
PEER_PROPOSAL_COUNTER_VEC.with_label_values(&["all"]).inc();

let local_read = self.is_local_read(&req);
let readonly_query = readonly_query(&req);
if local_read {
PEER_PROPOSAL_COUNTER_VEC.with_label_values(&["local_read"]).inc();

Expand All @@ -596,6 +658,21 @@ impl Peer {
cmd_resp::bind_term(&mut resp, self.term());
cmd.call(resp);
return false;
} else if readonly_query && self.raft_group.raft.state == StateRole::Follower {
// follower readonly query
let now = time::get_time();
let uuid = cmd.uuid;
self.pending_readonly_queries.insert(uuid, (cmd, req, now));
if let Err(e) = self.send_read_index(uuid.as_bytes()) {
error!("{} send read index failed, err: {}", self.tag, e);
let (mut cmd, _, _) = self.pending_readonly_queries.remove(&uuid).unwrap();
let mut resp = cmd_resp::new_error(e);
cmd_resp::bind_uuid(&mut resp, cmd.uuid);
cmd_resp::bind_term(&mut resp, self.term());
cmd.call(resp);
return false;
}
return true;
} else if get_transfer_leader_cmd(&req).is_some() {
let transfer_leader = get_transfer_leader_cmd(&req).unwrap();
let peer = transfer_leader.get_peer();
Expand Down Expand Up @@ -659,13 +736,7 @@ impl Peer {
return false;
}

for cmd_req in req.get_requests() {
if cmd_req.get_cmd_type() != CmdType::Snap && cmd_req.get_cmd_type() != CmdType::Get {
return false;
}
}

true
readonly_query(req)
}

/// Call the callback of `cmd` that leadership may have been changed.
Expand Down Expand Up @@ -1099,6 +1170,20 @@ impl Peer {
}
}

pub fn readonly_query(req: &RaftCmdRequest) -> bool {
if req.get_requests().len() == 0 {
return false;
}

for cmd_req in req.get_requests() {
if cmd_req.get_cmd_type() != CmdType::Snap && cmd_req.get_cmd_type() != CmdType::Get {
return false;
}
}

true
}

fn get_transfer_leader_cmd(msg: &RaftCmdRequest) -> Option<&TransferLeaderRequest> {
if !msg.has_admin_request() {
return None;
Expand Down

0 comments on commit 87c0a3d

Please sign in to comment.