Skip to content

Commit

Permalink
*: add columns support.
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing committed Jun 15, 2016
1 parent 2fc3f29 commit 9c2c12d
Show file tree
Hide file tree
Showing 12 changed files with 912 additions and 1,065 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 7 additions & 10 deletions benches/bin/mvcc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use test::BenchSamples;
use tempdir::TempDir;

use test_util::*;
use tikv::storage::{self, Dsn, Mutation, Key};
use tikv::storage::{self, Dsn};
use tikv::storage::txn::TxnStore;
use tikv::storage::mvcc::TEST_TS_BASE;
use tikv::storage::mvcc::{self, TEST_TS_BASE};
use kvproto::kvrpcpb::Context;

use super::print_result;
Expand All @@ -36,24 +36,21 @@ fn bench_tombstone_scan(dsn: Dsn) -> BenchSamples {
for (k, v) in kvs.take(100000) {
let mut ts = ts_generator.next().unwrap();
store.prewrite(Context::new(),
vec![Mutation::Put((Key::from_raw(&k), v))],
vec![mvcc::default_put(&k, &v)],
k.clone(),
ts)
.expect("");
store.commit(Context::new(),
vec![Key::from_raw(&k)],
vec![k.clone()],
ts,
ts_generator.next().unwrap())
.expect("");

ts = ts_generator.next().unwrap();
store.prewrite(Context::new(),
vec![Mutation::Delete(Key::from_raw(&k))],
k.clone(),
ts)
store.prewrite(Context::new(), vec![mvcc::default_del(&k)], k.clone(), ts)
.expect("");
store.commit(Context::new(),
vec![Key::from_raw(&k)],
vec![k.clone()],
ts,
ts_generator.next().unwrap())
.expect("");
Expand All @@ -63,7 +60,7 @@ fn bench_tombstone_scan(dsn: Dsn) -> BenchSamples {
bench!{
let (k, _) = kvs.next().unwrap();
assert!(store.scan(Context::new(),
Key::from_raw(&k),
mvcc::default_row(&k),
1,
ts_generator.next().unwrap())
.unwrap()
Expand Down
43 changes: 23 additions & 20 deletions src/server/coprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ use byteorder::{BigEndian, ReadBytesExt};
use threadpool::ThreadPool;

use storage::{Engine, SnapshotStore, engine, txn, mvcc};
use kvproto::kvrpcpb::LockInfo;
use kvproto::kvpb::LockInfo;
use kvproto::msgpb::{MessageType, Message};
use kvproto::coprocessor::{Request, Response, KeyRange};
use kvproto::errorpb;
use storage::{Snapshot, Key};
use storage::Snapshot;
use util::codec::number::NumberDecoder;
use util::codec::datum::DatumDecoder;
use util::codec::{Datum, table, datum, mysql};
Expand Down Expand Up @@ -86,9 +86,9 @@ impl From<txn::Error> for Error {
match e {
txn::Error::Mvcc(mvcc::Error::KeyIsLocked { primary, ts, key }) => {
let mut info = LockInfo::new();
info.set_primary_lock(primary);
info.set_lock_version(ts);
info.set_key(key);
info.set_primary(primary);
info.set_ts(ts);
info.set_row(key);
Error::Locked(info)
}
_ => Error::Other(box e),
Expand Down Expand Up @@ -406,7 +406,8 @@ impl<'a> SelectContext<'a> {
return Ok(rows);
}
if is_point(&range) {
if let None = try!(self.snap.get(&Key::from_raw(range.get_start()))) {
let row_value = try!(self.snap.get(mvcc::default_row(range.get_start())));
if mvcc::row_value_empty(&row_value) {
return Ok(rows);
}
try!(self.load_row_with_key(range.get_start(), &mut rows));
Expand All @@ -418,13 +419,14 @@ impl<'a> SelectContext<'a> {
};
let mut scanner = try!(self.snap.scanner());
while limit > rows.len() {
let key = if desc {
try!(scanner.reverse_seek(Key::from_raw(&seek_key)))
let mut kv_row = mvcc::default_row(&seek_key);
let row_value = if desc {
try!(scanner.reverse_seek(&mut kv_row))
} else {
try!(scanner.seek(Key::from_raw(&seek_key)))
try!(scanner.seek(&mut kv_row))
};
let key = match key {
Some((key, _)) => box_try!(key.raw()),
let key = match row_value {
Some(x) => x.get_row_key().to_vec(),
None => break,
};
if range.get_start() > &key || range.get_end() <= &key {
Expand Down Expand Up @@ -460,8 +462,8 @@ impl<'a> SelectContext<'a> {
}
} else {
let key = table::encode_column_key(t_id, h, col_id);
let data = try!(self.snap.get(&Key::from_raw(&key)));
let value = match data {
let data = try!(self.snap.get(mvcc::default_row(&key)));
let value = match mvcc::default_row_value(&data) {
None if mysql::has_not_null_flag(col.get_flag() as u64) => {
return Err(box_err!("key {} not exists", escape(&key)));
}
Expand Down Expand Up @@ -499,8 +501,8 @@ impl<'a> SelectContext<'a> {
row.mut_data().extend(bytes);
} else {
let raw_key = table::encode_column_key(tid, h, col.get_column_id());
let key = Key::from_raw(&raw_key);
match try!(self.snap.get(&key)) {
let data = try!(self.snap.get(mvcc::default_row(&raw_key)));
match mvcc::default_row_value(&data) {
None if mysql::has_not_null_flag(col.get_flag() as u64) => {
return Err(box_err!("key {} not exists", escape(&raw_key)));
}
Expand Down Expand Up @@ -543,13 +545,14 @@ impl<'a> SelectContext<'a> {
};
let mut scanner = try!(self.snap.scanner());
while rows.len() < limit {
let nk = if desc {
try!(scanner.reverse_seek(Key::from_raw(&seek_key)))
let mut kv_row = mvcc::default_row(&seek_key);
let row_value = if desc {
try!(scanner.reverse_seek(&mut kv_row))
} else {
try!(scanner.seek(Key::from_raw(&seek_key)))
try!(scanner.seek(&mut kv_row))
};
let (key, val) = match nk {
Some((key, val)) => (box_try!(key.raw()), val),
let (key, val) = match row_value {
Some(x) => (x.get_row_key().to_vec(), mvcc::default_row_value(&x).unwrap()),
None => break,
};
if r.get_start() > &key || r.get_end() <= &key {
Expand Down

0 comments on commit 9c2c12d

Please sign in to comment.