Skip to content

Commit

Permalink
scheduler, mvcc: limit writebatch size for ResolveLock and GC command. (
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing committed Nov 15, 2016
2 parents d64ed12 + c86056d commit 978f3bb
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 39 deletions.
2 changes: 1 addition & 1 deletion src/storage/mvcc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod write;
mod metrics;

use std::io;
pub use self::txn::MvccTxn;
pub use self::txn::{MvccTxn, MAX_TXN_WRITE_SIZE};
pub use self::reader::MvccReader;
pub use self::lock::{Lock, LockType};
pub use self::write::{Write, WriteType};
Expand Down
85 changes: 72 additions & 13 deletions src/storage/mvcc/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ use super::write::{WriteType, Write};
use super::{Error, Result};
use super::metrics::*;

pub const MAX_TXN_WRITE_SIZE: usize = 512 * 1024;

pub struct MvccTxn<'a> {
reader: MvccReader<'a>,
start_ts: u64,
writes: Vec<Modify>,
write_size: usize,
}

impl<'a> fmt::Debug for MvccTxn<'a> {
Expand All @@ -39,22 +42,53 @@ impl<'a> MvccTxn<'a> {
reader: MvccReader::new(snapshot, mode, true /* fill_cache */),
start_ts: start_ts,
writes: vec![],
write_size: 0,
}
}

pub fn modifies(self) -> Vec<Modify> {
self.writes
}

pub fn write_size(&self) -> usize {
self.write_size
}

fn lock_key(&mut self, key: Key, lock_type: LockType, primary: Vec<u8>) {
let lock = Lock::new(lock_type, primary, self.start_ts);
self.writes.push(Modify::Put(CF_LOCK, key, lock.to_bytes()));
let lock = Lock::new(lock_type, primary, self.start_ts).to_bytes();
self.write_size += CF_LOCK.len() + key.encoded().len() + lock.len();
self.writes.push(Modify::Put(CF_LOCK, key, lock));
}

fn unlock_key(&mut self, key: Key) {
self.write_size += CF_LOCK.len() + key.encoded().len();
self.writes.push(Modify::Delete(CF_LOCK, key));
}

fn put_value(&mut self, key: &Key, ts: u64, value: Value) {
let key = key.append_ts(ts);
self.write_size += key.encoded().len() + value.len();
self.writes.push(Modify::Put(CF_DEFAULT, key, value));
}

fn delete_value(&mut self, key: &Key, ts: u64) {
let key = key.append_ts(ts);
self.write_size += key.encoded().len();
self.writes.push(Modify::Delete(CF_DEFAULT, key));
}

fn put_write(&mut self, key: &Key, ts: u64, value: Value) {
let key = key.append_ts(ts);
self.write_size += CF_WRITE.len() + key.encoded().len() + value.len();
self.writes.push(Modify::Put(CF_WRITE, key, value));
}

fn delete_write(&mut self, key: &Key, ts: u64) {
let key = key.append_ts(ts);
self.write_size += CF_WRITE.len() + key.encoded().len();
self.writes.push(Modify::Delete(CF_WRITE, key));
}

pub fn get(&mut self, key: &Key) -> Result<Option<Value>> {
self.reader.get(key, self.start_ts)
}
Expand Down Expand Up @@ -82,8 +116,8 @@ impl<'a> MvccTxn<'a> {
primary.to_vec());

if let Mutation::Put((_, ref value)) = mutation {
let value_key = key.append_ts(self.start_ts);
self.writes.push(Modify::Put(CF_DEFAULT, value_key, value.clone()));
let ts = self.start_ts;
self.put_value(key, ts, value.clone());
}
Ok(())
}
Expand All @@ -107,16 +141,15 @@ impl<'a> MvccTxn<'a> {
}
};
let write = Write::new(WriteType::from_lock_type(lock_type), self.start_ts);
self.writes.push(Modify::Put(CF_WRITE, key.append_ts(commit_ts), write.to_bytes()));
self.put_write(key, commit_ts, write.to_bytes());
self.unlock_key(key.clone());
Ok(())
}

pub fn rollback(&mut self, key: &Key) -> Result<()> {
match try!(self.reader.load_lock(key)) {
Some(ref lock) if lock.ts == self.start_ts => {
let data_key = key.append_ts(lock.ts);
self.writes.push(Modify::Delete(CF_DEFAULT, data_key));
self.delete_value(key, lock.ts);
}
_ => {
return match try!(self.reader.get_txn_commit_ts(key, self.start_ts)) {
Expand All @@ -133,9 +166,9 @@ impl<'a> MvccTxn<'a> {
};
}
}
self.writes.push(Modify::Put(CF_WRITE,
key.append_ts(self.start_ts),
Write::new(WriteType::Rollback, self.start_ts).to_bytes()));
let write = Write::new(WriteType::Rollback, self.start_ts);
let ts = self.start_ts;
self.put_write(key, ts, write.to_bytes());
self.unlock_key(key.clone());
Ok(())
}
Expand All @@ -146,6 +179,9 @@ impl<'a> MvccTxn<'a> {
let mut versions = 0;
let mut delete_versions = 0;
while let Some((commit, write)) = try!(self.reader.seek_write(key, ts)) {
if self.write_size >= MAX_TXN_WRITE_SIZE {
break;
}
if !remove_older {
if commit <= safe_point {
// Set `remove_older` after we find the latest value.
Expand All @@ -160,16 +196,16 @@ impl<'a> MvccTxn<'a> {
// Rollback or Lock.
match write.write_type {
WriteType::Delete | WriteType::Rollback | WriteType::Lock => {
self.writes.push(Modify::Delete(CF_WRITE, key.append_ts(commit)));
self.delete_write(key, commit);
delete_versions += 1;
}
WriteType::Put => {}
}
}
} else {
self.writes.push(Modify::Delete(CF_WRITE, key.append_ts(commit)));
self.delete_write(key, commit);
if write.write_type == WriteType::Put {
self.writes.push(Modify::Delete(CF_DEFAULT, key.append_ts(write.start_ts)));
self.delete_value(key, write.start_ts);
}
delete_versions += 1;
}
Expand Down Expand Up @@ -429,6 +465,29 @@ mod tests {
must_scan_keys(engine.as_ref(), Some(b"c"), 1, vec![b"c"], Some(b"c"));
}

#[test]
fn test_write_size() {
let engine = engine::new_local_engine(TEMP_DIR, ALL_CFS).unwrap();
let ctx = Context::new();
let snapshot = engine.snapshot(&ctx).unwrap();
let mut txn = MvccTxn::new(snapshot.as_ref(), 10, None);
let key = make_key(b"key");
assert_eq!(txn.write_size, 0);

assert!(txn.get(&key).unwrap().is_none());
assert_eq!(txn.write_size, 0);

txn.prewrite(Mutation::Put((key.clone(), b"value".to_vec())), b"pk").unwrap();
assert!(txn.write_size() > 0);
engine.write(&ctx, txn.modifies()).unwrap();

let snapshot = engine.snapshot(&ctx).unwrap();
let mut txn = MvccTxn::new(snapshot.as_ref(), 10, None);
txn.commit(&key, 15).unwrap();
assert!(txn.write_size() > 0);
engine.write(&ctx, txn.modifies()).unwrap();
}

fn must_get(engine: &Engine, key: &[u8], ts: u64, expect: &[u8]) {
let ctx = Context::new();
let snapshot = engine.snapshot(&ctx).unwrap();
Expand Down
24 changes: 14 additions & 10 deletions src/storage/txn/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use prometheus::HistogramTimer;
use storage::{Engine, Command, Snapshot, StorageCb, Result as StorageResult,
Error as StorageError, ScanMode};
use kvproto::kvrpcpb::{Context, LockInfo};
use storage::mvcc::{MvccTxn, MvccReader, Error as MvccError};
use storage::mvcc::{MvccTxn, MvccReader, Error as MvccError, MAX_TXN_WRITE_SIZE};
use storage::{Key, Value, KvPair};
use std::collections::HashMap;
use mio::{self, EventLoop};
Expand Down Expand Up @@ -421,17 +421,16 @@ fn process_write_impl(cid: u64,
(pr, txn.modifies())
}
Command::ResolveLock { ref ctx, start_ts, commit_ts, ref mut scan_key, ref keys } => {
let mut scan_key = scan_key.take();
let mut txn = MvccTxn::new(snapshot, start_ts, None);
match commit_ts {
Some(ts) => {
for k in keys {
try!(txn.commit(&k, ts));
}
for k in keys {
match commit_ts {
Some(ts) => try!(txn.commit(&k, ts)),
None => try!(txn.rollback(&k)),
}
None => {
for k in keys {
try!(txn.rollback(&k));
}
if txn.write_size() >= MAX_TXN_WRITE_SIZE {
scan_key = Some(k.to_owned());
break;
}
}
if scan_key.is_none() {
Expand All @@ -450,9 +449,14 @@ fn process_write_impl(cid: u64,
}
}
Command::Gc { ref ctx, safe_point, ref mut scan_key, ref keys } => {
let mut scan_key = scan_key.take();
let mut txn = MvccTxn::new(snapshot, 0, Some(ScanMode::Mixed));
for k in keys {
try!(txn.gc(k, safe_point));
if txn.write_size() >= MAX_TXN_WRITE_SIZE {
scan_key = Some(k.to_owned());
break;
}
}
if scan_key.is_none() {
(ProcessResult::Res, txn.modifies())
Expand Down
46 changes: 31 additions & 15 deletions tests/storage/test_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use super::sync_storage::SyncStorage;
use kvproto::kvrpcpb::{Context, LockInfo};
use tikv::storage::{Mutation, Key, KvPair, make_key};
use tikv::storage::txn::{GC_BATCH_SIZE, RESOLVE_LOCK_BATCH_SIZE};
use tikv::storage::mvcc::MAX_TXN_WRITE_SIZE;

#[derive(Clone)]
struct AssertionStorage(SyncStorage);
Expand Down Expand Up @@ -467,19 +468,20 @@ fn test_txn_store_resolve_lock() {
store.scan_lock_ok(30, vec![]);
}

fn test_txn_store_resolve_lock_batch(n: usize) {
fn test_txn_store_resolve_lock_batch(key_prefix_len: usize, n: usize) {
let prefix = String::from_utf8(vec![b'k'; key_prefix_len]).unwrap();
let keys: Vec<String> = (0..n).map(|i| format!("{}{}", prefix, i)).collect();

let store = new_assertion_storage();
for i in 0..n {
let key = format!("k{}", i);
store.prewrite_ok(vec![Mutation::Put((make_key(&key.as_bytes()), b"v".to_vec()))],
for k in &keys {
store.prewrite_ok(vec![Mutation::Put((make_key(&k.as_bytes()), b"v".to_vec()))],
b"k1",
5);
}
store.resolve_lock_ok(5, Some(10));
for i in 0..n {
let key = format!("k{}", i);
store.get_ok(key.as_bytes(), 30, b"v");
store.get_none(key.as_bytes(), 8);
for k in &keys {
store.get_ok(k.as_bytes(), 30, b"v");
store.get_none(k.as_bytes(), 8);
}
}

Expand All @@ -491,7 +493,11 @@ fn test_txn_store_resolve_lock2() {
RESOLVE_LOCK_BATCH_SIZE,
RESOLVE_LOCK_BATCH_SIZE + 1,
RESOLVE_LOCK_BATCH_SIZE * 2] {
test_txn_store_resolve_lock_batch(i);
test_txn_store_resolve_lock_batch(1, i);
}

for &i in &[1, MAX_TXN_WRITE_SIZE / 2, MAX_TXN_WRITE_SIZE + 1] {
test_txn_store_resolve_lock_batch(i, 3);
}
}

Expand All @@ -505,19 +511,29 @@ fn test_txn_store_gc() {
store.get_ok(b"k", 25, b"v2");
}

fn test_txn_store_gc_multiple_keys(n: usize) {
fn test_txn_store_gc_multiple_keys(key_prefix_len: usize, n: usize) {
let prefix = String::from_utf8(vec![b'k'; key_prefix_len]).unwrap();
let keys: Vec<String> = (0..n).map(|i| format!("{}{}", prefix, i)).collect();

let store = new_assertion_storage();
for i in 0..n {
let key = format!("k{}", i);
store.put_ok(key.as_bytes(), b"value", 5, 10);
for k in &keys {
store.put_ok(k.as_bytes(), b"v1", 5, 10);
store.put_ok(k.as_bytes(), b"v2", 15, 20);
}
store.gc_ok(30);
for k in &keys {
store.get_none(k.as_bytes(), 15);
}
store.gc_ok(20);
}

#[test]
fn test_txn_store_gc2() {
for &i in &[0, 1, GC_BATCH_SIZE - 1, GC_BATCH_SIZE, GC_BATCH_SIZE + 1, GC_BATCH_SIZE * 2] {
test_txn_store_gc_multiple_keys(i);
test_txn_store_gc_multiple_keys(1, i);
}

for &i in &[1, MAX_TXN_WRITE_SIZE / 2, MAX_TXN_WRITE_SIZE + 1] {
test_txn_store_gc_multiple_keys(i, 3);
}
}

Expand Down

0 comments on commit 978f3bb

Please sign in to comment.