diff --git a/src/storage/mod.rs b/src/storage/mod.rs index ef9bbd47eb7..72110aff01c 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -689,7 +689,7 @@ impl Storage { let priority = readpool::Priority::from(ctx.get_priority()); let res = self.read_pool.future_execute(priority, move |ctxd| { - let mut _timer = { + let timer = { let ctxd = ctxd.clone(); let mut thread_ctx = ctxd.current_thread_context_mut(); thread_ctx.start_command_duration_timer(CMD, priority) @@ -722,7 +722,7 @@ impl Storage { result }) .then(move |r| { - _timer.observe_duration(); + timer.observe_duration(); r }) }); @@ -745,7 +745,7 @@ impl Storage { let priority = readpool::Priority::from(ctx.get_priority()); let res = self.read_pool.future_execute(priority, move |ctxd| { - let mut _timer = { + let timer = { let ctxd = ctxd.clone(); let mut thread_ctx = ctxd.current_thread_context_mut(); thread_ctx.start_command_duration_timer(CMD, priority) @@ -782,7 +782,7 @@ impl Storage { Ok(kv_pairs) }) .then(move |r| { - _timer.observe_duration(); + timer.observe_duration(); r }) }); @@ -809,7 +809,7 @@ impl Storage { let priority = readpool::Priority::from(ctx.get_priority()); let res = self.read_pool.future_execute(priority, move |ctxd| { - let mut _timer = { + let timer = { let ctxd = ctxd.clone(); let mut thread_ctx = ctxd.current_thread_context_mut(); thread_ctx.start_command_duration_timer(CMD, priority) @@ -854,7 +854,7 @@ impl Storage { }) }) .then(move |r| { - _timer.observe_duration(); + timer.observe_duration(); r }) }); @@ -1107,44 +1107,44 @@ impl Storage { let engine = self.get_engine(); let priority = readpool::Priority::from(ctx.get_priority()); - let res = self.read_pool.future_execute(priority, move |ctxd| { - let mut _timer = { - let ctxd = ctxd.clone(); - let mut thread_ctx = ctxd.current_thread_context_mut(); - thread_ctx.start_command_duration_timer(CMD, priority) - }; - - Self::async_snapshot(engine, &ctx) - .and_then(move |snapshot: E::Snap| { - let mut thread_ctx = ctxd.current_thread_context_mut(); - let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); - let cf = Self::rawkv_cf(&cf)?; - // no scan_count for this kind of op. + let timer = SCHED_HISTOGRAM_VEC + .with_label_values(&[CMD]) + .start_coarse_timer(); - let key_len = key.len(); - snapshot.get_cf(cf, &Key::from_encoded(key)) - // map storage::engine::Error -> storage::Error - .map_err(Error::from) - .map(|r| { - if let Some(ref value) = r { - let mut stats = Statistics::default(); - stats.data.flow_stats.read_keys = 1; - stats.data.flow_stats.read_bytes = key_len + value.len(); - thread_ctx.collect_read_flow(ctx.get_region_id(), &stats); - thread_ctx.collect_key_reads(CMD, 1); - } - r - }) - }) - .then(move |r| { - _timer.observe_duration(); - r - }) - }); + let readpool = self.read_pool.clone(); - future::result(res) - .map_err(|_| Error::SchedTooBusy) - .flatten() + Self::async_snapshot(engine, &ctx).and_then(move |snapshot: E::Snap| { + let res = readpool.future_execute(priority, move |ctxd| { + let mut thread_ctx = ctxd.current_thread_context_mut(); + let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); + let cf = match Self::rawkv_cf(&cf) { + Ok(x) => x, + Err(e) => return future::err(e), + }; + // no scan_count for this kind of op. + + let key_len = key.len(); + let result = snapshot.get_cf(cf, &Key::from_encoded(key)) + // map storage::engine::Error -> storage::Error + .map_err(Error::from) + .map(|r| { + if let Some(ref value) = r { + let mut stats = Statistics::default(); + stats.data.flow_stats.read_keys = 1; + stats.data.flow_stats.read_bytes = key_len + value.len(); + thread_ctx.collect_read_flow(ctx.get_region_id(), &stats); + thread_ctx.collect_key_reads(CMD, 1); + } + r + }); + + timer.observe_duration(); + future::result(result) + }); + future::result(res) + .map_err(|_| Error::SchedTooBusy) + .flatten() + }) } /// Get the values of some raw keys in a batch. @@ -1157,53 +1157,50 @@ impl Storage { const CMD: &str = "raw_batch_get"; let engine = self.get_engine(); let priority = readpool::Priority::from(ctx.get_priority()); + let timer = SCHED_HISTOGRAM_VEC + .with_label_values(&[CMD]) + .start_coarse_timer(); - let keys: Vec = keys.into_iter().map(Key::from_encoded).collect(); + let readpool = self.read_pool.clone(); - let res = self.read_pool.future_execute(priority, move |ctxd| { - let mut _timer = { - let ctxd = ctxd.clone(); + Self::async_snapshot(engine, &ctx).and_then(move |snapshot: E::Snap| { + let res = readpool.future_execute(priority, move |ctxd| { + let keys: Vec = keys.into_iter().map(Key::from_encoded).collect(); let mut thread_ctx = ctxd.current_thread_context_mut(); - thread_ctx.start_command_duration_timer(CMD, priority) - }; - - Self::async_snapshot(engine, &ctx) - .and_then(move |snapshot: E::Snap| { - let mut thread_ctx = ctxd.current_thread_context_mut(); - let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); - let cf = Self::rawkv_cf(&cf)?; - // no scan_count for this kind of op. - let mut stats = Statistics::default(); - let result: Vec> = keys - .into_iter() - .map(|k| { - let v = snapshot.get_cf(cf, &k); - (k, v) - }) - .filter(|&(_, ref v)| !(v.is_ok() && v.as_ref().unwrap().is_none())) - .map(|(k, v)| match v { - Ok(Some(v)) => { - stats.data.flow_stats.read_keys += 1; - stats.data.flow_stats.read_bytes += k.as_encoded().len() + v.len(); - Ok((k.into_encoded(), v)) - } - Err(e) => Err(Error::from(e)), - _ => unreachable!(), - }) - .collect(); - thread_ctx.collect_key_reads(CMD, stats.data.flow_stats.read_keys as u64); - thread_ctx.collect_read_flow(ctx.get_region_id(), &stats); - Ok(result) - }) - .then(move |r| { - _timer.observe_duration(); - r - }) - }); - - future::result(res) - .map_err(|_| Error::SchedTooBusy) - .flatten() + let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); + let cf = match Self::rawkv_cf(&cf) { + Ok(x) => x, + Err(e) => return future::err(e), + }; + // no scan_count for this kind of op. + let mut stats = Statistics::default(); + let result: Vec> = keys + .into_iter() + .map(|k| { + let v = snapshot.get_cf(cf, &k); + (k, v) + }) + .filter(|&(_, ref v)| !(v.is_ok() && v.as_ref().unwrap().is_none())) + .map(|(k, v)| match v { + Ok(Some(v)) => { + stats.data.flow_stats.read_keys += 1; + stats.data.flow_stats.read_bytes += k.as_encoded().len() + v.len(); + Ok((k.into_encoded(), v)) + } + Err(e) => Err(Error::from(e)), + _ => unreachable!(), + }) + .collect(); + thread_ctx.collect_key_reads(CMD, stats.data.flow_stats.read_keys as u64); + thread_ctx.collect_read_flow(ctx.get_region_id(), &stats); + + timer.observe_duration(); + future::ok(result) + }); + future::result(res) + .map_err(|_| Error::SchedTooBusy) + .flatten() + }) } /// Write a raw key to the storage. @@ -1445,60 +1442,55 @@ impl Storage { let engine = self.get_engine(); let priority = readpool::Priority::from(ctx.get_priority()); - let res = self.read_pool.future_execute(priority, move |ctxd| { - let mut _timer = { - let ctxd = ctxd.clone(); - let mut thread_ctx = ctxd.current_thread_context_mut(); - thread_ctx.start_command_duration_timer(CMD, priority) - }; + let timer = SCHED_HISTOGRAM_VEC + .with_label_values(&[CMD]) + .start_coarse_timer(); - Self::async_snapshot(engine, &ctx) - .and_then(move |snapshot: E::Snap| { - let mut thread_ctx = ctxd.current_thread_context_mut(); - let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); - - let end_key = end_key.map(Key::from_encoded); - - let mut statistics = Statistics::default(); - let result = if reverse { - Self::reverse_raw_scan( - &snapshot, - &cf, - &Key::from_encoded(key), - end_key, - limit, - &mut statistics, - key_only, - ) - .map_err(Error::from) - } else { - Self::raw_scan( - &snapshot, - &cf, - &Key::from_encoded(key), - end_key, - limit, - &mut statistics, - key_only, - ) - .map_err(Error::from) - }; + let readpool = self.read_pool.clone(); - thread_ctx.collect_read_flow(ctx.get_region_id(), &statistics); - thread_ctx.collect_key_reads(CMD, statistics.write.flow_stats.read_keys as u64); - thread_ctx.collect_scan_count(CMD, &statistics); + Self::async_snapshot(engine, &ctx).and_then(move |snapshot: E::Snap| { + let res = readpool.future_execute(priority, move |ctxd| { + let mut thread_ctx = ctxd.current_thread_context_mut(); + let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); - result - }) - .then(move |r| { - _timer.observe_duration(); - r - }) - }); + let end_key = end_key.map(Key::from_encoded); - future::result(res) - .map_err(|_| Error::SchedTooBusy) - .flatten() + let mut statistics = Statistics::default(); + let result = if reverse { + Self::reverse_raw_scan( + &snapshot, + &cf, + &Key::from_encoded(key), + end_key, + limit, + &mut statistics, + key_only, + ) + .map_err(Error::from) + } else { + Self::raw_scan( + &snapshot, + &cf, + &Key::from_encoded(key), + end_key, + limit, + &mut statistics, + key_only, + ) + .map_err(Error::from) + }; + + thread_ctx.collect_read_flow(ctx.get_region_id(), &statistics); + thread_ctx.collect_key_reads(CMD, statistics.write.flow_stats.read_keys as u64); + thread_ctx.collect_scan_count(CMD, &statistics); + + timer.observe_duration(); + future::result(result) + }); + future::result(res) + .map_err(|_| Error::SchedTooBusy) + .flatten() + }) } /// Check the given raw kv CF name. Return the CF name, or `Err` if given CF name is invalid. @@ -1551,75 +1543,76 @@ impl Storage { let engine = self.get_engine(); let priority = readpool::Priority::from(ctx.get_priority()); - let res = self.read_pool.future_execute(priority, move |ctxd| { - let mut _timer = { - let ctxd = ctxd.clone(); - let mut thread_ctx = ctxd.current_thread_context_mut(); - thread_ctx.start_command_duration_timer(CMD, priority) - }; + let timer = SCHED_HISTOGRAM_VEC + .with_label_values(&[CMD]) + .start_coarse_timer(); - Self::async_snapshot(engine, &ctx) - .and_then(move |snapshot: E::Snap| { - let mut thread_ctx = ctxd.current_thread_context_mut(); - let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); + let readpool = self.read_pool.clone(); - let mut statistics = Statistics::default(); - if !Self::check_key_ranges(&ranges, reverse) { - return Err(box_err!("Invalid KeyRanges")); - }; - let mut result = Vec::new(); - let ranges_len = ranges.len(); - for i in 0..ranges_len { - let start_key = Key::from_encoded(ranges[i].take_start_key()); - let end_key = ranges[i].take_end_key(); - let end_key = if end_key.is_empty() { - if i + 1 == ranges_len { - None - } else { - Some(Key::from_encoded_slice(ranges[i + 1].get_start_key())) - } - } else { - Some(Key::from_encoded(end_key)) - }; - let pairs = if reverse { - Self::reverse_raw_scan( - &snapshot, - &cf, - &start_key, - end_key, - each_limit, - &mut statistics, - key_only, - )? + Self::async_snapshot(engine, &ctx).and_then(move |snapshot: E::Snap| { + let res = readpool.future_execute(priority, move |ctxd| { + let mut thread_ctx = ctxd.current_thread_context_mut(); + let _t_process = thread_ctx.start_processing_read_duration_timer(CMD); + + let mut statistics = Statistics::default(); + if !Self::check_key_ranges(&ranges, reverse) { + return future::result(Err(box_err!("Invalid KeyRanges"))); + }; + let mut result = Vec::new(); + let ranges_len = ranges.len(); + for i in 0..ranges_len { + let start_key = Key::from_encoded(ranges[i].take_start_key()); + let end_key = ranges[i].take_end_key(); + let end_key = if end_key.is_empty() { + if i + 1 == ranges_len { + None } else { - Self::raw_scan( - &snapshot, - &cf, - &start_key, - end_key, - each_limit, - &mut statistics, - key_only, - )? - }; - result.extend(pairs.into_iter()); - } - - thread_ctx.collect_read_flow(ctx.get_region_id(), &statistics); - thread_ctx.collect_key_reads(CMD, statistics.write.flow_stats.read_keys as u64); - thread_ctx.collect_scan_count(CMD, &statistics); + Some(Key::from_encoded_slice(ranges[i + 1].get_start_key())) + } + } else { + Some(Key::from_encoded(end_key)) + }; + let pairs = if reverse { + match Self::reverse_raw_scan( + &snapshot, + &cf, + &start_key, + end_key, + each_limit, + &mut statistics, + key_only, + ) { + Ok(x) => x, + Err(e) => return future::err(e), + } + } else { + match Self::raw_scan( + &snapshot, + &cf, + &start_key, + end_key, + each_limit, + &mut statistics, + key_only, + ) { + Ok(x) => x, + Err(e) => return future::err(e), + } + }; + result.extend(pairs.into_iter()); + } - Ok(result) - }) - .then(move |r| { - _timer.observe_duration(); - r - }) - }); + thread_ctx.collect_read_flow(ctx.get_region_id(), &statistics); + thread_ctx.collect_key_reads(CMD, statistics.write.flow_stats.read_keys as u64); + thread_ctx.collect_scan_count(CMD, &statistics); - future::result(res) - .map_err(|_| Error::SchedTooBusy) - .flatten() + timer.observe_duration(); + future::ok(result) + }); + future::result(res) + .map_err(|_| Error::SchedTooBusy) + .flatten() + }) } /// Get MVCC info of a transactional key.