Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: support reverse raw_scan and raw_batch_scan #3724

Merged
merged 27 commits into from Nov 15, 2018
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b3966ce
cargo: update kvproto
kamijin-fanta Oct 30, 2018
b1de02a
service/kv,storage: support reverse request in raw_scan
kamijin-fanta Oct 30, 2018
333e0b6
service/kv,storage: support reverse request in raw_batch_scan
kamijin-fanta Oct 30, 2018
a99f94f
storage: format code
kamijin-fanta Oct 30, 2018
794e64b
Merge branch 'master' into master
kamijin-fanta Oct 30, 2018
1f6dcc0
storage: fix check_key_ranges conditions
kamijin-fanta Oct 30, 2018
7d432e9
storage: add raw_scan test case
kamijin-fanta Oct 30, 2018
0e05db3
storage: add comment
kamijin-fanta Nov 1, 2018
8a9d284
storage: change the type of seek in raw_scan
kamijin-fanta Nov 1, 2018
29f7bf6
Merge branch 'master' into master
kamijin-fanta Nov 1, 2018
cc2069c
storage: comments changed to rustdoc
kamijin-fanta Nov 2, 2018
9db0e9f
Merge branch 'master' into master
kamijin-fanta Nov 2, 2018
acb02c2
Merge branch 'master' into master
kamijin-fanta Nov 4, 2018
050708b
run ci
kamijin-fanta Nov 5, 2018
92085c9
storage: split functions raw_scan and reverse_raw_scan
kamijin-fanta Nov 7, 2018
0d3ffe1
Merge remote-tracking branch 'upstream/master'
kamijin-fanta Nov 7, 2018
8083033
Merge branch 'master' of github.com:kamijin-fanta/tikv
kamijin-fanta Nov 7, 2018
6ce3a6b
storage: update kvproto
kamijin-fanta Nov 7, 2018
1ff5ac4
test-storage: add reverse_raw_scan to sync_storage
kamijin-fanta Nov 7, 2018
ede459d
storage: add test cases
kamijin-fanta Nov 7, 2018
3abc519
test_storage: fix sync_storage
kamijin-fanta Nov 7, 2018
2f3712b
Merge branch 'master' into master
kamijin-fanta Nov 7, 2018
4943047
Merge branch 'master' into master
kamijin-fanta Nov 8, 2018
47de650
Merge branch 'master' into master
breezewish Nov 13, 2018
b274a35
Merge branch 'master' into master
Hijiao Nov 13, 2018
a5e9407
Merge branch 'master' into master
MyonKeminta Nov 15, 2018
0f5b9aa
Merge branch 'master' into master
MyonKeminta Nov 15, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/test_storage/src/assert_storage.rs
Expand Up @@ -578,7 +578,7 @@ impl<E: Engine> AssertionStorage<E> {
) {
let result: Vec<KvPair> = self
.store
.raw_scan(self.ctx.clone(), cf, start_key, limit)
.raw_scan(self.ctx.clone(), cf, start_key, limit, false)
.unwrap()
.into_iter()
.map(|x| x.unwrap())
Expand Down
3 changes: 2 additions & 1 deletion components/test_storage/src/sync_storage.rs
Expand Up @@ -205,9 +205,10 @@ impl<E: Engine> SyncStorage<E> {
cf: String,
start_key: Vec<u8>,
limit: usize,
reverse: bool,
) -> Result<Vec<Result<KvPair>>> {
self.store
.async_raw_scan(ctx, cf, start_key, limit, false)
.async_raw_scan(ctx, cf, start_key, limit, false, reverse)
kamijin-fanta marked this conversation as resolved.
Show resolved Hide resolved
.wait()
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/server/service/kv.rs
Expand Up @@ -584,6 +584,7 @@ impl<T: RaftStoreRouter + 'static, E: Engine> tikvpb_grpc::Tikv for Service<T, E
req.take_start_key(),
req.get_limit() as usize,
req.get_key_only(),
req.get_reverse(),
)
.then(|v| {
let mut resp = RawScanResponse::new();
Expand Down Expand Up @@ -619,6 +620,7 @@ impl<T: RaftStoreRouter + 'static, E: Engine> tikvpb_grpc::Tikv for Service<T, E
req.take_ranges().into_vec(),
req.get_each_limit() as usize,
req.get_key_only(),
req.get_reverse(),
)
.then(|v| {
let mut resp = RawBatchScanResponse::new();
Expand Down
157 changes: 138 additions & 19 deletions src/storage/mod.rs
Expand Up @@ -1134,14 +1134,28 @@ impl<E: Engine> Storage<E> {
limit: usize,
statistics: &mut Statistics,
key_only: bool,
reverse: bool,
) -> Result<Vec<Result<KvPair>>> {
let mut option = IterOption::default();
if let Some(end) = end_key {
option.set_upper_bound(end.into_encoded());
if reverse {
kamijin-fanta marked this conversation as resolved.
Show resolved Hide resolved
option.set_lower_bound(end.into_encoded());
} else {
option.set_upper_bound(end.into_encoded());
}
}
let mut cursor = snapshot.iter_cf(Self::rawkv_cf(cf)?, option, ScanMode::Forward)?;
let scan_mode = if reverse {
ScanMode::Backward
} else {
ScanMode::Forward
};
let mut cursor = snapshot.iter_cf(Self::rawkv_cf(cf)?, option, scan_mode)?;
let statistics = statistics.mut_cf_statistics(cf);
if !cursor.seek(start_key, statistics)? {
if reverse {
if !cursor.seek_for_prev(start_key, statistics)? {
kamijin-fanta marked this conversation as resolved.
Show resolved Hide resolved
return Ok(vec![]);
}
} else if !cursor.seek(start_key, statistics)? {
return Ok(vec![]);
}
let mut pairs = vec![];
Expand All @@ -1154,7 +1168,11 @@ impl<E: Engine> Storage<E> {
cursor.value(statistics).to_owned()
},
)));
cursor.next(statistics);
if reverse {
cursor.prev(statistics);
} else {
cursor.next(statistics);
}
}
Ok(pairs)
}
Expand All @@ -1166,6 +1184,7 @@ impl<E: Engine> Storage<E> {
key: Vec<u8>,
limit: usize,
key_only: bool,
reverse: bool,
) -> impl Future<Item = Vec<Result<KvPair>>, Error = Error> {
const CMD: &str = "raw_scan";
let engine = self.get_engine();
Expand All @@ -1192,6 +1211,7 @@ impl<E: Engine> Storage<E> {
limit,
&mut statistics,
key_only,
reverse,
).map_err(Error::from);

thread_ctx.collect_read_flow(ctx.get_region_id(), &statistics);
Expand Down Expand Up @@ -1223,15 +1243,17 @@ impl<E: Engine> Storage<E> {
Err(Error::InvalidCf(cf.to_owned()))
}

fn check_key_ranges(ranges: &[KeyRange]) -> bool {
fn check_key_ranges(ranges: &[KeyRange], reverse: bool) -> bool {
let ranges_len = ranges.len();
for i in 0..ranges_len {
let start_key = ranges[i].get_start_key();
let mut end_key = ranges[i].get_end_key();
if end_key.is_empty() && i + 1 != ranges_len {
end_key = ranges[i + 1].get_start_key();
}
if !end_key.is_empty() && start_key >= end_key {
if !end_key.is_empty()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In reverse scan mode, when there are multiple ranges, should they in descending order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When conscious of time series metrics, I feel it naturally. However, I follow the policy of the project.

&& (!reverse && start_key >= end_key || reverse && start_key <= end_key)
{
return false;
}
}
Expand All @@ -1245,6 +1267,7 @@ impl<E: Engine> Storage<E> {
mut ranges: Vec<KeyRange>,
each_limit: usize,
key_only: bool,
reverse: bool,
) -> impl Future<Item = Vec<Result<KvPair>>, Error = Error> {
const CMD: &str = "raw_batch_scan";
let engine = self.get_engine();
Expand All @@ -1263,7 +1286,7 @@ impl<E: Engine> Storage<E> {
let _t_process = thread_ctx.start_processing_read_duration_timer(CMD);

let mut statistics = Statistics::default();
if !Self::check_key_ranges(&ranges) {
if !Self::check_key_ranges(&ranges, reverse) {
return Err(box_err!("Invalid KeyRanges"));
};
let mut result = Vec::new();
Expand All @@ -1288,6 +1311,7 @@ impl<E: Engine> Storage<E> {
each_limit,
&mut statistics,
key_only,
reverse,
)?;
result.extend(pairs.into_iter());
}
Expand Down Expand Up @@ -2491,29 +2515,62 @@ mod tests {
expect_multi_values(
results.clone(),
storage
.async_raw_scan(Context::new(), "".to_string(), vec![], 20, true)
.async_raw_scan(Context::new(), "".to_string(), vec![], 20, true, false)
.wait(),
);
results = results.split_off(10);
expect_multi_values(
results,
storage
.async_raw_scan(Context::new(), "".to_string(), b"c2".to_vec(), 20, true)
.async_raw_scan(
Context::new(),
"".to_string(),
b"c2".to_vec(),
20,
true,
false,
)
.wait(),
);
let mut results: Vec<Option<KvPair>> =
test_data.into_iter().map(|(k, v)| Some((k, v))).collect();
let mut results: Vec<Option<KvPair>> = test_data
.clone()
.into_iter()
.map(|(k, v)| Some((k, v)))
.collect();
expect_multi_values(
results.clone(),
storage
.async_raw_scan(Context::new(), "".to_string(), vec![], 20, false)
.async_raw_scan(Context::new(), "".to_string(), vec![], 20, false, false)
.wait(),
);
results = results.split_off(10);
expect_multi_values(
results,
storage
.async_raw_scan(Context::new(), "".to_string(), b"c2".to_vec(), 20, false)
.async_raw_scan(
Context::new(),
"".to_string(),
b"c2".to_vec(),
20,
false,
false,
)
.wait(),
);
let mut results: Vec<Option<KvPair>> =
test_data.into_iter().map(|(k, v)| Some((k, v))).collect();
results.reverse();
kamijin-fanta marked this conversation as resolved.
Show resolved Hide resolved
expect_multi_values(
results,
storage
.async_raw_scan(
Context::new(),
"".to_string(),
b"z".to_vec(),
20,
false,
true,
)
kamijin-fanta marked this conversation as resolved.
Show resolved Hide resolved
.wait(),
);
}
Expand Down Expand Up @@ -2596,7 +2653,14 @@ mod tests {
expect_multi_values(
results,
storage
.async_raw_batch_scan(Context::new(), "".to_string(), ranges.clone(), 5, false)
.async_raw_batch_scan(
Context::new(),
"".to_string(),
ranges.clone(),
5,
false,
false,
)
.wait(),
);

Expand All @@ -2618,7 +2682,14 @@ mod tests {
expect_multi_values(
results,
storage
.async_raw_batch_scan(Context::new(), "".to_string(), ranges.clone(), 5, true)
.async_raw_batch_scan(
Context::new(),
"".to_string(),
ranges.clone(),
5,
true,
false,
)
.wait(),
);

Expand All @@ -2636,7 +2707,14 @@ mod tests {
expect_multi_values(
results,
storage
.async_raw_batch_scan(Context::new(), "".to_string(), ranges.clone(), 3, false)
.async_raw_batch_scan(
Context::new(),
"".to_string(),
ranges.clone(),
3,
false,
false,
)
.wait(),
);

Expand All @@ -2654,7 +2732,7 @@ mod tests {
expect_multi_values(
results,
storage
.async_raw_batch_scan(Context::new(), "".to_string(), ranges, 3, true)
.async_raw_batch_scan(Context::new(), "".to_string(), ranges, 3, true, false)
.wait(),
);

Expand Down Expand Up @@ -2684,7 +2762,14 @@ mod tests {
expect_multi_values(
results,
storage
.async_raw_batch_scan(Context::new(), "".to_string(), ranges.clone(), 5, false)
.async_raw_batch_scan(
Context::new(),
"".to_string(),
ranges.clone(),
5,
false,
false,
)
.wait(),
);

Expand All @@ -2702,7 +2787,41 @@ mod tests {
expect_multi_values(
results,
storage
.async_raw_batch_scan(Context::new(), "".to_string(), ranges, 5, true)
.async_raw_batch_scan(Context::new(), "".to_string(), ranges, 5, true, false)
.wait(),
);

// backward test
let results = vec![
Some((b"a3".to_vec(), vec![])),
Some((b"a2".to_vec(), vec![])),
Some((b"a1".to_vec(), vec![])),
Some((b"a".to_vec(), vec![])),
Some((b"b3".to_vec(), vec![])),
Some((b"b2".to_vec(), vec![])),
Some((b"b1".to_vec(), vec![])),
Some((b"b".to_vec(), vec![])),
Some((b"c3".to_vec(), vec![])),
Some((b"c2".to_vec(), vec![])),
Some((b"c1".to_vec(), vec![])),
Some((b"c".to_vec(), vec![])),
];
let ranges: Vec<KeyRange> = vec![
(b"a3".to_vec(), b"a".to_vec()),
(b"b3".to_vec(), b"b".to_vec()),
(b"c3".to_vec(), b"c".to_vec()),
].into_iter()
.map(|(s, e)| {
let mut range = KeyRange::new();
range.set_start_key(s);
range.set_end_key(e);
range
})
.collect();
expect_multi_values(
results,
storage
.async_raw_batch_scan(Context::new(), "".to_string(), ranges, 5, true, true)
kamijin-fanta marked this conversation as resolved.
Show resolved Hide resolved
.wait(),
);
}
Expand Down