Skip to content

Commit

Permalink
service/kv: Add end_key limit to kv_scan interface (#3720)
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta committed Nov 7, 2018
1 parent ff59513 commit 14530e5
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benches/storage/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ fn bench_tombstone_scan(b: &mut Bencher) {
.scan(
Context::new(),
Key::from_raw(&k),
None,
1,
false,
ts_generator.next().unwrap()
Expand Down
6 changes: 3 additions & 3 deletions components/test_storage/assert_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ impl<E: Engine> AssertionStorage<E> {
let key_address = Key::from_raw(start_key);
let result = self
.store
.scan(self.ctx.clone(), key_address, limit, false, ts)
.scan(self.ctx.clone(), key_address, None, limit, false, ts)
.unwrap();
let result: Vec<Option<KvPair>> = result.into_iter().map(Result::ok).collect();
let expect: Vec<Option<KvPair>> = expect
Expand All @@ -340,7 +340,7 @@ impl<E: Engine> AssertionStorage<E> {
let key_address = Key::from_raw(start_key);
let result = self
.store
.reverse_scan(self.ctx.clone(), key_address, limit, false, ts)
.reverse_scan(self.ctx.clone(), key_address, None, limit, false, ts)
.unwrap();
let result: Vec<Option<KvPair>> = result.into_iter().map(Result::ok).collect();
let expect: Vec<Option<KvPair>> = expect
Expand All @@ -360,7 +360,7 @@ impl<E: Engine> AssertionStorage<E> {
let key_address = Key::from_raw(start_key);
let result = self
.store
.scan(self.ctx.clone(), key_address, limit, true, ts)
.scan(self.ctx.clone(), key_address, None, limit, true, ts)
.unwrap();
let result: Vec<Option<KvPair>> = result.into_iter().map(Result::ok).collect();
let expect: Vec<Option<KvPair>> = expect
Expand Down
18 changes: 14 additions & 4 deletions components/test_storage/sync_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,38 @@ impl<E: Engine> SyncStorage<E> {
pub fn scan(
&self,
ctx: Context,
key: Key,
start_key: Key,
end_key: Option<Key>,
limit: usize,
key_only: bool,
start_ts: u64,
) -> Result<Vec<Result<KvPair>>> {
self.store
.async_scan(ctx, key, limit, start_ts, Options::new(0, false, key_only))
.async_scan(
ctx,
start_key,
end_key,
limit,
start_ts,
Options::new(0, false, key_only),
)
.wait()
}

pub fn reverse_scan(
&self,
ctx: Context,
key: Key,
start_key: Key,
end_key: Option<Key>,
limit: usize,
key_only: bool,
start_ts: u64,
) -> Result<Vec<Result<KvPair>>> {
self.store
.async_scan(
ctx,
key,
start_key,
end_key,
limit,
start_ts,
Options::new(0, false, key_only).reverse_scan(),
Expand Down
7 changes: 7 additions & 0 deletions src/server/service/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,18 @@ impl<T: RaftStoreRouter + 'static, E: Engine> tikvpb_grpc::Tikv for Service<T, E
options.key_only = req.get_key_only();
options.reverse_scan = req.get_reverse();

let end_key = if req.get_end_key().is_empty() {
None
} else {
Some(Key::from_raw(req.get_end_key()))
};

let future = self
.storage
.async_scan(
req.take_context(),
Key::from_raw(req.get_start_key()),
end_key,
req.get_limit() as usize,
req.get_version(),
options,
Expand Down
1 change: 1 addition & 0 deletions src/storage/gc_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ mod tests {
.async_scan(
Context::default(),
Key::from_encoded_slice(b""),
None,
expected_data.len() + 1,
1,
Options::default(),
Expand Down
167 changes: 165 additions & 2 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ impl<E: Engine> Storage<E> {
&self,
ctx: Context,
start_key: Key,
end_key: Option<Key>,
limit: usize,
start_ts: u64,
options: Options,
Expand Down Expand Up @@ -665,13 +666,13 @@ impl<E: Engine> Storage<E> {
ScanMode::Forward,
options.key_only,
Some(start_key),
None,
end_key,
)?;
} else {
scanner = snap_store.scanner(
ScanMode::Backward,
options.key_only,
None,
end_key,
Some(start_key),
)?;
};
Expand Down Expand Up @@ -1630,6 +1631,7 @@ mod tests {
.async_scan(
Context::new(),
Key::from_raw(b"x"),
None,
1000,
1,
Options::default(),
Expand Down Expand Up @@ -1671,18 +1673,91 @@ mod tests {
)
.unwrap();
rx.recv().unwrap();
// Forward
expect_multi_values(
vec![None, None, None],
storage
.async_scan(
Context::new(),
Key::from_raw(b"\x00"),
None,
1000,
5,
Options::default(),
)
.wait(),
);
// Backward
expect_multi_values(
vec![None, None, None],
storage
.async_scan(
Context::new(),
Key::from_raw(b"\xff"),
None,
1000,
5,
Options::default().reverse_scan(),
)
.wait(),
);
// Forward with bound
expect_multi_values(
vec![None, None],
storage
.async_scan(
Context::new(),
Key::from_raw(b"\x00"),
Some(Key::from_raw(b"c")),
1000,
5,
Options::default(),
)
.wait(),
);
// Backward with bound
expect_multi_values(
vec![None, None],
storage
.async_scan(
Context::new(),
Key::from_raw(b"\xff"),
Some(Key::from_raw(b"b")),
1000,
5,
Options::default().reverse_scan(),
)
.wait(),
);
// Forward with limit
expect_multi_values(
vec![None, None],
storage
.async_scan(
Context::new(),
Key::from_raw(b"\x00"),
None,
2,
5,
Options::default(),
)
.wait(),
);
// Backward with limit
expect_multi_values(
vec![None, None],
storage
.async_scan(
Context::new(),
Key::from_raw(b"\xff"),
None,
2,
5,
Options::default().reverse_scan(),
)
.wait(),
);

storage
.async_commit(
Context::new(),
Expand All @@ -1697,6 +1772,7 @@ mod tests {
)
.unwrap();
rx.recv().unwrap();
// Forward
expect_multi_values(
vec![
Some((b"a".to_vec(), b"aa".to_vec())),
Expand All @@ -1707,13 +1783,100 @@ mod tests {
.async_scan(
Context::new(),
Key::from_raw(b"\x00"),
None,
1000,
5,
Options::default(),
)
.wait(),
);
// Backward
expect_multi_values(
vec![
Some((b"c".to_vec(), b"cc".to_vec())),
Some((b"b".to_vec(), b"bb".to_vec())),
Some((b"a".to_vec(), b"aa".to_vec())),
],
storage
.async_scan(
Context::new(),
Key::from_raw(b"\xff"),
None,
1000,
5,
Options::default().reverse_scan(),
)
.wait(),
);
// Forward with bound
expect_multi_values(
vec![
Some((b"a".to_vec(), b"aa".to_vec())),
Some((b"b".to_vec(), b"bb".to_vec())),
],
storage
.async_scan(
Context::new(),
Key::from_raw(b"\x00"),
Some(Key::from_raw(b"c")),
1000,
5,
Options::default(),
)
.wait(),
);
// Backward with bound
expect_multi_values(
vec![
Some((b"c".to_vec(), b"cc".to_vec())),
Some((b"b".to_vec(), b"bb".to_vec())),
],
storage
.async_scan(
Context::new(),
Key::from_raw(b"\xff"),
Some(Key::from_raw(b"b")),
1000,
5,
Options::default().reverse_scan(),
)
.wait(),
);
storage.stop().unwrap();
// Forward with limit
expect_multi_values(
vec![
Some((b"a".to_vec(), b"aa".to_vec())),
Some((b"b".to_vec(), b"bb".to_vec())),
],
storage
.async_scan(
Context::new(),
Key::from_raw(b"\x00"),
None,
2,
5,
Options::default(),
)
.wait(),
);
// Backward with limit
expect_multi_values(
vec![
Some((b"c".to_vec(), b"cc".to_vec())),
Some((b"b".to_vec(), b"bb".to_vec())),
],
storage
.async_scan(
Context::new(),
Key::from_raw(b"\xff"),
None,
2,
5,
Options::default().reverse_scan(),
)
.wait(),
);
}

#[test]
Expand Down
4 changes: 2 additions & 2 deletions tests/storage_cases/test_raft_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ fn test_raft_storage() {
assert!(storage.batch_get(ctx.clone(), &[key.clone()], 20).is_err());
assert!(
storage
.scan(ctx.clone(), key.clone(), 1, false, 20)
.scan(ctx.clone(), key.clone(), None, 1, false, 20)
.is_err()
);
assert!(
Expand Down Expand Up @@ -158,7 +158,7 @@ fn test_raft_storage_store_not_match() {
assert!(storage.batch_get(ctx.clone(), &[key.clone()], 20).is_err());
assert!(
storage
.scan(ctx.clone(), key.clone(), 1, false, 20)
.scan(ctx.clone(), key.clone(), None, 1, false, 20)
.is_err()
);
assert!(
Expand Down

0 comments on commit 14530e5

Please sign in to comment.