Skip to content

Commit

Permalink
raft client: make grpc message size limit configurable (tikv#7816) (t…
Browse files Browse the repository at this point in the history
…ikv#7823)

Signed-off-by: qupeng <qupeng@pingcap.com>

Co-authored-by: qupeng <qupeng@pingcap.com>
  • Loading branch information
sre-bot and hicqu committed May 29, 2020
1 parent 20f58dc commit 275904e
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 11 deletions.
5 changes: 5 additions & 0 deletions src/server/config.rs
Expand Up @@ -32,6 +32,8 @@ const DEFAULT_ENDPOINT_REQUEST_MAX_HANDLE_SECS: u64 = 60;
// Number of rows in each chunk for streaming coprocessor.
const DEFAULT_ENDPOINT_STREAM_BATCH_ROW_LIMIT: usize = 128;

const DEFAULT_MAX_GRPC_SEND_MSG_LEN: i32 = 10 * 1024 * 1024;

/// A clone of `grpc::CompressionAlgorithms` with serde supports.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
Expand Down Expand Up @@ -60,6 +62,8 @@ pub struct Config {
pub status_addr: String,
pub status_thread_pool_size: usize,

pub max_grpc_send_msg_len: i32,

// TODO: use CompressionAlgorithms instead once it supports traits like Clone etc.
pub grpc_compression_type: GrpcCompressionType,
pub grpc_concurrency: usize,
Expand Down Expand Up @@ -113,6 +117,7 @@ impl Default for Config {
advertise_addr: DEFAULT_ADVERTISE_LISTENING_ADDR.to_owned(),
status_addr: DEFAULT_STATUS_ADDR.to_owned(),
status_thread_pool_size: 1,
max_grpc_send_msg_len: DEFAULT_MAX_GRPC_SEND_MSG_LEN,
grpc_compression_type: GrpcCompressionType::None,
grpc_concurrency: DEFAULT_GRPC_CONCURRENCY,
grpc_concurrent_stream: DEFAULT_GRPC_CONCURRENT_STREAM,
Expand Down
32 changes: 21 additions & 11 deletions src/server/raft_client.rs
Expand Up @@ -25,8 +25,6 @@ use tikv_util::security::SecurityManager;
use tikv_util::timer::GLOBAL_TIMER_HANDLE;
use tokio_timer::timer::Handle;

const MAX_GRPC_RECV_MSG_LEN: i32 = 10 * 1024 * 1024;
const MAX_GRPC_SEND_MSG_LEN: i32 = 10 * 1024 * 1024;
// When merge raft messages into a batch message, leave a buffer.
const GRPC_SEND_MSG_BUF: usize = 64 * 1024;

Expand All @@ -53,8 +51,7 @@ impl Conn {

let cb = ChannelBuilder::new(env)
.stream_initial_window_size(cfg.grpc_stream_initial_window_size.0 as i32)
.max_receive_message_len(MAX_GRPC_RECV_MSG_LEN)
.max_send_message_len(MAX_GRPC_SEND_MSG_LEN)
.max_send_message_len(cfg.max_grpc_send_msg_len)
.keepalive_time(cfg.grpc_keepalive_time.0)
.keepalive_timeout(cfg.grpc_keepalive_timeout.0)
.default_compression_algorithm(cfg.grpc_compression_algorithm())
Expand All @@ -68,8 +65,12 @@ impl Conn {
let client2 = client1.clone();

let (tx, rx) = batch::unbounded::<RaftMessage>(RAFT_MSG_NOTIFY_SIZE);
let rx =
batch::BatchReceiver::new(rx, RAFT_MSG_MAX_BATCH_SIZE, Vec::new, RaftMsgCollector(0));
let rx = batch::BatchReceiver::new(
rx,
RAFT_MSG_MAX_BATCH_SIZE,
Vec::new,
RaftMsgCollector::new(cfg.max_grpc_send_msg_len as usize),
);

// Use a mutex to make compiler happy.
let rx1 = Arc::new(Mutex::new(rx));
Expand Down Expand Up @@ -239,19 +240,28 @@ impl<T: RaftStoreRouter> RaftClient<T> {
}

// Collect raft messages into a vector so that we can merge them into one message later.
// `MAX_GRPC_SEND_MSG_LEN` will be considered when collecting.
struct RaftMsgCollector(usize);
struct RaftMsgCollector {
size: usize,
limit: usize,
}

impl RaftMsgCollector {
fn new(limit: usize) -> Self {
Self { size: 0, limit }
}
}

impl BatchCollector<Vec<RaftMessage>, RaftMessage> for RaftMsgCollector {
fn collect(&mut self, v: &mut Vec<RaftMessage>, e: RaftMessage) -> Option<RaftMessage> {
let mut msg_size = e.start_key.len() + e.end_key.len();
for entry in e.get_message().get_entries() {
msg_size += entry.data.len();
}
if self.0 > 0 && self.0 + msg_size + GRPC_SEND_MSG_BUF >= MAX_GRPC_SEND_MSG_LEN as usize {
self.0 = 0;
if self.size > 0 && self.size + msg_size + GRPC_SEND_MSG_BUF >= self.limit as usize {
self.size = 0;
return Some(e);
}
self.0 += msg_size;
self.size += msg_size;
v.push(e);
None
}
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/mod.rs
Expand Up @@ -54,6 +54,7 @@ fn test_serde_custom_tikv_config() {
advertise_addr: "example.com:443".to_owned(),
status_addr: "example.com:443".to_owned(),
status_thread_pool_size: 1,
max_grpc_send_msg_len: 6 * (1 << 20),
concurrent_send_snap_limit: 4,
concurrent_recv_snap_limit: 4,
grpc_compression_type: GrpcCompressionType::Gzip,
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/test-custom.toml
Expand Up @@ -26,6 +26,7 @@ addr = "example.com:443"
advertise-addr = "example.com:443"
status-addr = "example.com:443"
status-thread-pool-size = 1
max-grpc-send-msg-len = 6291456
grpc-compression-type = "gzip"
grpc-concurrency = 123
grpc-concurrent-stream = 1234
Expand Down

0 comments on commit 275904e

Please sign in to comment.