Skip to content

Commit

Permalink
Supports reuseaddr, reuseport, and configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Aug 1, 2023
1 parent e9b6b22 commit db895e5
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 8 deletions.
8 changes: 8 additions & 0 deletions rmqtt-bin/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ async fn listen(name: String, listen_cfg: &Listener) -> Result<()> {
.workers(listen_cfg.workers)
.maxconn(listen_cfg.max_connections / listen_cfg.workers)
.backlog(listen_cfg.backlog)
.reuseaddr(listen_cfg.reuseaddr)
.reuseport(listen_cfg.reuseport)
.run()
.await?;
Ok(())
Expand Down Expand Up @@ -300,6 +302,8 @@ async fn listen_tls(name: String, listen_cfg: &Listener) -> Result<()> {
.workers(listen_cfg.workers)
.maxconn(listen_cfg.max_connections / listen_cfg.workers)
.backlog(listen_cfg.backlog)
.reuseaddr(listen_cfg.reuseaddr)
.reuseport(listen_cfg.reuseport)
.run()
.await?;
Ok(())
Expand Down Expand Up @@ -403,6 +407,8 @@ async fn listen_ws(name: String, listen_cfg: &Listener) -> Result<()> {
.workers(listen_cfg.workers)
.maxconn(listen_cfg.max_connections / listen_cfg.workers)
.backlog(listen_cfg.backlog)
.reuseaddr(listen_cfg.reuseaddr)
.reuseport(listen_cfg.reuseport)
.run()
.await?;
Ok(())
Expand Down Expand Up @@ -516,6 +522,8 @@ async fn listen_wss(name: String, listen_cfg: &Listener) -> Result<()> {
.workers(listen_cfg.workers)
.maxconn(listen_cfg.max_connections / listen_cfg.workers)
.backlog(listen_cfg.backlog)
.reuseaddr(listen_cfg.reuseaddr)
.reuseport(listen_cfg.reuseport)
.run()
.await?;
Ok(())
Expand Down
2 changes: 2 additions & 0 deletions rmqtt-plugins/rmqtt-cluster-raft.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ try_lock_timeout = "10s"
task_exec_queue_workers = 500
task_exec_queue_max = 100_000

raft.grpc_reuseaddr = true
raft.grpc_reuseport = true
raft.grpc_timeout = "6s"
raft.grpc_concurrency_limit = 200
raft.grpc_breaker_threshold = 5
Expand Down
5 changes: 4 additions & 1 deletion rmqtt-plugins/rmqtt-cluster-raft/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ impl PluginConfig {

#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct RaftConfig {
pub grpc_reuseaddr: bool,
pub grpc_reuseport: bool,
#[serde(default, deserialize_with = "deserialize_duration_option")]
pub grpc_timeout: Option<Duration>,
pub grpc_concurrency_limit: Option<usize>,
Expand Down Expand Up @@ -162,7 +164,8 @@ pub struct RaftConfig {
impl RaftConfig {
pub(crate) fn to_raft_config(&self) -> rmqtt_raft::Config {
let mut cfg = rmqtt_raft::Config { ..Default::default() };

cfg.reuseaddr = self.grpc_reuseaddr;
cfg.reuseport = self.grpc_reuseport;
if let Some(grpc_timeout) = self.grpc_timeout {
cfg.grpc_timeout = grpc_timeout;
}
Expand Down
46 changes: 39 additions & 7 deletions rmqtt/src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,26 @@ impl Server {

pub(crate) async fn listen_and_serve(&self) -> Result<()> {
//start grpc server
let addr = Runtime::instance().settings.rpc.server_addr;

let rpccfg = Runtime::instance().settings.rpc.clone();

//NodeServiceServer::with_interceptor(RmqttNodeService::default(), Self::check_auth)

log::info!("grpc server is listening on tcp://{:?}", addr);
transport::Server::builder()
.add_service(NodeServiceServer::new(NodeGrpcService::default()))
.serve(addr)
.await
.map_err(anyhow::Error::new)?;
log::info!(
"grpc server is listening on tcp://{:?}, reuseaddr: {}, reuseport: {}",
rpccfg.server_addr,
rpccfg.reuseaddr,
rpccfg.reuseport
);
let server =
transport::Server::builder().add_service(NodeServiceServer::new(NodeGrpcService::default()));

if rpccfg.reuseaddr || rpccfg.reuseport {
let listener = Self::bind(rpccfg.server_addr, 1024, rpccfg.reuseaddr, rpccfg.reuseport)?;
server.serve_with_incoming(listener).await?;
} else {
server.serve(rpccfg.server_addr).await?;
}
Ok(())
}

Expand All @@ -43,6 +53,28 @@ impl Server {
// _ => Err(Status::unauthenticated("No valid auth token")),
// }
// }

#[inline]
pub(crate) fn bind(
laddr: std::net::SocketAddr,
backlog: i32,
_reuseaddr: bool,
_reuseport: bool,
) -> Result<tokio_stream::wrappers::TcpListenerStream> {
use socket2::{Domain, SockAddr, Socket, Type};
let builder = Socket::new(Domain::for_address(laddr), Type::STREAM, None)?;
builder.set_nonblocking(true)?;
#[cfg(unix)]
builder.set_reuse_address(_reuseaddr)?;
#[cfg(unix)]
builder.set_reuse_port(_reuseport)?;
builder.bind(&SockAddr::from(laddr))?;
builder.listen(backlog)?;
let listener = tokio_stream::wrappers::TcpListenerStream::new(tokio::net::TcpListener::from_std(
std::net::TcpListener::from(builder),
)?);
Ok(listener)
}
}

#[derive(Debug, Default)]
Expand Down
14 changes: 14 additions & 0 deletions rmqtt/src/settings/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ pub struct ListenerInner {
pub max_packet_size: Bytesize,
#[serde(default = "ListenerInner::backlog_default")]
pub backlog: i32,
#[serde(default = "ListenerInner::reuseaddr_default")]
pub reuseaddr: Option<bool>,
#[serde(default = "ListenerInner::reuseport_default")]
pub reuseport: Option<bool>,
#[serde(default = "ListenerInner::idle_timeout_default", deserialize_with = "deserialize_duration")]
pub idle_timeout: Duration,
#[serde(default = "ListenerInner::allow_anonymous_default")]
Expand Down Expand Up @@ -236,6 +240,8 @@ impl Default for ListenerInner {
max_connections: ListenerInner::max_connections_default(),
max_handshaking_limit: ListenerInner::max_handshaking_limit_default(),
max_packet_size: ListenerInner::max_packet_size_default(),
reuseaddr: ListenerInner::reuseaddr_default(),
reuseport: ListenerInner::reuseport_default(),
backlog: ListenerInner::backlog_default(),
idle_timeout: ListenerInner::idle_timeout_default(),
allow_anonymous: ListenerInner::allow_anonymous_default(),
Expand Down Expand Up @@ -287,6 +293,14 @@ impl ListenerInner {
Bytesize(1024 * 1024)
}
#[inline]
fn reuseaddr_default() -> Option<bool> {
Some(true)
}
#[inline]
fn reuseport_default() -> Option<bool> {
None
}
#[inline]
fn backlog_default() -> i32 {
1024
}
Expand Down
8 changes: 8 additions & 0 deletions rmqtt/src/settings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ pub struct Rpc {
#[serde(default = "Rpc::server_addr_default", deserialize_with = "deserialize_addr")]
pub server_addr: SocketAddr,

#[serde(default)]
pub reuseaddr: bool,

#[serde(default)]
pub reuseport: bool,

#[serde(default = "Rpc::server_workers_default")]
pub server_workers: usize,

Expand All @@ -164,6 +170,8 @@ impl Default for Rpc {
#[inline]
fn default() -> Self {
Self {
reuseaddr: false,
reuseport: false,
batch_size: Self::batch_size_default(),
server_addr: Self::server_addr_default(),
server_workers: Self::server_workers_default(),
Expand Down

0 comments on commit db895e5

Please sign in to comment.