Skip to content

Commit

Permalink
remove the poll argument from ProxyConfiguration methods
Browse files Browse the repository at this point in the history
ProxySession::close() and close_backend() now take a Registry as
argument
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 92ded4a commit 538741c
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 132 deletions.
38 changes: 19 additions & 19 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ impl Session {
}

impl ProxySession for Session {
fn close(&mut self, poll: &mut Poll) -> CloseResult {
fn close(&mut self, registry: &Registry) -> CloseResult {
self.metrics.service_stop();
self.cancel_timeouts();
if let Err(e) = self.front_socket().shutdown(Shutdown::Both) {
Expand All @@ -755,7 +755,7 @@ impl ProxySession for Session {
}
}

if let Err(e) = poll.registry().deregister(self.front_socket_mut()) {
if let Err(e) = registry.deregister(self.front_socket_mut()) {
error!(
"error deregistering front socket({:?}): {:?}",
self.front_socket(),
Expand All @@ -770,7 +770,7 @@ impl ProxySession for Session {
}

//FIXME: should we really pass a token here?
self.close_backend(Token(0), poll);
self.close_backend(Token(0), registry);

if let Some(State::Http(ref mut http)) = self.protocol {
//if the state was initial, the connection was already reset
Expand Down Expand Up @@ -817,7 +817,7 @@ impl ProxySession for Session {
}

//FIXME: check the token passed as argument
fn close_backend(&mut self, _: Token, poll: &mut Poll) {
fn close_backend(&mut self, _: Token, registry: &Registry) {
self.remove_backend();

let back_connected = self.back_connected();
Expand All @@ -829,7 +829,7 @@ impl ProxySession for Session {
error!("error shutting down back socket({:?}): {:?}", sock, e);
}
}
if let Err(e) = poll.registry().deregister(sock) {
if let Err(e) = registry.deregister(sock) {
error!("error shutting down back socket({:?}): {:?}", sock, e);
}
}
Expand Down Expand Up @@ -983,7 +983,7 @@ impl Proxy {
) -> Option<Token> {
for listener in self.listeners.values_mut() {
if &listener.address == addr {
return listener.activate(&mut self.registry, tcp_listener);
return listener.activate(&self.registry, tcp_listener);
}
}
None
Expand Down Expand Up @@ -1196,7 +1196,7 @@ impl Listener {

pub fn activate(
&mut self,
registry: &mut Registry,
registry: &Registry,
tcp_listener: Option<TcpListener>,
) -> Option<Token> {
if self.active {
Expand Down Expand Up @@ -1295,7 +1295,6 @@ impl Listener {
impl ProxyConfiguration<Session> for Proxy {
fn connect_to_backend(
&mut self,
poll: &mut Poll,
session: &mut Session,
back_token: Token,
) -> Result<BackendConnectAction, ConnectionError> {
Expand Down Expand Up @@ -1338,7 +1337,7 @@ impl ProxyConfiguration<Session> for Proxy {
});
return Ok(BackendConnectAction::Reuse);
} else if let Some(token) = session.back_token() {
session.close_backend(token, poll);
session.close_backend(token, &self.registry);

//reset the back token here so we can remove it
//from the slab after backend_from* fails
Expand All @@ -1349,7 +1348,7 @@ impl ProxyConfiguration<Session> for Proxy {
//replacing with a connection to another cluster
if old_cluster_id.is_some() && old_cluster_id.as_ref() != Some(&cluster_id) {
if let Some(token) = session.back_token() {
session.close_backend(token, poll);
session.close_backend(token, &self.registry);

//reset the back token here so we can remove it
//from the slab after backend_from* fails
Expand Down Expand Up @@ -1391,7 +1390,7 @@ impl ProxyConfiguration<Session> for Proxy {
session.back_connected = BackendConnectionStatus::Connecting(Instant::now());
if let Some(back_token) = old_back_token {
session.set_back_token(back_token);
if let Err(e) = poll.registry().register(
if let Err(e) = self.registry.register(
&mut socket,
back_token,
Interest::READABLE | Interest::WRITABLE,
Expand All @@ -1404,7 +1403,7 @@ impl ProxyConfiguration<Session> for Proxy {
.map(|h| h.set_back_timeout(connect_timeout));
Ok(BackendConnectAction::Replace)
} else {
if let Err(e) = poll.registry().register(
if let Err(e) = self.registry.register(
&mut socket,
back_token,
Interest::READABLE | Interest::WRITABLE,
Expand All @@ -1421,7 +1420,7 @@ impl ProxyConfiguration<Session> for Proxy {
}
}

fn notify(&mut self, event_loop: &mut Poll, message: ProxyRequest) -> ProxyResponse {
fn notify(&mut self, message: ProxyRequest) -> ProxyResponse {
// ToDo temporary
//trace!("{} notified", message);
match message.order {
Expand Down Expand Up @@ -1513,9 +1512,10 @@ impl ProxyConfiguration<Session> for Proxy {
}
ProxyRequestData::SoftStop => {
info!("{} processing soft shutdown", message.id);
for (_, l) in self.listeners.iter_mut() {
let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
for (_, l) in listeners.iter_mut() {
l.listener.take().map(|mut sock| {
if let Err(e) = event_loop.registry().deregister(&mut sock) {
if let Err(e) = self.registry.deregister(&mut sock) {
error!("error deregistering listen socket({:?}): {:?}", sock, e);
}
});
Expand All @@ -1528,9 +1528,10 @@ impl ProxyConfiguration<Session> for Proxy {
}
ProxyRequestData::HardStop => {
info!("{} hard shutdown", message.id);
for (_, mut l) in self.listeners.drain() {
let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
for (_, mut l) in listeners.drain() {
l.listener.take().map(|mut sock| {
if let Err(e) = event_loop.registry().deregister(&mut sock) {
if let Err(e) = self.registry.deregister(&mut sock) {
error!("error deregistering listen socket({:?}): {:?}", sock, e);
}
});
Expand Down Expand Up @@ -1586,7 +1587,6 @@ impl ProxyConfiguration<Session> for Proxy {
&mut self,
frontend_sock: TcpStream,
listen_token: ListenToken,
poll: &mut Poll,
session_token: Token,
wait_time: Duration,
) -> Result<(Rc<RefCell<Session>>, bool), AcceptError> {
Expand Down Expand Up @@ -1614,7 +1614,7 @@ impl ProxyConfiguration<Session> for Proxy {
Duration::seconds(listener.config.back_timeout as i64),
Duration::seconds(listener.config.request_timeout as i64),
) {
if let Err(e) = poll.registry().register(
if let Err(e) = self.registry.register(
c.front_socket_mut(),
session_token,
Interest::READABLE | Interest::WRITABLE,
Expand Down
38 changes: 19 additions & 19 deletions lib/src/https_openssl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ impl Session {
}

impl ProxySession for Session {
fn close(&mut self, poll: &mut Poll) -> CloseResult {
fn close(&mut self, registry: &Registry) -> CloseResult {
//println!("TLS closing[{:?}] temp->front: {:?}, temp->back: {:?}", self.frontend_token, *self.temp.front_buf, *self.temp.back_buf);
self.http_mut().map(|http| http.close());
self.metrics.service_stop();
Expand All @@ -887,7 +887,7 @@ impl ProxySession for Session {
error!("error closing front socket({:?}): {:?}", front_socket, e);
}
}
if let Err(e) = poll.registry().deregister(front_socket) {
if let Err(e) = registry.deregister(front_socket) {
error!(
"error deregistering front socket({:?}): {:?}",
front_socket, e
Expand All @@ -901,7 +901,7 @@ impl ProxySession for Session {
}

//FIXME: should we really pass a token here?
self.close_backend(Token(0), poll);
self.close_backend(Token(0), registry);

if let Some(State::Http(ref mut http)) = self.protocol {
//if the state was initial, the connection was already reset
Expand Down Expand Up @@ -960,7 +960,7 @@ impl ProxySession for Session {
}

//FIXME: check the token passed as argument
fn close_backend(&mut self, _: Token, poll: &mut Poll) {
fn close_backend(&mut self, _: Token, registry: &Registry) {
self.remove_backend();

let back_connected = self.back_connected();
Expand All @@ -972,7 +972,7 @@ impl ProxySession for Session {
error!("error closing back socket({:?}): {:?}", sock, e);
}
}
if let Err(e) = poll.registry().deregister(sock) {
if let Err(e) = registry.deregister(sock) {
error!("error deregistering back socket({:?}): {:?}", sock, e);
}
}
Expand Down Expand Up @@ -1155,7 +1155,7 @@ impl Listener {

pub fn activate(
&mut self,
registry: &mut Registry,
registry: &Registry,
tcp_listener: Option<TcpListener>,
) -> Option<Token> {
if self.active {
Expand Down Expand Up @@ -1636,7 +1636,7 @@ impl Proxy {
) -> Option<Token> {
for listener in self.listeners.values_mut() {
if &listener.address == addr {
return listener.activate(&mut self.registry, tcp_listener);
return listener.activate(&self.registry, tcp_listener);
}
}
None
Expand Down Expand Up @@ -1841,7 +1841,6 @@ impl ProxyConfiguration<Session> for Proxy {
&mut self,
mut frontend_sock: TcpStream,
token: ListenToken,
poll: &mut Poll,
session_token: Token,
wait_time: Duration,
) -> Result<(Rc<RefCell<Session>>, bool), AcceptError> {
Expand All @@ -1853,7 +1852,7 @@ impl ProxyConfiguration<Session> for Proxy {
);
}
if let Ok(ssl) = Ssl::new(&listener.default_context) {
if let Err(e) = poll.registry().register(
if let Err(e) = self.registry.register(
&mut frontend_sock,
session_token,
Interest::READABLE | Interest::WRITABLE,
Expand Down Expand Up @@ -1895,7 +1894,6 @@ impl ProxyConfiguration<Session> for Proxy {

fn connect_to_backend(
&mut self,
poll: &mut Poll,
session: &mut Session,
back_token: Token,
) -> Result<BackendConnectAction, ConnectionError> {
Expand Down Expand Up @@ -1939,7 +1937,7 @@ impl ProxyConfiguration<Session> for Proxy {
return Ok(BackendConnectAction::Reuse);
} else {
if let Some(token) = session.back_token() {
session.close_backend(token, poll);
session.close_backend(token, &self.registry);

//reset the back token here so we can remove it
//from the slab after backend_from* fails
Expand All @@ -1951,7 +1949,7 @@ impl ProxyConfiguration<Session> for Proxy {
//replacing with a connection to another application
if old_cluster_id.is_some() && old_cluster_id.as_ref() != Some(&cluster_id) {
if let Some(token) = session.back_token() {
session.close_backend(token, poll);
session.close_backend(token, &self.registry);

//reset the back token here so we can remove it
//from the slab after backend_from* fails
Expand Down Expand Up @@ -1993,7 +1991,7 @@ impl ProxyConfiguration<Session> for Proxy {
session.back_connected = BackendConnectionStatus::Connecting(Instant::now());
if let Some(back_token) = old_back_token {
session.set_back_token(back_token);
if let Err(e) = poll.registry().register(
if let Err(e) = self.registry.register(
&mut socket,
back_token,
Interest::READABLE | Interest::WRITABLE,
Expand All @@ -2007,7 +2005,7 @@ impl ProxyConfiguration<Session> for Proxy {
.map(|h| h.set_back_timeout(connect_timeout));
Ok(BackendConnectAction::Replace)
} else {
if let Err(e) = poll.registry().register(
if let Err(e) = self.registry.register(
&mut socket,
back_token,
Interest::READABLE | Interest::WRITABLE,
Expand All @@ -2024,7 +2022,7 @@ impl ProxyConfiguration<Session> for Proxy {
}
}

fn notify(&mut self, event_loop: &mut Poll, message: ProxyRequest) -> ProxyResponse {
fn notify(&mut self, message: ProxyRequest) -> ProxyResponse {
//trace!("{} notified", message);
match message.order {
ProxyRequestData::AddCluster(cluster) => {
Expand Down Expand Up @@ -2160,9 +2158,10 @@ impl ProxyConfiguration<Session> for Proxy {
}
ProxyRequestData::SoftStop => {
info!("{} processing soft shutdown", message.id);
for (_, l) in self.listeners.iter_mut() {
let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
for (_, l) in listeners.iter_mut() {
l.listener.take().map(|mut sock| {
if let Err(e) = event_loop.registry().deregister(&mut sock) {
if let Err(e) = self.registry.deregister(&mut sock) {
error!("error deregistering listen socket({:?}): {:?}", sock, e);
}
});
Expand All @@ -2175,9 +2174,10 @@ impl ProxyConfiguration<Session> for Proxy {
}
ProxyRequestData::HardStop => {
info!("{} hard shutdown", message.id);
for (_, mut l) in self.listeners.drain() {
let mut listeners: HashMap<_, _> = self.listeners.drain().collect();
for (_, mut l) in listeners.drain() {
l.listener.take().map(|mut sock| {
if let Err(e) = event_loop.registry().deregister(&mut sock) {
if let Err(e) = self.registry.deregister(&mut sock) {
error!("error dereginstering listen socket({:?}): {:?}", sock, e);
}
});
Expand Down

0 comments on commit 538741c

Please sign in to comment.