diff --git a/Cargo.lock b/Cargo.lock index 640c09427e9..ebaab86376b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -449,7 +449,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "librocksdb_sys" version = "0.1.0" -source = "git+https://github.com/pingcap/rust-rocksdb.git#b95e696fd28b1c3e1df4ba8798a802e4256f3092" +source = "git+https://github.com/pingcap/rust-rocksdb.git#4fd6391331c6f5197e7002d4f9c533eae1c59338" dependencies = [ "gcc 0.3.51 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", @@ -760,7 +760,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "rocksdb" version = "0.3.0" -source = "git+https://github.com/pingcap/rust-rocksdb.git#b95e696fd28b1c3e1df4ba8798a802e4256f3092" +source = "git+https://github.com/pingcap/rust-rocksdb.git#4fd6391331c6f5197e7002d4f9c533eae1c59338" dependencies = [ "crc 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/circle.yml b/circle.yml index 286d1327e00..809da062da7 100644 --- a/circle.yml +++ b/circle.yml @@ -63,14 +63,11 @@ dependencies: post: # check format first - make format && git diff-index --quiet HEAD -- || (git diff; echo please make format and run tests before creating a PR!; exit 1) - # cargo test also use debug - - cargo build: - timeout: 1800 - - cargo test --features "default" --no-run: + - cargo test --features "dev" --no-run: timeout: 1800 test: override: - - cargo test --features "default" -- --nocapture - - cargo test --features "default" --bench benches -- --nocapture + - cargo test --features "dev" -- --nocapture + - cargo test --features "dev" --bench benches -- --nocapture diff --git a/src/config.rs b/src/config.rs index d4d3aa26f13..870b9e178b7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -26,8 +26,8 @@ use raftstore::store::keys::region_raft_prefix_len; use storage::{Config as StorageConfig, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, DEFAULT_DATA_DIR}; use util::config::{self, compression_type_level_serde, ReadableDuration, ReadableSize, GB, KB, MB}; use util::properties::{MvccPropertiesCollectorFactory, SizePropertiesCollectorFactory}; -use util::rocksdb::{CFOptions, FixedPrefixSliceTransform, FixedSuffixSliceTransform, - NoopSliceTransform}; +use util::rocksdb::{CFOptions, EventListener, FixedPrefixSliceTransform, + FixedSuffixSliceTransform, NoopSliceTransform}; const LOCKCF_MIN_MEM: usize = 256 * MB as usize; const LOCKCF_MAX_MEM: usize = GB as usize; @@ -398,6 +398,7 @@ impl DbConfig { self.use_direct_io_for_flush_and_compaction, ); opts.enable_pipelined_write(self.enable_pipelined_write); + opts.add_event_listener(EventListener::default()); opts } diff --git a/src/coprocessor/codec/mysql/json/json_modify.rs b/src/coprocessor/codec/mysql/json/json_modify.rs index 057a350ad65..e74a74eda0e 100644 --- a/src/coprocessor/codec/mysql/json/json_modify.rs +++ b/src/coprocessor/codec/mysql/json/json_modify.rs @@ -93,7 +93,7 @@ impl Json { if let Json::Object(ref mut map) = *self { if map.contains_key(key) { // e.g. json_replace('{"a": 1}', '$.a', 2) => '{"a": 2}' - let mut v = map.get_mut(key).unwrap(); + let v = map.get_mut(key).unwrap(); v.set_json(sub_path_legs, value, mt); } else if sub_path_legs.is_empty() && mt != ModifyType::Replace { // e.g. json_insert('{"a": 1}', '$.b', 2) => '{"a": 1, "b": 2}' diff --git a/src/coprocessor/dag/dag.rs b/src/coprocessor/dag/dag.rs index 5d56b489a6b..c1a972a0100 100644 --- a/src/coprocessor/dag/dag.rs +++ b/src/coprocessor/dag/dag.rs @@ -71,7 +71,7 @@ impl<'s> DAGContext<'s> { match exec.next() { Ok(Some(row)) => { try!(check_if_outdated(self.deadline, REQ_TYPE_DAG)); - let mut chunk = get_chunk(&mut chunks); + let chunk = get_chunk(&mut chunks); let length = chunk.get_rows_data().len(); if self.has_aggr { chunk.mut_rows_data().extend_from_slice(&row.data.value); diff --git a/src/coprocessor/dag/executor/index_scan.rs b/src/coprocessor/dag/executor/index_scan.rs index 94582f68b9c..d4c5e2f4ba2 100644 --- a/src/coprocessor/dag/executor/index_scan.rs +++ b/src/coprocessor/dag/executor/index_scan.rs @@ -45,7 +45,7 @@ impl<'a> IndexScanExecutor<'a> { ) -> IndexScanExecutor<'a> { let mut pk_col = None; let desc = meta.get_desc(); - let mut cols = meta.mut_columns(); + let cols = meta.mut_columns(); if cols.last().map_or(false, |c| c.get_pk_handle()) { pk_col = Some(cols.pop().unwrap()); } diff --git a/src/coprocessor/dag/expr/column.rs b/src/coprocessor/dag/expr/column.rs index cba2166b4b6..b8f9403c9ee 100644 --- a/src/coprocessor/dag/expr/column.rs +++ b/src/coprocessor/dag/expr/column.rs @@ -85,7 +85,7 @@ mod test { fn test_column_eval() { let dec = "1.1".parse::().unwrap(); let s = "你好".as_bytes().to_owned(); - let dur = Duration::parse("01:00:00".as_bytes(), 0).unwrap(); + let dur = Duration::parse(b"01:00:00", 0).unwrap(); let row = vec![ Datum::Null, @@ -108,7 +108,7 @@ mod test { ]; let ctx = StatementContext::default(); - for ii in 0..row.len() { + for (ii, exp) in expecteds.iter().enumerate().take(row.len()) { let c = col_expr(ii as i64); let e = Expression::build(c, row.len()).unwrap(); @@ -131,7 +131,7 @@ mod test { .map(|t| t.into_owned()); let result = EvalResults(i, r, dec, s, t, dur, j); - assert_eq!(expecteds[ii], result); + assert_eq!(*exp, result); } } } diff --git a/src/coprocessor/dag/expr/constant.rs b/src/coprocessor/dag/expr/constant.rs index acc112fbc4c..6ba70ff56c9 100644 --- a/src/coprocessor/dag/expr/constant.rs +++ b/src/coprocessor/dag/expr/constant.rs @@ -143,7 +143,7 @@ mod test { fn test_constant_eval() { let dec = "1.1".parse::().unwrap(); let s = "你好".as_bytes().to_owned(); - let dur = Duration::parse("01:00:00".as_bytes(), 0).unwrap(); + let dur = Duration::parse(b"01:00:00", 0).unwrap(); let tests = vec![ datum_expr(Datum::Null), diff --git a/src/coprocessor/dag/expr/mod.rs b/src/coprocessor/dag/expr/mod.rs index e0b036e4100..174dbd48ecf 100644 --- a/src/coprocessor/dag/expr/mod.rs +++ b/src/coprocessor/dag/expr/mod.rs @@ -417,7 +417,7 @@ mod test { ), ]; - for tt in tests.into_iter() { + for tt in tests { let expr = Expression::build(tt.0, tt.1); assert_eq!(expr.is_ok(), tt.2); } diff --git a/src/coprocessor/endpoint.rs b/src/coprocessor/endpoint.rs index dc29d4ba4ed..e34bb1c998f 100644 --- a/src/coprocessor/endpoint.rs +++ b/src/coprocessor/endpoint.rs @@ -315,7 +315,7 @@ impl BatchRunnable for Host { ctx.get_peer().get_id(), ) }; - let mut group = grouped_reqs.entry(key).or_insert_with(Vec::new); + let group = grouped_reqs.entry(key).or_insert_with(Vec::new); group.push(req); } Task::SnapRes(q_id, snap_res) => { diff --git a/src/raftstore/store/snap.rs b/src/raftstore/store/snap.rs index c06dd9e5b76..34384d7c59d 100644 --- a/src/raftstore/store/snap.rs +++ b/src/raftstore/store/snap.rs @@ -626,8 +626,8 @@ impl Snap { } fn add_kv(&mut self, k: &[u8], v: &[u8]) -> io::Result<()> { - let mut cf_file = &mut self.cf_files[self.cf_index]; - let mut writer = cf_file.sst_writer.as_mut().unwrap(); + let cf_file = &mut self.cf_files[self.cf_index]; + let writer = cf_file.sst_writer.as_mut().unwrap(); if let Err(e) = writer.add(k, v) { return Err(io::Error::new(ErrorKind::Other, e)); } @@ -1011,8 +1011,8 @@ impl Write for Snap { continue; } - let mut file = cf_file.file.as_mut().unwrap(); - let mut digest = cf_file.write_digest.as_mut().unwrap(); + let file = cf_file.file.as_mut().unwrap(); + let digest = cf_file.write_digest.as_mut().unwrap(); if next_buf.len() > left { try!(file.write_all(&next_buf[0..left])); digest.write(&next_buf[0..left]); diff --git a/src/raftstore/store/store.rs b/src/raftstore/store/store.rs index 6b310e26bfe..bad0c83abf0 100644 --- a/src/raftstore/store/store.rs +++ b/src/raftstore/store/store.rs @@ -413,7 +413,7 @@ impl Store { fn report_snapshot_status(&mut self, region_id: u64, to_peer_id: u64, status: SnapshotStatus) { self.sent_snapshot_count -= 1; - if let Some(mut peer) = self.region_peers.get_mut(®ion_id) { + if let Some(peer) = self.region_peers.get_mut(®ion_id) { let to_peer = match peer.get_peer_from_cache(to_peer_id) { Some(peer) => peer, None => { @@ -1189,7 +1189,7 @@ impl Store { first_index: u64, state: RaftTruncatedState, ) { - let mut peer = self.region_peers.get_mut(®ion_id).unwrap(); + let peer = self.region_peers.get_mut(®ion_id).unwrap(); let total_cnt = peer.last_applying_idx - first_index; // the size of current CompactLog command can be ignored. let remain_cnt = peer.last_applying_idx - state.get_index() - 1; @@ -1433,7 +1433,7 @@ impl Store { let mut resp = RaftCmdResponse::new(); let region_id = msg.get_header().get_region_id(); - let mut peer = self.region_peers.get_mut(®ion_id).unwrap(); + let peer = self.region_peers.get_mut(®ion_id).unwrap(); let term = peer.term(); bind_term(&mut resp, term); if peer.propose(cb, msg, resp, &mut self.raft_metrics.propose) { @@ -1466,7 +1466,7 @@ impl Store { } let region_id = msg.get_header().get_region_id(); - let mut peer = self.region_peers.get_mut(®ion_id).unwrap(); + let peer = self.region_peers.get_mut(®ion_id).unwrap(); ret.push(peer.propose_snapshot(msg, &mut self.raft_metrics.propose)); } on_finished.call_box((ret,)); @@ -2017,7 +2017,7 @@ impl Store { } fn on_unreachable(&mut self, region_id: u64, to_peer_id: u64) { - if let Some(mut peer) = self.region_peers.get_mut(®ion_id) { + if let Some(peer) = self.region_peers.get_mut(®ion_id) { peer.raft_group.report_unreachable(to_peer_id); } } diff --git a/src/server/raft_client.rs b/src/server/raft_client.rs index 021ac79cd83..6e46989dfc0 100644 --- a/src/server/raft_client.rs +++ b/src/server/raft_client.rs @@ -122,7 +122,7 @@ impl RaftClient { } pub fn send(&mut self, store_id: u64, addr: SocketAddr, msg: RaftMessage) -> Result<()> { - let mut conn = self.get_conn(addr, msg.region_id, store_id); + let conn = self.get_conn(addr, msg.region_id, store_id); conn.buffer .as_mut() .unwrap() diff --git a/src/storage/mvcc/reader.rs b/src/storage/mvcc/reader.rs index 9d2e2fbfa86..fd2efc8dfc2 100644 --- a/src/storage/mvcc/reader.rs +++ b/src/storage/mvcc/reader.rs @@ -168,7 +168,7 @@ impl<'a> MvccReader<'a> { self.write_cursor = Some(iter); } - let mut cursor = self.write_cursor.as_mut().unwrap(); + let cursor = self.write_cursor.as_mut().unwrap(); let ok = if reverse { try!(cursor.near_seek_for_prev(&key.append_ts(ts), &mut self.statistics.write)) } else { @@ -297,7 +297,7 @@ impl<'a> MvccReader<'a> { assert!(self.scan_mode.is_some()); try!(self.create_write_cursor()); - let mut cursor = self.write_cursor.as_mut().unwrap(); + let cursor = self.write_cursor.as_mut().unwrap(); let mut ok = cursor.seek_to_first(&mut self.statistics.write); while ok { @@ -320,8 +320,8 @@ impl<'a> MvccReader<'a> { loop { key = { - let mut w_cur = self.write_cursor.as_mut().unwrap(); - let mut l_cur = self.lock_cursor.as_mut().unwrap(); + let w_cur = self.write_cursor.as_mut().unwrap(); + let l_cur = self.lock_cursor.as_mut().unwrap(); let (mut w_key, mut l_key) = (None, None); if write_valid { if try!(w_cur.near_seek(&key, &mut self.statistics.write)) { @@ -366,8 +366,8 @@ impl<'a> MvccReader<'a> { loop { key = { - let mut w_cur = self.write_cursor.as_mut().unwrap(); - let mut l_cur = self.lock_cursor.as_mut().unwrap(); + let w_cur = self.write_cursor.as_mut().unwrap(); + let l_cur = self.lock_cursor.as_mut().unwrap(); let (mut w_key, mut l_key) = (None, None); if write_valid { if try!(w_cur.near_reverse_seek(&key, &mut self.statistics.write)) { @@ -413,7 +413,7 @@ impl<'a> MvccReader<'a> { F: Fn(&Lock) -> bool, { try!(self.create_lock_cursor()); - let mut cursor = self.lock_cursor.as_mut().unwrap(); + let cursor = self.lock_cursor.as_mut().unwrap(); let ok = match start { Some(ref x) => try!(cursor.seek(x, &mut self.statistics.lock)), None => cursor.seek_to_first(&mut self.statistics.lock), @@ -469,7 +469,7 @@ impl<'a> MvccReader<'a> { // Get all Value of the given key in CF_DEFAULT pub fn scan_values_in_default(&mut self, key: &Key) -> Result> { try!(self.create_data_cursor()); - let mut cursor = self.data_cursor.as_mut().unwrap(); + let cursor = self.data_cursor.as_mut().unwrap(); let mut ok = try!(cursor.seek(key, &mut self.statistics.data)); if !ok { return Ok(vec![]); diff --git a/src/storage/txn/scheduler.rs b/src/storage/txn/scheduler.rs index 63bf00d8eae..b89e1280719 100644 --- a/src/storage/txn/scheduler.rs +++ b/src/storage/txn/scheduler.rs @@ -1098,7 +1098,7 @@ impl Scheduler { /// /// Returns true if successful; returns false otherwise. fn acquire_lock(&mut self, cid: u64) -> bool { - let mut ctx = &mut self.cmd_ctxs.get_mut(&cid).unwrap(); + let ctx = &mut self.cmd_ctxs.get_mut(&cid).unwrap(); assert_eq!(ctx.cid, cid); let ok = self.latches.acquire(&mut ctx.lock, cid); if ok { @@ -1341,7 +1341,7 @@ impl Scheduler { fn lock_and_register_get_snapshot(&mut self, cid: u64) { if self.acquire_lock(cid) { let ctx = self.extract_context(cid).clone(); - let mut group = self.grouped_cmds + let group = self.grouped_cmds .as_mut() .unwrap() .entry(HashableContext(ctx)) diff --git a/src/util/rocksdb/engine_metrics.rs b/src/util/rocksdb/engine_metrics.rs index 265da451c82..b9d72a8c4fc 100644 --- a/src/util/rocksdb/engine_metrics.rs +++ b/src/util/rocksdb/engine_metrics.rs @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use prometheus::{Gauge, GaugeVec}; +use prometheus::{exponential_buckets, CounterVec, Gauge, GaugeVec, HistogramVec}; use rocksdb::{DBStatisticsHistogramType as HistType, DBStatisticsTickerType as TickerType, HistogramData, DB}; use storage::ALL_CFS; @@ -523,6 +523,21 @@ lazy_static!{ &["type"] ).unwrap(); + pub static ref STORE_ENGINE_COMPACTION_DURATIONS_VEC: HistogramVec = + register_histogram_vec!( + "tikv_engine_compaction_duration_seconds", + "Histogram of compaction duration seconds", + &["cf"], + exponential_buckets(0.005, 2.0, 20).unwrap() + ).unwrap(); + + pub static ref STORE_ENGINE_COMPACTION_NUM_CORRUPT_KEYS_VEC: CounterVec = + register_counter_vec!( + "tikv_engine_compaction_num_corrupt_keys", + "Number of corrupt keys during compaction", + &["cf"] + ).unwrap(); + pub static ref STORE_ENGINE_LOCATE_VEC: GaugeVec = register_gauge_vec!( "tikv_engine_locate", @@ -556,4 +571,10 @@ lazy_static!{ "Number of times WAL sync is done." ).unwrap(); + pub static ref STORE_ENGINE_EVENT_COUNTER_VEC: CounterVec = + register_counter_vec!( + "tikv_engine_event_total", + "Number of engine events", + &["cf", "type"] + ).unwrap(); } diff --git a/src/util/rocksdb/event_listener.rs b/src/util/rocksdb/event_listener.rs new file mode 100644 index 00000000000..52d3dc9c4b8 --- /dev/null +++ b/src/util/rocksdb/event_listener.rs @@ -0,0 +1,45 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use rocksdb::{self, CompactionJobInfo, FlushJobInfo, IngestionInfo}; +use util::rocksdb::engine_metrics::*; + +#[derive(Default)] +pub struct EventListener {} + +impl rocksdb::EventListener for EventListener { + fn on_flush_completed(&self, info: &FlushJobInfo) { + STORE_ENGINE_EVENT_COUNTER_VEC + .with_label_values(&[info.cf_name(), "flush"]) + .inc(); + } + + fn on_compaction_completed(&self, info: &CompactionJobInfo) { + STORE_ENGINE_EVENT_COUNTER_VEC + .with_label_values(&[info.cf_name(), "compaction"]) + .inc(); + STORE_ENGINE_COMPACTION_DURATIONS_VEC + .with_label_values(&[info.cf_name()]) + .observe(info.elapsed_micros() as f64 / 1_000_000.0); + STORE_ENGINE_COMPACTION_NUM_CORRUPT_KEYS_VEC + .with_label_values(&[info.cf_name()]) + .inc_by(info.num_corrupt_keys() as f64) + .unwrap(); + } + + fn on_external_file_ingested(&self, info: &IngestionInfo) { + STORE_ENGINE_EVENT_COUNTER_VEC + .with_label_values(&[info.cf_name(), "ingestion"]) + .inc(); + } +} diff --git a/src/util/rocksdb/mod.rs b/src/util/rocksdb/mod.rs index 8f74f7298e3..7e016352fc9 100644 --- a/src/util/rocksdb/mod.rs +++ b/src/util/rocksdb/mod.rs @@ -12,9 +12,11 @@ // limitations under the License. pub mod properties; +pub mod event_listener; pub mod engine_metrics; pub mod metrics_flusher; +pub use self::event_listener::EventListener; pub use self::metrics_flusher::MetricsFlusher; use std::fs; diff --git a/src/util/time.rs b/src/util/time.rs index 6b9d39dd162..09d1295ff80 100644 --- a/src/util/time.rs +++ b/src/util/time.rs @@ -467,7 +467,7 @@ mod tests { #[test] fn test_coarse_instant_on_smp() { let zero = Duration::from_millis(0); - for i in 0..1000_000 { + for i in 0..1_000_000 { let now = Instant::now(); let now_coarse = Instant::now_coarse(); if i % 100 == 0 {