From 4d39c5ecd7b3553de44c02c3f3b92ff714c2c37e Mon Sep 17 00:00:00 2001 From: qupeng Date: Wed, 13 May 2020 15:54:31 +0800 Subject: [PATCH] raft client: make grpc message size limit configurable (#7816) Signed-off-by: qupeng --- src/server/config.rs | 5 ++++ src/server/raft_client.rs | 32 ++++++++++++++-------- tests/integrations/config/mod.rs | 1 + tests/integrations/config/test-custom.toml | 1 + 4 files changed, 28 insertions(+), 11 deletions(-) diff --git a/src/server/config.rs b/src/server/config.rs index 93bbe9efd53..b8e7054bf5b 100644 --- a/src/server/config.rs +++ b/src/server/config.rs @@ -34,6 +34,8 @@ const DEFAULT_ENDPOINT_STREAM_BATCH_ROW_LIMIT: usize = 128; const DEFAULT_SNAP_MAX_BYTES_PER_SEC: u64 = 100 * 1024 * 1024; +const DEFAULT_MAX_GRPC_SEND_MSG_LEN: i32 = 10 * 1024 * 1024; + /// A clone of `grpc::CompressionAlgorithms` with serde supports. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] @@ -62,6 +64,8 @@ pub struct Config { pub status_addr: String, pub status_thread_pool_size: usize, + pub max_grpc_send_msg_len: i32, + // TODO: use CompressionAlgorithms instead once it supports traits like Clone etc. pub grpc_compression_type: GrpcCompressionType, pub grpc_concurrency: usize, @@ -122,6 +126,7 @@ impl Default for Config { advertise_addr: DEFAULT_ADVERTISE_LISTENING_ADDR.to_owned(), status_addr: DEFAULT_STATUS_ADDR.to_owned(), status_thread_pool_size: 1, + max_grpc_send_msg_len: DEFAULT_MAX_GRPC_SEND_MSG_LEN, grpc_compression_type: GrpcCompressionType::None, grpc_concurrency: DEFAULT_GRPC_CONCURRENCY, grpc_concurrent_stream: DEFAULT_GRPC_CONCURRENT_STREAM, diff --git a/src/server/raft_client.rs b/src/server/raft_client.rs index d7903c01dca..d3cbece0a66 100644 --- a/src/server/raft_client.rs +++ b/src/server/raft_client.rs @@ -24,8 +24,6 @@ use tikv_util::mpsc::batch::{self, BatchCollector, Sender as BatchSender}; use tikv_util::timer::GLOBAL_TIMER_HANDLE; use tokio_timer::timer::Handle; -const MAX_GRPC_RECV_MSG_LEN: i32 = 10 * 1024 * 1024; -const MAX_GRPC_SEND_MSG_LEN: i32 = 10 * 1024 * 1024; // When merge raft messages into a batch message, leave a buffer. const GRPC_SEND_MSG_BUF: usize = 64 * 1024; @@ -52,8 +50,7 @@ impl Conn { let cb = ChannelBuilder::new(env) .stream_initial_window_size(cfg.grpc_stream_initial_window_size.0 as i32) - .max_receive_message_len(MAX_GRPC_RECV_MSG_LEN) - .max_send_message_len(MAX_GRPC_SEND_MSG_LEN) + .max_send_message_len(cfg.max_grpc_send_msg_len) .keepalive_time(cfg.grpc_keepalive_time.0) .keepalive_timeout(cfg.grpc_keepalive_timeout.0) .default_compression_algorithm(cfg.grpc_compression_algorithm()) @@ -67,8 +64,12 @@ impl Conn { let client2 = client1.clone(); let (tx, rx) = batch::unbounded::(RAFT_MSG_NOTIFY_SIZE); - let rx = - batch::BatchReceiver::new(rx, RAFT_MSG_MAX_BATCH_SIZE, Vec::new, RaftMsgCollector(0)); + let rx = batch::BatchReceiver::new( + rx, + RAFT_MSG_MAX_BATCH_SIZE, + Vec::new, + RaftMsgCollector::new(cfg.max_grpc_send_msg_len as usize), + ); // Use a mutex to make compiler happy. let rx1 = Arc::new(Mutex::new(rx)); @@ -238,19 +239,28 @@ impl> RaftClient { } // Collect raft messages into a vector so that we can merge them into one message later. -// `MAX_GRPC_SEND_MSG_LEN` will be considered when collecting. -struct RaftMsgCollector(usize); +struct RaftMsgCollector { + size: usize, + limit: usize, +} + +impl RaftMsgCollector { + fn new(limit: usize) -> Self { + Self { size: 0, limit } + } +} + impl BatchCollector, RaftMessage> for RaftMsgCollector { fn collect(&mut self, v: &mut Vec, e: RaftMessage) -> Option { let mut msg_size = e.start_key.len() + e.end_key.len(); for entry in e.get_message().get_entries() { msg_size += entry.data.len(); } - if self.0 > 0 && self.0 + msg_size + GRPC_SEND_MSG_BUF >= MAX_GRPC_SEND_MSG_LEN as usize { - self.0 = 0; + if self.size > 0 && self.size + msg_size + GRPC_SEND_MSG_BUF >= self.limit as usize { + self.size = 0; return Some(e); } - self.0 += msg_size; + self.size += msg_size; v.push(e); None } diff --git a/tests/integrations/config/mod.rs b/tests/integrations/config/mod.rs index 93065953c9e..c25fe3204c5 100644 --- a/tests/integrations/config/mod.rs +++ b/tests/integrations/config/mod.rs @@ -63,6 +63,7 @@ fn test_serde_custom_tikv_config() { advertise_addr: "example.com:443".to_owned(), status_addr: "example.com:443".to_owned(), status_thread_pool_size: 1, + max_grpc_send_msg_len: 6 * (1 << 20), concurrent_send_snap_limit: 4, concurrent_recv_snap_limit: 4, grpc_compression_type: GrpcCompressionType::Gzip, diff --git a/tests/integrations/config/test-custom.toml b/tests/integrations/config/test-custom.toml index 4d46391f978..e04725d3b0c 100644 --- a/tests/integrations/config/test-custom.toml +++ b/tests/integrations/config/test-custom.toml @@ -36,6 +36,7 @@ addr = "example.com:443" advertise-addr = "example.com:443" status-addr = "example.com:443" status-thread-pool-size = 1 +max-grpc-send-msg-len = 6291456 grpc-compression-type = "gzip" grpc-concurrency = 123 grpc-concurrent-stream = 1234