Skip to content

Commit

Permalink
Supports reuseaddr, reuseport, and configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
rmqtt committed Aug 1, 2023
1 parent 2ffad69 commit 69f4cb5
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
14 changes: 12 additions & 2 deletions rmqtt-plugins/rmqtt-http-api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::SocketAddr;
use salvo::affix;
use salvo::http::header::{HeaderValue, CONTENT_TYPE};
use salvo::http::mime;
use salvo::hyper::server::conn::AddrIncoming;
use salvo::prelude::*;

use rmqtt::{
Expand Down Expand Up @@ -85,8 +86,17 @@ pub(crate) async fn listen_and_serve(
cfg: PluginConfigType,
rx: oneshot::Receiver<()>,
) -> Result<()> {
log::info!("HTTP API Listening on {}", laddr);
Server::new(TcpListener::bind(laddr))
let (reuseaddr, reuseport) = {
let cfg = cfg.read();
(cfg.http_reuseaddr, cfg.http_reuseport)
};
log::info!("HTTP API Listening on {}, reuseaddr: {}, reuseport: {}", laddr, reuseaddr, reuseport);

let listen = rmqtt::tokio::net::TcpListener::from_std(rmqtt::grpc::server::Server::bind(
laddr, 128, reuseaddr, reuseport,
)?)?;
let incoming = AddrIncoming::from_listener(listen).map_err(anyhow::Error::new)?;
Server::new(TcpListener::bind(incoming))
.try_serve_with_graceful_shutdown(route(cfg), async {
rx.await.ok();
})
Expand Down
14 changes: 14 additions & 0 deletions rmqtt-plugins/rmqtt-http-api/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ pub struct PluginConfig {
#[serde(default = "PluginConfig::message_type_default")]
pub message_type: MessageType,

#[serde(default = "PluginConfig::http_reuseaddr_default")]
pub http_reuseaddr: bool,

#[serde(default = "PluginConfig::http_reuseport_default")]
pub http_reuseport: bool,

#[serde(default = "PluginConfig::http_request_log_default")]
pub http_request_log: bool,
}
Expand All @@ -53,6 +59,14 @@ impl PluginConfig {
99
}

fn http_reuseaddr_default() -> bool {
true
}

fn http_reuseport_default() -> bool {
false
}

fn http_request_log_default() -> bool {
false
}
Expand Down
15 changes: 7 additions & 8 deletions rmqtt/src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl Server {
//NodeServiceServer::with_interceptor(RmqttNodeService::default(), Self::check_auth)

log::info!(
"grpc server is listening on tcp://{:?}, reuseaddr: {}, reuseport: {}",
"gRPC server is listening on tcp://{:?}, reuseaddr: {}, reuseport: {}",
rpccfg.server_addr,
rpccfg.reuseaddr,
rpccfg.reuseport
Expand All @@ -35,7 +35,9 @@ impl Server {
transport::Server::builder().add_service(NodeServiceServer::new(NodeGrpcService::default()));

if rpccfg.reuseaddr || rpccfg.reuseport {
let listener = Self::bind(rpccfg.server_addr, 1024, rpccfg.reuseaddr, rpccfg.reuseport)?;
let listener = tokio_stream::wrappers::TcpListenerStream::new(tokio::net::TcpListener::from_std(
Self::bind(rpccfg.server_addr, 1024, rpccfg.reuseaddr, rpccfg.reuseport)?,
)?);
server.serve_with_incoming(listener).await?;
} else {
server.serve(rpccfg.server_addr).await?;
Expand All @@ -55,12 +57,12 @@ impl Server {
// }

#[inline]
pub(crate) fn bind(
pub fn bind(
laddr: std::net::SocketAddr,
backlog: i32,
_reuseaddr: bool,
_reuseport: bool,
) -> Result<tokio_stream::wrappers::TcpListenerStream> {
) -> Result<std::net::TcpListener> {
use socket2::{Domain, SockAddr, Socket, Type};
let builder = Socket::new(Domain::for_address(laddr), Type::STREAM, None)?;
builder.set_nonblocking(true)?;
Expand All @@ -70,10 +72,7 @@ impl Server {
builder.set_reuse_port(_reuseport)?;
builder.bind(&SockAddr::from(laddr))?;
builder.listen(backlog)?;
let listener = tokio_stream::wrappers::TcpListenerStream::new(tokio::net::TcpListener::from_std(
std::net::TcpListener::from(builder),
)?);
Ok(listener)
Ok(std::net::TcpListener::from(builder))
}
}

Expand Down

0 comments on commit 69f4cb5

Please sign in to comment.