Skip to content

Commit

Permalink
storage/mvcc: return error when mvcc corruption (#4315)
Browse files Browse the repository at this point in the history
  • Loading branch information
Connor1996 authored and zhangjinpeng87 committed Mar 12, 2019
1 parent 5cb9ac9 commit 95a2f9a
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 43 deletions.
6 changes: 3 additions & 3 deletions src/bin/tikv-server.rs
Expand Up @@ -109,9 +109,9 @@ fn pre_start(cfg: &TiKvConfig) {
check_system_config(&cfg);
check_environment_variables();

if cfg.panic_when_key_exceed_bound {
info!("panic-when-key-exceed-bound is on");
tikv_util::set_panic_when_key_exceed_bound(true);
if cfg.panic_when_unexpected_key_or_data {
info!("panic-when-unexpected-key-or-data is on");
tikv_util::set_panic_when_unexpected_key_or_data(true);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Expand Up @@ -1141,7 +1141,7 @@ pub struct TiKvConfig {
pub log_level: slog::Level,
pub log_file: String,
pub log_rotation_timespan: ReadableDuration,
pub panic_when_key_exceed_bound: bool,
pub panic_when_unexpected_key_or_data: bool,
pub readpool: ReadPoolConfig,
pub server: ServerConfig,
pub storage: StorageConfig,
Expand All @@ -1162,7 +1162,7 @@ impl Default for TiKvConfig {
log_level: slog::Level::Info,
log_file: "".to_owned(),
log_rotation_timespan: ReadableDuration::hours(24),
panic_when_key_exceed_bound: false,
panic_when_unexpected_key_or_data: false,
readpool: ReadPoolConfig::default(),
server: ServerConfig::default(),
metric: MetricConfig::default(),
Expand Down
10 changes: 2 additions & 8 deletions src/raftstore/store/metrics.rs
Expand Up @@ -177,14 +177,14 @@ lazy_static! {
register_histogram!(
"tikv_snapshot_kv_count",
"Total number of kv in snapshot",
exponential_buckets(100.0, 2.0, 20).unwrap() //100,100*2^1,...100M
exponential_buckets(100.0, 2.0, 20).unwrap() //100,100*2^1,...100M
).unwrap();

pub static ref SNAPSHOT_SIZE_HISTOGRAM: Histogram =
register_histogram!(
"tikv_snapshot_size",
"Size of snapshot",
exponential_buckets(1024.0, 2.0, 22).unwrap() // 1024,1024*2^1,..,4G
exponential_buckets(1024.0, 2.0, 22).unwrap() // 1024,1024*2^1,..,4G
).unwrap();

pub static ref RAFT_ENTRY_FETCHES: IntCounterVec =
Expand Down Expand Up @@ -221,10 +221,4 @@ lazy_static! {
&["type"],
exponential_buckets(0.001, 1.59, 20).unwrap() // max 10s
).unwrap();

pub static ref KEY_NOT_IN_REGION: IntCounter = register_int_counter!(
"tikv_key_not_in_region",
"Counter of key not in region"
)
.unwrap();
}
11 changes: 6 additions & 5 deletions src/raftstore/store/region_snapshot.rs
Expand Up @@ -19,9 +19,8 @@ use std::sync::Arc;
use crate::raftstore::store::engine::{IterOption, Peekable, Snapshot, SyncSnapshot};
use crate::raftstore::store::{keys, util, PeerStorage};
use crate::raftstore::Result;
use crate::util::{panic_when_key_exceed_bound, set_panic_mark};

use super::metrics::*;
use crate::util::metrics::CRITICAL_ERROR;
use crate::util::{panic_when_unexpected_key_or_data, set_panic_mark};

/// Snapshot of a region.
///
Expand Down Expand Up @@ -310,8 +309,10 @@ impl RegionIterator {
#[inline]
pub fn should_seekable(&self, key: &[u8]) -> Result<()> {
if let Err(e) = util::check_key_in_region_inclusive(key, &self.region) {
KEY_NOT_IN_REGION.inc();
if panic_when_key_exceed_bound() {
CRITICAL_ERROR
.with_label_values(&["key not in region"])
.inc();
if panic_when_unexpected_key_or_data() {
set_panic_mark();
panic!("key exceed bound: {:?}", e);
} else {
Expand Down
35 changes: 35 additions & 0 deletions src/storage/mvcc/mod.rs
Expand Up @@ -22,7 +22,10 @@ pub use self::reader::MvccReader;
pub use self::reader::{Scanner, ScannerBuilder};
pub use self::txn::{MvccTxn, MAX_TXN_WRITE_SIZE};
pub use self::write::{Write, WriteType};

use crate::util::escape;
use crate::util::metrics::CRITICAL_ERROR;
use crate::util::{panic_when_unexpected_key_or_data, set_panic_mark};
use std::error;
use std::io;

Expand Down Expand Up @@ -71,6 +74,10 @@ quick_error! {
description("already exists")
display("key {:?} already exists", escape(key))
}
DefaultNotFound { key: Vec<u8>, write: Write } {
description("write cf corresponding value not found in default cf")
display("default not found: key:{:?}, write:{:?}, maybe read truncated/dropped table data?", escape(key), write)
}
KeyVersion {description("bad format key(version)")}
Other(err: Box<dyn error::Error + Sync + Send>) {
from()
Expand Down Expand Up @@ -122,6 +129,10 @@ impl Error {
primary: primary.to_owned(),
}),
Error::AlreadyExist { ref key } => Some(Error::AlreadyExist { key: key.clone() }),
Error::DefaultNotFound { ref key, ref write } => Some(Error::DefaultNotFound {
key: key.to_owned(),
write: write.clone(),
}),
Error::KeyVersion => Some(Error::KeyVersion),
Error::Committed { commit_ts } => Some(Error::Committed { commit_ts }),
Error::Io(_) | Error::Other(_) => None,
Expand All @@ -131,6 +142,30 @@ impl Error {

pub type Result<T> = std::result::Result<T, Error>;

/// Generates `DefaultNotFound` error or panic directly based on config.
pub fn default_not_found_error(key: Vec<u8>, write: Write, hint: &str) -> Error {
CRITICAL_ERROR
.with_label_values(&["default value not found"])
.inc();
if panic_when_unexpected_key_or_data() {
set_panic_mark();
panic!(
"default value not found for key {:?}, write: {:?} when {}",
hex::encode_upper(&key),
write,
hint,
);
} else {
error!(
"default value not found";
"key" => log_wrappers::Key(&key),
"write" => ?write,
"hint" => hint,
);
Error::DefaultNotFound { key, write }
}
}

#[cfg(test)]
pub mod tests {
use kvproto::kvrpcpb::{Context, IsolationLevel};
Expand Down
27 changes: 14 additions & 13 deletions src/storage/mvcc/reader/reader.rs
Expand Up @@ -13,13 +13,13 @@

use crate::raftstore::store::engine::IterOption;
use crate::storage::engine::{Cursor, ScanMode, Snapshot, Statistics};
use crate::storage::mvcc::default_not_found_error;
use crate::storage::mvcc::lock::{Lock, LockType};
use crate::storage::mvcc::write::{Write, WriteType};
use crate::storage::mvcc::{Error, Result};
use crate::storage::{Key, Value, CF_LOCK, CF_WRITE};
use crate::util::rocksdb_util::properties::MvccProperties;
use kvproto::kvrpcpb::IsolationLevel;
use std::u64;

const GC_MAX_ROW_VERSIONS_THRESHOLD: u64 = 100;

Expand Down Expand Up @@ -77,9 +77,9 @@ impl<S: Snapshot> MvccReader<S> {
self.key_only = key_only;
}

pub fn load_data(&mut self, key: &Key, ts: u64) -> Result<Value> {
pub fn load_data(&mut self, key: &Key, ts: u64) -> Result<Option<Value>> {
if self.key_only {
return Ok(vec![]);
return Ok(Some(vec![]));
}
if self.scan_mode.is_some() && self.data_cursor.is_none() {
let iter_opt = IterOption::new(None, None, self.fill_cache);
Expand All @@ -88,16 +88,12 @@ impl<S: Snapshot> MvccReader<S> {

let k = key.clone().append_ts(ts);
let res = if let Some(ref mut cursor) = self.data_cursor {
match cursor.get(&k, &mut self.statistics.data)? {
None => panic!("key {} not found, ts {}", key, ts),
Some(v) => v.to_vec(),
}
cursor
.get(&k, &mut self.statistics.data)?
.map(|v| v.to_vec())
} else {
self.statistics.data.get += 1;
match self.snapshot.get(&k)? {
None => panic!("key {} not found, ts: {}", key, ts),
Some(v) => v,
}
self.snapshot.get(&k)?
};

self.statistics.data.processed += 1;
Expand Down Expand Up @@ -205,7 +201,7 @@ impl<S: Snapshot> MvccReader<S> {
return Ok(ts);
}

if ts == u64::MAX && key.to_raw()? == lock.primary {
if ts == std::u64::MAX && key.to_raw()? == lock.primary {
// when ts==u64::MAX(which means to get latest committed version for
// primary key),and current key is the primary key, returns the latest
// commit version's value
Expand Down Expand Up @@ -234,7 +230,12 @@ impl<S: Snapshot> MvccReader<S> {
}
return Ok(write.short_value.take());
}
return self.load_data(key, write.start_ts).map(Some);
match self.load_data(key, write.start_ts)? {
None => {
return Err(default_not_found_error(key.to_raw()?, write, "get"));
}
Some(v) => return Ok(Some(v)),
}
}
Ok(None)
}
Expand Down
23 changes: 19 additions & 4 deletions src/storage/mvcc/reader/scanner/util.rs
Expand Up @@ -11,6 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::storage::mvcc::default_not_found_error;
use crate::storage::mvcc::{Error, Result};
use crate::storage::mvcc::{Lock, LockType, Write};
use crate::storage::{Cursor, Iterator, Key, Statistics, Value};
Expand Down Expand Up @@ -81,8 +82,15 @@ where
assert!(write.short_value.is_none());
let seek_key = user_key.clone().append_ts(write.start_ts);
default_cursor.near_seek(&seek_key, &mut statistics.data)?;
assert!(default_cursor.valid());
assert!(default_cursor.key(&mut statistics.data) == seek_key.as_encoded().as_slice());
if !default_cursor.valid()
|| default_cursor.key(&mut statistics.data) != seek_key.as_encoded().as_slice()
{
return Err(default_not_found_error(
user_key.to_raw()?,
write,
"near_load_data_by_write",
));
}
statistics.data.processed += 1;
Ok(default_cursor.value(&mut statistics.data).to_vec())
}
Expand All @@ -101,8 +109,15 @@ where
assert!(write.short_value.is_none());
let seek_key = user_key.clone().append_ts(write.start_ts);
default_cursor.near_seek_for_prev(&seek_key, &mut statistics.data)?;
assert!(default_cursor.valid());
assert!(default_cursor.key(&mut statistics.data) == seek_key.as_encoded().as_slice());
if !default_cursor.valid()
|| default_cursor.key(&mut statistics.data) != seek_key.as_encoded().as_slice()
{
return Err(default_not_found_error(
user_key.to_raw()?,
write,
"near_reverse_load_data_by_write",
));
}
statistics.data.processed += 1;
Ok(default_cursor.value(&mut statistics.data).to_vec())
}
11 changes: 10 additions & 1 deletion src/util/metrics/mod.rs
Expand Up @@ -14,7 +14,7 @@
use std::thread;
use std::time::Duration;

use prometheus::{self, Encoder, TextEncoder};
use prometheus::*;

#[cfg(target_os = "linux")]
mod threads_linux;
Expand Down Expand Up @@ -71,3 +71,12 @@ pub fn dump() -> String {
}
String::from_utf8(buffer).unwrap()
}

lazy_static! {
pub static ref CRITICAL_ERROR: IntCounterVec = register_int_counter_vec!(
"tikv_critical_error_total",
"Counter of critical error.",
&["type"]
)
.unwrap();
}
10 changes: 5 additions & 5 deletions src/util/mod.rs
Expand Up @@ -44,14 +44,14 @@ pub mod time;
pub mod timer;
pub mod worker;

static PANIC_WHEN_KEY_EXCEED_BOUND: AtomicBool = AtomicBool::new(false);
static PANIC_WHEN_UNEXPECTED_KEY_OR_DATA: AtomicBool = AtomicBool::new(false);

pub fn panic_when_key_exceed_bound() -> bool {
PANIC_WHEN_KEY_EXCEED_BOUND.load(Ordering::SeqCst)
pub fn panic_when_unexpected_key_or_data() -> bool {
PANIC_WHEN_UNEXPECTED_KEY_OR_DATA.load(Ordering::SeqCst)
}

pub fn set_panic_when_key_exceed_bound(flag: bool) {
PANIC_WHEN_KEY_EXCEED_BOUND.store(flag, Ordering::SeqCst);
pub fn set_panic_when_unexpected_key_or_data(flag: bool) {
PANIC_WHEN_UNEXPECTED_KEY_OR_DATA.store(flag, Ordering::SeqCst);
}

static PANIC_MARK: AtomicBool = AtomicBool::new(false);
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/config/mod.rs
Expand Up @@ -497,7 +497,7 @@ fn test_serde_custom_tikv_config() {
stream_channel_window: 123,
max_open_engines: 2,
};
value.panic_when_key_exceed_bound = true;
value.panic_when_unexpected_key_or_data = true;

let custom = read_file_in_project_dir("tests/integrations/config/test-custom.toml");
let load = toml::from_str(&custom).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/config/test-custom.toml
@@ -1,7 +1,7 @@
log-level = "debug"
log-file = "foo"
log-rotation-timespan = "1d"
panic-when-key-exceed-bound = true
panic-when-unexpected-key-or-data = true

[readpool.storage]
high-concurrency = 1
Expand Down

0 comments on commit 95a2f9a

Please sign in to comment.