Skip to content

Commit

Permalink
move connect_to_backend to the session, call it from ready()
Browse files Browse the repository at this point in the history
with this step we can now also move backend reconnection in ready(), and
remove all the backend management from the event loop
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 1685d8f commit 6b7681b
Show file tree
Hide file tree
Showing 5 changed files with 915 additions and 15 deletions.
290 changes: 284 additions & 6 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,266 @@ impl Session {
});
self.backend = Some(backend);
}

fn cluster_id_from_request(&mut self) -> Result<String, ConnectionError> {
let (host, uri, method) = match self.extract_route() {
Ok(t) => t,
Err(e) => {
self.set_answer(DefaultAnswerStatus::Answer400, None);
return Err(e);
}
};

let cluster_id_res = self
.proxy
.borrow()
.listeners
.get(&self.listen_token)
.as_ref()
.and_then(|l| l.frontend_from_request(&host, &uri, &method));
let cluster_id = match cluster_id_res {
Some(Route::ClusterId(cluster_id)) => cluster_id,
Some(Route::Deny) => {
self.set_answer(DefaultAnswerStatus::Answer401, None);
return Err(ConnectionError::Unauthorized);
}
None => {
self.set_answer(DefaultAnswerStatus::Answer404, None);
return Err(ConnectionError::HostNotFound);
}
};

let front_should_redirect_https = self
.proxy
.borrow()
.clusters
.get(&cluster_id)
.map(|ref app| app.https_redirect)
.unwrap_or(false);
if front_should_redirect_https {
let answer = format!("HTTP/1.1 301 Moved Permanently\r\nContent-Length: 0\r\nLocation: https://{}{}\r\n\r\n", host, uri);
self.set_answer(
DefaultAnswerStatus::Answer301,
Some(Rc::new(answer.into_bytes())),
);
return Err(ConnectionError::HttpsRedirect);
}

Ok(cluster_id)
}

pub fn backend_from_request(
&mut self,
cluster_id: &str,
front_should_stick: bool,
) -> Result<TcpStream, ConnectionError> {
let sticky_session = self
.http()
.and_then(|http| http.request.as_ref())
.and_then(|r| r.get_sticky_session());

let res = match (front_should_stick, sticky_session) {
(true, Some(sticky_session)) => self
.proxy
.borrow_mut()
.backends
.borrow_mut()
.backend_from_sticky_session(cluster_id, &sticky_session)
.map_err(|e| {
debug!(
"Couldn't find a backend corresponding to sticky_session {} for app {}",
sticky_session, cluster_id
);
e
}),
_ => self
.proxy
.borrow_mut()
.backends
.borrow_mut()
.backend_from_cluster_id(cluster_id),
};

match res {
Err(e) => {
self.set_answer(DefaultAnswerStatus::Answer503, None);
Err(e)
}
Ok((backend, conn)) => {
let listen_token = self.listen_token;
if front_should_stick {
let sticky_name = &self.proxy.borrow().listeners[&listen_token]
.config
.sticky_name
.to_string();
self.http_mut().map(|http| {
http.sticky_session = Some(StickySession::new(
backend
.borrow()
.sticky_id
.clone()
.unwrap_or_else(|| backend.borrow().backend_id.clone()),
));
http.sticky_name = sticky_name.to_string();
});
}
self.metrics.backend_id = Some(backend.borrow().backend_id.clone());
self.metrics.backend_start();
self.http_mut().map(|http| {
http.set_backend_id(backend.borrow().backend_id.clone());
});
self.backend = Some(backend);

Ok(conn)
}
}
}

fn connect_to_backend(
&mut self,
session_rc: Rc<RefCell<dyn ProxySessionCast>>,
) -> Result<BackendConnectAction, ConnectionError> {
let old_cluster_id = self.http().and_then(|ref http| http.cluster_id.clone());
let old_back_token = self.back_token();

self.check_circuit_breaker()?;

let cluster_id = self.cluster_id_from_request()?;

// check if we can reuse the backend connection
if (self.http().and_then(|h| h.cluster_id.as_ref()) == Some(&cluster_id))
&& self.back_connected == BackendConnectionStatus::Connected
{
let has_backend = self
.backend
.as_ref()
.map(|backend| {
let ref backend = *backend.borrow();
self.proxy
.borrow()
.backends
.borrow()
.has_backend(&cluster_id, backend)
})
.unwrap_or(false);

if has_backend && self.check_backend_connection() {
return Ok(BackendConnectAction::Reuse);
} else if let Some(token) = self.back_token() {
if let Some(fd) = self.back_socket_mut().map(|s| s.as_raw_fd()) {
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("1error deregistering socket({:?}): {:?}", fd, e);
}

proxy.sessions.borrow_mut().slab.try_remove(token.0);
}
self.close_backend_inner(token);

//reset the back token here so we can remove it
//from the slab after backend_from* fails
self.set_back_token(token);
}
}

//replacing with a connection to another cluster
if old_cluster_id.is_some() && old_cluster_id.as_ref() != Some(&cluster_id) {
if let Some(token) = self.back_token() {
if let Some(fd) = self.back_socket_mut().map(|s| s.as_raw_fd()) {
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("2error deregistering socket({:?}): {:?}", fd, e);
}

proxy.sessions.borrow_mut().slab.try_remove(token.0);
}
self.close_backend_inner(token);

//reset the back token here so we can remove it
//from the slab after backend_from* fails
self.set_back_token(token);
}
}

self.cluster_id = Some(cluster_id.clone());
self.http_mut().map(|http| {
http.cluster_id = Some(cluster_id.clone());
});

let front_should_stick = self
.proxy
.borrow()
.clusters
.get(&cluster_id)
.map(|ref app| app.sticky_session)
.unwrap_or(false);
let mut socket = self.backend_from_request(&cluster_id, front_should_stick)?;

if let Err(e) = socket.set_nodelay(true) {
error!(
"error setting nodelay on back socket({:?}): {:?}",
socket, e
);
}
self.back_readiness().map(|r| {
r.interest = Ready::writable() | Ready::hup() | Ready::error();
});

let connect_timeout = time::Duration::seconds(i64::from(
self.proxy
.borrow()
.listeners
.get(&self.listen_token)
.as_ref()
.map(|l| l.config.connect_timeout)
.unwrap(),
));

self.back_connected = BackendConnectionStatus::Connecting(Instant::now());

if let Some(back_token) = old_back_token {
self.set_back_token(back_token);
if let Err(e) = self.proxy.borrow().registry.register(
&mut socket,
back_token,
Interest::READABLE | Interest::WRITABLE,
) {
error!("error registering back socket({:?}): {:?}", socket, e);
}
self.set_back_socket(socket);
self.http_mut().map(|h| h.set_back_timeout(connect_timeout));
Ok(BackendConnectAction::Replace)
} else {
if self.proxy.borrow().sessions.borrow().slab.len()
>= self.proxy.borrow().sessions.borrow().slab_capacity()
{
error!("not enough memory, cannot connect to backend");
return Err(ConnectionError::TooManyConnections);
}

let back_token = {
let p = self.proxy.borrow();
let mut s = p.sessions.borrow_mut();
let entry = s.slab.vacant_entry();
let back_token = Token(entry.key());
let _entry = entry.insert(session_rc);
back_token
};

if let Err(e) = self.proxy.borrow().registry.register(
&mut socket,
back_token,
Interest::READABLE | Interest::WRITABLE,
) {
error!("error registering back socket({:?}): {:?}", socket, e);
}

self.set_back_socket(socket);
self.set_back_token(back_token);
self.http_mut().map(|h| h.set_back_timeout(connect_timeout));
Ok(BackendConnectAction::New)
}
}
}

impl ProxySession for Session {
Expand All @@ -952,7 +1212,7 @@ impl ProxySession for Session {

if let Err(e) = registry.deregister(self.front_socket_mut()) {
error!(
"error deregistering front socket({:?}): {:?}",
"3error deregistering front socket({:?}): {:?}",
self.front_socket(),
e
);
Expand Down Expand Up @@ -1078,7 +1338,10 @@ impl ProxySession for Session {
let proxy = self.proxy.borrow_mut();
for (token, fd) in v.drain(..) {
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("error deregistering socket({:?}): {:?}", fd, e);
error!(
"4error deregistering socket({:?} token = {:?}): {:?}",
fd, token, e
);
}

proxy.sessions.borrow_mut().slab.try_remove(token.0);
Expand All @@ -1090,14 +1353,26 @@ impl ProxySession for Session {
) {
let proxy = self.proxy.borrow_mut();
if let Err(e) = proxy.registry.deregister(&mut SourceFd(&fd)) {
error!("error deregistering socket({:?}): {:?}", fd, e);
error!("5error deregistering socket({:?}): {:?}", fd, e);
}

proxy.sessions.borrow_mut().slab.try_remove(token.0);
}

//FIXME: should we really pass a token here?
self.close_backend_inner(Token(0));
} else if res == SessionResult::ConnectBackend {
let res = self.connect_to_backend(session);
info!("HTTP::READY): connect_to_backend returned {:?}\n\n", res);

//FIXME: we might need to loop here betwen ready and connet_to_backend because a call to ready() might go through connect_to_backend again or it could return CloseBackend and other results. Ideally, connect_to_backend should be called from ready_inner instead
if res == Ok(BackendConnectAction::Reuse) || res.is_err() {
let res = self.ready_inner();
self.metrics().service_stop();
return res;
}
self.metrics().service_stop();
return SessionResult::Continue;
}
self.metrics().service_stop();
res
Expand Down Expand Up @@ -2027,6 +2302,7 @@ mod tests {
Channel::generate(1000, 10000).expect("should create a channel");

let _jg = thread::spawn(move || {
setup_test_logger!();
start(config, channel, 10, 16384);
});

Expand Down Expand Up @@ -2086,6 +2362,7 @@ mod tests {
}
}
}

println!(
"Response: {}",
str::from_utf8(&buffer[..index]).expect("could not make string from buffer")
Expand Down Expand Up @@ -2216,13 +2493,14 @@ mod tests {

fn start_server(port: u16, barrier: Arc<Barrier>) {
thread::spawn(move || {
setup_test_logger!();
let server =
Server::http(&format!("127.0.0.1:{}", port)).expect("could not create server");
info!("starting web server in port {}", port);
barrier.wait();

for request in server.incoming_requests() {
println!(
info!(
"backend web server got request -> method: {:?}, url: {:?}, headers: {:?}",
request.method(),
request.url(),
Expand All @@ -2231,9 +2509,9 @@ mod tests {

let response = Response::from_string("hello world");
request.respond(response).unwrap();
println!("backend web server sent response");
info!("backend web server sent response");
barrier.wait();
println!("server session stopped");
info!("server session stopped");
}

println!("server on port {} closed", port);
Expand Down

0 comments on commit 6b7681b

Please sign in to comment.