Skip to content

Commit

Permalink
store a mio::Registry in ProxyConfiguration implementations
Browse files Browse the repository at this point in the history
this will avoid passing the Poll instance as argument everywhere
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 4e17f86 commit 92ded4a
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 80 deletions.
25 changes: 14 additions & 11 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -941,15 +941,21 @@ pub struct Proxy {
backends: Rc<RefCell<BackendMap>>,
clusters: HashMap<ClusterId, Cluster>,
pool: Rc<RefCell<Pool>>,
registry: Registry,
}

impl Proxy {
pub fn new(pool: Rc<RefCell<Pool>>, backends: Rc<RefCell<BackendMap>>) -> Proxy {
pub fn new(
registry: Registry,
pool: Rc<RefCell<Pool>>,
backends: Rc<RefCell<BackendMap>>,
) -> Proxy {
Proxy {
listeners: HashMap::new(),
clusters: HashMap::new(),
backends,
pool,
registry,
}
}

Expand All @@ -972,13 +978,12 @@ impl Proxy {

pub fn activate_listener(
&mut self,
event_loop: &mut Poll,
addr: &SocketAddr,
tcp_listener: Option<TcpListener>,
) -> Option<Token> {
for listener in self.listeners.values_mut() {
if &listener.address == addr {
return listener.activate(event_loop, tcp_listener);
return listener.activate(&mut self.registry, tcp_listener);
}
}
None
Expand Down Expand Up @@ -1191,7 +1196,7 @@ impl Listener {

pub fn activate(
&mut self,
event_loop: &mut Poll,
registry: &mut Registry,
tcp_listener: Option<TcpListener>,
) -> Option<Token> {
if self.active {
Expand All @@ -1210,10 +1215,7 @@ impl Listener {
});

if let Some(ref mut sock) = listener {
if let Err(e) = event_loop
.registry()
.register(sock, self.token, Interest::READABLE)
{
if let Err(e) = registry.register(sock, self.token, Interest::READABLE) {
error!("error registering listener socket({:?}): {:?}", sock, e);
}
} else {
Expand Down Expand Up @@ -1637,7 +1639,7 @@ impl ProxyConfiguration<Session> for Proxy {

pub fn start(config: HttpListener, channel: ProxyChannel, max_buffers: usize, buffer_size: usize) {
use super::server::{self, ProxySessionCast};
let mut event_loop = Poll::new().expect("could not create event loop");
let event_loop = Poll::new().expect("could not create event loop");

let pool = Rc::new(RefCell::new(Pool::with_capacity(
1,
Expand Down Expand Up @@ -1678,9 +1680,10 @@ pub fn start(config: HttpListener, channel: ProxyChannel, max_buffers: usize, bu
};

let address = config.address;
let mut proxy = Proxy::new(pool.clone(), backends.clone());
let registry = event_loop.registry().try_clone().unwrap();
let mut proxy = Proxy::new(registry, pool.clone(), backends.clone());
let _ = proxy.add_listener(config, token);
let _ = proxy.activate_listener(&mut event_loop, &address, None);
let _ = proxy.activate_listener(&address, None);
let (scm_server, scm_client) = UnixStream::pair().unwrap();
let scm = ScmSocket::new(scm_client.into_raw_fd());
if let Err(e) = scm.send_listeners(&Listeners {
Expand Down
28 changes: 14 additions & 14 deletions lib/src/https_openssl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1155,7 +1155,7 @@ impl Listener {

pub fn activate(
&mut self,
event_loop: &mut Poll,
registry: &mut Registry,
tcp_listener: Option<TcpListener>,
) -> Option<Token> {
if self.active {
Expand All @@ -1174,10 +1174,7 @@ impl Listener {
});

if let Some(ref mut sock) = listener {
if let Err(e) = event_loop
.registry()
.register(sock, self.token, Interest::READABLE)
{
if let Err(e) = registry.register(sock, self.token, Interest::READABLE) {
error!("cannot register listen socket({:?}): {:?}", sock, e);
}
} else {
Expand Down Expand Up @@ -1597,15 +1594,21 @@ pub struct Proxy {
clusters: HashMap<ClusterId, Cluster>,
backends: Rc<RefCell<BackendMap>>,
pool: Rc<RefCell<Pool>>,
registry: Registry,
}

impl Proxy {
pub fn new(pool: Rc<RefCell<Pool>>, backends: Rc<RefCell<BackendMap>>) -> Proxy {
pub fn new(
registry: Registry,
pool: Rc<RefCell<Pool>>,
backends: Rc<RefCell<BackendMap>>,
) -> Proxy {
Proxy {
listeners: HashMap::new(),
clusters: HashMap::new(),
backends,
pool,
registry,
}
}

Expand All @@ -1628,13 +1631,12 @@ impl Proxy {

pub fn activate_listener(
&mut self,
event_loop: &mut Poll,
addr: &SocketAddr,
tcp_listener: Option<TcpListener>,
) -> Option<Token> {
for listener in self.listeners.values_mut() {
if &listener.address == addr {
return listener.activate(event_loop, tcp_listener);
return listener.activate(&mut self.registry, tcp_listener);
}
}
None
Expand Down Expand Up @@ -2329,7 +2331,7 @@ use crate::server::HttpsProvider;
pub fn start(config: HttpsListener, channel: ProxyChannel, max_buffers: usize, buffer_size: usize) {
use crate::server::{self, ProxySessionCast};

let mut event_loop = Poll::new().expect("could not create event loop");
let event_loop = Poll::new().expect("could not create event loop");

let pool = Rc::new(RefCell::new(Pool::with_capacity(
1,
Expand Down Expand Up @@ -2370,13 +2372,11 @@ pub fn start(config: HttpsListener, channel: ProxyChannel, max_buffers: usize, b
Token(key)
};

let mut configuration = Proxy::new(pool.clone(), backends.clone());
let registry = event_loop.registry().try_clone().unwrap();
let mut configuration = Proxy::new(registry, pool.clone(), backends.clone());
let address = config.address.clone();
if configuration.add_listener(config, token).is_some() {
if configuration
.activate_listener(&mut event_loop, &address, None)
.is_some()
{
if configuration.activate_listener(&address, None).is_some() {
let (scm_server, _scm_client) = UnixStream::pair().unwrap();
let mut server_config: server::ServerConfig = Default::default();
server_config.max_connections = max_buffers;
Expand Down
27 changes: 14 additions & 13 deletions lib/src/https_rustls/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl Listener {

pub fn activate(
&mut self,
event_loop: &mut Poll,
registry: &mut Registry,
tcp_listener: Option<TcpListener>,
) -> Option<Token> {
if self.active {
Expand All @@ -145,10 +145,7 @@ impl Listener {
});

if let Some(ref mut sock) = listener {
if let Err(e) = event_loop
.registry()
.register(sock, self.token, Interest::READABLE)
{
if let Err(e) = registry.register(sock, self.token, Interest::READABLE) {
error!("error registering listen socket: {:?}", e);
}
} else {
Expand Down Expand Up @@ -247,15 +244,21 @@ pub struct Proxy {
clusters: HashMap<ClusterId, Cluster>,
backends: Rc<RefCell<BackendMap>>,
pool: Rc<RefCell<Pool>>,
registry: Registry,
}

impl Proxy {
pub fn new(pool: Rc<RefCell<Pool>>, backends: Rc<RefCell<BackendMap>>) -> Proxy {
pub fn new(
registry: Registry,
pool: Rc<RefCell<Pool>>,
backends: Rc<RefCell<BackendMap>>,
) -> Proxy {
Proxy {
listeners: HashMap::new(),
clusters: HashMap::new(),
backends,
pool,
registry,
}
}

Expand All @@ -278,13 +281,12 @@ impl Proxy {

pub fn activate_listener(
&mut self,
event_loop: &mut Poll,
addr: &SocketAddr,
tcp_listener: Option<TcpListener>,
) -> Option<Token> {
for listener in self.listeners.values_mut() {
if &listener.address == addr {
return listener.activate(event_loop, tcp_listener);
return listener.activate(&mut self.registry, tcp_listener);
}
}
None
Expand Down Expand Up @@ -907,7 +909,7 @@ use crate::server::HttpsProvider;
pub fn start(config: HttpsListener, channel: ProxyChannel, max_buffers: usize, buffer_size: usize) {
use crate::server::{self, ProxySessionCast};

let mut event_loop = Poll::new().expect("could not create event loop");
let event_loop = Poll::new().expect("could not create event loop");

let pool = Rc::new(RefCell::new(Pool::with_capacity(
1,
Expand Down Expand Up @@ -949,11 +951,10 @@ pub fn start(config: HttpsListener, channel: ProxyChannel, max_buffers: usize, b
};

let address = config.address.clone();
let mut configuration = Proxy::new(pool.clone(), backends.clone());
let registry = event_loop.registry().try_clone().unwrap();
let mut configuration = Proxy::new(registry, pool.clone(), backends.clone());
if configuration.add_listener(config, token).is_some()
&& configuration
.activate_listener(&mut event_loop, &address, None)
.is_some()
&& configuration.activate_listener(&address, None).is_some()
{
let (scm_server, _scm_client) = UnixStream::pair().unwrap();
let mut server_config: server::ServerConfig = Default::default();
Expand Down

0 comments on commit 92ded4a

Please sign in to comment.