Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raftstore-v2: implement local read for raftstore-v2 #13375

Merged
merged 48 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
2b16eef
ReadResponseTrait + LocalReaderTrait
SpadeA-Tang Aug 29, 2022
f326f8f
update
SpadeA-Tang Aug 30, 2022
faa5f05
remove trait and wrap LocalReader
SpadeA-Tang Aug 30, 2022
43fcacd
before remove readRespnoseTrait
SpadeA-Tang Aug 30, 2022
992a4d7
Remove ReadResponseTrait and cargo c pass
SpadeA-Tang Aug 30, 2022
c0d4c35
half way for v2's snapshot
SpadeA-Tang Aug 30, 2022
8648b31
update snapshot in v2
SpadeA-Tang Aug 30, 2022
16219d6
move router from LocalReaderCore to LocalReader
SpadeA-Tang Aug 31, 2022
f7f1cd8
clippy
SpadeA-Tang Sep 1, 2022
86af68b
format
SpadeA-Tang Sep 1, 2022
1fd8ac6
update
SpadeA-Tang Sep 5, 2022
c2df8d2
update
SpadeA-Tang Sep 5, 2022
c3db075
snapshot update
SpadeA-Tang Sep 8, 2022
bde4e3d
add test(partial)
SpadeA-Tang Sep 8, 2022
9d6a39f
refine test
SpadeA-Tang Sep 13, 2022
6f73441
update
SpadeA-Tang Sep 13, 2022
611748f
remove unused code
SpadeA-Tang Sep 13, 2022
7db5a4c
modify test
SpadeA-Tang Sep 13, 2022
b28b39a
resolve merge
SpadeA-Tang Sep 13, 2022
f88011d
update
SpadeA-Tang Sep 13, 2022
e792726
format
SpadeA-Tang Sep 14, 2022
ff15c7f
address comment
SpadeA-Tang Sep 14, 2022
4888e96
change the way to renew lease in advance
SpadeA-Tang Sep 14, 2022
1500953
remove unused code
SpadeA-Tang Sep 16, 2022
074b61c
address comment
SpadeA-Tang Sep 20, 2022
5e9f7cb
update
SpadeA-Tang Sep 21, 2022
cb08e71
replace generic type of ReadExecutor by associated type
SpadeA-Tang Sep 21, 2022
75a4061
resolve merge
SpadeA-Tang Sep 21, 2022
8862e7c
update
SpadeA-Tang Sep 22, 2022
d0d97ab
impl ServerRaftStoreRouter for v2
SpadeA-Tang Sep 22, 2022
32b525c
remove some generic types in LocalReader
SpadeA-Tang Sep 22, 2022
abbc7cf
add end to end test, and the some code in Cluster
SpadeA-Tang Sep 22, 2022
b6aa69e
merge conflict
SpadeA-Tang Sep 22, 2022
69952e6
address comment
SpadeA-Tang Sep 22, 2022
a8ffb1b
resolve deadlock
SpadeA-Tang Sep 22, 2022
34176cb
remove unused import
SpadeA-Tang Sep 22, 2022
2b24fa1
add error processing for renew lease
SpadeA-Tang Sep 23, 2022
d7ff6d9
address comment
SpadeA-Tang Sep 26, 2022
446f70d
update
SpadeA-Tang Sep 26, 2022
6fa2ea2
remove RefCell
SpadeA-Tang Sep 27, 2022
c6e979b
address comment
SpadeA-Tang Sep 27, 2022
57a3ecd
update
SpadeA-Tang Sep 27, 2022
62edf8f
merge conflict
SpadeA-Tang Sep 27, 2022
b5f7e88
get snap race condition
SpadeA-Tang Oct 8, 2022
521f976
merge conflict
SpadeA-Tang Oct 8, 2022
26bc405
update
SpadeA-Tang Oct 8, 2022
0ee256b
update
SpadeA-Tang Oct 11, 2022
5019283
Merge branch 'master' into localreader
ti-chi-bot Oct 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/raftstore-v2/src/operation/query/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ mod tests {
use engine_traits::{OpenOptions, Peekable, SyncMutable, ALL_CFS, CF_DEFAULT};
use kvproto::{metapb::Region, raft_cmdpb::*};
use raftstore::store::{
util::Lease, Callback, CasualMessage, CasualRouter, LocalReader, ProposalRouter,
util::Lease, Callback, CasualMessage, CasualRouter, LocalReaderCore, ProposalRouter,
RaftCommand,
};
use tempfile::{Builder, TempDir};
Expand Down
16 changes: 11 additions & 5 deletions components/raftstore/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use crate::{
store::{
fsm::RaftRouter,
transport::{CasualRouter, ProposalRouter, SignificantRouter},
CachedReadDelegate, Callback, CasualMessage, LocalReader, PeerMsg, RaftCmdExtraOpts,
RaftCommand, SignificantMsg, StoreMetaDelegate, StoreMsg, StoreRouter,
CachedReadDelegate, Callback, CasualMessage, LocalReaderCore, LocalReader, PeerMsg,
RaftCmdExtraOpts, RaftCommand, SignificantMsg, StoreMetaDelegate, StoreMsg, StoreRouter,
},
DiscardReason, Error as RaftStoreError, Result as RaftStoreResult,
};
Expand Down Expand Up @@ -174,8 +174,9 @@ where
ER: RaftEngine,
{
router: RaftRouter<EK, ER>,
local_reader:
RefCell<LocalReader<RaftRouter<EK, ER>, EK, CachedReadDelegate<EK>, StoreMetaDelegate<EK>>>,
local_reader: RefCell<
LocalReader<RaftRouter<EK, ER>, EK, CachedReadDelegate<EK>, StoreMetaDelegate<EK>>,
>,
}

impl<EK, ER> Clone for ServerRaftStoreRouter<EK, ER>
Expand All @@ -195,7 +196,12 @@ impl<EK: KvEngine, ER: RaftEngine> ServerRaftStoreRouter<EK, ER> {
/// Creates a new router.
pub fn new(
router: RaftRouter<EK, ER>,
reader: LocalReader<RaftRouter<EK, ER>, EK, CachedReadDelegate<EK>, StoreMetaDelegate<EK>>,
reader: LocalReader<
RaftRouter<EK, ER>,
EK,
CachedReadDelegate<EK>,
StoreMetaDelegate<EK>,
>,
) -> ServerRaftStoreRouter<EK, ER> {
let local_reader = RefCell::new(reader);
ServerRaftStoreRouter {
Expand Down
8 changes: 4 additions & 4 deletions components/raftstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ pub use self::{
worker::{
AutoSplitController, Bucket, BucketRange, CachedReadDelegate, CheckLeaderRunner,
CheckLeaderTask, FetchedLogs, FlowStatistics, FlowStatsReporter, KeyEntry,
LocalReadContext, LocalReader, LogFetchedNotifier, PdTask, QueryStats, RaftlogFetchRunner,
RaftlogFetchTask, ReadDelegate, ReadExecutor, ReadExecutorProvider, ReadProgress,
ReadStats, RefreshConfigTask, RegionTask, SplitCheckRunner, SplitCheckTask, SplitConfig,
SplitConfigManager, StoreMetaDelegate, TrackVer, WriteStats,
LocalReadContext, LocalReaderCore, LocalReader, LogFetchedNotifier, PdTask, QueryStats,
RaftlogFetchRunner, RaftlogFetchTask, ReadDelegate, ReadExecutor, ReadExecutorProvider,
ReadProgress, ReadStats, RefreshConfigTask, RegionTask, SplitCheckRunner, SplitCheckTask,
SplitConfig, SplitConfigManager, StoreMetaDelegate, TrackVer, WriteStats,
},
};
86 changes: 82 additions & 4 deletions components/raftstore/src/store/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ use kvproto::{
metapb,
metapb::RegionEpoch,
pdpb::{self, CheckPolicy},
raft_cmdpb::{RaftCmdRequest, RaftCmdResponse},
raft_cmdpb::{RaftCmdRequest, RaftCmdResponse, Response},
raft_serverpb::RaftMessage,
replication_modepb::ReplicationStatus,
};
#[cfg(any(test, feature = "testexport"))]
use pd_client::BucketMeta;
use protobuf::RepeatedField;
use raft::SnapshotStatus;
use smallvec::{smallvec, SmallVec};
use tikv_util::{deadline::Deadline, escape, memory::HeapSize, time::Instant};
Expand All @@ -37,6 +38,70 @@ use crate::store::{
SnapKey,
};

pub trait ReadResponseTrait<S: Snapshot>: Default {
type Response;

fn set_term(&mut self, _term: u64) {
unimplemented!()
}

fn set_response(&mut self, response: Self::Response);

fn set_responses(&mut self, _response: RepeatedField<Response>) {
unimplemented!()
}

fn mut_snapshot(&mut self) -> Option<&mut RegionSnapshot<S>> {
unimplemented!()
}

fn set_snapshot(&mut self, _snapshot: RegionSnapshot<S>) {
unimplemented!()
}

fn set_txn_extra_op(&mut self, _txn_extra_op: TxnExtraOp) {
unimplemented!()
}

fn set_error(&mut self, error: RaftCmdResponse);
}

impl<S: Snapshot> ReadResponseTrait<S> for ReadResponse<S> {
type Response = RaftCmdResponse;

fn set_term(&mut self, term: u64) {
if term == 0 {
return;
}

self.response.mut_header().set_current_term(term);
}

fn set_response(&mut self, response: Self::Response) {
self.response = response;
}

fn set_responses(&mut self, response: RepeatedField<Response>) {
self.response.set_responses(response);
}

fn mut_snapshot(&mut self) -> Option<&mut RegionSnapshot<S>> {
self.snapshot.as_mut()
}

fn set_snapshot(&mut self, snapshot: RegionSnapshot<S>) {
self.snapshot = Some(snapshot);
}

fn set_txn_extra_op(&mut self, txn_extra_op: TxnExtraOp) {
self.txn_extra_op = txn_extra_op;
}

fn set_error(&mut self, error: RaftCmdResponse) {
self.response = error;
}
}

#[derive(Debug)]
pub struct ReadResponse<S: Snapshot> {
pub response: RaftCmdResponse,
Expand Down Expand Up @@ -73,6 +138,19 @@ where
}
}

impl<S> Default for ReadResponse<S>
where
S: Snapshot,
{
fn default() -> Self {
ReadResponse {
response: RaftCmdResponse::default(),
snapshot: None,
txn_extra_op: TxnExtraOp::Noop,
}
}
}

pub type BoxReadCallback<S> = Box<dyn FnOnce(ReadResponse<S>) + Send>;
pub type BoxWriteCallback = Box<dyn FnOnce(WriteResponse) + Send>;
pub type ExtCallback = Box<dyn FnOnce() + Send>;
Expand Down Expand Up @@ -201,8 +279,8 @@ where
}
}

pub trait ReadCallback: ErrorCallback {
type Response;
pub trait ReadCallback<S: Snapshot>: ErrorCallback {
type Response: ReadResponseTrait<S>;

fn set_result(self, result: Self::Response);
}
Expand All @@ -222,7 +300,7 @@ pub trait ErrorCallback: Send {
fn is_none(&self) -> bool;
}

impl<S: Snapshot> ReadCallback for Callback<S> {
impl<S: Snapshot> ReadCallback<S> for Callback<S> {
type Response = ReadResponse<S>;

#[inline]
Expand Down
2 changes: 2 additions & 0 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5476,6 +5476,8 @@ where
EK: KvEngine,
ER: RaftEngine,
{
type Response = ReadResponse<EK::Snapshot>;

fn get_tablet(&mut self) -> &EK {
&self.engines.kv
}
Expand Down
5 changes: 3 additions & 2 deletions components/raftstore/src/store/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ pub use self::{
},
raftlog_gc::{Runner as RaftlogGcRunner, Task as RaftlogGcTask},
read::{
CachedReadDelegate, LocalReadContext, LocalReader, Progress as ReadProgress, ReadDelegate,
ReadExecutor, ReadExecutorProvider, StoreMetaDelegate, TrackVer,
CachedReadDelegate, LocalReadContext, LocalReaderCore, LocalReader,
Progress as ReadProgress, ReadDelegate, ReadExecutor, ReadExecutorProvider,
StoreMetaDelegate, TrackVer,
},
refresh_config::{
BatchComponent as RaftStoreBatchComponent, Runner as RefreshConfigRunner,
Expand Down