Skip to content

Commit

Permalink
Merge branch 'master' into qupeng/expr-math
Browse files Browse the repository at this point in the history
  • Loading branch information
hicqu committed Aug 18, 2017
2 parents ac1261d + d82059f commit 74ea718
Show file tree
Hide file tree
Showing 19 changed files with 107 additions and 41 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 3 additions & 6 deletions circle.yml
Expand Up @@ -63,14 +63,11 @@ dependencies:
post:
# check format first
- make format && git diff-index --quiet HEAD -- || (git diff; echo please make format and run tests before creating a PR!; exit 1)
# cargo test also use debug
- cargo build:
timeout: 1800
- cargo test --features "default" --no-run:
- cargo test --features "dev" --no-run:
timeout: 1800


test:
override:
- cargo test --features "default" -- --nocapture
- cargo test --features "default" --bench benches -- --nocapture
- cargo test --features "dev" -- --nocapture
- cargo test --features "dev" --bench benches -- --nocapture
5 changes: 3 additions & 2 deletions src/config.rs
Expand Up @@ -26,8 +26,8 @@ use raftstore::store::keys::region_raft_prefix_len;
use storage::{Config as StorageConfig, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, DEFAULT_DATA_DIR};
use util::config::{self, compression_type_level_serde, ReadableDuration, ReadableSize, GB, KB, MB};
use util::properties::{MvccPropertiesCollectorFactory, SizePropertiesCollectorFactory};
use util::rocksdb::{CFOptions, FixedPrefixSliceTransform, FixedSuffixSliceTransform,
NoopSliceTransform};
use util::rocksdb::{CFOptions, EventListener, FixedPrefixSliceTransform,
FixedSuffixSliceTransform, NoopSliceTransform};

const LOCKCF_MIN_MEM: usize = 256 * MB as usize;
const LOCKCF_MAX_MEM: usize = GB as usize;
Expand Down Expand Up @@ -398,6 +398,7 @@ impl DbConfig {
self.use_direct_io_for_flush_and_compaction,
);
opts.enable_pipelined_write(self.enable_pipelined_write);
opts.add_event_listener(EventListener::default());
opts
}

Expand Down
2 changes: 1 addition & 1 deletion src/coprocessor/codec/mysql/json/json_modify.rs
Expand Up @@ -93,7 +93,7 @@ impl Json {
if let Json::Object(ref mut map) = *self {
if map.contains_key(key) {
// e.g. json_replace('{"a": 1}', '$.a', 2) => '{"a": 2}'
let mut v = map.get_mut(key).unwrap();
let v = map.get_mut(key).unwrap();
v.set_json(sub_path_legs, value, mt);
} else if sub_path_legs.is_empty() && mt != ModifyType::Replace {
// e.g. json_insert('{"a": 1}', '$.b', 2) => '{"a": 1, "b": 2}'
Expand Down
2 changes: 1 addition & 1 deletion src/coprocessor/dag/dag.rs
Expand Up @@ -71,7 +71,7 @@ impl<'s> DAGContext<'s> {
match exec.next() {
Ok(Some(row)) => {
try!(check_if_outdated(self.deadline, REQ_TYPE_DAG));
let mut chunk = get_chunk(&mut chunks);
let chunk = get_chunk(&mut chunks);
let length = chunk.get_rows_data().len();
if self.has_aggr {
chunk.mut_rows_data().extend_from_slice(&row.data.value);
Expand Down
2 changes: 1 addition & 1 deletion src/coprocessor/dag/executor/index_scan.rs
Expand Up @@ -45,7 +45,7 @@ impl<'a> IndexScanExecutor<'a> {
) -> IndexScanExecutor<'a> {
let mut pk_col = None;
let desc = meta.get_desc();
let mut cols = meta.mut_columns();
let cols = meta.mut_columns();
if cols.last().map_or(false, |c| c.get_pk_handle()) {
pk_col = Some(cols.pop().unwrap());
}
Expand Down
6 changes: 3 additions & 3 deletions src/coprocessor/dag/expr/column.rs
Expand Up @@ -85,7 +85,7 @@ mod test {
fn test_column_eval() {
let dec = "1.1".parse::<Decimal>().unwrap();
let s = "你好".as_bytes().to_owned();
let dur = Duration::parse("01:00:00".as_bytes(), 0).unwrap();
let dur = Duration::parse(b"01:00:00", 0).unwrap();

let row = vec![
Datum::Null,
Expand All @@ -108,7 +108,7 @@ mod test {
];

let ctx = StatementContext::default();
for ii in 0..row.len() {
for (ii, exp) in expecteds.iter().enumerate().take(row.len()) {
let c = col_expr(ii as i64);
let e = Expression::build(c, row.len()).unwrap();

Expand All @@ -131,7 +131,7 @@ mod test {
.map(|t| t.into_owned());

let result = EvalResults(i, r, dec, s, t, dur, j);
assert_eq!(expecteds[ii], result);
assert_eq!(*exp, result);
}
}
}
2 changes: 1 addition & 1 deletion src/coprocessor/dag/expr/constant.rs
Expand Up @@ -143,7 +143,7 @@ mod test {
fn test_constant_eval() {
let dec = "1.1".parse::<Decimal>().unwrap();
let s = "你好".as_bytes().to_owned();
let dur = Duration::parse("01:00:00".as_bytes(), 0).unwrap();
let dur = Duration::parse(b"01:00:00", 0).unwrap();

let tests = vec![
datum_expr(Datum::Null),
Expand Down
2 changes: 1 addition & 1 deletion src/coprocessor/dag/expr/mod.rs
Expand Up @@ -417,7 +417,7 @@ mod test {
),
];

for tt in tests.into_iter() {
for tt in tests {
let expr = Expression::build(tt.0, tt.1);
assert_eq!(expr.is_ok(), tt.2);
}
Expand Down
2 changes: 1 addition & 1 deletion src/coprocessor/endpoint.rs
Expand Up @@ -315,7 +315,7 @@ impl BatchRunnable<Task> for Host {
ctx.get_peer().get_id(),
)
};
let mut group = grouped_reqs.entry(key).or_insert_with(Vec::new);
let group = grouped_reqs.entry(key).or_insert_with(Vec::new);
group.push(req);
}
Task::SnapRes(q_id, snap_res) => {
Expand Down
8 changes: 4 additions & 4 deletions src/raftstore/store/snap.rs
Expand Up @@ -626,8 +626,8 @@ impl Snap {
}

fn add_kv(&mut self, k: &[u8], v: &[u8]) -> io::Result<()> {
let mut cf_file = &mut self.cf_files[self.cf_index];
let mut writer = cf_file.sst_writer.as_mut().unwrap();
let cf_file = &mut self.cf_files[self.cf_index];
let writer = cf_file.sst_writer.as_mut().unwrap();
if let Err(e) = writer.add(k, v) {
return Err(io::Error::new(ErrorKind::Other, e));
}
Expand Down Expand Up @@ -1011,8 +1011,8 @@ impl Write for Snap {
continue;
}

let mut file = cf_file.file.as_mut().unwrap();
let mut digest = cf_file.write_digest.as_mut().unwrap();
let file = cf_file.file.as_mut().unwrap();
let digest = cf_file.write_digest.as_mut().unwrap();
if next_buf.len() > left {
try!(file.write_all(&next_buf[0..left]));
digest.write(&next_buf[0..left]);
Expand Down
10 changes: 5 additions & 5 deletions src/raftstore/store/store.rs
Expand Up @@ -413,7 +413,7 @@ impl<T, C> Store<T, C> {

fn report_snapshot_status(&mut self, region_id: u64, to_peer_id: u64, status: SnapshotStatus) {
self.sent_snapshot_count -= 1;
if let Some(mut peer) = self.region_peers.get_mut(&region_id) {
if let Some(peer) = self.region_peers.get_mut(&region_id) {
let to_peer = match peer.get_peer_from_cache(to_peer_id) {
Some(peer) => peer,
None => {
Expand Down Expand Up @@ -1189,7 +1189,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
first_index: u64,
state: RaftTruncatedState,
) {
let mut peer = self.region_peers.get_mut(&region_id).unwrap();
let peer = self.region_peers.get_mut(&region_id).unwrap();
let total_cnt = peer.last_applying_idx - first_index;
// the size of current CompactLog command can be ignored.
let remain_cnt = peer.last_applying_idx - state.get_index() - 1;
Expand Down Expand Up @@ -1433,7 +1433,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {

let mut resp = RaftCmdResponse::new();
let region_id = msg.get_header().get_region_id();
let mut peer = self.region_peers.get_mut(&region_id).unwrap();
let peer = self.region_peers.get_mut(&region_id).unwrap();
let term = peer.term();
bind_term(&mut resp, term);
if peer.propose(cb, msg, resp, &mut self.raft_metrics.propose) {
Expand Down Expand Up @@ -1466,7 +1466,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
}

let region_id = msg.get_header().get_region_id();
let mut peer = self.region_peers.get_mut(&region_id).unwrap();
let peer = self.region_peers.get_mut(&region_id).unwrap();
ret.push(peer.propose_snapshot(msg, &mut self.raft_metrics.propose));
}
on_finished.call_box((ret,));
Expand Down Expand Up @@ -2017,7 +2017,7 @@ impl<T: Transport, C: PdClient> Store<T, C> {
}

fn on_unreachable(&mut self, region_id: u64, to_peer_id: u64) {
if let Some(mut peer) = self.region_peers.get_mut(&region_id) {
if let Some(peer) = self.region_peers.get_mut(&region_id) {
peer.raft_group.report_unreachable(to_peer_id);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/raft_client.rs
Expand Up @@ -122,7 +122,7 @@ impl RaftClient {
}

pub fn send(&mut self, store_id: u64, addr: SocketAddr, msg: RaftMessage) -> Result<()> {
let mut conn = self.get_conn(addr, msg.region_id, store_id);
let conn = self.get_conn(addr, msg.region_id, store_id);
conn.buffer
.as_mut()
.unwrap()
Expand Down
16 changes: 8 additions & 8 deletions src/storage/mvcc/reader.rs
Expand Up @@ -168,7 +168,7 @@ impl<'a> MvccReader<'a> {
self.write_cursor = Some(iter);
}

let mut cursor = self.write_cursor.as_mut().unwrap();
let cursor = self.write_cursor.as_mut().unwrap();
let ok = if reverse {
try!(cursor.near_seek_for_prev(&key.append_ts(ts), &mut self.statistics.write))
} else {
Expand Down Expand Up @@ -297,7 +297,7 @@ impl<'a> MvccReader<'a> {
assert!(self.scan_mode.is_some());
try!(self.create_write_cursor());

let mut cursor = self.write_cursor.as_mut().unwrap();
let cursor = self.write_cursor.as_mut().unwrap();
let mut ok = cursor.seek_to_first(&mut self.statistics.write);

while ok {
Expand All @@ -320,8 +320,8 @@ impl<'a> MvccReader<'a> {

loop {
key = {
let mut w_cur = self.write_cursor.as_mut().unwrap();
let mut l_cur = self.lock_cursor.as_mut().unwrap();
let w_cur = self.write_cursor.as_mut().unwrap();
let l_cur = self.lock_cursor.as_mut().unwrap();
let (mut w_key, mut l_key) = (None, None);
if write_valid {
if try!(w_cur.near_seek(&key, &mut self.statistics.write)) {
Expand Down Expand Up @@ -366,8 +366,8 @@ impl<'a> MvccReader<'a> {

loop {
key = {
let mut w_cur = self.write_cursor.as_mut().unwrap();
let mut l_cur = self.lock_cursor.as_mut().unwrap();
let w_cur = self.write_cursor.as_mut().unwrap();
let l_cur = self.lock_cursor.as_mut().unwrap();
let (mut w_key, mut l_key) = (None, None);
if write_valid {
if try!(w_cur.near_reverse_seek(&key, &mut self.statistics.write)) {
Expand Down Expand Up @@ -413,7 +413,7 @@ impl<'a> MvccReader<'a> {
F: Fn(&Lock) -> bool,
{
try!(self.create_lock_cursor());
let mut cursor = self.lock_cursor.as_mut().unwrap();
let cursor = self.lock_cursor.as_mut().unwrap();
let ok = match start {
Some(ref x) => try!(cursor.seek(x, &mut self.statistics.lock)),
None => cursor.seek_to_first(&mut self.statistics.lock),
Expand Down Expand Up @@ -469,7 +469,7 @@ impl<'a> MvccReader<'a> {
// Get all Value of the given key in CF_DEFAULT
pub fn scan_values_in_default(&mut self, key: &Key) -> Result<Vec<(u64, Value)>> {
try!(self.create_data_cursor());
let mut cursor = self.data_cursor.as_mut().unwrap();
let cursor = self.data_cursor.as_mut().unwrap();
let mut ok = try!(cursor.seek(key, &mut self.statistics.data));
if !ok {
return Ok(vec![]);
Expand Down
4 changes: 2 additions & 2 deletions src/storage/txn/scheduler.rs
Expand Up @@ -1098,7 +1098,7 @@ impl Scheduler {
///
/// Returns true if successful; returns false otherwise.
fn acquire_lock(&mut self, cid: u64) -> bool {
let mut ctx = &mut self.cmd_ctxs.get_mut(&cid).unwrap();
let ctx = &mut self.cmd_ctxs.get_mut(&cid).unwrap();
assert_eq!(ctx.cid, cid);
let ok = self.latches.acquire(&mut ctx.lock, cid);
if ok {
Expand Down Expand Up @@ -1341,7 +1341,7 @@ impl Scheduler {
fn lock_and_register_get_snapshot(&mut self, cid: u64) {
if self.acquire_lock(cid) {
let ctx = self.extract_context(cid).clone();
let mut group = self.grouped_cmds
let group = self.grouped_cmds
.as_mut()
.unwrap()
.entry(HashableContext(ctx))
Expand Down
23 changes: 22 additions & 1 deletion src/util/rocksdb/engine_metrics.rs
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use prometheus::{Gauge, GaugeVec};
use prometheus::{exponential_buckets, CounterVec, Gauge, GaugeVec, HistogramVec};
use rocksdb::{DBStatisticsHistogramType as HistType, DBStatisticsTickerType as TickerType,
HistogramData, DB};
use storage::ALL_CFS;
Expand Down Expand Up @@ -523,6 +523,21 @@ lazy_static!{
&["type"]
).unwrap();

pub static ref STORE_ENGINE_COMPACTION_DURATIONS_VEC: HistogramVec =
register_histogram_vec!(
"tikv_engine_compaction_duration_seconds",
"Histogram of compaction duration seconds",
&["cf"],
exponential_buckets(0.005, 2.0, 20).unwrap()
).unwrap();

pub static ref STORE_ENGINE_COMPACTION_NUM_CORRUPT_KEYS_VEC: CounterVec =
register_counter_vec!(
"tikv_engine_compaction_num_corrupt_keys",
"Number of corrupt keys during compaction",
&["cf"]
).unwrap();

pub static ref STORE_ENGINE_LOCATE_VEC: GaugeVec =
register_gauge_vec!(
"tikv_engine_locate",
Expand Down Expand Up @@ -556,4 +571,10 @@ lazy_static!{
"Number of times WAL sync is done."
).unwrap();

pub static ref STORE_ENGINE_EVENT_COUNTER_VEC: CounterVec =
register_counter_vec!(
"tikv_engine_event_total",
"Number of engine events",
&["cf", "type"]
).unwrap();
}
45 changes: 45 additions & 0 deletions src/util/rocksdb/event_listener.rs
@@ -0,0 +1,45 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

use rocksdb::{self, CompactionJobInfo, FlushJobInfo, IngestionInfo};
use util::rocksdb::engine_metrics::*;

#[derive(Default)]
pub struct EventListener {}

impl rocksdb::EventListener for EventListener {
fn on_flush_completed(&self, info: &FlushJobInfo) {
STORE_ENGINE_EVENT_COUNTER_VEC
.with_label_values(&[info.cf_name(), "flush"])
.inc();
}

fn on_compaction_completed(&self, info: &CompactionJobInfo) {
STORE_ENGINE_EVENT_COUNTER_VEC
.with_label_values(&[info.cf_name(), "compaction"])
.inc();
STORE_ENGINE_COMPACTION_DURATIONS_VEC
.with_label_values(&[info.cf_name()])
.observe(info.elapsed_micros() as f64 / 1_000_000.0);
STORE_ENGINE_COMPACTION_NUM_CORRUPT_KEYS_VEC
.with_label_values(&[info.cf_name()])
.inc_by(info.num_corrupt_keys() as f64)
.unwrap();
}

fn on_external_file_ingested(&self, info: &IngestionInfo) {
STORE_ENGINE_EVENT_COUNTER_VEC
.with_label_values(&[info.cf_name(), "ingestion"])
.inc();
}
}
2 changes: 2 additions & 0 deletions src/util/rocksdb/mod.rs
Expand Up @@ -12,9 +12,11 @@
// limitations under the License.

pub mod properties;
pub mod event_listener;
pub mod engine_metrics;
pub mod metrics_flusher;

pub use self::event_listener::EventListener;
pub use self::metrics_flusher::MetricsFlusher;

use std::fs;
Expand Down
2 changes: 1 addition & 1 deletion src/util/time.rs
Expand Up @@ -467,7 +467,7 @@ mod tests {
#[test]
fn test_coarse_instant_on_smp() {
let zero = Duration::from_millis(0);
for i in 0..1000_000 {
for i in 0..1_000_000 {
let now = Instant::now();
let now_coarse = Instant::now_coarse();
if i % 100 == 0 {
Expand Down

0 comments on commit 74ea718

Please sign in to comment.