Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft client: make grpc message size limit configurable (#7816) #7824

Merged
merged 2 commits into from May 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/server/config.rs
Expand Up @@ -34,6 +34,8 @@ const DEFAULT_ENDPOINT_STREAM_BATCH_ROW_LIMIT: usize = 128;

const DEFAULT_SNAP_MAX_BYTES_PER_SEC: u64 = 100 * 1024 * 1024;

const DEFAULT_MAX_GRPC_SEND_MSG_LEN: i32 = 10 * 1024 * 1024;

/// A clone of `grpc::CompressionAlgorithms` with serde supports.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
Expand Down Expand Up @@ -62,6 +64,8 @@ pub struct Config {
pub status_addr: String,
pub status_thread_pool_size: usize,

pub max_grpc_send_msg_len: i32,

// TODO: use CompressionAlgorithms instead once it supports traits like Clone etc.
pub grpc_compression_type: GrpcCompressionType,
pub grpc_concurrency: usize,
Expand Down Expand Up @@ -122,6 +126,7 @@ impl Default for Config {
advertise_addr: DEFAULT_ADVERTISE_LISTENING_ADDR.to_owned(),
status_addr: DEFAULT_STATUS_ADDR.to_owned(),
status_thread_pool_size: 1,
max_grpc_send_msg_len: DEFAULT_MAX_GRPC_SEND_MSG_LEN,
grpc_compression_type: GrpcCompressionType::None,
grpc_concurrency: DEFAULT_GRPC_CONCURRENCY,
grpc_concurrent_stream: DEFAULT_GRPC_CONCURRENT_STREAM,
Expand Down
32 changes: 21 additions & 11 deletions src/server/raft_client.rs
Expand Up @@ -23,8 +23,6 @@ use tikv_util::mpsc::batch::{self, BatchCollector, Sender as BatchSender};
use tikv_util::timer::GLOBAL_TIMER_HANDLE;
use tokio_timer::timer::Handle;

const MAX_GRPC_RECV_MSG_LEN: i32 = 10 * 1024 * 1024;
const MAX_GRPC_SEND_MSG_LEN: i32 = 10 * 1024 * 1024;
// When merge raft messages into a batch message, leave a buffer.
const GRPC_SEND_MSG_BUF: usize = 64 * 1024;

Expand All @@ -51,8 +49,7 @@ impl Conn {

let cb = ChannelBuilder::new(env)
.stream_initial_window_size(cfg.grpc_stream_initial_window_size.0 as i32)
.max_receive_message_len(MAX_GRPC_RECV_MSG_LEN)
.max_send_message_len(MAX_GRPC_SEND_MSG_LEN)
.max_send_message_len(cfg.max_grpc_send_msg_len)
.keepalive_time(cfg.grpc_keepalive_time.0)
.keepalive_timeout(cfg.grpc_keepalive_timeout.0)
.default_compression_algorithm(cfg.grpc_compression_algorithm())
Expand All @@ -66,8 +63,12 @@ impl Conn {
let client2 = client1.clone();

let (tx, rx) = batch::unbounded::<RaftMessage>(RAFT_MSG_NOTIFY_SIZE);
let rx =
batch::BatchReceiver::new(rx, RAFT_MSG_MAX_BATCH_SIZE, Vec::new, RaftMsgCollector(0));
let rx = batch::BatchReceiver::new(
rx,
RAFT_MSG_MAX_BATCH_SIZE,
Vec::new,
RaftMsgCollector::new(cfg.max_grpc_send_msg_len as usize),
);

// Use a mutex to make compiler happy.
let rx1 = Arc::new(Mutex::new(rx));
Expand Down Expand Up @@ -237,19 +238,28 @@ impl<T: RaftStoreRouter> RaftClient<T> {
}

// Collect raft messages into a vector so that we can merge them into one message later.
// `MAX_GRPC_SEND_MSG_LEN` will be considered when collecting.
struct RaftMsgCollector(usize);
struct RaftMsgCollector {
size: usize,
limit: usize,
}

impl RaftMsgCollector {
fn new(limit: usize) -> Self {
Self { size: 0, limit }
}
}

impl BatchCollector<Vec<RaftMessage>, RaftMessage> for RaftMsgCollector {
fn collect(&mut self, v: &mut Vec<RaftMessage>, e: RaftMessage) -> Option<RaftMessage> {
let mut msg_size = e.start_key.len() + e.end_key.len();
for entry in e.get_message().get_entries() {
msg_size += entry.data.len();
}
if self.0 > 0 && self.0 + msg_size + GRPC_SEND_MSG_BUF >= MAX_GRPC_SEND_MSG_LEN as usize {
self.0 = 0;
if self.size > 0 && self.size + msg_size + GRPC_SEND_MSG_BUF >= self.limit as usize {
self.size = 0;
return Some(e);
}
self.0 += msg_size;
self.size += msg_size;
v.push(e);
None
}
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/mod.rs
Expand Up @@ -63,6 +63,7 @@ fn test_serde_custom_tikv_config() {
advertise_addr: "example.com:443".to_owned(),
status_addr: "example.com:443".to_owned(),
status_thread_pool_size: 1,
max_grpc_send_msg_len: 6 * (1 << 20),
concurrent_send_snap_limit: 4,
concurrent_recv_snap_limit: 4,
grpc_compression_type: GrpcCompressionType::Gzip,
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/config/test-custom.toml
Expand Up @@ -36,6 +36,7 @@ addr = "example.com:443"
advertise-addr = "example.com:443"
status-addr = "example.com:443"
status-thread-pool-size = 1
max-grpc-send-msg-len = 6291456
grpc-compression-type = "gzip"
grpc-concurrency = 123
grpc-concurrent-stream = 1234
Expand Down