Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup RegionIterator to use RocksDB features directly #6010

Merged
merged 10 commits into from Nov 25, 2019
8 changes: 8 additions & 0 deletions components/engine_traits/src/iterable.rs
Expand Up @@ -14,6 +14,14 @@ pub trait Iterator {
fn seek(&mut self, key: SeekKey) -> bool;
fn seek_for_prev(&mut self, key: SeekKey) -> bool;

fn seek_to_first(&mut self) -> bool {
self.seek(SeekKey::Start)
}

fn seek_to_last(&mut self) -> bool {
self.seek(SeekKey::End)
}

fn prev(&mut self) -> bool;
fn next(&mut self) -> bool;

Expand Down
179 changes: 56 additions & 123 deletions src/raftstore/store/region_snapshot.rs
Expand Up @@ -3,9 +3,7 @@
use engine::rocks::{TablePropertiesCollection, DB};
use engine::{self, IterOption};
use engine_rocks::{RocksDBVector, RocksEngineIterator, RocksSnapshot, RocksSyncSnapshot};
use engine_traits::{
Peekable, ReadOptions, Result as EngineResult, SeekKey, Snapshot as SnapshotTrait,
};
use engine_traits::{Peekable, ReadOptions, Result as EngineResult, Snapshot as SnapshotTrait};
use kvproto::metapb::Region;
use std::sync::Arc;

Expand Down Expand Up @@ -154,26 +152,9 @@ impl Peekable for RegionSnapshot {
)
.map_err(|e| EngineError::Other(box_err!(e)))?;
let data_key = keys::data_key(key);
self.snap.get_value_opt(opts, &data_key).map_err(|e| {
CRITICAL_ERROR.with_label_values(&["rocksdb get"]).inc();
if panic_when_unexpected_key_or_data() {
set_panic_mark();
panic!(
"failed to get value of key {} in region {}: {:?}",
hex::encode_upper(&key),
self.region.get_id(),
e,
);
} else {
error!(
"failed to get value of key";
"key" => hex::encode_upper(&key),
"region" => self.region.get_id(),
"error" => ?e,
);
e
}
})
self.snap
.get_value_opt(opts, &data_key)
.map_err(|e| self.handle_get_value_error(e, "", key))
}

fn get_value_cf_opt(
Expand All @@ -192,27 +173,32 @@ impl Peekable for RegionSnapshot {
let data_key = keys::data_key(key);
self.snap
.get_value_cf_opt(opts, cf, &data_key)
.map_err(|e| {
CRITICAL_ERROR.with_label_values(&["rocksdb get"]).inc();
if panic_when_unexpected_key_or_data() {
set_panic_mark();
panic!(
"failed to get value of key {} in region {}: {:?}",
hex::encode_upper(&key),
self.region.get_id(),
e,
);
} else {
error!(
"failed to get value of key in cf";
"key" => hex::encode_upper(&key),
"region" => self.region.get_id(),
"cf" => cf,
"error" => ?e,
);
e
}
})
.map_err(|e| self.handle_get_value_error(e, cf, key))
}
}

impl RegionSnapshot {
#[inline(never)]
fn handle_get_value_error(&self, e: EngineError, cf: &str, key: &[u8]) -> EngineError {
CRITICAL_ERROR.with_label_values(&["rocksdb get"]).inc();
if panic_when_unexpected_key_or_data() {
set_panic_mark();
panic!(
"failed to get value of key {} in region {}: {:?}",
hex::encode_upper(&key),
self.region.get_id(),
e,
);
} else {
error!(
"failed to get value of key in cf";
"key" => hex::encode_upper(&key),
"region" => self.region.get_id(),
"cf" => cf,
"error" => ?e,
);
e
}
}
}

Expand All @@ -221,10 +207,7 @@ impl Peekable for RegionSnapshot {
/// db only contains one region.
pub struct RegionIterator {
iter: RocksEngineIterator,
valid: bool,
region: Arc<Region>,
start_key: Vec<u8>,
end_key: Vec<u8>,
}

fn update_lower_bound(iter_opt: &mut IterOption, region: &Region) {
Expand Down Expand Up @@ -260,18 +243,10 @@ impl RegionIterator {
) -> RegionIterator {
update_lower_bound(&mut iter_opt, &region);
update_upper_bound(&mut iter_opt, &region);
let start_key = iter_opt.lower_bound().unwrap().to_vec();
let end_key = iter_opt.upper_bound().unwrap().to_vec();
let iter = snap
.iterator_opt(iter_opt)
.expect("creating snapshot iterator"); // FIXME error handling
RegionIterator {
iter,
valid: false,
start_key,
end_key,
region,
}
RegionIterator { iter, region }
}

pub fn new_cf(
Expand All @@ -282,110 +257,62 @@ impl RegionIterator {
) -> RegionIterator {
update_lower_bound(&mut iter_opt, &region);
update_upper_bound(&mut iter_opt, &region);
let start_key = iter_opt.lower_bound().unwrap().to_vec();
let end_key = iter_opt.upper_bound().unwrap().to_vec();
let iter = snap
.iterator_cf_opt(cf, iter_opt)
.expect("creating snapshot iterator"); // FIXME error handling
RegionIterator {
iter,
valid: false,
start_key,
end_key,
region,
}
RegionIterator { iter, region }
}

pub fn seek_to_first(&mut self) -> bool {
self.valid = self.iter.seek(self.start_key.as_slice().into());

self.update_valid(true)
}

#[inline]
fn update_valid(&mut self, forward: bool) -> bool {
if self.valid {
let key = self.iter.key();
self.valid = if forward {
key < self.end_key.as_slice()
} else {
key >= self.start_key.as_slice()
};
}
self.valid
self.iter.seek_to_first()
}

pub fn seek_to_last(&mut self) -> bool {
if !self.iter.seek(self.end_key.as_slice().into()) && !self.iter.seek(SeekKey::End) {
self.valid = false;
return self.valid;
}

while self.iter.key() >= self.end_key.as_slice() && self.iter.prev() {}

self.valid = self.iter.valid();
self.update_valid(false)
self.iter.seek_to_last()
}

pub fn seek(&mut self, key: &[u8]) -> Result<bool> {
fail_point!("region_snapshot_seek", |_| {
return Err(box_err!("region seek error"));
});

self.should_seekable(key)?;
let key = keys::data_key(key);
if key == self.end_key {
self.valid = false;
} else {
self.valid = self.iter.seek(key.as_slice().into());
}

Ok(self.update_valid(true))
Ok(self.iter.seek(key.as_slice().into()))
}

pub fn seek_for_prev(&mut self, key: &[u8]) -> Result<bool> {
self.should_seekable(key)?;
let key = keys::data_key(key);
self.valid = self.iter.seek_for_prev(key.as_slice().into());
if self.valid && self.iter.key() == self.end_key.as_slice() {
self.valid = self.iter.prev();
}
Ok(self.update_valid(false))
Ok(self.iter.seek_for_prev(key.as_slice().into()))
}

pub fn prev(&mut self) -> bool {
if !self.valid {
if !self.valid() {
return false;
}
self.valid = self.iter.prev();

self.update_valid(false)
self.iter.prev()
}

pub fn next(&mut self) -> bool {
if !self.valid {
if !self.valid() {
return false;
}
self.valid = self.iter.next();

self.update_valid(true)
self.iter.next()
}

#[inline]
pub fn key(&self) -> &[u8] {
assert!(self.valid);
keys::origin_key(self.iter.key())
}

#[inline]
pub fn value(&self) -> &[u8] {
assert!(self.valid);
self.iter.value()
}

#[inline]
pub fn valid(&self) -> bool {
self.valid
self.iter.valid()
}

#[inline]
Expand All @@ -396,20 +323,26 @@ impl RegionIterator {
#[inline]
pub fn should_seekable(&self, key: &[u8]) -> Result<()> {
if let Err(e) = util::check_key_in_region_inclusive(key, &self.region) {
CRITICAL_ERROR
.with_label_values(&["key not in region"])
.inc();
if panic_when_unexpected_key_or_data() {
set_panic_mark();
panic!("key exceed bound: {:?}", e);
} else {
return Err(e);
}
return handle_check_key_in_region_error(e);
}
Ok(())
}
}

#[inline(never)]
fn handle_check_key_in_region_error(e: crate::raftstore::Error) -> Result<()> {
// Split out the error case to reduce hot-path code size.
CRITICAL_ERROR
.with_label_values(&["key not in region"])
.inc();
if panic_when_unexpected_key_or_data() {
set_panic_mark();
panic!("key exceed bound: {:?}", e);
} else {
Err(e)
}
}

#[cfg(test)]
mod tests {
use std::path::Path;
Expand Down
46 changes: 27 additions & 19 deletions src/storage/kv/cursor.rs
Expand Up @@ -9,7 +9,9 @@ use tikv_util::keybuilder::KeyBuilder;
use tikv_util::metrics::CRITICAL_ERROR;
use tikv_util::{panic_when_unexpected_key_or_data, set_panic_mark};

use crate::storage::kv::{CFStatistics, Iterator, Key, Result, ScanMode, Snapshot, SEEK_BOUND};
use crate::storage::kv::{
CFStatistics, Error, Iterator, Key, Result, ScanMode, Snapshot, SEEK_BOUND,
};

pub struct Cursor<I: Iterator> {
iter: I,
Expand Down Expand Up @@ -346,30 +348,36 @@ impl<I: Iterator> Cursor<I> {
pub fn valid(&self) -> Result<bool> {
if !self.iter.valid() {
if let Err(e) = self.iter.status() {
CRITICAL_ERROR.with_label_values(&["rocksdb iter"]).inc();
if panic_when_unexpected_key_or_data() {
set_panic_mark();
panic!(
"failed to iterate: {:?}, min_key: {:?}, max_key: {:?}",
e,
self.min_key.as_ref().map(|k| hex::encode_upper(k)),
self.max_key.as_ref().map(|k| hex::encode_upper(k))
);
} else {
error!(
"failed to iterate";
"min_key" => ?self.min_key.as_ref().map(|k| hex::encode_upper(k)),
"max_key" => ?self.max_key.as_ref().map(|k| hex::encode_upper(k)),
"error" => ?e,
);
}
return Err(e);
self.handle_error_status(e)?;
}
Ok(false)
} else {
Ok(true)
}
}

#[inline(never)]
fn handle_error_status(&self, e: Error) -> Result<()> {
// Split out the error case to reduce hot-path code size.
CRITICAL_ERROR.with_label_values(&["rocksdb iter"]).inc();
if panic_when_unexpected_key_or_data() {
set_panic_mark();
panic!(
"failed to iterate: {:?}, min_key: {:?}, max_key: {:?}",
e,
self.min_key.as_ref().map(|v| hex::encode_upper(v)),
self.max_key.as_ref().map(|v| hex::encode_upper(v)),
);
} else {
error!(
"failed to iterate";
"min_key" => ?self.min_key.as_ref().map(|v| hex::encode_upper(v)),
"max_key" => ?self.max_key.as_ref().map(|v| hex::encode_upper(v)),
"error" => ?e,
);
Err(e)
}
}
}

/// A handy utility to build a snapshot cursor according to various configurations.
Expand Down
1 change: 1 addition & 0 deletions src/storage/mvcc/mod.rs
Expand Up @@ -319,6 +319,7 @@ impl From<codec::Error> for ErrorInner {
pub type Result<T> = std::result::Result<T, Error>;

/// Generates `DefaultNotFound` error or panic directly based on config.
#[inline(never)]
pub fn default_not_found_error(key: Vec<u8>, hint: &str) -> Error {
CRITICAL_ERROR
.with_label_values(&["default value not found"])
Expand Down