Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
Signed-off-by: SpadeA-Tang <u6748471@anu.edu.au>
  • Loading branch information
SpadeA-Tang committed May 14, 2024
2 parents 0545556 + 86e0ec3 commit fae27c4
Show file tree
Hide file tree
Showing 11 changed files with 1,158 additions and 365 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions components/region_cache_memory_engine/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ make_auto_flush_static_metric! {
pub label_enum TickerEnum {
bytes_read,
iter_bytes_read,
number_db_seek,
number_db_seek_found,
number_db_next,
number_db_next_found,
number_db_prev,
number_db_prev_found,
}

pub struct GcFilteredCountVec: LocalIntCounter {
Expand Down Expand Up @@ -75,13 +81,21 @@ lazy_static! {
&["type"]
)
.unwrap();
pub static ref IN_MEMORY_ENGINE_LOCATE: IntCounterVec = register_int_counter_vec!(
"tikv_range_cache_memory_engine_locate",
"Number of calls to seek/next/prev",
&["type"]
)
.unwrap();
}

lazy_static! {
pub static ref GC_FILTERED_STATIC: GcFilteredCountVec =
auto_flush_from!(GC_FILTERED, GcFilteredCountVec);
pub static ref IN_MEMORY_ENGINE_FLOW_STATIC: InMemoryEngineTickerMetrics =
auto_flush_from!(IN_MEMORY_ENGINE_FLOW, InMemoryEngineTickerMetrics);
pub static ref IN_MEMORY_ENGINE_LOCATE_STATIC: InMemoryEngineTickerMetrics =
auto_flush_from!(IN_MEMORY_ENGINE_LOCATE, InMemoryEngineTickerMetrics);
}

pub fn flush_range_cache_engine_statistics(statistics: &Arc<RangeCacheMemoryEngineStatistics>) {
Expand All @@ -99,6 +113,30 @@ fn flush_engine_ticker_metrics(t: Tickers, value: u64) {
Tickers::IterBytesRead => {
IN_MEMORY_ENGINE_FLOW_STATIC.iter_bytes_read.inc_by(value);
}
Tickers::NumberDbSeek => {
IN_MEMORY_ENGINE_LOCATE_STATIC.number_db_seek.inc_by(value);
}
Tickers::NumberDbSeekFound => {
IN_MEMORY_ENGINE_LOCATE_STATIC
.number_db_seek_found
.inc_by(value);
}
Tickers::NumberDbNext => {
IN_MEMORY_ENGINE_LOCATE_STATIC.number_db_next.inc_by(value);
}
Tickers::NumberDbNextFound => {
IN_MEMORY_ENGINE_LOCATE_STATIC
.number_db_next_found
.inc_by(value);
}
Tickers::NumberDbPrev => {
IN_MEMORY_ENGINE_LOCATE_STATIC.number_db_prev.inc_by(value);
}
Tickers::NumberDbPrevFound => {
IN_MEMORY_ENGINE_LOCATE_STATIC
.number_db_prev_found
.inc_by(value);
}
_ => {
unreachable!()
}
Expand Down
123 changes: 100 additions & 23 deletions components/region_cache_memory_engine/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,25 @@ impl Drop for RangeCacheIterator {
fn drop(&mut self) {
self.statistics
.record_ticker(Tickers::IterBytesRead, self.local_stats.bytes_read);
self.statistics
.record_ticker(Tickers::NumberDbSeek, self.local_stats.number_db_seek);
self.statistics.record_ticker(
Tickers::NumberDbSeekFound,
self.local_stats.number_db_seek_found,
);
self.statistics
.record_ticker(Tickers::NumberDbNext, self.local_stats.number_db_next);
self.statistics.record_ticker(
Tickers::NumberDbNextFound,
self.local_stats.number_db_next_found,
);
self.statistics
.record_ticker(Tickers::NumberDbPrev, self.local_stats.number_db_prev);
self.statistics.record_ticker(
Tickers::NumberDbPrevFound,
self.local_stats.number_db_prev_found,
);
perf_counter_add!(iter_read_bytes, self.local_stats.bytes_read);
}
}

Expand Down Expand Up @@ -310,14 +329,18 @@ impl RangeCacheIterator {
fn seek_internal(&mut self, key: &InternalBytes) {
let guard = &epoch::pin();
self.iter.seek(key, guard);
self.local_stats.number_db_seek += 1;
if self.iter.valid() {
self.find_next_visible_key(false, guard);
} else {
self.valid = false;
}
}

fn seek_for_prev_internal(&mut self, key: &InternalBytes) {
let guard = &epoch::pin();
self.iter.seek_for_prev(key, guard);
self.local_stats.number_db_seek += 1;
self.prev_internal(guard);
}

Expand Down Expand Up @@ -408,16 +431,6 @@ impl RangeCacheIterator {
self.iter.prev(guard);
}
}

#[inline]
fn collects_stats(&mut self) {
if self.valid {
// Updating stats and perf context counters
let read_bytes = (self.key().len() + self.value().len()) as u64;
self.local_stats.bytes_read += read_bytes;
perf_counter_add!(iter_read_bytes, read_bytes);
}
}
}

impl Iterator for RangeCacheIterator {
Expand All @@ -440,14 +453,20 @@ impl Iterator for RangeCacheIterator {
assert!(self.direction == Direction::Forward);
let guard = &epoch::pin();
self.iter.next(guard);

perf_counter_add!(internal_key_skipped_count, 1);
self.local_stats.number_db_next += 1;

self.valid = self.iter.valid();
if self.valid {
// self.valid can be changed after this
self.find_next_visible_key(true, guard);
}

self.collects_stats();
if self.valid {
self.local_stats.number_db_next_found += 1;
self.local_stats.bytes_read += (self.key().len() + self.value().len()) as u64;
}

Ok(self.valid)
}
Expand All @@ -458,7 +477,11 @@ impl Iterator for RangeCacheIterator {
let guard = &epoch::pin();
self.prev_internal(guard);

self.collects_stats();
self.local_stats.number_db_prev += 1;
if self.valid {
self.local_stats.number_db_prev_found += 1;
self.local_stats.bytes_read += (self.key().len() + self.value().len()) as u64;
}

Ok(self.valid)
}
Expand All @@ -478,7 +501,10 @@ impl Iterator for RangeCacheIterator {

let seek_key = encode_seek_key(seek_key, self.sequence_number);
self.seek_internal(&seek_key);
self.collects_stats();
if self.valid {
self.local_stats.bytes_read += (self.key().len() + self.value().len()) as u64;
self.local_stats.number_db_seek_found += 1;
}

Ok(self.valid)
}
Expand All @@ -497,7 +523,10 @@ impl Iterator for RangeCacheIterator {
};

self.seek_for_prev_internal(&seek_key);
self.collects_stats();
if self.valid {
self.local_stats.bytes_read += (self.key().len() + self.value().len()) as u64;
self.local_stats.number_db_seek_found += 1;
}

Ok(self.valid)
}
Expand All @@ -508,7 +537,10 @@ impl Iterator for RangeCacheIterator {
let seek_key = encode_seek_key(&self.lower_bound, self.sequence_number);
self.seek_internal(&seek_key);

self.collects_stats();
if self.valid {
self.local_stats.bytes_read += (self.key().len() + self.value().len()) as u64;
self.local_stats.number_db_seek_found += 1;
}

Ok(self.valid)
}
Expand All @@ -523,7 +555,10 @@ impl Iterator for RangeCacheIterator {
return Ok(false);
}

self.collects_stats();
if self.valid {
self.local_stats.bytes_read += (self.key().len() + self.value().len()) as u64;
self.local_stats.number_db_seek_found += 1;
}

Ok(self.valid)
}
Expand Down Expand Up @@ -797,6 +832,44 @@ mod tests {
}
}

#[test]
fn test_seek() {
let engine = RangeCacheMemoryEngine::new(RangeCacheEngineContext::new(Arc::new(
VersionTrack::new(RangeCacheEngineConfig::config_for_test()),
)));
let range = CacheRange::new(b"".to_vec(), b"z".to_vec());
engine.new_range(range.clone());

{
let mut core = engine.core.write();
core.range_manager.set_safe_point(&range, 5);
let sl = core.engine.data[cf_to_id("write")].clone();

put_key_val(&sl, "b", "val", 10, 5);
put_key_val(&sl, "c", "vall", 10, 5);
}

let snapshot = engine.snapshot(range.clone(), u64::MAX, 100).unwrap();
let mut iter_opt = IterOptions::default();
iter_opt.set_upper_bound(&range.end, 0);
iter_opt.set_lower_bound(&range.start, 0);
let mut iter = snapshot.iterator_opt("write", iter_opt.clone()).unwrap();

let key = construct_mvcc_key("b", 10);
iter.seek(&key).unwrap();
assert_eq!(iter.value(), b"val");
let key = construct_mvcc_key("d", 10);
iter.seek(&key).unwrap();
assert!(!iter.valid().unwrap());

let key = construct_mvcc_key("b", 10);
iter.seek_for_prev(&key).unwrap();
assert_eq!(iter.value(), b"val");
let key = construct_mvcc_key("a", 10);
iter.seek_for_prev(&key).unwrap();
assert!(!iter.valid().unwrap());
}

#[test]
fn test_get_value() {
let engine = RangeCacheMemoryEngine::new(RangeCacheEngineContext::new(Arc::new(
Expand Down Expand Up @@ -1905,36 +1978,40 @@ mod tests {
assert_eq!(PERF_CONTEXT.with(|c| c.borrow().iter_read_bytes), 0);
iter.seek_to_first().unwrap();
rocks_iter.seek_to_first().unwrap();
assert_eq!(PERF_CONTEXT.with(|c| c.borrow().iter_read_bytes), 12);
let key = construct_mvcc_key("b", 10);
iter.seek(&key).unwrap();
rocks_iter.seek(&key).unwrap();
assert_eq!(PERF_CONTEXT.with(|c| c.borrow().iter_read_bytes), 25);
iter.next().unwrap();
rocks_iter.next().unwrap();
assert_eq!(PERF_CONTEXT.with(|c| c.borrow().iter_read_bytes), 39);
iter.next().unwrap();
rocks_iter.next().unwrap();
drop(iter);
assert_eq!(PERF_CONTEXT.with(|c| c.borrow().iter_read_bytes), 54);
assert_eq!(2, statistics.get_ticker_count(Tickers::NumberDbSeek));
assert_eq!(2, statistics.get_ticker_count(Tickers::NumberDbSeekFound));
assert_eq!(2, statistics.get_ticker_count(Tickers::NumberDbNext));
assert_eq!(2, statistics.get_ticker_count(Tickers::NumberDbNextFound));

let mut iter = snapshot.iterator_opt("write", iter_opt.clone()).unwrap();
iter.seek_to_last().unwrap();
rocks_iter.seek_to_last().unwrap();
assert_eq!(PERF_CONTEXT.with(|c| c.borrow().iter_read_bytes), 69);
iter.prev().unwrap();
rocks_iter.prev().unwrap();
assert_eq!(PERF_CONTEXT.with(|c| c.borrow().iter_read_bytes), 83);
iter.prev().unwrap();
rocks_iter.prev().unwrap();
assert_eq!(PERF_CONTEXT.with(|c| c.borrow().iter_read_bytes), 96);
iter.prev().unwrap();
rocks_iter.prev().unwrap();
assert_eq!(PERF_CONTEXT.with(|c| c.borrow().iter_read_bytes), 108);
drop(rocks_iter);
drop(iter);
assert_eq!(statistics.get_ticker_count(Tickers::IterBytesRead), 108);
assert_eq!(
rocks_statistics.get_and_reset_ticker_count(DBStatisticsTickerType::IterBytesRead),
statistics.get_and_reset_ticker_count(Tickers::IterBytesRead)
);
assert_eq!(PERF_CONTEXT.with(|c| c.borrow().iter_read_bytes), 108);
assert_eq!(3, statistics.get_ticker_count(Tickers::NumberDbSeek));
assert_eq!(3, statistics.get_ticker_count(Tickers::NumberDbSeekFound));
assert_eq!(3, statistics.get_ticker_count(Tickers::NumberDbPrev));
assert_eq!(3, statistics.get_ticker_count(Tickers::NumberDbPrevFound));
}
}
29 changes: 28 additions & 1 deletion components/region_cache_memory_engine/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,28 @@ fn physical_core_id() -> i32 {
}
}

pub const ENGINE_TICKER_TYPES: &[Tickers] = &[Tickers::BytesRead, Tickers::IterBytesRead];
pub const ENGINE_TICKER_TYPES: &[Tickers] = &[
Tickers::BytesRead,
Tickers::IterBytesRead,
Tickers::NumberDbSeek,
Tickers::NumberDbSeekFound,
Tickers::NumberDbNext,
Tickers::NumberDbNextFound,
Tickers::NumberDbPrev,
Tickers::NumberDbPrevFound,
];

#[repr(u32)]
#[derive(Copy, Clone)]
pub enum Tickers {
BytesRead = 0,
IterBytesRead,
NumberDbSeek,
NumberDbSeekFound,
NumberDbNext,
NumberDbNextFound,
NumberDbPrev,
NumberDbPrevFound,
TickerEnumMax,
}

Expand Down Expand Up @@ -151,6 +166,18 @@ impl Statistics {
pub(crate) struct LocalStatistics {
// Map to Tickers::IterBytesRead
pub(crate) bytes_read: u64,
// Map to Tickers::NumberDbSeek
pub(crate) number_db_seek: u64,
// Map to Tickers::NumberDbSeekFound
pub(crate) number_db_seek_found: u64,
// Map to Tickers::NumberDbNext
pub(crate) number_db_next: u64,
// Map to Tickers::NumberDbNextFound
pub(crate) number_db_next_found: u64,
// Map to Tickers::NumberDbPrev
pub(crate) number_db_prev: u64,
// Map to Tickers::NumberDbPrevFound
pub(crate) number_db_prev_found: u64,
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions components/tidb_query_expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ num-traits = "0.2"
openssl = { workspace = true }
protobuf = "2"
regex = "1.1"
ryu = "1.0"
safemem = { version = "0.3", default-features = false }
serde = "1.0"
serde_json = "1.0"
Expand Down
Loading

0 comments on commit fae27c4

Please sign in to comment.