Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
storage: limit key size. (#2445)
  • Loading branch information
disksing authored and siddontang committed Nov 2, 2017
1 parent 1084f94 commit 5126bff
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/storage/config.rs
Expand Up @@ -20,6 +20,7 @@ use util::config::{self, ReadableSize};
pub const DEFAULT_DATA_DIR: &'static str = "";
pub const DEFAULT_ROCKSDB_SUB_DIR: &'static str = "db";
const DEFAULT_GC_RATIO_THRESHOLD: f64 = 1.1;
const DEFAULT_MAX_KEY_SIZE: usize = 4 * 1024;
const DEFAULT_SCHED_CAPACITY: usize = 10240;
const DEFAULT_SCHED_MSG_PER_TICK: usize = 1024;
const DEFAULT_SCHED_CONCURRENCY: usize = 102400;
Expand All @@ -36,6 +37,7 @@ const DEFAULT_SCHED_PENDING_WRITE_MB: u64 = 100;
pub struct Config {
pub data_dir: String,
pub gc_ratio_threshold: f64,
pub max_key_size: usize,
pub scheduler_notify_capacity: usize,
pub scheduler_messages_per_tick: usize,
pub scheduler_concurrency: usize,
Expand All @@ -49,6 +51,7 @@ impl Default for Config {
Config {
data_dir: DEFAULT_DATA_DIR.to_owned(),
gc_ratio_threshold: DEFAULT_GC_RATIO_THRESHOLD,
max_key_size: DEFAULT_MAX_KEY_SIZE,
scheduler_notify_capacity: DEFAULT_SCHED_CAPACITY,
scheduler_messages_per_tick: DEFAULT_SCHED_MSG_PER_TICK,
scheduler_concurrency: DEFAULT_SCHED_CONCURRENCY,
Expand Down
22 changes: 22 additions & 0 deletions src/storage/mod.rs
Expand Up @@ -493,6 +493,7 @@ pub struct Storage {

// Storage configurations.
gc_ratio_threshold: f64,
max_key_size: usize,
}

impl Storage {
Expand All @@ -509,6 +510,7 @@ impl Storage {
receiver: Some(rx),
})),
gc_ratio_threshold: config.gc_ratio_threshold,
max_key_size: config.max_key_size,
})
}

Expand Down Expand Up @@ -653,6 +655,13 @@ impl Storage {
options: Options,
callback: Callback<Vec<Result<()>>>,
) -> Result<()> {
for m in &mutations {
let size = m.key().encoded().len();
if size > self.max_key_size {
callback(Err(Error::KeyTooLarge(size, self.max_key_size)));
return Ok(());
}
}
let cmd = Command::Prewrite {
ctx: ctx,
mutations: mutations,
Expand Down Expand Up @@ -827,6 +836,10 @@ impl Storage {
value: Vec<u8>,
callback: Callback<()>,
) -> Result<()> {
if key.len() > self.max_key_size {
callback(Err(Error::KeyTooLarge(key.len(), self.max_key_size)));
return Ok(());
}
try!(self.engine
.async_write(&ctx,
vec![Modify::Put(CF_DEFAULT, Key::from_encoded(key), value)],
Expand All @@ -843,6 +856,10 @@ impl Storage {
key: Vec<u8>,
callback: Callback<()>,
) -> Result<()> {
if key.len() > self.max_key_size {
callback(Err(Error::KeyTooLarge(key.len(), self.max_key_size)));
return Ok(());
}
self.engine.async_write(
&ctx,
vec![Modify::Delete(CF_DEFAULT, Key::from_encoded(key))],
Expand Down Expand Up @@ -908,6 +925,7 @@ impl Clone for Storage {
sendch: self.sendch.clone(),
handle: self.handle.clone(),
gc_ratio_threshold: self.gc_ratio_threshold,
max_key_size: self.max_key_size,
}
}
}
Expand Down Expand Up @@ -946,6 +964,10 @@ quick_error! {
SchedTooBusy {
description("scheduler is too busy")
}
KeyTooLarge(size: usize, limit: usize) {
description("max key size exceeded")
display("max key size exceeded, size: {}, limit: {}", size, limit)
}
}
}

Expand Down
1 change: 1 addition & 0 deletions tests/config/mod.rs
Expand Up @@ -310,6 +310,7 @@ fn test_serde_custom_tikv_config() {
value.storage = StorageConfig {
data_dir: "/var".to_owned(),
gc_ratio_threshold: 1.2,
max_key_size: 8192,
scheduler_notify_capacity: 123,

scheduler_messages_per_tick: 123,
Expand Down
1 change: 1 addition & 0 deletions tests/config/test-custom.toml
Expand Up @@ -20,6 +20,7 @@ a = "b"
[storage]
data-dir = "/var"
gc-ratio-threshold = 1.2
max-key-size = 8192
scheduler-notify-capacity = 123
scheduler-messages-per-tick = 123
scheduler-concurrency = 123
Expand Down
20 changes: 18 additions & 2 deletions tests/storage/assert_storage.rs
Expand Up @@ -300,6 +300,12 @@ impl AssertionStorage {
.unwrap();
}

pub fn prewrite_err(&self, mutations: Vec<Mutation>, primary: &[u8], start_ts: u64) {
self.store
.prewrite(self.ctx.clone(), mutations, primary.to_vec(), start_ts)
.unwrap_err();
}

pub fn prewrite_locked(
&self,
mutations: Vec<Mutation>,
Expand Down Expand Up @@ -425,10 +431,20 @@ impl AssertionStorage {
self.store.raw_put(self.ctx.clone(), key, value).unwrap();
}

pub fn raw_put_err(&self, key: Vec<u8>, value: Vec<u8>) {
self.store
.raw_put(self.ctx.clone(), key, value)
.unwrap_err();
}

pub fn raw_delete_ok(&self, key: Vec<u8>) {
self.store.raw_delete(self.ctx.clone(), key).unwrap()
}

pub fn raw_delete_err(&self, key: Vec<u8>) {
self.store.raw_delete(self.ctx.clone(), key).unwrap_err();
}

pub fn raw_scan_ok(&self, start_key: Vec<u8>, limit: usize, expect: Vec<(&[u8], &[u8])>) {
let result: Vec<KvPair> = self.store
.raw_scan(self.ctx.clone(), start_key, limit)
Expand All @@ -454,7 +470,7 @@ impl AssertionStorage {

pub fn test_txn_store_gc3(&self, key_prefix: u8) {
let key_len = 10_000;
let key = vec![key_prefix; 10_000];
let key = vec![key_prefix; 1024];
for k in 1u64..(MAX_TXN_WRITE_SIZE / key_len * 2) as u64 {
self.put_ok(&key, b"", k * 10, k * 10 + 5);
}
Expand All @@ -470,7 +486,7 @@ impl AssertionStorage {
key_prefix: u8,
) {
let key_len = 10_000;
let key = vec![key_prefix; 10_000];
let key = vec![key_prefix; 1024];
for k in 1u64..(MAX_TXN_WRITE_SIZE / key_len * 2) as u64 {
self.put_ok_for_cluster(cluster, &key, b"", k * 10, k * 10 + 5);
}
Expand Down
26 changes: 23 additions & 3 deletions tests/storage/test_storage.rs
Expand Up @@ -404,8 +404,8 @@ fn test_txn_store_resolve_lock2() {
test_txn_store_resolve_lock_batch(1, i);
}

for &i in &[1, MAX_TXN_WRITE_SIZE / 2, MAX_TXN_WRITE_SIZE + 1] {
test_txn_store_resolve_lock_batch(i, 3);
for &i in &[1, 512, 1024] {
test_txn_store_resolve_lock_batch(i, 50);
}
}

Expand Down Expand Up @@ -506,7 +506,7 @@ fn test_txn_store_gc2_with_many_keys() {

#[test]
fn test_txn_store_gc2_with_long_key_prefix() {
test_txn_store_gc_multiple_keys(MAX_TXN_WRITE_SIZE + 1, 3);
test_txn_store_gc_multiple_keys(1024, MAX_TXN_WRITE_SIZE / 1024 * 3);
}

#[test]
Expand Down Expand Up @@ -545,6 +545,26 @@ fn test_txn_store_rawkv() {
store.raw_scan_ok(b"k5".to_vec(), 1, vec![]);
}

#[test]
fn test_txn_storage_keysize() {
let store = AssertionStorage::default();
let long_key = vec![b'x'; 10240];
store.raw_put_ok(b"short_key".to_vec(), b"v".to_vec());
store.raw_put_err(long_key.clone(), b"v".to_vec());
store.raw_delete_ok(b"short_key".to_vec());
store.raw_delete_err(long_key.clone());
store.prewrite_ok(
vec![Mutation::Put((make_key(b"short_key"), b"v".to_vec()))],
b"short_key",
1,
);
store.prewrite_err(
vec![Mutation::Put((make_key(&long_key), b"v".to_vec()))],
b"short_key",
1,
);
}

#[test]
fn test_txn_store_lock_primary() {
let store = AssertionStorage::default();
Expand Down

0 comments on commit 5126bff

Please sign in to comment.