Skip to content

Commit

Permalink
check stall readonly query
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangjinpeng87 committed Nov 9, 2016
1 parent 87c0a3d commit 2046ab4
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 7 deletions.
6 changes: 6 additions & 0 deletions src/raftstore/store/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const STORE_CAPACITY: u64 = u64::MAX;
const DEFAULT_NOTIFY_CAPACITY: usize = 4096;
const DEFAULT_MGR_GC_TICK_INTERVAL_MS: u64 = 60000;
const DEFAULT_SNAP_GC_TIMEOUT_SECS: u64 = 60 * 10;
const DEFAULT_CHECK_STALL_RO_QUERY_INTERVAL_MS: u64 = 5 * 1000; // 5 secs
const DEFAULT_READONLY_QUERY_TIMEOUT_MS: u64 = 10 * 1000; // 10 secs
const DEFAULT_MESSAGES_PER_TICK: usize = 256;
const DEFAULT_MAX_PEER_DOWN_SECS: u64 = 300;
const DEFAULT_LOCK_CF_COMPACT_INTERVAL_SECS: u64 = 60 * 10; // 10 min
Expand Down Expand Up @@ -85,6 +87,8 @@ pub struct Config {
pub snap_mgr_gc_tick_interval: u64,
pub snap_gc_timeout: u64,
pub lock_cf_compact_interval_secs: u64,
pub check_stall_readonly_query_interval: u64,
pub readonly_query_timeout_ms: u64,

pub notify_capacity: usize,
pub messages_per_tick: usize,
Expand Down Expand Up @@ -129,6 +133,8 @@ impl Default for Config {
max_leader_missing_duration: Duration::from_secs(DEFAULT_MAX_LEADER_MISSING_SECS),
snap_apply_batch_size: DEFAULT_SNAPSHOT_APPLY_BATCH_SIZE,
lock_cf_compact_interval_secs: DEFAULT_LOCK_CF_COMPACT_INTERVAL_SECS,
check_stall_readonly_query_interval: DEFAULT_CHECK_STALL_RO_QUERY_INTERVAL_MS,
readonly_query_timeout_ms: DEFAULT_READONLY_QUERY_TIMEOUT_MS,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/raftstore/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub enum Tick {
PdStoreHeartbeat,
SnapGc,
CompactLockCf,
CheckStallRoQuery,
}

pub enum Msg {
Expand Down
29 changes: 25 additions & 4 deletions src/raftstore/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::collections::{HashMap, HashSet, VecDeque};
use std::vec::Vec;
use std::default::Default;
use std::time::{Instant, Duration};
use time::{self, Timespec};

use rocksdb::{DB, WriteBatch, Writable};
use protobuf::{self, Message, RepeatedField};
Expand Down Expand Up @@ -191,7 +190,7 @@ pub struct Peer {
pub last_compacted_idx: u64,

// follower readonly query
pending_readonly_queries: HashMap<Uuid, (PendingCmd, RaftCmdRequest, Timespec)>,
pending_readonly_queries: HashMap<Uuid, (PendingCmd, RaftCmdRequest, Instant)>,
ready_read_states: VecDeque<ReadState>,
}

Expand Down Expand Up @@ -498,6 +497,28 @@ impl Peer {
StaleState::Valid
}

pub fn check_stall_readonly_query(&mut self, now: Instant, timeout: Duration) {
let mut expired = vec![];
{
for (k, v) in &self.pending_readonly_queries {
// v = (cmd, req, start_time)
let duration = now.duration_since(v.2);
if duration > timeout {
expired.push(*k);
}
}
}

for uuid in expired {
warn!("follower read timeout for command {}", uuid);
let (mut cmd, _, _) = self.pending_readonly_queries.remove(&uuid).unwrap();
let mut resp = cmd_resp::new_error(box_err!("follower read timeout"));
cmd_resp::bind_uuid(&mut resp, cmd.uuid);
cmd_resp::bind_term(&mut resp, self.term());
cmd.call(resp);
}
}

pub fn handle_raft_ready<T: Transport>(&mut self,
trans: &T,
metrics: &mut RaftMetrics)
Expand Down Expand Up @@ -660,9 +681,8 @@ impl Peer {
return false;
} else if readonly_query && self.raft_group.raft.state == StateRole::Follower {
// follower readonly query
let now = time::get_time();
let uuid = cmd.uuid;
self.pending_readonly_queries.insert(uuid, (cmd, req, now));
self.pending_readonly_queries.insert(uuid, (cmd, req, Instant::now()));
if let Err(e) = self.send_read_index(uuid.as_bytes()) {
error!("{} send read index failed, err: {}", self.tag, e);
let (mut cmd, _, _) = self.pending_readonly_queries.remove(&uuid).unwrap();
Expand All @@ -672,6 +692,7 @@ impl Peer {
cmd.call(resp);
return false;
}
PEER_PROPOSAL_COUNTER_VEC.with_label_values(&["follower_read"]).inc();
return true;
} else if get_transfer_leader_cmd(&req).is_some() {
let transfer_leader = get_transfer_leader_cmd(&req).unwrap();
Expand Down
25 changes: 22 additions & 3 deletions src/raftstore/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use kvproto::raft_serverpb::{RaftMessage, RaftSnapshotData, RaftTruncatedState,
use kvproto::eraftpb::{ConfChangeType, Snapshot, MessageType};
use kvproto::pdpb::StoreStats;
use util::{HandyRwLock, SlowTimer, duration_to_nanos};
use pd::PdClient;
use pd::{PdClient, INVALID_ID};
use kvproto::raft_cmdpb::{AdminCmdType, AdminRequest, StatusCmdType, StatusResponse,
RaftCmdRequest, RaftCmdResponse};
use protobuf::Message;
Expand All @@ -50,7 +50,7 @@ use super::{util, Msg, Tick, SnapManager};
use super::keys::{self, enc_start_key, enc_end_key, data_end_key, data_key};
use super::engine::{Iterable, Peekable, delete_all_in_range};
use super::config::Config;
use super::peer::{Peer, PendingCmd, ReadyResult, ExecResult, StaleState};
use super::peer::{Peer, PendingCmd, ReadyResult, ExecResult, StaleState, readonly_query};
use super::peer_storage::{ApplySnapResult, SnapState};
use super::msg::Callback;
use super::cmd_resp::{bind_uuid, bind_term, bind_error};
Expand Down Expand Up @@ -369,6 +369,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
self.register_pd_store_heartbeat_tick(event_loop);
self.register_snap_mgr_gc_tick(event_loop);
self.register_compact_lock_cf_tick(event_loop);
self.register_check_stall_readonly_query_tick(event_loop);

let split_check_runner = SplitCheckRunner::new(self.sendch.clone(),
self.cfg.region_max_size,
Expand Down Expand Up @@ -1093,7 +1094,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
Some(peer) => peer,
None => return Err(Error::RegionNotFound(region_id)),
};
if !peer.is_leader() {
if !peer.is_leader() && (!readonly_query(msg) || peer.leader_id() == INVALID_ID) {
return Err(Error::NotLeader(region_id, peer.get_peer_from_cache(peer.leader_id())));
}
if peer.peer_id() != peer_id {
Expand Down Expand Up @@ -1247,6 +1248,23 @@ impl<T: Transport, C: PdClient> Store<T, C> {
self.register_compact_check_tick(event_loop);
}

fn on_check_stall_readonly_query(&mut self, event_loop: &mut EventLoop<Self>) {
let now = Instant::now();
let timeout = Duration::from_millis(self.cfg.readonly_query_timeout_ms);
for (_, peer) in &mut self.region_peers {
peer.check_stall_readonly_query(now, timeout);
}
self.register_check_stall_readonly_query_tick(event_loop);
}

fn register_check_stall_readonly_query_tick(&self, event_loop: &mut EventLoop<Self>) {
if let Err(e) = register_timer(event_loop,
Tick::CheckStallRoQuery,
self.cfg.check_stall_readonly_query_interval) {
error!("{} register pd heartbeat tick err: {:?}", self.tag, e);
};
}

fn on_split_check_result(&mut self,
region_id: u64,
epoch: metapb::RegionEpoch,
Expand Down Expand Up @@ -1639,6 +1657,7 @@ impl<T: Transport, C: PdClient> mio::Handler for Store<T, C> {
Tick::PdStoreHeartbeat => self.on_pd_store_heartbeat_tick(event_loop),
Tick::SnapGc => self.on_snap_mgr_gc(event_loop),
Tick::CompactLockCf => self.on_compact_lock_cf(event_loop),
Tick::CheckStallRoQuery => self.on_check_stall_readonly_query(event_loop),
}
slow_log!(t, "{} handle timeout {:?}", self.tag, timeout);
}
Expand Down

0 comments on commit 2046ab4

Please sign in to comment.