Skip to content

Commit

Permalink
replace ProxySession::close() with close_inner(), remove close_session()
Browse files Browse the repository at this point in the history
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent fc8daa4 commit 3151dea
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 312 deletions.
95 changes: 14 additions & 81 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use super::server::{
};
use super::socket::server_bind;
use super::{
AcceptError, Backend, BackendConnectAction, BackendConnectionStatus, CloseResult, ClusterId,
AcceptError, Backend, BackendConnectAction, BackendConnectionStatus, ClusterId,
ConnectionError, Protocol, ProxyConfiguration, ProxySession, Readiness, SessionMetrics,
SessionResult,
};
Expand Down Expand Up @@ -746,61 +746,6 @@ impl Session {
SessionResult::Continue
}

fn close_inner(&mut self) {
self.metrics.service_stop();
self.cancel_timeouts();
if let Err(e) = self.front_socket().shutdown(Shutdown::Both) {
if e.kind() != ErrorKind::NotConnected {
error!(
"error shutting down front socket({:?}): {:?}",
self.front_socket(),
e
);
}
}

//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));

if let Some(State::Http(ref mut http)) = self.protocol {
//if the state was initial, the connection was already reset
if http.request != Some(RequestState::Initial) {
gauge_add!("http.active_requests", -1);

if let Some(b) = http.backend_data.as_mut() {
let mut backend = b.borrow_mut();
backend.active_requests = backend.active_requests.saturating_sub(1);
}
}
}

if let Some(State::WebSocket(_)) = self.protocol {
if let Some(b) = self.backend.as_mut() {
let mut backend = b.borrow_mut();
backend.active_requests = backend.active_requests.saturating_sub(1);
}
}

match self.protocol {
Some(State::Expect(_)) => gauge_add!("protocol.proxy.expect", -1),
Some(State::Http(_)) => gauge_add!("protocol.http", -1),
Some(State::WebSocket(_)) => gauge_add!("protocol.ws", -1),
None => {}
}

let fd = self.front_socket().as_raw_fd();
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("1error deregistering socket({:?}): {:?}", fd, e);
}

proxy
.sessions
.borrow_mut()
.slab
.try_remove(self.frontend_token.0);
}

//FIXME: check the token passed as argument
fn close_backend_inner(&mut self, _: Token) {
if let Some(token) = self.back_token() {
Expand Down Expand Up @@ -1190,7 +1135,7 @@ impl Session {
}

impl ProxySession for Session {
fn close(&mut self, registry: &Registry) -> CloseResult {
fn close(&mut self) {
self.metrics.service_stop();
self.cancel_timeouts();
if let Err(e) = self.front_socket().shutdown(Shutdown::Both) {
Expand All @@ -1203,22 +1148,8 @@ impl ProxySession for Session {
}
}

if let Err(e) = registry.deregister(self.front_socket_mut()) {
error!(
"3error deregistering front socket({:?}): {:?}",
self.front_socket(),
e
);
}

let mut result = CloseResult::default();

if let Some(tk) = self.back_token() {
result.tokens.push(tk)
}

//FIXME: should we really pass a token here?
self.close_backend(Token(0), registry);
self.close_backend_inner(Token(0));

if let Some(State::Http(ref mut http)) = self.protocol {
//if the state was initial, the connection was already reset
Expand Down Expand Up @@ -1246,9 +1177,17 @@ impl ProxySession for Session {
None => {}
}

result.tokens.push(self.frontend_token);
let fd = self.front_socket().as_raw_fd();
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("1error deregistering socket({:?}): {:?}", fd, e);
}

result
proxy
.sessions
.borrow_mut()
.slab
.try_remove(self.frontend_token.0);
}

fn timeout(&mut self, token: Token) -> SessionResult {
Expand Down Expand Up @@ -1326,7 +1265,7 @@ impl ProxySession for Session {
let res = self.ready_inner();

if res == SessionResult::CloseSession {
let mut v = self.close_inner();
self.close();
} else if let SessionResult::CloseBackend(_opt_back_token) = res {
//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
Expand Down Expand Up @@ -1508,12 +1447,6 @@ impl Proxy {
l.answers.borrow_mut().remove_custom_answer(cluster_id);
}
}

pub fn close_session(&mut self, token: Token) {
self.sessions
.borrow_mut()
.close_session(SessionManager::to_session(token), &self.registry)
}
}

impl Listener {
Expand Down
86 changes: 10 additions & 76 deletions lib/src/https_openssl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use crate::socket::server_bind;
use crate::timer::TimeoutContainer;
use crate::util::UnwrapLog;
use crate::{
AcceptError, Backend, BackendConnectAction, BackendConnectionStatus, CloseResult, ClusterId,
AcceptError, Backend, BackendConnectAction, BackendConnectionStatus, ClusterId,
ConnectionError, Protocol, ProxyConfiguration, ProxySession, Readiness, SessionMetrics,
SessionResult,
};
Expand Down Expand Up @@ -879,58 +879,6 @@ impl Session {
SessionResult::Continue
}

fn close_inner(&mut self) {
//println!("TLS closing[{:?}] temp->front: {:?}, temp->back: {:?}", self.frontend_token, *self.temp.front_buf, *self.temp.back_buf);
self.http_mut().map(|http| http.close());
self.metrics.service_stop();
self.cancel_timeouts();
if let Some(front_socket) = self.front_socket_mut() {
if let Err(e) = front_socket.shutdown(Shutdown::Both) {
if e.kind() != ErrorKind::NotConnected {
error!("error closing front socket({:?}): {:?}", front_socket, e);
}
}
}

//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));

if let Some(State::Http(ref mut http)) = self.protocol {
//if the state was initial, the connection was already reset
if http.request != Some(RequestState::Initial) {
gauge_add!("http.active_requests", -1);

if let Some(b) = http.backend_data.as_mut() {
let mut backend = b.borrow_mut();
backend.active_requests = backend.active_requests.saturating_sub(1);
}
}
}

if let Some(State::WebSocket(_)) = self.protocol {
if let Some(b) = self.backend.as_mut() {
let mut backend = b.borrow_mut();
backend.active_requests = backend.active_requests.saturating_sub(1);
}
}

match self.protocol {
Some(State::Expect(_, _)) => gauge_add!("protocol.proxy.expect", -1),
Some(State::Handshake(_)) => gauge_add!("protocol.tls.handshake", -1),
Some(State::Http(_)) => gauge_add!("protocol.https", -1),
Some(State::Http2(_)) => gauge_add!("protocol.http2s", -1),
Some(State::WebSocket(_)) => gauge_add!("protocol.wss", -1),
None => {}
}

if let Some(fd) = self.front_socket_mut().map(|s| s.as_raw_fd()) {
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("1error deregistering socket({:?}): {:?}", fd, e);
}
}
}

fn close_backend_inner(&mut self, _: Token) {
if let Some(token) = self.back_token() {
if let Some(fd) = self.back_socket_mut().map(|s| s.as_raw_fd()) {
Expand Down Expand Up @@ -1318,7 +1266,7 @@ impl Session {
}

impl ProxySession for Session {
fn close(&mut self, registry: &Registry) -> CloseResult {
fn close(&mut self) {
//println!("TLS closing[{:?}] temp->front: {:?}, temp->back: {:?}", self.frontend_token, *self.temp.front_buf, *self.temp.back_buf);
self.http_mut().map(|http| http.close());
self.metrics.service_stop();
Expand All @@ -1329,21 +1277,10 @@ impl ProxySession for Session {
error!("error closing front socket({:?}): {:?}", front_socket, e);
}
}
if let Err(e) = registry.deregister(front_socket) {
error!(
"error deregistering front socket({:?}): {:?}",
front_socket, e
);
}
}
let mut result = CloseResult::default();

if let Some(tk) = self.back_token() {
result.tokens.push(tk)
}

//FIXME: should we really pass a token here?
self.close_backend(Token(0), registry);
self.close_backend_inner(Token(0));

if let Some(State::Http(ref mut http)) = self.protocol {
//if the state was initial, the connection was already reset
Expand Down Expand Up @@ -1373,9 +1310,12 @@ impl ProxySession for Session {
None => {}
}

result.tokens.push(self.frontend_token);

result
if let Some(fd) = self.front_socket_mut().map(|s| s.as_raw_fd()) {
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("1error deregistering socket({:?}): {:?}", fd, e);
}
}
}

fn timeout(&mut self, token: Token) -> SessionResult {
Expand Down Expand Up @@ -1463,7 +1403,7 @@ impl ProxySession for Session {
let res = self.ready_inner();

if res == SessionResult::CloseSession {
let mut v = self.close_inner();
self.close();
} else if let SessionResult::CloseBackend(_opt_back_token) = res {
//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
Expand Down Expand Up @@ -2157,12 +2097,6 @@ impl Proxy {
l.answers.borrow_mut().remove_custom_answer(cluster_id);
}
}

pub fn close_session(&mut self, token: Token) {
self.sessions
.borrow_mut()
.close_session(SessionManager::to_session(token), &self.registry)
}
}

impl ProxyConfiguration<Session> for Proxy {
Expand Down
6 changes: 0 additions & 6 deletions lib/src/https_rustls/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,6 @@ impl Proxy {
l.answers.borrow_mut().remove_custom_answer(cluster_id);
}
}

pub fn close_session(&mut self, token: Token) {
self.sessions
.borrow_mut()
.close_session(SessionManager::to_session(token), &self.registry)
}
}

impl ProxyConfiguration<Session> for Proxy {
Expand Down

0 comments on commit 3151dea

Please sign in to comment.