Skip to content

Commit

Permalink
move slab capacity check in connect_to_backend
Browse files Browse the repository at this point in the history
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 7eef4f0 commit ad26bb8
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 23 deletions.
20 changes: 19 additions & 1 deletion lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -945,12 +945,14 @@ pub struct Proxy {
pool: Rc<RefCell<Pool>>,
registry: Registry,
sessions: Rc<RefCell<Slab<Rc<RefCell<dyn ProxySessionCast>>>>>,
max_connections: usize,
}

impl Proxy {
pub fn new(
registry: Registry,
sessions: Rc<RefCell<Slab<Rc<RefCell<dyn ProxySessionCast>>>>>,
max_connections: usize,
pool: Rc<RefCell<Pool>>,
backends: Rc<RefCell<BackendMap>>,
) -> Proxy {
Expand All @@ -961,6 +963,7 @@ impl Proxy {
pool,
registry,
sessions,
max_connections,
}
}

Expand Down Expand Up @@ -1181,6 +1184,10 @@ impl Proxy {
Ok(())
}
}

fn slab_capacity(&self) -> usize {
10 + 2 * self.max_connections
}
}

impl Listener {
Expand Down Expand Up @@ -1411,6 +1418,11 @@ impl ProxyConfiguration<Session> for Proxy {
.map(|h| h.set_back_timeout(connect_timeout));
Ok(BackendConnectAction::Replace)
} else {
if self.sessions.borrow().len() >= self.slab_capacity() {
error!("not enough memory, cannot connect to backend");
return Err(ConnectionError::TooManyConnections);
}

let back_token = {
let mut s = self.sessions.borrow_mut();
let entry = s.vacant_entry();
Expand Down Expand Up @@ -1703,7 +1715,13 @@ pub fn start(config: HttpListener, channel: ProxyChannel, max_buffers: usize, bu
let address = config.address;
let sessions = Rc::new(RefCell::new(sessions));
let registry = event_loop.registry().try_clone().unwrap();
let mut proxy = Proxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
let mut proxy = Proxy::new(
registry,
sessions.clone(),
max_buffers,
pool.clone(),
backends.clone(),
);
let _ = proxy.add_listener(config, token);
let _ = proxy.activate_listener(&address, None);
let (scm_server, scm_client) = UnixStream::pair().unwrap();
Expand Down
20 changes: 19 additions & 1 deletion lib/src/https_openssl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1597,12 +1597,14 @@ pub struct Proxy {
pool: Rc<RefCell<Pool>>,
registry: Registry,
sessions: Rc<RefCell<Slab<Rc<RefCell<dyn ProxySessionCast>>>>>,
max_connections: usize,
}

impl Proxy {
pub fn new(
registry: Registry,
sessions: Rc<RefCell<Slab<Rc<RefCell<dyn ProxySessionCast>>>>>,
max_connections: usize,
pool: Rc<RefCell<Pool>>,
backends: Rc<RefCell<BackendMap>>,
) -> Proxy {
Expand All @@ -1613,6 +1615,7 @@ impl Proxy {
pool,
registry,
sessions,
max_connections,
}
}

Expand Down Expand Up @@ -1834,6 +1837,10 @@ impl Proxy {
Ok(())
}
}

fn slab_capacity(&self) -> usize {
10 + 2 * self.max_connections
}
}

impl ProxyConfiguration<Session> for Proxy {
Expand Down Expand Up @@ -2016,6 +2023,11 @@ impl ProxyConfiguration<Session> for Proxy {
.map(|h| h.set_back_timeout(connect_timeout));
Ok(BackendConnectAction::Replace)
} else {
if self.sessions.borrow().len() >= self.slab_capacity() {
error!("not enough memory, cannot connect to backend");
return Err(ConnectionError::TooManyConnections);
}

let back_token = {
let mut s = self.sessions.borrow_mut();
let entry = s.vacant_entry();
Expand Down Expand Up @@ -2393,7 +2405,13 @@ pub fn start(config: HttpsListener, channel: ProxyChannel, max_buffers: usize, b

let sessions = Rc::new(RefCell::new(sessions));
let registry = event_loop.registry().try_clone().unwrap();
let mut configuration = Proxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
let mut configuration = Proxy::new(
registry,
sessions.clone(),
max_buffers,
pool.clone(),
backends.clone(),
);
let address = config.address.clone();
if configuration.add_listener(config, token).is_some() {
if configuration.activate_listener(&address, None).is_some() {
Expand Down
21 changes: 19 additions & 2 deletions lib/src/https_rustls/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,14 @@ pub struct Proxy {
pool: Rc<RefCell<Pool>>,
registry: Registry,
sessions: Rc<RefCell<Slab<Rc<RefCell<dyn ProxySessionCast>>>>>,
max_connections: usize,
}

impl Proxy {
pub fn new(
registry: Registry,
sessions: Rc<RefCell<Slab<Rc<RefCell<dyn ProxySessionCast>>>>>,
max_connections: usize,
pool: Rc<RefCell<Pool>>,
backends: Rc<RefCell<BackendMap>>,
) -> Proxy {
Expand All @@ -264,6 +266,7 @@ impl Proxy {
pool,
registry,
sessions,
max_connections,
}
}

Expand Down Expand Up @@ -484,6 +487,10 @@ impl Proxy {
Ok(())
}
}

fn slab_capacity(&self) -> usize {
10 + 2 * self.max_connections
}
}

impl ProxyConfiguration<Session> for Proxy {
Expand Down Expand Up @@ -551,7 +558,6 @@ impl ProxyConfiguration<Session> for Proxy {

fn connect_to_backend(
&mut self,

session_rc: Rc<RefCell<dyn ProxySessionCast>>,
) -> Result<BackendConnectAction, ConnectionError> {
let mut b = session_rc.borrow_mut();
Expand Down Expand Up @@ -660,6 +666,11 @@ impl ProxyConfiguration<Session> for Proxy {
.map(|h| h.set_back_timeout(connect_timeout));
Ok(BackendConnectAction::Replace)
} else {
if self.sessions.borrow().len() >= self.slab_capacity() {
error!("not enough memory, cannot connect to backend");
return Err(ConnectionError::TooManyConnections);
}

let back_token = {
let mut s = self.sessions.borrow_mut();
let entry = s.vacant_entry();
Expand Down Expand Up @@ -974,7 +985,13 @@ pub fn start(config: HttpsListener, channel: ProxyChannel, max_buffers: usize, b
let address = config.address.clone();
let sessions = Rc::new(RefCell::new(sessions));
let registry = event_loop.registry().try_clone().unwrap();
let mut configuration = Proxy::new(registry, sessions.clone(), pool.clone(), backends.clone());
let mut configuration = Proxy::new(
registry,
sessions.clone(),
max_buffers,
pool.clone(),
backends.clone(),
);
if configuration.add_listener(config, token).is_some()
&& configuration.activate_listener(&address, None).is_some()
{
Expand Down
1 change: 1 addition & 0 deletions lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ pub enum ConnectionError {
ToBeDefined,
HttpsRedirect,
Unauthorized,
TooManyConnections,
}

#[derive(Debug, PartialEq, Eq)]
Expand Down
56 changes: 39 additions & 17 deletions lib/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ impl Server {
use_openssl,
registry,
sessions.clone(),
server_config.max_connections,
pool.clone(),
backends.clone(),
);
Expand Down Expand Up @@ -285,7 +286,13 @@ impl Server {
.registry()
.try_clone()
.expect("could not clone the mio Registry");
http::Proxy::new(registry, sessions.clone(), pool.clone(), backends.clone())
http::Proxy::new(
registry,
sessions.clone(),
server_config.max_connections,
pool.clone(),
backends.clone(),
)
});
let https = https.unwrap_or_else(|| {
let registry = poll
Expand All @@ -296,6 +303,7 @@ impl Server {
false,
registry,
sessions.clone(),
server_config.max_connections,
pool.clone(),
backends.clone(),
)
Expand All @@ -305,7 +313,12 @@ impl Server {
.registry()
.try_clone()
.expect("could not clone the mio Registry");
tcp::Proxy::new(registry, sessions.clone(), backends.clone())
tcp::Proxy::new(
registry,
sessions.clone(),
server_config.max_connections,
backends.clone(),
)
});

let mut server = Server {
Expand Down Expand Up @@ -1668,23 +1681,17 @@ impl Server {
}

let (protocol, res) = {
let cl = self.sessions.borrow()[token.0].clone();
let cl2: Rc<RefCell<dyn ProxySessionCast>> = cl.clone();
let protocol = { cl.borrow().protocol() };

if self.sessions.borrow().len() >= self.slab_capacity() {
error!("not enough memory, cannot connect to backend");
return;
}
let session = self.sessions.borrow()[token.0].clone();
let protocol = { session.borrow().protocol() };

let (protocol, res) = match protocol {
Protocol::TCP => {
let r = self.tcp.connect_to_backend(cl2);
let r = self.tcp.connect_to_backend(session);

(Protocol::TCP, r)
}
Protocol::HTTP => (Protocol::HTTP, { self.http.connect_to_backend(cl2) }),
Protocol::HTTPS => (Protocol::HTTPS, { self.https.connect_to_backend(cl2) }),
Protocol::HTTP => (Protocol::HTTP, { self.http.connect_to_backend(session) }),
Protocol::HTTPS => (Protocol::HTTPS, { self.https.connect_to_backend(session) }),
_ => {
panic!("should not call connect_to_backend on listeners");
}
Expand Down Expand Up @@ -1935,16 +1942,25 @@ impl HttpsProvider {
use_openssl: bool,
registry: Registry,
sessions: Rc<RefCell<Slab<Rc<RefCell<dyn ProxySessionCast>>>>>,
max_connections: usize,
pool: Rc<RefCell<Pool>>,
backends: Rc<RefCell<BackendMap>>,
) -> HttpsProvider {
if use_openssl {
HttpsProvider::Openssl(https_openssl::Proxy::new(
registry, sessions, pool, backends,
registry,
sessions,
max_connections,
pool,
backends,
))
} else {
HttpsProvider::Rustls(https_rustls::configuration::Proxy::new(
registry, sessions, pool, backends,
registry,
sessions,
max_connections,
pool,
backends,
))
}
}
Expand Down Expand Up @@ -2035,15 +2051,21 @@ impl HttpsProvider {
use_openssl: bool,
registry: Registry,
sessions: Rc<RefCell<Slab<Rc<RefCell<dyn ProxySessionCast>>>>>,
max_connections: usize,
pool: Rc<RefCell<Pool>>,
backends: Rc<RefCell<BackendMap>>,
) -> HttpsProvider {
if use_openssl {
error!("the openssl provider is not compiled, continuing with the rustls provider");
}

let configuration =
https_rustls::configuration::Proxy::new(registry, sessions, pool, backends);
let configuration = https_rustls::configuration::Proxy::new(
registry,
sessions,
max_connections,
pool,
backends,
);
HttpsProvider::Rustls(configuration)
}

Expand Down
17 changes: 15 additions & 2 deletions lib/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -979,12 +979,14 @@ pub struct Proxy {
configs: HashMap<ClusterId, ClusterConfiguration>,
registry: Registry,
sessions: Rc<RefCell<Slab<Rc<RefCell<dyn ProxySessionCast>>>>>,
max_connections: usize,
}

impl Proxy {
pub fn new(
registry: Registry,
sessions: Rc<RefCell<Slab<Rc<RefCell<dyn ProxySessionCast>>>>>,
max_connections: usize,
backends: Rc<RefCell<BackendMap>>,
) -> Proxy {
Proxy {
Expand All @@ -994,6 +996,7 @@ impl Proxy {
fronts: HashMap::new(),
registry,
sessions,
max_connections,
}
}

Expand Down Expand Up @@ -1068,6 +1071,10 @@ impl Proxy {
false
}
}

fn slab_capacity(&self) -> usize {
10 + 2 * self.max_connections
}
}

impl ProxyConfiguration<Session> for Proxy {
Expand All @@ -1092,6 +1099,11 @@ impl ProxyConfiguration<Session> for Proxy {
return Err(ConnectionError::NoBackendAvailable);
}

if self.sessions.borrow().len() >= self.slab_capacity() {
error!("not enough memory, cannot connect to backend");
return Err(ConnectionError::TooManyConnections);
}

let conn = self
.backends
.borrow_mut()
Expand Down Expand Up @@ -1416,7 +1428,7 @@ pub fn start(
let sessions = Rc::new(RefCell::new(sessions));
let address = config.address;
let registry = poll.registry().try_clone().unwrap();
let mut configuration = Proxy::new(registry, sessions.clone(), backends.clone());
let mut configuration = Proxy::new(registry, sessions.clone(), max_buffers, backends.clone());
let _ = configuration.add_listener(config, pool.clone(), token);
let _ = configuration.activate_listener(&address, None);
let (scm_server, _scm_client) = UnixStream::pair().unwrap();
Expand Down Expand Up @@ -1615,7 +1627,8 @@ mod tests {

let sessions = Rc::new(RefCell::new(sessions));
let registry = poll.registry().try_clone().unwrap();
let mut configuration = Proxy::new(registry, sessions.clone(), backends.clone());
let mut configuration =
Proxy::new(registry, sessions.clone(), max_buffers, backends.clone());
let listener_config = TcpListenerConfig {
address: "127.0.0.1:1234".parse().unwrap(),
public_address: None,
Expand Down

0 comments on commit ad26bb8

Please sign in to comment.