Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: support batch grpc messages. #4043

Merged
merged 16 commits into from
Jan 12, 2019
Merged
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use prometheus_static_metric::*;

make_static_metric! {
pub label_enum GrpcTypeKind {
invalid,
kv_get,
kv_scan,
kv_prewrite,
Expand Down Expand Up @@ -75,6 +76,16 @@ lazy_static! {
"Total number of handle grpc message failure",
&["type"]
).unwrap();
pub static ref GRPC_REQ_BATCH_COMMANDS_SIZE: Histogram = register_histogram!(
"tikv_server_grpc_req_batch_size",
"grpc batch size of gRPC requests",
exponential_buckets(1f64, 2f64, 10).unwrap()
).unwrap();
pub static ref GRPC_RESP_BATCH_COMMANDS_SIZE: Histogram = register_histogram!(
"tikv_server_grpc_resp_batch_size",
"grpc batch size of gRPC responses",
exponential_buckets(1f64, 2f64, 10).unwrap()
).unwrap();
pub static ref RAFT_MESSAGE_RECV_COUNTER: IntCounter = register_int_counter!(
"tikv_server_raft_message_recv_total",
"Total number of raft messages received"
Expand Down
11 changes: 10 additions & 1 deletion src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,15 @@ impl<T: RaftStoreRouter, S: StoreAddrResolver + 'static> Server<T, S> {
);
let snap_worker = Worker::new("snap-handler");

let kv_service = KvService::new(storage, cop, raft_router.clone(), snap_worker.scheduler());
let kv_service = KvService::new(
storage,
cop,
raft_router.clone(),
snap_worker.scheduler(),
Arc::clone(&stats_runtime),
Arc::clone(&thread_load),
);

let addr = SocketAddr::from_str(&cfg.addr)?;
info!("listening on {}", addr);
let ip = format!("{}", addr.ip());
Expand All @@ -107,6 +115,7 @@ impl<T: RaftStoreRouter, S: StoreAddrResolver + 'static> Server<T, S> {
.max_concurrent_stream(cfg.grpc_concurrent_stream)
.max_receive_message_len(MAX_GRPC_RECV_MSG_LEN)
.max_send_message_len(-1)
.http2_max_ping_strikes(i32::MAX) // For pings without data from clients.
.build_args();
let grpc_server = {
let mut sb = ServerBuilder::new(Arc::clone(&env))
Expand Down
Loading