Skip to content

Commit

Permalink
move close_session() to the session manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 56e6579 commit a152063
Showing 1 changed file with 49 additions and 34 deletions.
83 changes: 49 additions & 34 deletions lib/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,43 @@ impl SessionManager {
true
}

pub fn to_session(&self, token: Token) -> SessionToken {
SessionToken(token.0)
}

pub fn incr(&mut self) {
self.nb_connections += 1;
assert!(self.nb_connections <= self.max_connections);
gauge!("client.connections", self.nb_connections);
}

pub fn close_session(&mut self, token: SessionToken, registry: &Registry) {
if self.slab.contains(token.0) {
let session = self.slab.remove(token.0);
let CloseResult { tokens } = session.borrow_mut().close(registry);

for tk in tokens.into_iter() {
let cl = self.to_session(tk);
if self.slab.contains(cl.0) {
self.slab.remove(cl.0);
}
}

assert!(self.nb_connections != 0);
self.nb_connections -= 1;
gauge!("client.connections", self.nb_connections);
}

// do not be ready to accept right away, wait until we get back to 10% capacity
if !self.can_accept && self.nb_connections < self.max_connections * 90 / 100 {
debug!(
"nb_connections = {}, max_connections = {}, starting to accept again",
self.nb_connections, self.max_connections
);
gauge!("accept_queue.backpressure", 0);
self.can_accept = true;
}
}
}

/// `Server` handles the event loop, the listeners, the sessions and
Expand Down Expand Up @@ -633,7 +665,9 @@ impl Server {

for tk in frontend_tokens.iter() {
let cl = self.to_session(*tk);
self.close_session(cl);
self.sessions
.borrow_mut()
.close_session(cl, self.poll.registry());
}

if count > 0 {
Expand Down Expand Up @@ -682,7 +716,9 @@ impl Server {

for tk in closing_tokens.iter() {
let cl = self.to_session(*tk);
self.close_session(cl);
self.sessions
.borrow_mut()
.close_session(cl, self.poll.registry());
}

if !closing_tokens.is_empty() {
Expand Down Expand Up @@ -1419,35 +1455,6 @@ impl Server {
Token(token.0)
}

pub fn close_session(&mut self, token: SessionToken) {
let mut sessions = self.sessions.borrow_mut();
if sessions.slab.contains(token.0) {
let session = sessions.slab.remove(token.0);
let CloseResult { tokens } = session.borrow_mut().close(self.poll.registry());

for tk in tokens.into_iter() {
let cl = self.to_session(tk);
if sessions.slab.contains(cl.0) {
sessions.slab.remove(cl.0);
}
}

assert!(sessions.nb_connections != 0);
sessions.nb_connections -= 1;
gauge!("client.connections", sessions.nb_connections);
}

// do not be ready to accept right away, wait until we get back to 10% capacity
if !sessions.can_accept && sessions.nb_connections < sessions.max_connections * 90 / 100 {
debug!(
"nb_connections = {}, max_connections = {}, starting to accept again",
sessions.nb_connections, sessions.max_connections
);
gauge!("accept_queue.backpressure", 0);
sessions.can_accept = true;
}
}

pub fn create_session_tcp(
&mut self,
token: ListenToken,
Expand Down Expand Up @@ -1676,17 +1683,25 @@ impl Server {
| Err(ConnectionError::InvalidHost)
| Err(ConnectionError::Unauthorized) => {
if protocol == Protocol::TCP {
self.close_session(token);
self.sessions
.borrow_mut()
.close_session(token, self.poll.registry());
}
}
_ => self.close_session(token),
_ => self
.sessions
.borrow_mut()
.close_session(token, self.poll.registry()),
}
}

pub fn interpret_session_order(&mut self, token: SessionToken, order: SessionResult) {
//trace!("INTERPRET ORDER: {:?}", order);
match order {
SessionResult::CloseSession => self.close_session(token),
SessionResult::CloseSession => self
.sessions
.borrow_mut()
.close_session(token, self.poll.registry()),
SessionResult::CloseBackend(opt) => {
if let Some(token) = opt {
let cl = self.to_session(token);
Expand Down

0 comments on commit a152063

Please sign in to comment.