Skip to content

Commit

Permalink
refactor session management
Browse files Browse the repository at this point in the history
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 0716c42 commit f5b8290
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 76 deletions.
6 changes: 4 additions & 2 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1651,8 +1651,10 @@ impl ProxyConfiguration<Session> for Proxy {
);
}

let s = Rc::new(RefCell::new(c));
entry.insert(s);
let session = Rc::new(RefCell::new(c));
entry.insert(session);

s.incr();
Ok((session_token, false))
} else {
Err(AcceptError::TooManySessions)
Expand Down
6 changes: 4 additions & 2 deletions lib/src/https_openssl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1889,8 +1889,10 @@ impl ProxyConfiguration<Session> for Proxy {
Duration::seconds(listener.config.request_timeout as i64),
);

let s = Rc::new(RefCell::new(c));
entry.insert(s);
let session = Rc::new(RefCell::new(c));
entry.insert(session);

s.incr();
Ok((session_token, false))
} else {
error!("could not create ssl context");
Expand Down
6 changes: 4 additions & 2 deletions lib/src/https_rustls/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,10 @@ impl ProxyConfiguration<Session> for Proxy {
Duration::seconds(listener.config.request_timeout as i64),
);

let s = Rc::new(RefCell::new(c));
entry.insert(s);
let session = Rc::new(RefCell::new(c));
entry.insert(session);

s.incr();
Ok((session_token, false))
} else {
//FIXME
Expand Down
104 changes: 36 additions & 68 deletions lib/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,39 @@ impl SessionManager {
pub fn slab_capacity(&self) -> usize {
10 + 2 * self.max_connections
}

pub fn check_limits(&mut self) -> bool {
if self.nb_connections == self.max_connections {
error!("max number of session connection reached, flushing the accept queue");
gauge!("accept_queue.backpressure", 1);
self.can_accept = false;
return false;
}

if self.slab.len() >= self.slab_capacity() {
error!("not enough memory to accept another session, flushing the accept queue");
error!(
"nb_connections: {}, max_connections: {}",
self.nb_connections,
self.max_connections
);
gauge!("accept_queue.backpressure", 1);
self.can_accept = false;

return false;
}

true
}

pub fn incr(&mut self) {
self.nb_connections += 1;
assert!(
self.nb_connections
<= self.max_connections
);
gauge!("client.connections", self.nb_connections);
}
}

/// `Server` handles the event loop, the listeners, the sessions and
Expand Down Expand Up @@ -1425,37 +1458,14 @@ impl Server {
socket: TcpStream,
wait_time: Duration,
) -> bool {
if self.sessions.borrow().nb_connections == self.sessions.borrow().max_connections {
error!("max number of session connection reached, flushing the accept queue");
gauge!("accept_queue.backpressure", 1);
self.sessions.borrow_mut().can_accept = false;
return false;
}

if self.sessions.borrow().slab.len() >= self.sessions.borrow().slab_capacity() {
error!("not enough memory to accept another session, flushing the accept queue");
error!(
"nb_connections: {}, max_connections: {}",
self.sessions.borrow().nb_connections,
self.sessions.borrow().max_connections
);
gauge!("accept_queue.backpressure", 1);
self.sessions.borrow_mut().can_accept = false;

if !self.sessions.borrow_mut().check_limits() {
return false;
}

//FIXME: we must handle separately the session limit since the sessions slab also has entries for listeners and backends
let index = {
match self.tcp.create_session(socket, token, wait_time) {
Ok((session_token, should_connect)) => {
self.sessions.borrow_mut().nb_connections += 1;
assert!(
self.sessions.borrow().nb_connections
<= self.sessions.borrow().max_connections
);
gauge!("client.connections", self.sessions.borrow().nb_connections);

if should_connect {
session_token.0
} else {
Expand Down Expand Up @@ -1489,33 +1499,12 @@ impl Server {
socket: TcpStream,
wait_time: Duration,
) -> bool {
if self.sessions.borrow().nb_connections == self.sessions.borrow().max_connections {
error!("max number of session connection reached, flushing the accept queue");
gauge!("accept_queue.backpressure", 1);
self.sessions.borrow_mut().can_accept = false;
return false;
}

//FIXME: we must handle separately the session limit since the sessions slab also has entries for listeners and backends
if self.sessions.borrow().slab.len() >= self.sessions.borrow().slab_capacity() {
error!("not enough memory to accept another session, flushing the accept queue");
error!(
"nb_connections: {}, max_connections: {}",
self.sessions.borrow().nb_connections,
self.sessions.borrow().max_connections
);
gauge!("accept_queue.backpressure", 1);
self.sessions.borrow_mut().can_accept = false;
if !self.sessions.borrow_mut().check_limits() {
return false;
}

match self.http.create_session(socket, token, wait_time) {
Ok(_) => {
self.sessions.borrow_mut().nb_connections += 1;
assert!(
self.sessions.borrow().nb_connections <= self.sessions.borrow().max_connections
);
gauge!("client.connections", self.sessions.borrow().nb_connections);
true
}
Err(AcceptError::IoError) => {
Expand All @@ -1541,33 +1530,12 @@ impl Server {
socket: TcpStream,
wait_time: Duration,
) -> bool {
if self.sessions.borrow().nb_connections == self.sessions.borrow().max_connections {
error!("max number of session connection reached, flushing the accept queue");
gauge!("accept_queue.backpressure", 1);
self.sessions.borrow_mut().can_accept = false;
return false;
}

//FIXME: we must handle separately the session limit since the sessions slab also has entries for listeners and backends
if self.sessions.borrow().slab.len() >= self.sessions.borrow().slab_capacity() {
error!("not enough memory to accept another session, flushing the accept queue");
error!(
"nb_connections: {}, max_connections: {}",
self.sessions.borrow().nb_connections,
self.sessions.borrow().max_connections
);
gauge!("accept_queue.backpressure", 1);
self.sessions.borrow_mut().can_accept = false;
if !self.sessions.borrow_mut().check_limits() {
return false;
}

match self.https.create_session(socket, token, wait_time) {
Ok(_) => {
self.sessions.borrow_mut().nb_connections += 1;
assert!(
self.sessions.borrow().nb_connections <= self.sessions.borrow().max_connections
);
gauge!("client.connections", self.sessions.borrow().nb_connections);
true
}
Err(AcceptError::IoError) => {
Expand Down
6 changes: 4 additions & 2 deletions lib/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1357,8 +1357,10 @@ impl ProxyConfiguration<Session> for Proxy {
let should_connect_backend =
proxy_protocol != Some(ProxyProtocolConfig::ExpectHeader);

let s = Rc::new(RefCell::new(c));
entry.insert(s);
let session = Rc::new(RefCell::new(c));
entry.insert(session);

s.incr();
Ok((session_token, should_connect_backend))
} else {
error!("could not get buffers from pool");
Expand Down

0 comments on commit f5b8290

Please sign in to comment.