Skip to content

Commit

Permalink
deregister the back socket inside close_backend_inner()
Browse files Browse the repository at this point in the history
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 59c0548 commit 8a801cf
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 254 deletions.
128 changes: 45 additions & 83 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,13 +761,6 @@ impl Session {

let mut result = Vec::new();

if let (Some(tk), Some(fd)) = (
self.back_token(),
self.back_socket_mut().map(|s| s.as_raw_fd()),
) {
result.push((tk, fd));
}

//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));

Expand Down Expand Up @@ -797,43 +790,64 @@ impl Session {
None => {}
}

result.push((self.frontend_token, self.front_socket().as_raw_fd()));
let fd = self.front_socket().as_raw_fd();
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("1error deregistering socket({:?}): {:?}", fd, e);
}

proxy
.sessions
.borrow_mut()
.slab
.try_remove(self.frontend_token.0);

result
}

//FIXME: check the token passed as argument
fn close_backend_inner(&mut self, _: Token) {
self.remove_backend();
if let Some(token) = self.back_token() {
if let Some(fd) = self.back_socket_mut().map(|s| s.as_raw_fd()) {
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("1error deregistering socket({:?}): {:?}", fd, e);
}

let back_connected = self.back_connected();
if back_connected != BackendConnectionStatus::NotConnected {
self.back_readiness().map(|r| r.event = Ready::empty());
if let Some(sock) = self.back_socket_mut() {
if let Err(e) = sock.shutdown(Shutdown::Both) {
if e.kind() != ErrorKind::NotConnected {
error!("error shutting down back socket({:?}): {:?}", sock, e);
proxy.sessions.borrow_mut().slab.try_remove(token.0);
}

self.remove_backend();

let back_connected = self.back_connected();
if back_connected != BackendConnectionStatus::NotConnected {
self.back_readiness().map(|r| r.event = Ready::empty());
if let Some(sock) = self.back_socket_mut() {
if let Err(e) = sock.shutdown(Shutdown::Both) {
if e.kind() != ErrorKind::NotConnected {
error!("error shutting down back socket({:?}): {:?}", sock, e);
}
}
}
}
}

if back_connected == BackendConnectionStatus::Connected {
gauge_add!("backend.connections", -1);
gauge_add!(
"connections_per_backend",
-1,
self.cluster_id.as_ref().map(|s| s.as_str()),
self.metrics.backend_id.as_ref().map(|s| s.as_str())
);
}
if back_connected == BackendConnectionStatus::Connected {
gauge_add!("backend.connections", -1);
gauge_add!(
"connections_per_backend",
-1,
self.cluster_id.as_ref().map(|s| s.as_str()),
self.metrics.backend_id.as_ref().map(|s| s.as_str())
);
}

self.set_back_connected(BackendConnectionStatus::NotConnected);
self.set_back_connected(BackendConnectionStatus::NotConnected);

self.http_mut().map(|h| {
h.clear_back_token();
h.remove_backend();
});
self.http_mut().map(|h| {
h.clear_back_token();
h.remove_backend();
});
}
}

fn check_circuit_breaker(&mut self) -> Result<(), ConnectionError> {
Expand Down Expand Up @@ -1079,14 +1093,6 @@ impl Session {
if has_backend && self.check_backend_connection() {
return Ok(BackendConnectAction::Reuse);
} else if let Some(token) = self.back_token() {
if let Some(fd) = self.back_socket_mut().map(|s| s.as_raw_fd()) {
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("1error deregistering socket({:?}): {:?}", fd, e);
}

proxy.sessions.borrow_mut().slab.try_remove(token.0);
}
self.close_backend_inner(token);

//reset the back token here so we can remove it
Expand All @@ -1098,14 +1104,6 @@ impl Session {
//replacing with a connection to another cluster
if old_cluster_id.is_some() && old_cluster_id.as_ref() != Some(&cluster_id) {
if let Some(token) = self.back_token() {
if let Some(fd) = self.back_socket_mut().map(|s| s.as_raw_fd()) {
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("2error deregistering socket({:?}): {:?}", fd, e);
}

proxy.sessions.borrow_mut().slab.try_remove(token.0);
}
self.close_backend_inner(token);

//reset the back token here so we can remove it
Expand Down Expand Up @@ -1333,31 +1331,7 @@ impl ProxySession for Session {

if res == SessionResult::CloseSession {
let mut v = self.close_inner();

let proxy = self.proxy.borrow_mut();
for (token, fd) in v.drain(..) {
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!(
"4error deregistering socket({:?} token = {:?}): {:?}",
fd, token, e
);
}

proxy.sessions.borrow_mut().slab.try_remove(token.0);
}
} else if let SessionResult::CloseBackend(_opt_back_token) = res {
if let (Some(token), Some(fd)) = (
self.back_token(),
self.back_socket_mut().map(|s| s.as_raw_fd()),
) {
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("5error deregistering socket({:?}): {:?}", fd, e);
}

proxy.sessions.borrow_mut().slab.try_remove(token.0);
}

//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
} else if res == SessionResult::ConnectBackend {
Expand All @@ -1373,18 +1347,6 @@ impl ProxySession for Session {
self.metrics().service_stop();
return SessionResult::Continue;
} else if let SessionResult::ReconnectBackend(_, opt_back_token) = res {
if let (Some(token), Some(fd)) = (
opt_back_token,
self.back_socket_mut().map(|s| s.as_raw_fd()),
) {
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("error deregistering socket({:?}): {:?}", fd, e);
}

proxy.sessions.borrow_mut().slab.try_remove(token.0);
}

//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));

Expand Down
104 changes: 34 additions & 70 deletions lib/src/https_openssl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -893,13 +893,6 @@ impl Session {
}
let mut result = Vec::new();

if let (Some(tk), Some(fd)) = (
self.back_token(),
self.back_socket_mut().map(|s| s.as_raw_fd()),
) {
result.push((tk, fd));
}

//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));

Expand Down Expand Up @@ -940,36 +933,47 @@ impl Session {
}

fn close_backend_inner(&mut self, _: Token) {
self.remove_backend();
if let Some(token) = self.back_token() {
if let Some(fd) = self.back_socket_mut().map(|s| s.as_raw_fd()) {
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("1error deregistering socket({:?}): {:?}", fd, e);
}

let back_connected = self.back_connected();
if back_connected != BackendConnectionStatus::NotConnected {
self.back_readiness().map(|r| r.event = Ready::empty());
if let Some(sock) = self.back_socket_mut() {
if let Err(e) = sock.shutdown(Shutdown::Both) {
if e.kind() != ErrorKind::NotConnected {
error!("error closing back socket({:?}): {:?}", sock, e);
proxy.sessions.borrow_mut().slab.try_remove(token.0);
}

self.remove_backend();

let back_connected = self.back_connected();
if back_connected != BackendConnectionStatus::NotConnected {
self.back_readiness().map(|r| r.event = Ready::empty());
if let Some(sock) = self.back_socket_mut() {
if let Err(e) = sock.shutdown(Shutdown::Both) {
if e.kind() != ErrorKind::NotConnected {
error!("error closing back socket({:?}): {:?}", sock, e);
}
}
}
}
}

if back_connected == BackendConnectionStatus::Connected {
gauge_add!("backend.connections", -1);
gauge_add!(
"connections_per_backend",
-1,
self.cluster_id.as_ref().map(|s| s.as_str()),
self.metrics.backend_id.as_ref().map(|s| s.as_str())
);
}
if back_connected == BackendConnectionStatus::Connected {
gauge_add!("backend.connections", -1);
gauge_add!(
"connections_per_backend",
-1,
self.cluster_id.as_ref().map(|s| s.as_str()),
self.metrics.backend_id.as_ref().map(|s| s.as_str())
);
}

self.set_back_connected(BackendConnectionStatus::NotConnected);
self.set_back_connected(BackendConnectionStatus::NotConnected);

self.http_mut().map(|h| {
h.clear_back_token();
h.remove_backend();
});
self.http_mut().map(|h| {
h.clear_back_token();
h.remove_backend();
});
}
}

fn check_circuit_breaker(&mut self) -> Result<(), ConnectionError> {
Expand Down Expand Up @@ -1214,14 +1218,6 @@ impl Session {
if has_backend && self.check_backend_connection() {
return Ok(BackendConnectAction::Reuse);
} else if let Some(token) = self.back_token() {
if let Some(fd) = self.back_socket_mut().map(|s| s.as_raw_fd()) {
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("1error deregistering socket({:?}): {:?}", fd, e);
}

proxy.sessions.borrow_mut().slab.try_remove(token.0);
}
self.close_backend_inner(token);

//reset the back token here so we can remove it
Expand All @@ -1233,14 +1229,6 @@ impl Session {
//replacing with a connection to another application
if old_cluster_id.is_some() && old_cluster_id.as_ref() != Some(&cluster_id) {
if let Some(token) = self.back_token() {
if let Some(fd) = self.back_socket_mut().map(|s| s.as_raw_fd()) {
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("1error deregistering socket({:?}): {:?}", fd, e);
}

proxy.sessions.borrow_mut().slab.try_remove(token.0);
}
self.close_backend_inner(token);

//reset the back token here so we can remove it
Expand Down Expand Up @@ -1487,18 +1475,6 @@ impl ProxySession for Session {
proxy.sessions.borrow_mut().slab.try_remove(token.0);
}
} else if let SessionResult::CloseBackend(_opt_back_token) = res {
if let (Some(token), Some(fd)) = (
self.back_token(),
self.back_socket_mut().map(|s| s.as_raw_fd()),
) {
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("error deregistering socket({:?}): {:?}", fd, e);
}

proxy.sessions.borrow_mut().slab.try_remove(token.0);
}

//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
} else if res == SessionResult::ConnectBackend {
Expand All @@ -1517,18 +1493,6 @@ impl ProxySession for Session {
self.metrics().service_stop();
return SessionResult::Continue;
} else if let SessionResult::ReconnectBackend(_, opt_back_token) = res {
if let (Some(token), Some(fd)) = (
opt_back_token,
self.back_socket_mut().map(|s| s.as_raw_fd()),
) {
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("error deregistering socket({:?}): {:?}", fd, e);
}

proxy.sessions.borrow_mut().slab.try_remove(token.0);
}

//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));

Expand Down

0 comments on commit 8a801cf

Please sign in to comment.