Skip to content

Commit

Permalink
remove ProxySession::close_backend()
Browse files Browse the repository at this point in the history
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 3151dea commit 6c2da60
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 167 deletions.
49 changes: 6 additions & 43 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ impl Session {
}

//FIXME: check the token passed as argument
fn close_backend_inner(&mut self, _: Token) {
fn close_backend(&mut self, _: Token) {
if let Some(token) = self.back_token() {
if let Some(fd) = self.back_socket_mut().map(|s| s.as_raw_fd()) {
let proxy = self.proxy.borrow_mut();
Expand Down Expand Up @@ -1034,7 +1034,7 @@ impl Session {
if has_backend && self.check_backend_connection() {
return Ok(BackendConnectAction::Reuse);
} else if let Some(token) = self.back_token() {
self.close_backend_inner(token);
self.close_backend(token);

//reset the back token here so we can remove it
//from the slab after backend_from* fails
Expand All @@ -1045,7 +1045,7 @@ impl Session {
//replacing with a connection to another cluster
if old_cluster_id.is_some() && old_cluster_id.as_ref() != Some(&cluster_id) {
if let Some(token) = self.back_token() {
self.close_backend_inner(token);
self.close_backend(token);

//reset the back token here so we can remove it
//from the slab after backend_from* fails
Expand Down Expand Up @@ -1149,7 +1149,7 @@ impl ProxySession for Session {
}

//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
self.close_backend(Token(0));

if let Some(State::Http(ref mut http)) = self.protocol {
//if the state was initial, the connection was already reset
Expand Down Expand Up @@ -1203,43 +1203,6 @@ impl ProxySession for Session {
}
}

//FIXME: check the token passed as argument
fn close_backend(&mut self, _: Token, registry: &Registry) {
self.remove_backend();

let back_connected = self.back_connected();
if back_connected != BackendConnectionStatus::NotConnected {
self.back_readiness().map(|r| r.event = Ready::empty());
if let Some(sock) = self.back_socket_mut() {
if let Err(e) = sock.shutdown(Shutdown::Both) {
if e.kind() != ErrorKind::NotConnected {
error!("error shutting down back socket({:?}): {:?}", sock, e);
}
}
if let Err(e) = registry.deregister(sock) {
error!("error shutting down back socket({:?}): {:?}", sock, e);
}
}
}

if back_connected == BackendConnectionStatus::Connected {
gauge_add!("backend.connections", -1);
gauge_add!(
"connections_per_backend",
-1,
self.cluster_id.as_ref().map(|s| s.as_str()),
self.metrics.backend_id.as_ref().map(|s| s.as_str())
);
}

self.set_back_connected(BackendConnectionStatus::NotConnected);

self.http_mut().map(|h| {
h.clear_back_token();
h.remove_backend();
});
}

fn protocol(&self) -> Protocol {
Protocol::HTTP
}
Expand Down Expand Up @@ -1268,7 +1231,7 @@ impl ProxySession for Session {
self.close();
} else if let SessionResult::CloseBackend(_opt_back_token) = res {
//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
self.close_backend(Token(0));
} else if res == SessionResult::ConnectBackend {
let res = self.connect_to_backend(session);
info!("HTTP::READY): connect_to_backend returned {:?}\n\n", res);
Expand All @@ -1283,7 +1246,7 @@ impl ProxySession for Session {
return SessionResult::Continue;
} else if let SessionResult::ReconnectBackend(_, opt_back_token) = res {
//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
self.close_backend(Token(0));

let res = self.connect_to_backend(session);
info!("TCP::READY): (re)connect_to_backend returned {:?}\n\n", res);
Expand Down
49 changes: 6 additions & 43 deletions lib/src/https_openssl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ impl Session {
SessionResult::Continue
}

fn close_backend_inner(&mut self, _: Token) {
fn close_backend(&mut self, _: Token) {
if let Some(token) = self.back_token() {
if let Some(fd) = self.back_socket_mut().map(|s| s.as_raw_fd()) {
let proxy = self.proxy.borrow_mut();
Expand Down Expand Up @@ -1165,7 +1165,7 @@ impl Session {
if has_backend && self.check_backend_connection() {
return Ok(BackendConnectAction::Reuse);
} else if let Some(token) = self.back_token() {
self.close_backend_inner(token);
self.close_backend(token);

//reset the back token here so we can remove it
//from the slab after backend_from* fails
Expand All @@ -1176,7 +1176,7 @@ impl Session {
//replacing with a connection to another application
if old_cluster_id.is_some() && old_cluster_id.as_ref() != Some(&cluster_id) {
if let Some(token) = self.back_token() {
self.close_backend_inner(token);
self.close_backend(token);

//reset the back token here so we can remove it
//from the slab after backend_from* fails
Expand Down Expand Up @@ -1280,7 +1280,7 @@ impl ProxySession for Session {
}

//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
self.close_backend(Token(0));

if let Some(State::Http(ref mut http)) = self.protocol {
//if the state was initial, the connection was already reset
Expand Down Expand Up @@ -1341,43 +1341,6 @@ impl ProxySession for Session {
}
}

//FIXME: check the token passed as argument
fn close_backend(&mut self, _: Token, registry: &Registry) {
self.remove_backend();

let back_connected = self.back_connected();
if back_connected != BackendConnectionStatus::NotConnected {
self.back_readiness().map(|r| r.event = Ready::empty());
if let Some(sock) = self.back_socket_mut() {
if let Err(e) = sock.shutdown(Shutdown::Both) {
if e.kind() != ErrorKind::NotConnected {
error!("error closing back socket({:?}): {:?}", sock, e);
}
}
if let Err(e) = registry.deregister(sock) {
error!("error deregistering back socket({:?}): {:?}", sock, e);
}
}
}

if back_connected == BackendConnectionStatus::Connected {
gauge_add!("backend.connections", -1);
gauge_add!(
"connections_per_backend",
-1,
self.cluster_id.as_ref().map(|s| s.as_str()),
self.metrics.backend_id.as_ref().map(|s| s.as_str())
);
}

self.set_back_connected(BackendConnectionStatus::NotConnected);

self.http_mut().map(|h| {
h.clear_back_token();
h.remove_backend();
});
}

fn protocol(&self) -> Protocol {
Protocol::HTTPS
}
Expand Down Expand Up @@ -1406,7 +1369,7 @@ impl ProxySession for Session {
self.close();
} else if let SessionResult::CloseBackend(_opt_back_token) = res {
//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
self.close_backend(Token(0));
} else if res == SessionResult::ConnectBackend {
let res = self.connect_to_backend(session);
info!(
Expand All @@ -1424,7 +1387,7 @@ impl ProxySession for Session {
return SessionResult::Continue;
} else if let SessionResult::ReconnectBackend(_, opt_back_token) = res {
//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
self.close_backend(Token(0));

let res = self.connect_to_backend(session);
info!("TCP::READY): (re)connect_to_backend returned {:?}\n\n", res);
Expand Down
48 changes: 6 additions & 42 deletions lib/src/https_rustls/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ impl Session {
SessionResult::Continue
}

fn close_backend_inner(&mut self, _: Token) {
fn close_backend(&mut self, _: Token) {
if let Some(token) = self.back_token() {
if let Some(fd) = self.back_socket_mut().map(|s| s.as_raw_fd()) {
let proxy = self.proxy.borrow_mut();
Expand Down Expand Up @@ -1092,7 +1092,7 @@ impl Session {
if has_backend && self.check_backend_connection() {
return Ok(BackendConnectAction::Reuse);
} else if let Some(token) = self.back_token() {
self.close_backend_inner(token);
self.close_backend(token);

//reset the back token here so we can remove it
//from the slab after backend_from* fails
Expand All @@ -1102,7 +1102,7 @@ impl Session {

if old_cluster_id.is_some() && old_cluster_id.as_ref() != Some(&cluster_id) {
if let Some(token) = self.back_token() {
self.close_backend_inner(token);
self.close_backend(token);

//reset the back token here so we can remove it
//from the slab after backend_from* fails
Expand Down Expand Up @@ -1202,7 +1202,7 @@ impl ProxySession for Session {
}

//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
self.close_backend(Token(0));

if let Some(State::Http(ref mut http)) = self.protocol {
//if the state was initial, the connection was already reset
Expand Down Expand Up @@ -1251,42 +1251,6 @@ impl ProxySession for Session {
}
}

fn close_backend(&mut self, _: Token, registry: &Registry) {
self.remove_backend();

let back_connected = self.back_connected();
if back_connected != BackendConnectionStatus::NotConnected {
self.back_readiness().map(|r| r.event = Ready::empty());
if let Some(sock) = self.back_socket_mut() {
if let Err(e) = sock.shutdown(Shutdown::Both) {
if e.kind() != ErrorKind::NotConnected {
error!("error shutting down backend socket: {:?}", e);
}
}

if let Err(e) = registry.deregister(sock) {
error!("error deregistering backend socket: {:?}", e);
}
}
}

if back_connected == BackendConnectionStatus::Connected {
gauge_add!("backend.connections", -1);
gauge_add!(
"connections_per_backend",
-1,
self.cluster_id.as_ref().map(|s| s.as_str()),
self.metrics.backend_id.as_ref().map(|s| s.as_str())
);
}

self.set_back_connected(BackendConnectionStatus::NotConnected);
self.http_mut().map(|h| {
h.clear_back_token();
h.remove_backend();
});
}

fn protocol(&self) -> Protocol {
Protocol::HTTPS
}
Expand Down Expand Up @@ -1315,7 +1279,7 @@ impl ProxySession for Session {
self.close();
} else if let SessionResult::CloseBackend(_opt_back_token) = res {
//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
self.close_backend(Token(0));
} else if res == SessionResult::ConnectBackend {
let res = self.connect_to_backend(session);
info!(
Expand All @@ -1333,7 +1297,7 @@ impl ProxySession for Session {
return SessionResult::Continue;
} else if let SessionResult::ReconnectBackend(_, opt_back_token) = res {
//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
self.close_backend(Token(0));

let res = self.connect_to_backend(session);
info!("TCP::READY): (re)connect_to_backend returned {:?}\n\n", res);
Expand Down
2 changes: 0 additions & 2 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,6 @@ pub trait ProxySession {
fn process_events(&mut self, token: Token, events: Ready);
/// closes a session
fn close(&mut self);
/// closes the backend socket of a session
fn close_backend(&mut self, token: Token, registry: &Registry);
/// if a timeout associated with the session triggers, the event loop will
/// call this method with the timeout's token
fn timeout(&mut self, t: Token) -> SessionResult;
Expand Down
2 changes: 0 additions & 2 deletions lib/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1783,8 +1783,6 @@ impl ProxySession for ListenSession {

fn close(&mut self) {}

fn close_backend(&mut self, _token: Token, _registry: &Registry) {}

fn timeout(&mut self, _token: Token) -> SessionResult {
error!(
"called ProxySession::timeout(token={:?}, time) on ListenSession {{ protocol: {:?} }}",
Expand Down
39 changes: 4 additions & 35 deletions lib/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ impl Session {
SessionResult::Continue
}

fn close_backend_inner(&mut self, _: Token) {
fn close_backend(&mut self, _: Token) {
if let (Some(token), Some(fd)) = (
self.back_token(),
self.back_socket_mut().map(|s| s.as_raw_fd()),
Expand Down Expand Up @@ -902,7 +902,7 @@ impl ProxySession for Session {
}

//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
self.close_backend(Token(0));

match self.protocol {
Some(State::Pipe(_)) => gauge_add!("protocol.tcp", -1),
Expand Down Expand Up @@ -937,37 +937,6 @@ impl ProxySession for Session {
}
}

fn close_backend(&mut self, _: Token, registry: &Registry) {
self.remove_backend();

let back_connected = self.back_connected();
if back_connected != BackendConnectionStatus::NotConnected {
self.back_readiness().map(|r| r.event = Ready::empty());
if let Some(sock) = self.back_socket_mut() {
if let Err(e) = sock.shutdown(Shutdown::Both) {
if e.kind() != ErrorKind::NotConnected {
error!("error closing back socket({:?}): {:?}", sock, e);
}
}
if let Err(e) = registry.deregister(sock) {
error!("error deregistering back socket({:?}): {:?}", sock, e);
}
}
}

if back_connected == BackendConnectionStatus::Connected {
gauge_add!("backend.connections", -1);
gauge_add!(
"connections_per_backend",
-1,
self.cluster_id.as_ref().map(|s| s.as_str()),
self.metrics.backend_id.as_ref().map(|s| s.as_str())
);
}

self.set_back_connected(BackendConnectionStatus::NotConnected);
}

fn protocol(&self) -> Protocol {
Protocol::TCP
}
Expand Down Expand Up @@ -996,7 +965,7 @@ impl ProxySession for Session {
self.close();
} else if let SessionResult::CloseBackend(_opt_back_token) = res {
//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
self.close_backend(Token(0));
} else if res == SessionResult::ConnectBackend {
let res = self.connect_to_backend(session);
info!("TCP::READY): connect_to_backend returned {:?}\n\n", res);
Expand All @@ -1011,7 +980,7 @@ impl ProxySession for Session {
return SessionResult::Continue;
} else if let SessionResult::ReconnectBackend(_, opt_back_token) = res {
//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
self.close_backend(Token(0));

let res = self.connect_to_backend(session);
info!("TCP::READY): (re)connect_to_backend returned {:?}\n\n", res);
Expand Down

0 comments on commit 6c2da60

Please sign in to comment.