Skip to content

Commit

Permalink
remove connect_to_backend from ProxyConfiguration
Browse files Browse the repository at this point in the history
it is handled in sessions now
  • Loading branch information
Geal authored and FlorentinDUBOIS committed Jul 13, 2022
1 parent 6310f19 commit c6ce2fd
Show file tree
Hide file tree
Showing 6 changed files with 3 additions and 786 deletions.
215 changes: 0 additions & 215 deletions lib/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1552,95 +1552,6 @@ impl Proxy {
}
}

pub fn backend_from_request(
&mut self,
session: &mut Session,
cluster_id: &str,
front_should_stick: bool,
) -> Result<TcpStream, ConnectionError> {
let sticky_session = session
.http()
.and_then(|http| http.request.as_ref())
.and_then(|r| r.get_sticky_session());

let res = match (front_should_stick, sticky_session) {
(true, Some(sticky_session)) => self
.backends
.borrow_mut()
.backend_from_sticky_session(cluster_id, &sticky_session)
.map_err(|e| {
debug!(
"Couldn't find a backend corresponding to sticky_session {} for app {}",
sticky_session, cluster_id
);
e
}),
_ => self
.backends
.borrow_mut()
.backend_from_cluster_id(cluster_id),
};

match res {
Err(e) => {
session.set_answer(DefaultAnswerStatus::Answer503, None);
Err(e)
}
Ok((backend, conn)) => {
let sticky_name = &self.listeners[&session.listen_token].config.sticky_name;
session.set_backend(backend, front_should_stick, &sticky_name);

Ok(conn)
}
}
}

fn cluster_id_from_request(
&mut self,
session: &mut Session,
) -> Result<String, ConnectionError> {
let (host, uri, method) = match session.extract_route() {
Ok(t) => t,
Err(e) => {
session.set_answer(DefaultAnswerStatus::Answer400, None);
return Err(e);
}
};

let cluster_id = match self
.listeners
.get(&session.listen_token)
.as_ref()
.and_then(|l| l.frontend_from_request(&host, &uri, &method))
{
Some(Route::ClusterId(cluster_id)) => cluster_id,
Some(Route::Deny) => {
session.set_answer(DefaultAnswerStatus::Answer401, None);
return Err(ConnectionError::Unauthorized);
}
None => {
session.set_answer(DefaultAnswerStatus::Answer404, None);
return Err(ConnectionError::HostNotFound);
}
};

let front_should_redirect_https = self
.clusters
.get(&cluster_id)
.map(|ref app| app.https_redirect)
.unwrap_or(false);
if front_should_redirect_https {
let answer = format!("HTTP/1.1 301 Moved Permanently\r\nContent-Length: 0\r\nLocation: https://{}{}\r\n\r\n", host, uri);
session.set_answer(
DefaultAnswerStatus::Answer301,
Some(Rc::new(answer.into_bytes())),
);
return Err(ConnectionError::HttpsRedirect);
}

Ok(cluster_id)
}

pub fn close_session(&mut self, token: Token) {
self.sessions
.borrow_mut()
Expand Down Expand Up @@ -1763,132 +1674,6 @@ impl Listener {
}

impl ProxyConfiguration<Session> for Proxy {
fn connect_to_backend(
&mut self,
session_rc: Rc<RefCell<dyn ProxySessionCast>>,
) -> Result<BackendConnectAction, ConnectionError> {
let mut b = session_rc.borrow_mut();
let session = b.as_http();

let old_cluster_id = session.http().and_then(|ref http| http.cluster_id.clone());
let old_back_token = session.back_token();

session.check_circuit_breaker()?;

let cluster_id = self.cluster_id_from_request(session)?;

// check if we can reuse the backend connection
if (session.http().and_then(|h| h.cluster_id.as_ref()) == Some(&cluster_id))
&& session.back_connected == BackendConnectionStatus::Connected
{
let has_backend = session
.backend
.as_ref()
.map(|backend| {
let ref backend = *backend.borrow();
self.backends.borrow().has_backend(&cluster_id, backend)
})
.unwrap_or(false);

if has_backend && session.check_backend_connection() {
return Ok(BackendConnectAction::Reuse);
} else if let Some(token) = session.back_token() {
session.close_backend(token, &self.registry);

//reset the back token here so we can remove it
//from the slab after backend_from* fails
session.set_back_token(token);
}
}

//replacing with a connection to another cluster
if old_cluster_id.is_some() && old_cluster_id.as_ref() != Some(&cluster_id) {
if let Some(token) = session.back_token() {
session.close_backend(token, &self.registry);

//reset the back token here so we can remove it
//from the slab after backend_from* fails
session.set_back_token(token);
}
}

session.cluster_id = Some(cluster_id.clone());
session.http_mut().map(|http| {
http.cluster_id = Some(cluster_id.clone());
});

let front_should_stick = self
.clusters
.get(&cluster_id)
.map(|ref app| app.sticky_session)
.unwrap_or(false);
let mut socket = self.backend_from_request(session, &cluster_id, front_should_stick)?;

if let Err(e) = socket.set_nodelay(true) {
error!(
"error setting nodelay on back socket({:?}): {:?}",
socket, e
);
}
session.back_readiness().map(|r| {
r.interest = Ready::writable() | Ready::hup() | Ready::error();
});

let connect_timeout = time::Duration::seconds(i64::from(
self.listeners
.get(&session.listen_token)
.as_ref()
.map(|l| l.config.connect_timeout)
.unwrap(),
));

session.back_connected = BackendConnectionStatus::Connecting(Instant::now());

if let Some(back_token) = old_back_token {
session.set_back_token(back_token);
if let Err(e) = self.registry.register(
&mut socket,
back_token,
Interest::READABLE | Interest::WRITABLE,
) {
error!("error registering back socket({:?}): {:?}", socket, e);
}
session.set_back_socket(socket);
session
.http_mut()
.map(|h| h.set_back_timeout(connect_timeout));
Ok(BackendConnectAction::Replace)
} else {
if self.sessions.borrow().slab.len() >= self.sessions.borrow().slab_capacity() {
error!("not enough memory, cannot connect to backend");
return Err(ConnectionError::TooManyConnections);
}

let back_token = {
let mut s = self.sessions.borrow_mut();
let entry = s.slab.vacant_entry();
let back_token = Token(entry.key());
let _entry = entry.insert(session_rc.clone());
back_token
};

if let Err(e) = self.registry.register(
&mut socket,
back_token,
Interest::READABLE | Interest::WRITABLE,
) {
error!("error registering back socket({:?}): {:?}", socket, e);
}

session.set_back_socket(socket);
session.set_back_token(back_token);
session
.http_mut()
.map(|h| h.set_back_timeout(connect_timeout));
Ok(BackendConnectAction::New)
}
}

fn notify(&mut self, message: ProxyRequest) -> ProxyResponse {
// ToDo temporary
//trace!("{} notified", message);
Expand Down

0 comments on commit c6ce2fd

Please sign in to comment.