Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding retryable to scan #456

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 108 additions & 52 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,33 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use core::ops::Range;
use std::future::Future;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;

use futures::StreamExt;
use log::debug;
use tokio::time::sleep;

use crate::backoff::DEFAULT_REGION_BACKOFF;
use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF};
use crate::common::Error;
use crate::config::Config;
use crate::pd::PdClient;
use crate::pd::PdRpcClient;
use crate::proto::metapb;
use crate::proto::kvrpcpb::{RawScanRequest, RawScanResponse};
use crate::proto::{errorpb, metapb};
use crate::raw::lowering::*;
use crate::request::Collect;
use crate::request::CollectSingle;
use crate::request::EncodeKeyspace;
use crate::request::KeyMode;
use crate::request::Keyspace;
use crate::request::Plan;
use crate::request::TruncateKeyspace;
use crate::request::{plan, Collect};
use crate::store::{HasRegionError, RegionStore};
use crate::Backoff;
use crate::BoundRange;
use crate::ColumnFamily;
use crate::Error::RegionError;
use crate::Key;
use crate::KvPair;
use crate::Result;
Expand Down Expand Up @@ -755,57 +759,40 @@ impl<PdC: PdClient> Client<PdC> {
max_limit: MAX_RAW_KV_SCAN_LIMIT,
});
}

let mut cur_range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let backoff = DEFAULT_STORE_BACKOFF;
let range = range.into().encode_keyspace(self.keyspace, KeyMode::Raw);
let mut result = Vec::new();
let mut scan_regions = self.rpc.clone().stores_for_range(cur_range.clone()).boxed();
let mut region_store =
scan_regions
.next()
.await
.ok_or(Error::RegionForRangeNotFound {
range: (cur_range.clone()),
})??;
let mut cur_limit = limit;

while cur_limit > 0 {
let request = new_raw_scan_request(
cur_range.clone(),
cur_limit,
let mut current_limit = limit;
let (start_key, end_key) = range.clone().into_keys();
let mut current_key: Key = start_key;

let region_error_handler =
|pd_rpc_client: Arc<PdC>, err: errorpb::Error, store: RegionStore| {
Box::pin(plan::handle_region_error(pd_rpc_client, err, store))
} as _;
while current_limit > 0 {
let scan_args = ScanInnerArgs {
start_key: current_key.clone(),
range: range.clone(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should scan from start_key. Otherwise if there is region merge during scan, we would get duplicated kv paires, and lose some others if limit is reached.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to trace through the logic and from what I understand, we will only be looping if the limit of the scan has not been exhausted. So regardless of a split or merge, won't we be resuming the next scan from the end_key returned by the previous scan call? So if the first scan result returns an end_key "foo", doesn't the system guarantee that if we start the next scan starting from "foo", we are guaranteed to return all results that are lexicographically larger than "foo" and smaller than whatever end_key has been provided. This is regardless of whether the underlying regions are undergoing any churn(due to any splits or merges). There may be gaps in my understanding so will be more than happy to get some more feedback here.

limit: current_limit,
key_only,
reverse,
self.cf.clone(),
);
let resp = crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, request)
.single_region_with_store(region_store.clone())
.await?
.plan()
.execute()
.await?;
let mut region_scan_res = resp
.kvs
.into_iter()
.map(Into::into)
.collect::<Vec<KvPair>>();
let res_len = region_scan_res.len();
result.append(&mut region_scan_res);

// if the number of results is less than cur_limit, it means this scan range contains more than one region, so we need to scan next region
if res_len < cur_limit as usize {
region_store = match scan_regions.next().await {
Some(Ok(rs)) => {
cur_range = BoundRange::new(
std::ops::Bound::Included(region_store.region_with_leader.range().1),
cur_range.to,
);
rs
}
Some(Err(e)) => return Err(e),
None => break,
};
cur_limit -= res_len as u32;
} else {
backoff: backoff.clone(),
};
let (res, next_key) = self.retryable_scan(scan_args, region_error_handler).await?;

let mut kvs = res
.map(|r| r.kvs.into_iter().map(Into::into).collect::<Vec<KvPair>>())
.unwrap_or(Vec::new());

if !kvs.is_empty() {
current_limit -= kvs.len() as u32;
result.append(&mut kvs);
}
if end_key.clone().is_some_and(|ek| ek <= next_key) {
break;
} else {
current_key = next_key;
}
}

Expand All @@ -818,6 +805,65 @@ impl<PdC: PdClient> Client<PdC> {
Ok(result)
}

async fn retryable_scan<'a, F>(
&self,
mut scan_args: ScanInnerArgs,
mut error_handler: F,
) -> Result<(Option<RawScanResponse>, Key)>
where
F: FnMut(
Arc<PdC>,
errorpb::Error,
RegionStore,
) -> Pin<Box<dyn Future<Output = Result<bool>>>>,
{
let start_key = scan_args.start_key;

let region = self.rpc.clone().region_for_key(&start_key).await?;
let store = self.rpc.clone().store_for_id(region.id()).await?;
let request = new_raw_scan_request(
scan_args.range.clone(),
scan_args.limit,
scan_args.key_only,
scan_args.reverse,
self.cf.clone(),
);
loop {
let resp = self.do_store_scan(store.clone(), request.clone()).await;
return match resp {
Ok(mut r) => {
if let Some(err) = r.region_error() {
let status =
error_handler(self.rpc.clone(), err.clone(), store.clone()).await?;
if status {
continue;
} else if let Some(duration) = scan_args.backoff.next_delay_duration() {
sleep(duration).await;
continue;
} else {
return Err(RegionError(Box::new(err)));
}
}
Ok((Some(r), region.end_key()))
}
Err(err) => Err(err),
};
}
}

async fn do_store_scan(
&self,
store: RegionStore,
scan_request: RawScanRequest,
) -> Result<RawScanResponse> {
crate::request::PlanBuilder::new(self.rpc.clone(), self.keyspace, scan_request)
.single_region_with_store(store.clone())
.await?
.plan()
.execute()
.await
}

async fn batch_scan_inner(
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
Expand Down Expand Up @@ -864,6 +910,16 @@ impl<PdC: PdClient> Client<PdC> {
}
}

#[derive(Clone)]
struct ScanInnerArgs {
start_key: Key,
range: BoundRange,
limit: u32,
key_only: bool,
reverse: bool,
backoff: Backoff,
}

#[cfg(test)]
mod tests {
use std::any::Any;
Expand Down
Loading
Loading