diff --git a/src/raftstore/store/config.rs b/src/raftstore/store/config.rs index b0a4d2282d0..484b0869fd1 100644 --- a/src/raftstore/store/config.rs +++ b/src/raftstore/store/config.rs @@ -36,6 +36,8 @@ const STORE_CAPACITY: u64 = u64::MAX; const DEFAULT_NOTIFY_CAPACITY: usize = 4096; const DEFAULT_MGR_GC_TICK_INTERVAL_MS: u64 = 60000; const DEFAULT_SNAP_GC_TIMEOUT_SECS: u64 = 60 * 10; +const DEFAULT_CHECK_STALL_RO_QUERY_INTERVAL_MS: u64 = 5 * 1000; // 5 secs +const DEFAULT_READONLY_QUERY_TIMEOUT_MS: u64 = 10 * 1000; // 10 secs const DEFAULT_MESSAGES_PER_TICK: usize = 256; const DEFAULT_MAX_PEER_DOWN_SECS: u64 = 300; const DEFAULT_LOCK_CF_COMPACT_INTERVAL_SECS: u64 = 60 * 10; // 10 min @@ -85,6 +87,8 @@ pub struct Config { pub snap_mgr_gc_tick_interval: u64, pub snap_gc_timeout: u64, pub lock_cf_compact_interval_secs: u64, + pub check_stall_readonly_query_interval: u64, + pub readonly_query_timeout_ms: u64, pub notify_capacity: usize, pub messages_per_tick: usize, @@ -129,6 +133,8 @@ impl Default for Config { max_leader_missing_duration: Duration::from_secs(DEFAULT_MAX_LEADER_MISSING_SECS), snap_apply_batch_size: DEFAULT_SNAPSHOT_APPLY_BATCH_SIZE, lock_cf_compact_interval_secs: DEFAULT_LOCK_CF_COMPACT_INTERVAL_SECS, + check_stall_readonly_query_interval: DEFAULT_CHECK_STALL_RO_QUERY_INTERVAL_MS, + readonly_query_timeout_ms: DEFAULT_READONLY_QUERY_TIMEOUT_MS, } } } diff --git a/src/raftstore/store/msg.rs b/src/raftstore/store/msg.rs index 1028d72950d..e03c6eacf2f 100644 --- a/src/raftstore/store/msg.rs +++ b/src/raftstore/store/msg.rs @@ -32,6 +32,7 @@ pub enum Tick { PdStoreHeartbeat, SnapGc, CompactLockCf, + CheckStallRoQuery, } pub enum Msg { diff --git a/src/raftstore/store/peer.rs b/src/raftstore/store/peer.rs index 10b97a36fed..79f3d46038b 100644 --- a/src/raftstore/store/peer.rs +++ b/src/raftstore/store/peer.rs @@ -18,7 +18,6 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::vec::Vec; use std::default::Default; use std::time::{Instant, Duration}; -use time::{self, Timespec}; use rocksdb::{DB, WriteBatch, Writable}; use protobuf::{self, Message, RepeatedField}; @@ -191,7 +190,7 @@ pub struct Peer { pub last_compacted_idx: u64, // follower readonly query - pending_readonly_queries: HashMap, + pending_readonly_queries: HashMap, ready_read_states: VecDeque, } @@ -498,6 +497,28 @@ impl Peer { StaleState::Valid } + pub fn check_stall_readonly_query(&mut self, now: Instant, timeout: Duration) { + let mut expired = vec![]; + { + for (k, v) in &self.pending_readonly_queries { + // v = (cmd, req, start_time) + let duration = now.duration_since(v.2); + if duration > timeout { + expired.push(*k); + } + } + } + + for uuid in expired { + warn!("follower read timeout for command {}", uuid); + let (mut cmd, _, _) = self.pending_readonly_queries.remove(&uuid).unwrap(); + let mut resp = cmd_resp::new_error(box_err!("follower read timeout")); + cmd_resp::bind_uuid(&mut resp, cmd.uuid); + cmd_resp::bind_term(&mut resp, self.term()); + cmd.call(resp); + } + } + pub fn handle_raft_ready(&mut self, trans: &T, metrics: &mut RaftMetrics) @@ -660,9 +681,8 @@ impl Peer { return false; } else if readonly_query && self.raft_group.raft.state == StateRole::Follower { // follower readonly query - let now = time::get_time(); let uuid = cmd.uuid; - self.pending_readonly_queries.insert(uuid, (cmd, req, now)); + self.pending_readonly_queries.insert(uuid, (cmd, req, Instant::now())); if let Err(e) = self.send_read_index(uuid.as_bytes()) { error!("{} send read index failed, err: {}", self.tag, e); let (mut cmd, _, _) = self.pending_readonly_queries.remove(&uuid).unwrap(); @@ -672,6 +692,7 @@ impl Peer { cmd.call(resp); return false; } + PEER_PROPOSAL_COUNTER_VEC.with_label_values(&["follower_read"]).inc(); return true; } else if get_transfer_leader_cmd(&req).is_some() { let transfer_leader = get_transfer_leader_cmd(&req).unwrap(); diff --git a/src/raftstore/store/store.rs b/src/raftstore/store/store.rs index 57a3f57786b..c37bba9e508 100644 --- a/src/raftstore/store/store.rs +++ b/src/raftstore/store/store.rs @@ -32,7 +32,7 @@ use kvproto::raft_serverpb::{RaftMessage, RaftSnapshotData, RaftTruncatedState, use kvproto::eraftpb::{ConfChangeType, Snapshot, MessageType}; use kvproto::pdpb::StoreStats; use util::{HandyRwLock, SlowTimer, duration_to_nanos}; -use pd::PdClient; +use pd::{PdClient, INVALID_ID}; use kvproto::raft_cmdpb::{AdminCmdType, AdminRequest, StatusCmdType, StatusResponse, RaftCmdRequest, RaftCmdResponse}; use protobuf::Message; @@ -50,7 +50,7 @@ use super::{util, Msg, Tick, SnapManager}; use super::keys::{self, enc_start_key, enc_end_key, data_end_key, data_key}; use super::engine::{Iterable, Peekable, delete_all_in_range}; use super::config::Config; -use super::peer::{Peer, PendingCmd, ReadyResult, ExecResult, StaleState}; +use super::peer::{Peer, PendingCmd, ReadyResult, ExecResult, StaleState, readonly_query}; use super::peer_storage::{ApplySnapResult, SnapState}; use super::msg::Callback; use super::cmd_resp::{bind_uuid, bind_term, bind_error}; @@ -369,6 +369,7 @@ impl Store { self.register_pd_store_heartbeat_tick(event_loop); self.register_snap_mgr_gc_tick(event_loop); self.register_compact_lock_cf_tick(event_loop); + self.register_check_stall_readonly_query_tick(event_loop); let split_check_runner = SplitCheckRunner::new(self.sendch.clone(), self.cfg.region_max_size, @@ -1093,7 +1094,7 @@ impl Store { Some(peer) => peer, None => return Err(Error::RegionNotFound(region_id)), }; - if !peer.is_leader() { + if !peer.is_leader() && (!readonly_query(msg) || peer.leader_id() == INVALID_ID) { return Err(Error::NotLeader(region_id, peer.get_peer_from_cache(peer.leader_id()))); } if peer.peer_id() != peer_id { @@ -1247,6 +1248,23 @@ impl Store { self.register_compact_check_tick(event_loop); } + fn on_check_stall_readonly_query(&mut self, event_loop: &mut EventLoop) { + let now = Instant::now(); + let timeout = Duration::from_millis(self.cfg.readonly_query_timeout_ms); + for (_, peer) in &mut self.region_peers { + peer.check_stall_readonly_query(now, timeout); + } + self.register_check_stall_readonly_query_tick(event_loop); + } + + fn register_check_stall_readonly_query_tick(&self, event_loop: &mut EventLoop) { + if let Err(e) = register_timer(event_loop, + Tick::CheckStallRoQuery, + self.cfg.check_stall_readonly_query_interval) { + error!("{} register pd heartbeat tick err: {:?}", self.tag, e); + }; + } + fn on_split_check_result(&mut self, region_id: u64, epoch: metapb::RegionEpoch, @@ -1639,6 +1657,7 @@ impl mio::Handler for Store { Tick::PdStoreHeartbeat => self.on_pd_store_heartbeat_tick(event_loop), Tick::SnapGc => self.on_snap_mgr_gc(event_loop), Tick::CompactLockCf => self.on_compact_lock_cf(event_loop), + Tick::CheckStallRoQuery => self.on_check_stall_readonly_query(event_loop), } slow_log!(t, "{} handle timeout {:?}", self.tag, timeout); }