diff --git a/components/test_storage/src/sync_storage.rs b/components/test_storage/src/sync_storage.rs index f6560a12145..16ad6167781 100644 --- a/components/test_storage/src/sync_storage.rs +++ b/components/test_storage/src/sync_storage.rs @@ -219,7 +219,18 @@ impl SyncTestStorage { limit: usize, ) -> Result>> { self.store - .async_raw_scan(ctx, cf, start_key, limit, false) + .async_raw_scan(ctx, cf, start_key, limit, false, false) + .wait() + } + pub fn reverse_raw_scan( + &self, + ctx: Context, + cf: String, + start_key: Vec, + limit: usize, + ) -> Result>> { + self.store + .async_raw_scan(ctx, cf, start_key, limit, false, true) .wait() } } diff --git a/src/server/service/kv.rs b/src/server/service/kv.rs index f07ebfe5f0d..d1c1d9a501a 100644 --- a/src/server/service/kv.rs +++ b/src/server/service/kv.rs @@ -591,6 +591,7 @@ impl tikvpb_grpc::Tikv for Service tikvpb_grpc::Tikv for Service Storage { } Ok(pairs) } + fn reverse_raw_scan( + snapshot: &E::Snap, + cf: &str, + start_key: &Key, + end_key: Option, + limit: usize, + statistics: &mut Statistics, + key_only: bool, + ) -> Result>> { + let mut option = IterOption::default(); + if let Some(end) = end_key { + option.set_lower_bound(end.into_encoded()); + } + let mut cursor = snapshot.iter_cf(Self::rawkv_cf(cf)?, option, ScanMode::Backward)?; + let statistics = statistics.mut_cf_statistics(cf); + if !cursor.reverse_seek(start_key, statistics)? { + return Ok(vec![]); + } + let mut pairs = vec![]; + while cursor.valid() && pairs.len() < limit { + pairs.push(Ok(( + cursor.key(statistics).to_owned(), + if key_only { + vec![] + } else { + cursor.value(statistics).to_owned() + }, + ))); + cursor.prev(statistics); + } + Ok(pairs) + } pub fn async_raw_scan( &self, @@ -1265,6 +1297,7 @@ impl Storage { key: Vec, limit: usize, key_only: bool, + reverse: bool, ) -> impl Future>, Error = Error> { const CMD: &str = "raw_scan"; let engine = self.get_engine(); @@ -1283,15 +1316,27 @@ impl Storage { let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); let mut statistics = Statistics::default(); - let result = Self::raw_scan( - &snapshot, - &cf, - &Key::from_encoded(key), - None, - limit, - &mut statistics, - key_only, - ).map_err(Error::from); + let result = if reverse { + Self::reverse_raw_scan( + &snapshot, + &cf, + &Key::from_encoded(key), + None, + limit, + &mut statistics, + key_only, + ).map_err(Error::from) + } else { + Self::raw_scan( + &snapshot, + &cf, + &Key::from_encoded(key), + None, + limit, + &mut statistics, + key_only, + ).map_err(Error::from) + }; thread_ctx.collect_read_flow(ctx.get_region_id(), &statistics); thread_ctx.collect_key_reads(CMD, statistics.write.flow_stats.read_keys as u64); @@ -1322,7 +1367,11 @@ impl Storage { Err(Error::InvalidCf(cf.to_owned())) } - fn check_key_ranges(ranges: &[KeyRange]) -> bool { + /// Check if key range is valid + /// + /// - if reverse, endKey is less than startKey. endKey is lowerBound. + /// - if not reverse, endKey is greater than startKey. endKey is upperBound. + fn check_key_ranges(ranges: &[KeyRange], reverse: bool) -> bool { let ranges_len = ranges.len(); for i in 0..ranges_len { let start_key = ranges[i].get_start_key(); @@ -1330,7 +1379,9 @@ impl Storage { if end_key.is_empty() && i + 1 != ranges_len { end_key = ranges[i + 1].get_start_key(); } - if !end_key.is_empty() && start_key >= end_key { + if !end_key.is_empty() + && (!reverse && start_key >= end_key || reverse && start_key <= end_key) + { return false; } } @@ -1344,6 +1395,7 @@ impl Storage { mut ranges: Vec, each_limit: usize, key_only: bool, + reverse: bool, ) -> impl Future>, Error = Error> { const CMD: &str = "raw_batch_scan"; let engine = self.get_engine(); @@ -1362,7 +1414,7 @@ impl Storage { let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); let mut statistics = Statistics::default(); - if !Self::check_key_ranges(&ranges) { + if !Self::check_key_ranges(&ranges, reverse) { return Err(box_err!("Invalid KeyRanges")); }; let mut result = Vec::new(); @@ -1379,15 +1431,27 @@ impl Storage { } else { Some(Key::from_encoded(end_key)) }; - let pairs = Self::raw_scan( - &snapshot, - &cf, - &start_key, - end_key, - each_limit, - &mut statistics, - key_only, - )?; + let pairs = if reverse { + Self::reverse_raw_scan( + &snapshot, + &cf, + &start_key, + end_key, + each_limit, + &mut statistics, + key_only, + )? + } else { + Self::raw_scan( + &snapshot, + &cf, + &start_key, + end_key, + each_limit, + &mut statistics, + key_only, + )? + }; result.extend(pairs.into_iter()); } @@ -2691,31 +2755,230 @@ mod tests { expect_multi_values( results.clone(), storage - .async_raw_scan(Context::new(), "".to_string(), vec![], 20, true) + .async_raw_scan(Context::new(), "".to_string(), vec![], 20, true, false) .wait(), ); results = results.split_off(10); expect_multi_values( results, storage - .async_raw_scan(Context::new(), "".to_string(), b"c2".to_vec(), 20, true) + .async_raw_scan( + Context::new(), + "".to_string(), + b"c2".to_vec(), + 20, + true, + false, + ) .wait(), ); - let mut results: Vec> = - test_data.into_iter().map(|(k, v)| Some((k, v))).collect(); + let mut results: Vec> = test_data + .clone() + .into_iter() + .map(|(k, v)| Some((k, v))) + .collect(); expect_multi_values( results.clone(), storage - .async_raw_scan(Context::new(), "".to_string(), vec![], 20, false) + .async_raw_scan(Context::new(), "".to_string(), vec![], 20, false, false) .wait(), ); results = results.split_off(10); expect_multi_values( results, storage - .async_raw_scan(Context::new(), "".to_string(), b"c2".to_vec(), 20, false) + .async_raw_scan( + Context::new(), + "".to_string(), + b"c2".to_vec(), + 20, + false, + false, + ) .wait(), ); + let results: Vec> = test_data + .clone() + .into_iter() + .map(|(k, v)| Some((k, v))) + .rev() + .collect(); + expect_multi_values( + results, + storage + .async_raw_scan( + Context::new(), + "".to_string(), + b"z".to_vec(), + 20, + false, + true, + ) + .wait(), + ); + + let results: Vec> = test_data + .clone() + .into_iter() + .map(|(k, v)| Some((k, v))) + .rev() + .take(5) + .collect(); + expect_multi_values( + results, + storage + .async_raw_scan( + Context::new(), + "".to_string(), + b"z".to_vec(), + 5, + false, + true, + ) + .wait(), + ); + + // End key tests. Confirm that lower/upper bound works correctly. + let ctx = Context::new(); + let results = vec![ + (b"c1".to_vec(), b"cc11".to_vec()), + (b"c2".to_vec(), b"cc22".to_vec()), + (b"c3".to_vec(), b"cc33".to_vec()), + (b"d".to_vec(), b"dd".to_vec()), + (b"d1".to_vec(), b"dd11".to_vec()), + (b"d2".to_vec(), b"dd22".to_vec()), + ].into_iter() + .map(|(k, v)| Some((k, v))); + expect_multi_values( + results.clone().collect(), + >::async_snapshot(storage.get_engine(), &ctx) + .and_then(move |snapshot| { + >::raw_scan( + &snapshot, + &"".to_string(), + &Key::from_encoded(b"c1".to_vec()), + Some(Key::from_encoded(b"d3".to_vec())), + 20, + &mut Statistics::default(), + false, + ) + }) + .wait(), + ); + expect_multi_values( + results.rev().collect(), + >::async_snapshot(storage.get_engine(), &ctx) + .and_then(move |snapshot| { + >::reverse_raw_scan( + &snapshot, + &"".to_string(), + &Key::from_encoded(b"d3".to_vec()), + Some(Key::from_encoded(b"c1".to_vec())), + 20, + &mut Statistics::default(), + false, + ) + }) + .wait(), + ); + } + + #[test] + fn test_check_key_ranges() { + fn make_ranges(ranges: Vec<(Vec, Vec)>) -> Vec { + ranges + .into_iter() + .map(|(s, e)| { + let mut range = KeyRange::new(); + range.set_start_key(s); + if !e.is_empty() { + range.set_end_key(e); + } + range + }) + .collect() + } + + let ranges = make_ranges(vec![ + (b"a".to_vec(), b"a3".to_vec()), + (b"b".to_vec(), b"b3".to_vec()), + (b"c".to_vec(), b"c3".to_vec()), + ]); + assert_eq!( + >::check_key_ranges(&ranges, false), + true + ); + + let ranges = make_ranges(vec![ + (b"a".to_vec(), vec![]), + (b"b".to_vec(), vec![]), + (b"c".to_vec(), vec![]), + ]); + assert_eq!( + >::check_key_ranges(&ranges, false), + true + ); + + let ranges = make_ranges(vec![ + (b"a3".to_vec(), b"a".to_vec()), + (b"b3".to_vec(), b"b".to_vec()), + (b"c3".to_vec(), b"c".to_vec()), + ]); + assert_eq!( + >::check_key_ranges(&ranges, false), + false + ); + + // if end_key is omitted, the next start_key is used instead. so, false is returned. + let ranges = make_ranges(vec![ + (b"c".to_vec(), vec![]), + (b"b".to_vec(), vec![]), + (b"a".to_vec(), vec![]), + ]); + assert_eq!( + >::check_key_ranges(&ranges, false), + false + ); + + let ranges = make_ranges(vec![ + (b"a3".to_vec(), b"a".to_vec()), + (b"b3".to_vec(), b"b".to_vec()), + (b"c3".to_vec(), b"c".to_vec()), + ]); + assert_eq!( + >::check_key_ranges(&ranges, true), + true + ); + + let ranges = make_ranges(vec![ + (b"c3".to_vec(), vec![]), + (b"b3".to_vec(), vec![]), + (b"a3".to_vec(), vec![]), + ]); + assert_eq!( + >::check_key_ranges(&ranges, true), + true + ); + + let ranges = make_ranges(vec![ + (b"a".to_vec(), b"a3".to_vec()), + (b"b".to_vec(), b"b3".to_vec()), + (b"c".to_vec(), b"c3".to_vec()), + ]); + assert_eq!( + >::check_key_ranges(&ranges, true), + false + ); + + let ranges = make_ranges(vec![ + (b"a3".to_vec(), vec![]), + (b"b3".to_vec(), vec![]), + (b"c3".to_vec(), vec![]), + ]); + assert_eq!( + >::check_key_ranges(&ranges, true), + false + ); } #[test] @@ -2793,7 +3056,14 @@ mod tests { expect_multi_values( results, storage - .async_raw_batch_scan(Context::new(), "".to_string(), ranges.clone(), 5, false) + .async_raw_batch_scan( + Context::new(), + "".to_string(), + ranges.clone(), + 5, + false, + false, + ) .wait(), ); @@ -2815,7 +3085,14 @@ mod tests { expect_multi_values( results, storage - .async_raw_batch_scan(Context::new(), "".to_string(), ranges.clone(), 5, true) + .async_raw_batch_scan( + Context::new(), + "".to_string(), + ranges.clone(), + 5, + true, + false, + ) .wait(), ); @@ -2833,7 +3110,14 @@ mod tests { expect_multi_values( results, storage - .async_raw_batch_scan(Context::new(), "".to_string(), ranges.clone(), 3, false) + .async_raw_batch_scan( + Context::new(), + "".to_string(), + ranges.clone(), + 3, + false, + false, + ) .wait(), ); @@ -2851,25 +3135,25 @@ mod tests { expect_multi_values( results, storage - .async_raw_batch_scan(Context::new(), "".to_string(), ranges, 3, true) + .async_raw_batch_scan(Context::new(), "".to_string(), ranges, 3, true, false) .wait(), ); let results = vec![ - Some((b"a".to_vec(), b"aa".to_vec())), - Some((b"a1".to_vec(), b"aa11".to_vec())), Some((b"a2".to_vec(), b"aa22".to_vec())), - Some((b"b".to_vec(), b"bb".to_vec())), - Some((b"b1".to_vec(), b"bb11".to_vec())), + Some((b"a1".to_vec(), b"aa11".to_vec())), + Some((b"a".to_vec(), b"aa".to_vec())), Some((b"b2".to_vec(), b"bb22".to_vec())), - Some((b"c".to_vec(), b"cc".to_vec())), - Some((b"c1".to_vec(), b"cc11".to_vec())), + Some((b"b1".to_vec(), b"bb11".to_vec())), + Some((b"b".to_vec(), b"bb".to_vec())), Some((b"c2".to_vec(), b"cc22".to_vec())), + Some((b"c1".to_vec(), b"cc11".to_vec())), + Some((b"c".to_vec(), b"cc".to_vec())), ]; let ranges: Vec = vec![ - (b"a".to_vec(), b"a3".to_vec()), - (b"b".to_vec(), b"b3".to_vec()), - (b"c".to_vec(), b"c3".to_vec()), + (b"a3".to_vec(), b"a".to_vec()), + (b"b3".to_vec(), b"b".to_vec()), + (b"c3".to_vec(), b"c".to_vec()), ].into_iter() .map(|(s, e)| { let mut range = KeyRange::new(); @@ -2881,25 +3165,60 @@ mod tests { expect_multi_values( results, storage - .async_raw_batch_scan(Context::new(), "".to_string(), ranges.clone(), 5, false) + .async_raw_batch_scan(Context::new(), "".to_string(), ranges, 5, false, true) + .wait(), + ); + + let results = vec![ + Some((b"c2".to_vec(), b"cc22".to_vec())), + Some((b"c1".to_vec(), b"cc11".to_vec())), + Some((b"b2".to_vec(), b"bb22".to_vec())), + Some((b"b1".to_vec(), b"bb11".to_vec())), + Some((b"a2".to_vec(), b"aa22".to_vec())), + Some((b"a1".to_vec(), b"aa11".to_vec())), + ]; + let ranges: Vec = vec![b"c3".to_vec(), b"b3".to_vec(), b"a3".to_vec()] + .into_iter() + .map(|s| { + let mut range = KeyRange::new(); + range.set_start_key(s); + range + }) + .collect(); + expect_multi_values( + results, + storage + .async_raw_batch_scan(Context::new(), "".to_string(), ranges, 2, false, true) .wait(), ); let results = vec![ - Some((b"a".to_vec(), vec![])), - Some((b"a1".to_vec(), vec![])), Some((b"a2".to_vec(), vec![])), - Some((b"b".to_vec(), vec![])), - Some((b"b1".to_vec(), vec![])), + Some((b"a1".to_vec(), vec![])), + Some((b"a".to_vec(), vec![])), Some((b"b2".to_vec(), vec![])), - Some((b"c".to_vec(), vec![])), - Some((b"c1".to_vec(), vec![])), + Some((b"b1".to_vec(), vec![])), + Some((b"b".to_vec(), vec![])), Some((b"c2".to_vec(), vec![])), + Some((b"c1".to_vec(), vec![])), + Some((b"c".to_vec(), vec![])), ]; + let ranges: Vec = vec![ + (b"a3".to_vec(), b"a".to_vec()), + (b"b3".to_vec(), b"b".to_vec()), + (b"c3".to_vec(), b"c".to_vec()), + ].into_iter() + .map(|(s, e)| { + let mut range = KeyRange::new(); + range.set_start_key(s); + range.set_end_key(e); + range + }) + .collect(); expect_multi_values( results, storage - .async_raw_batch_scan(Context::new(), "".to_string(), ranges, 5, true) + .async_raw_batch_scan(Context::new(), "".to_string(), ranges, 5, true, true) .wait(), ); }