New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement IngestSST API #2712
Implement IngestSST API #2712
Conversation
src/raftstore/store/store.rs
Outdated
if let Some(region) = self.region_peers.get_mut(&sst.get_region_id()) { | ||
region.size_diff_hint += sst.get_length(); | ||
} | ||
let _ = self.importer.delete(sst); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please error!()
if it meets an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errors are already logged in the importer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it block the raftstore? If yes, it may need to move to another place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm planning to add an import worker for the related jobs.
src/raftstore/store/worker/apply.rs
Outdated
|| epoch.get_version() != region_epoch.get_version() | ||
{ | ||
return Err(box_err!( | ||
"region epoch does not match: {:?} != {:?}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about Error::StaleEpoch(_)
?
src/raftstore/store/worker/apply.rs
Outdated
let region_id = sst.get_region_id(); | ||
if region_id != region.get_id() { | ||
return Err(box_err!( | ||
"region id does not match: {} != {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about Error::RegionNotFound(_)
?
src/storage/engine/mod.rs
Outdated
@@ -71,6 +72,11 @@ pub enum Modify { | |||
|
|||
pub trait Engine: Send + Debug { | |||
fn async_write(&self, ctx: &Context, batch: Vec<Modify>, callback: Callback<()>) -> Result<()>; | |||
|
|||
fn async_ingest(&self, _: &Context, _: SSTMeta, _: Callback<()>) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is proper to define the async_ingest
in the Engine
.
I suggest constructing a RaftCmd
in the ImportSSTService
and send it to the raftstore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will try.
src/import/sst_importer.rs
Outdated
}; | ||
|
||
// A valid file name should be in the format: | ||
// "{uuid}_{region_id}_{region_poch.conf_ver}_{region_epoch.version}.sst" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/region_poch/region_epoch
let _ = ctx.importer.delete(sst); | ||
return Err(e); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to add some metrics to indicate how many files we have ingested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can know that from the GRPC and RocksDB metrics.
return Err(e); | ||
} | ||
|
||
ctx.importer.ingest(sst, &self.engine).unwrap_or_else(|e| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems we don't verify checksum for this sst file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The checksum is verified inside importer.ingest()
.
} | ||
|
||
fn handle_delete_sst(&self, sst: SSTMeta) { | ||
let _ = self.importer.delete(&sst); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any mechanism to clean ssts when delete failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there is a mechanism to check and clean ssts periodically in the following PR.
LGTM |
src/import/service.rs
Outdated
|
||
use super::Error; | ||
|
||
pub fn make_cb<T: Debug + Send + 'static>() -> (Box<FnBox(T) + Send>, oneshot::Receiver<T>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the same as service::kv::make_callnack
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but I don't want to rely on a private function there, and it is just a simple function, so I make it self-contained here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we have a helper function.
https://github.com/pingcap/tikv/blob/0a991685770ed84f1e3db9b0f9870e5a77bf2778/src/util/future.rs#L19
src/import/errors.rs
Outdated
@@ -52,6 +54,16 @@ quick_error! { | |||
from() | |||
display("RocksDB {}", msg) | |||
} | |||
RaftStore(err: RaftStoreError) { | |||
from() | |||
cause(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please keep Indents.
use util::worker::Runnable; | ||
|
||
pub enum Task { | ||
DeleteSST { sst: SSTMeta }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be handled by the apply worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to delete the SST after it is successfully applied, so according to the current applying flow, it is more intuitive to handle it when we receive the ExecResult
.
@@ -43,22 +45,24 @@ fn new_cluster() -> (Cluster<ServerCluster>, Context) { | |||
(cluster, ctx) | |||
} | |||
|
|||
fn new_cluster_and_import_client() -> (Cluster<ServerCluster>, ImportSstClient) { | |||
fn new_cluster_and_tikv_import_client( | |||
) -> (Cluster<ServerCluster>, Context, TikvClient, ImportSstClient) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Weird format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is formatted by rustfmt :)
@overvenus PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM
Cargo.lock
Outdated
@@ -422,7 +422,7 @@ dependencies = [ | |||
[[package]] | |||
name = "kvproto" | |||
version = "0.0.1" | |||
source = "git+https://github.com/pingcap/kvproto.git#123051ec2973aa59a4d6d5b2c5da00071c0ddd24" | |||
source = "git+https://github.com/huachaohuang/kvproto.git?branch=ingest-sst#e194fb28415314427ad6a2497b65315db92745ee" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it is still in your branch?
Client must guarantee that the SST file has been uploaded to the servers where the region's peers locate in, before issuing an ingest request to the region's leader. The SST metadata provided in the ingest request will first be replicated through Raft. Then when the log entry is applied, the coresponding uploaded SST file will be ingested to the local engine. To make sure that the region's range and peers are not changed on during this process, the region's epoch will be checked before ingestion.
/run-integration-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
* Implement ImportSST::Ingest API Client must guarantee that the SST file has been uploaded to the servers where the region's peers locate in, before issuing an ingest request to the region's leader. The SST metadata provided in the ingest request will first be replicated through Raft. Then when the log entry is applied, the coresponding uploaded SST file will be ingested to the local engine. To make sure that the region's range and peers are not changed on during this process, the region's epoch will be checked before ingestion.
An uploaded SST file can be ingested to a region by sending an IngestSST request. The client must guarantee that the SST file has been uploaded to each peer of the region before issuing the IngestSST request. Upon ingestion, the metadata of the SST file will be replicated to the raft group first. Then before the IngestSST command can be applied, the range and the epoch of the metadata must be checked, and the length and checksum of the coresponding SST file must match the metadata too. Finally, the SST file will be deleted after it has been applied.
The coresponding kvproto PR: pingcap/kvproto#224