Skip to content

Commit

Permalink
resource_control: do not force set override priority at handle gRPC r…
Browse files Browse the repository at this point in the history
…equest (#16003)

close #15994

Signed-off-by: glorv <glorvs@163.com>
  • Loading branch information
glorv committed Nov 16, 2023
1 parent 6a61880 commit a0dbe2d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 39 deletions.
21 changes: 20 additions & 1 deletion components/resource_control/src/resource_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ impl TaskPriority {
impl From<u32> for TaskPriority {
fn from(value: u32) -> Self {
// map the resource group priority value (1,8,16) to (Low,Medium,High)
if value < 6 {
// 0 means the priority is not set, so map it to medium by default.
if value == 0 {
Self::Medium
} else if value < 6 {
Self::Low
} else if value < 11 {
Self::Medium
Expand Down Expand Up @@ -1430,4 +1433,20 @@ pub(crate) mod tests {
&mgr.priority_limiters[1]
));
}

#[test]
fn test_task_priority() {
use TaskPriority::*;
let cases = [
(0, Medium),
(1, Low),
(7, Medium),
(8, Medium),
(15, High),
(16, High),
];
for (value, priority) in cases {
assert_eq!(TaskPriority::from(value), priority);
}
}
}
62 changes: 24 additions & 38 deletions src/server/service/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,14 @@ macro_rules! handle_request {
handle_request!($fn_name, $future_name, $req_ty, $resp_ty, no_time_detail);
};
($fn_name: ident, $future_name: ident, $req_ty: ident, $resp_ty: ident, $time_detail: tt) => {
fn $fn_name(&mut self, ctx: RpcContext<'_>, mut req: $req_ty, sink: UnarySink<$resp_ty>) {
fn $fn_name(&mut self, ctx: RpcContext<'_>, req: $req_ty, sink: UnarySink<$resp_ty>) {
forward_unary!(self.proxy, $fn_name, ctx, req, sink);
let begin_instant = Instant::now();

let source = req.get_context().get_request_source().to_owned();
let resource_control_ctx = req.mut_context().mut_resource_control_context();
let resource_control_ctx = req.get_context().get_resource_control_context();
if let Some(resource_manager) = &self.resource_manager {
consume_penalty_and_set_priority(resource_manager, resource_control_ctx);
resource_manager.consume_penalty(resource_control_ctx);
}
GRPC_RESOURCE_GROUP_COUNTER_VEC
.with_label_values(&[resource_control_ctx.get_resource_group_name()])
Expand Down Expand Up @@ -229,20 +229,6 @@ macro_rules! handle_request {
}
}

// consume resource group penalty and set explicit group priority
// We override the override_priority here to make handling tasks easier.
fn consume_penalty_and_set_priority(
resource_manager: &Arc<ResourceGroupManager>,
resource_control_ctx: &mut ResourceControlContext,
) {
resource_manager.consume_penalty(resource_control_ctx);
if resource_control_ctx.get_override_priority() == 0 {
let prioirty = resource_manager
.get_resource_group_priority(resource_control_ctx.get_resource_group_name());
resource_control_ctx.override_priority = prioirty as u64;
}
}

macro_rules! set_total_time {
($resp:ident, $duration:expr,no_time_detail) => {};
($resp:ident, $duration:expr,has_time_detail) => {
Expand Down Expand Up @@ -490,12 +476,12 @@ impl<E: Engine, L: LockManager, F: KvFormat> Tikv for Service<E, L, F> {
ctx.spawn(task);
}

fn coprocessor(&mut self, ctx: RpcContext<'_>, mut req: Request, sink: UnarySink<Response>) {
fn coprocessor(&mut self, ctx: RpcContext<'_>, req: Request, sink: UnarySink<Response>) {
forward_unary!(self.proxy, coprocessor, ctx, req, sink);
let source = req.get_context().get_request_source().to_owned();
let resource_control_ctx = req.mut_context().mut_resource_control_context();
let resource_control_ctx = req.get_context().get_resource_control_context();
if let Some(resource_manager) = &self.resource_manager {
consume_penalty_and_set_priority(resource_manager, resource_control_ctx);
resource_manager.consume_penalty(resource_control_ctx);
}
GRPC_RESOURCE_GROUP_COUNTER_VEC
.with_label_values(&[resource_control_ctx.get_resource_group_name()])
Expand Down Expand Up @@ -527,13 +513,13 @@ impl<E: Engine, L: LockManager, F: KvFormat> Tikv for Service<E, L, F> {
fn raw_coprocessor(
&mut self,
ctx: RpcContext<'_>,
mut req: RawCoprocessorRequest,
req: RawCoprocessorRequest,
sink: UnarySink<RawCoprocessorResponse>,
) {
let source = req.get_context().get_request_source().to_owned();
let resource_control_ctx = req.mut_context().mut_resource_control_context();
let resource_control_ctx = req.get_context().get_resource_control_context();
if let Some(resource_manager) = &self.resource_manager {
consume_penalty_and_set_priority(resource_manager, resource_control_ctx);
resource_manager.consume_penalty(resource_control_ctx);
}
GRPC_RESOURCE_GROUP_COUNTER_VEC
.with_label_values(&[resource_control_ctx.get_resource_group_name()])
Expand Down Expand Up @@ -616,13 +602,13 @@ impl<E: Engine, L: LockManager, F: KvFormat> Tikv for Service<E, L, F> {
fn coprocessor_stream(
&mut self,
ctx: RpcContext<'_>,
mut req: Request,
req: Request,
mut sink: ServerStreamingSink<Response>,
) {
let begin_instant = Instant::now();
let resource_control_ctx = req.mut_context().mut_resource_control_context();
let resource_control_ctx = req.get_context().get_resource_control_context();
if let Some(resource_manager) = &self.resource_manager {
consume_penalty_and_set_priority(resource_manager, resource_control_ctx);
resource_manager.consume_penalty(resource_control_ctx);
}
GRPC_RESOURCE_GROUP_COUNTER_VEC
.with_label_values(&[resource_control_ctx.get_resource_group_name()])
Expand Down Expand Up @@ -1162,10 +1148,10 @@ fn handle_batch_commands_request<E: Engine, L: LockManager, F: KvFormat>(
let resp = future::ok(batch_commands_response::Response::default());
response_batch_commands_request(id, resp, tx.clone(), begin_instant, GrpcTypeKind::invalid, String::default());
},
Some(batch_commands_request::request::Cmd::Get(mut req)) => {
let resource_control_ctx = req.mut_context().mut_resource_control_context();
Some(batch_commands_request::request::Cmd::Get(req)) => {
let resource_control_ctx = req.get_context().get_resource_control_context();
if let Some(resource_manager) = resource_manager {
consume_penalty_and_set_priority(resource_manager, resource_control_ctx);
resource_manager.consume_penalty(resource_control_ctx);
}
GRPC_RESOURCE_GROUP_COUNTER_VEC
.with_label_values(&[resource_control_ctx.get_resource_group_name()])
Expand All @@ -1183,10 +1169,10 @@ fn handle_batch_commands_request<E: Engine, L: LockManager, F: KvFormat>(
response_batch_commands_request(id, resp, tx.clone(), begin_instant, GrpcTypeKind::kv_get, source);
}
},
Some(batch_commands_request::request::Cmd::RawGet(mut req)) => {
let resource_control_ctx = req.mut_context().mut_resource_control_context();
Some(batch_commands_request::request::Cmd::RawGet(req)) => {
let resource_control_ctx = req.get_context().get_resource_control_context();
if let Some(resource_manager) = resource_manager {
consume_penalty_and_set_priority(resource_manager, resource_control_ctx);
resource_manager.consume_penalty(resource_control_ctx);
}
GRPC_RESOURCE_GROUP_COUNTER_VEC
.with_label_values(&[resource_control_ctx.get_resource_group_name()])
Expand All @@ -1204,10 +1190,10 @@ fn handle_batch_commands_request<E: Engine, L: LockManager, F: KvFormat>(
response_batch_commands_request(id, resp, tx.clone(), begin_instant, GrpcTypeKind::raw_get, source);
}
},
Some(batch_commands_request::request::Cmd::Coprocessor(mut req)) => {
let resource_control_ctx = req.mut_context().mut_resource_control_context();
Some(batch_commands_request::request::Cmd::Coprocessor(req)) => {
let resource_control_ctx = req.get_context().get_resource_control_context();
if let Some(resource_manager) = resource_manager {
consume_penalty_and_set_priority(resource_manager, resource_control_ctx);
resource_manager.consume_penalty(resource_control_ctx);
}
GRPC_RESOURCE_GROUP_COUNTER_VEC
.with_label_values(&[resource_control_ctx.get_resource_group_name()])
Expand Down Expand Up @@ -1238,10 +1224,10 @@ fn handle_batch_commands_request<E: Engine, L: LockManager, F: KvFormat>(
String::default(),
);
}
$(Some(batch_commands_request::request::Cmd::$cmd(mut req)) => {
let resource_control_ctx = req.mut_context().mut_resource_control_context();
$(Some(batch_commands_request::request::Cmd::$cmd(req)) => {
let resource_control_ctx = req.get_context().get_resource_control_context();
if let Some(resource_manager) = resource_manager {
consume_penalty_and_set_priority(resource_manager, resource_control_ctx);
resource_manager.consume_penalty(resource_control_ctx);
}
GRPC_RESOURCE_GROUP_COUNTER_VEC
.with_label_values(&[resource_control_ctx.get_resource_group_name()])
Expand Down

0 comments on commit a0dbe2d

Please sign in to comment.