Skip to content

Commit

Permalink
Merge branch 'release-3.0' into sync_ingest_tikv3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
yiwu-arbug committed Jun 20, 2019
2 parents 178d1f0 + ad2a2f4 commit b38a308
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 3 deletions.
37 changes: 34 additions & 3 deletions src/storage/gc_worker.rs
Expand Up @@ -49,6 +49,9 @@ const POLL_SAFE_POINT_INTERVAL_SECS: u64 = 60;

const BEGIN_KEY: &[u8] = b"";

const PROCESS_TYPE_GC: &str = "gc";
const PROCESS_TYPE_SCAN: &str = "scan";

/// Provides safe point.
/// TODO: Give it a better name?
pub trait GCSafePointProvider: Send + 'static {
Expand Down Expand Up @@ -729,6 +732,13 @@ impl<S: GCSafePointProvider, R: RegionInfoProvider> GCManager<S, R> {
self.initialize()?;

loop {
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_GC])
.set(0);
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_SCAN])
.set(0);

set_status_metrics(GCManagerState::Idle);
self.wait_for_next_safe_point()?;

Expand Down Expand Up @@ -791,6 +801,7 @@ impl<S: GCSafePointProvider, R: RegionInfoProvider> GCManager<S, R> {
Ordering::Greater => {
debug!("gc_worker: update safe point"; "safe_point" => safe_point);
self.safe_point = safe_point;
AUTO_GC_SAFE_POINT_GAUGE.set(safe_point as i64);
true
}
}
Expand Down Expand Up @@ -868,7 +879,15 @@ impl<S: GCSafePointProvider, R: RegionInfoProvider> GCManager<S, R> {
info!(
"gc_worker: auto gc rewinds"; "processed_regions" => processed_regions
);

processed_regions = 0;
// Set the metric to zero to show that rewinding has happened.
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_GC])
.set(0);
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_SCAN])
.set(0);
}
} else {
// We are not going to rewind, So we will stop if `progress` reaches `end`.
Expand Down Expand Up @@ -964,6 +983,9 @@ impl<S: GCSafePointProvider, R: RegionInfoProvider> GCManager<S, R> {
);
}
*processed_regions += 1;
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_GC])
.inc();

Ok(next_key)
}
Expand All @@ -979,15 +1001,17 @@ impl<S: GCSafePointProvider, R: RegionInfoProvider> GCManager<S, R> {
let res = self.cfg.region_info_provider.seek_region(
key.as_encoded(),
Box::new(move |iter| {
let mut scanned_regions = 0;
for info in iter {
scanned_regions += 1;
if info.role == StateRole::Leader {
if find_peer(&info.region, store_id).is_some() {
let _ = tx.send(Some(info.region.clone()));
let _ = tx.send((Some(info.region.clone()), scanned_regions));
return;
}
}
}
let _ = tx.send(None);
let _ = tx.send((None, scanned_regions));
}),
);

Expand All @@ -998,7 +1022,14 @@ impl<S: GCSafePointProvider, R: RegionInfoProvider> GCManager<S, R> {
return (None, None);
};

match rx.recv() {
let seek_region_res = rx.recv().map(|(region, scanned_regions)| {
AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC
.with_label_values(&[PROCESS_TYPE_SCAN])
.add(scanned_regions);
region
});

match seek_region_res {
Ok(Some(mut region)) => {
let peer = find_peer(&region, store_id).unwrap().clone();
let end_key = region.take_end_key();
Expand Down
11 changes: 11 additions & 0 deletions src/storage/metrics.rs
Expand Up @@ -173,6 +173,17 @@ lazy_static! {
&["state"]
)
.unwrap();
pub static ref AUTO_GC_SAFE_POINT_GAUGE: IntGauge = register_int_gauge!(
"tikv_gcworker_autogc_safe_point",
"Safe point used for auto gc"
)
.unwrap();
pub static ref AUTO_GC_PROCESSED_REGIONS_GAUGE_VEC: IntGaugeVec = register_int_gauge_vec!(
"tikv_gcworker_autogc_processed_regions",
"Processed regions by auto gc",
&["type"]
)
.unwrap();
pub static ref REQUEST_EXCEED_BOUND: IntCounter = register_int_counter!(
"tikv_request_exceed_bound",
"Counter of request exceed bound"
Expand Down

0 comments on commit b38a308

Please sign in to comment.