Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: multi conns to send raft messages. #1921

Merged
merged 7 commits into from Jun 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions etc/config-template.toml
Expand Up @@ -25,6 +25,8 @@ messages-per-tick = 4096
# grpc-concurrency = 2
# The number of max concurrent streams/requests on a client connection.
# grpc-concurrent-stream = 1024
# The number of connections with each tikv server to send raft messages.
# grpc-raft-conn-num = 10
# Amount to read ahead on individual grpc streams.
# grpc-stream-initial-window-size = "2MB"

Expand Down
3 changes: 3 additions & 0 deletions src/bin/tikv-server.rs
Expand Up @@ -631,6 +631,9 @@ fn build_cfg(matches: &ArgMatches,
cfg_usize(&mut cfg.grpc_concurrent_stream,
config,
"server.grpc-concurrent-stream");
cfg_usize(&mut cfg.grpc_raft_conn_num,
config,
"server.grpc-raft-conn-num");
cfg_usize(&mut cfg.grpc_stream_initial_window_size,
config,
"server.grpc-stream-initial-window-size");
Expand Down
3 changes: 3 additions & 0 deletions src/server/config.rs
Expand Up @@ -25,6 +25,7 @@ const DEFAULT_NOTIFY_CAPACITY: usize = 40960;
const DEFAULT_END_POINT_CONCURRENCY: usize = 8;
const DEFAULT_GRPC_CONCURRENCY: usize = 2;
const DEFAULT_GRPC_CONCURRENT_STREAM: usize = 1024;
const DEFAULT_GRPC_RAFT_CONN_NUM: usize = 10;
const DEFAULT_GRPC_STREAM_INITIAL_WINDOW_SIZE: usize = 2 * 1024 * 1024;
const DEFAULT_END_POINT_TXN_CONCURRENCY_RATIO: f64 = 0.25;
const DEFAULT_END_POINT_SMALL_TXN_TASKS_LIMIT: usize = 2;
Expand All @@ -47,6 +48,7 @@ pub struct Config {
pub messages_per_tick: usize,
pub grpc_concurrency: usize,
pub grpc_concurrent_stream: usize,
pub grpc_raft_conn_num: usize,
pub grpc_stream_initial_window_size: usize,
pub storage: StorageConfig,
pub raft_store: RaftStoreConfig,
Expand All @@ -66,6 +68,7 @@ impl Default for Config {
messages_per_tick: DEFAULT_MESSAGES_PER_TICK,
grpc_concurrency: DEFAULT_GRPC_CONCURRENCY,
grpc_concurrent_stream: DEFAULT_GRPC_CONCURRENT_STREAM,
grpc_raft_conn_num: DEFAULT_GRPC_RAFT_CONN_NUM,
grpc_stream_initial_window_size: DEFAULT_GRPC_STREAM_INITIAL_WINDOW_SIZE,
end_point_concurrency: DEFAULT_END_POINT_CONCURRENCY,
end_point_txn_concurrency_on_busy: usize::default(),
Expand Down
11 changes: 6 additions & 5 deletions src/server/raft_client.rs
Expand Up @@ -64,7 +64,7 @@ impl Conn {
/// `RaftClient` is used for sending raft messages to other stores.
pub struct RaftClient {
env: Arc<Environment>,
conns: HashMap<SocketAddr, Conn>,
conns: HashMap<(SocketAddr, usize), Conn>,
cfg: Config,
}

Expand All @@ -77,24 +77,25 @@ impl RaftClient {
}
}

fn get_conn(&mut self, addr: SocketAddr) -> &Conn {
fn get_conn(&mut self, addr: SocketAddr, index: usize) -> &Conn {
let env = self.env.clone();
let cfg = self.cfg.clone();
self.conns
.entry(addr)
.entry((addr, index))
.or_insert_with(|| Conn::new(env, addr, &cfg))
}

pub fn send(&mut self, addr: SocketAddr, msg: RaftMessage) -> Result<()> {
let index = msg.get_region_id() as usize % self.cfg.grpc_raft_conn_num;
let res = {
let conn = self.get_conn(addr);
let conn = self.get_conn(addr, index);
UnboundedSender::send(&conn.stream, (msg, WriteFlags::default()))
};
if let Err(e) = res {
warn!("server: drop conn with tikv endpoint {} error: {:?}",
addr,
e);
self.conns.remove(&addr);
self.conns.remove(&(addr, index));
return Err(box_err!(e));
}
Ok(())
Expand Down