Skip to content

Commit

Permalink
Merge branch 'master' into feature/addcurlcommand
Browse files Browse the repository at this point in the history
  • Loading branch information
mikechengwei committed Nov 11, 2022
2 parents a4a133f + 9c4f0d0 commit e1e6659
Show file tree
Hide file tree
Showing 29 changed files with 983 additions and 161 deletions.
7 changes: 5 additions & 2 deletions components/cdc/src/endpoint.rs
Expand Up @@ -1399,7 +1399,10 @@ mod tests {

#[test]
fn test_api_version_check() {
let cfg = CdcConfig::default();
let mut cfg = CdcConfig::default();
// To make the case more stable.
cfg.min_ts_interval = ReadableDuration(Duration::from_secs(1));

let mut suite = mock_endpoint(&cfg, None, ApiVersion::V1);
suite.add_region(1, 100);
let quota = crate::channel::MemoryQuota::new(usize::MAX);
Expand Down Expand Up @@ -1539,7 +1542,7 @@ mod tests {
}
let diff = cfg.diff(&updated_cfg);
ep.run(Task::ChangeConfig(diff));
assert_eq!(ep.config.min_ts_interval, ReadableDuration::secs(1));
assert_eq!(ep.config.min_ts_interval, ReadableDuration::millis(200));
assert_eq!(ep.config.hibernate_regions_compatible, true);

{
Expand Down
14 changes: 9 additions & 5 deletions components/raftstore-v2/src/batch/store.rs
Expand Up @@ -2,6 +2,7 @@

use std::{
ops::{Deref, DerefMut},
path::Path,
sync::{Arc, Mutex},
time::Duration,
};
Expand All @@ -21,7 +22,7 @@ use kvproto::{
use raft::INVALID_ID;
use raftstore::store::{
fsm::store::PeerTickBatch, local_metrics::RaftMetrics, Config, ReadRunner, ReadTask,
StoreWriters, Transport, WriteSenders,
StoreWriters, TabletSnapManager, Transport, WriteSenders,
};
use slog::Logger;
use tikv_util::{
Expand Down Expand Up @@ -365,6 +366,7 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
trans: T,
router: &StoreRouter<EK, ER>,
store_meta: Arc<Mutex<StoreMeta<EK>>>,
snap_mgr: TabletSnapManager,
) -> Result<()>
where
T: Transport + 'static,
Expand All @@ -373,10 +375,12 @@ impl<EK: KvEngine, ER: RaftEngine> StoreSystem<EK, ER> {
workers
.store_writers
.spawn(store_id, raft_engine.clone(), None, router, &trans, &cfg)?;
let read_scheduler = workers.async_read_worker.start(
"async-read-worker",
ReadRunner::new(router.clone(), raft_engine.clone()),
);

let mut read_runner = ReadRunner::new(router.clone(), raft_engine.clone());
read_runner.set_snap_mgr(snap_mgr);
let read_scheduler = workers
.async_read_worker
.start("async-read-worker", read_runner);

let mut builder = StorePollerBuilder::new(
cfg.clone(),
Expand Down
2 changes: 1 addition & 1 deletion components/raftstore-v2/src/fsm/peer.rs
Expand Up @@ -241,6 +241,6 @@ impl<'a, EK: KvEngine, ER: RaftEngine, T: Transport> PeerFsmDelegate<'a, EK, ER,
}
}
// TODO: instead of propose pending commands immediately, we should use timeout.
self.fsm.peer.propose_pending_command(self.store_ctx);
self.fsm.peer.propose_pending_writes(self.store_ctx);
}
}
29 changes: 19 additions & 10 deletions components/raftstore-v2/src/operation/command/admin/mod.rs
Expand Up @@ -68,9 +68,13 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
ch.report_error(resp);
return;
}
// To maintain propose order, we need to make pending proposal first.
self.propose_pending_command(ctx);
let cmd_type = req.get_admin_request().get_cmd_type();
if let Some(conflict) = self.proposal_control_mut().check_conflict(Some(cmd_type)) {
conflict.delay_channel(ch);
return;
}
// To maintain propose order, we need to make pending proposal first.
self.propose_pending_writes(ctx);
let res = if apply::is_conf_change_cmd(&req) {
self.propose_conf_change(ctx, req)
} else {
Expand All @@ -83,14 +87,19 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
_ => unimplemented!(),
}
};
if let Err(e) = &res {
info!(
self.logger,
"failed to propose admin command";
"cmd_type" => ?cmd_type,
"error" => ?e,
);
match &res {
Ok(index) => self
.proposal_control_mut()
.record_proposed_admin(cmd_type, *index),
Err(e) => {
info!(
self.logger,
"failed to propose admin command";
"cmd_type" => ?cmd_type,
"error" => ?e,
);
}
}
self.post_propose_write(ctx, res, vec![ch]);
self.post_propose_command(ctx, res, vec![ch], true);
}
}
8 changes: 3 additions & 5 deletions components/raftstore-v2/src/operation/command/admin/split.rs
Expand Up @@ -71,12 +71,10 @@ impl<EK: KvEngine, ER: RaftEngine> Peer<EK, ER> {
mut req: RaftCmdRequest,
) -> Result<u64> {
validate_batch_split(req.mut_admin_request(), self.region())?;
let mut proposal_ctx = ProposalContext::empty();
proposal_ctx.insert(ProposalContext::SYNC_LOG);
proposal_ctx.insert(ProposalContext::SPLIT);

// We rely on ConflictChecker to detect conflicts, so no need to set proposal
// context.
let data = req.write_to_bytes().unwrap();
self.propose_with_ctx(store_ctx, data, proposal_ctx.to_vec())
self.propose(store_ctx, data)
}
}

Expand Down

0 comments on commit e1e6659

Please sign in to comment.