Skip to content

Commit

Permalink
create BackendConnectionError and RetrieveClusterError
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Jul 31, 2023
1 parent e1e7ce2 commit bdde240
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 70 deletions.
35 changes: 34 additions & 1 deletion lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ use std::{
str,
};

use backends::BackendError;
use mio::{net::TcpStream, Interest, Token};
use protocol::http::parser::Method;
use router::RouterError;
Expand Down Expand Up @@ -642,7 +643,38 @@ pub enum BackendConnectAction {
Replace,
}

// Used in sessions
#[derive(thiserror::Error, Debug)]
pub enum BackendConnectionError {
#[error("Found no TCP cluster")]
FoundNoTcpCluster,
#[error("Too many connections on cluster {0}")]
TooManyConnections(String),
#[error("the sessions slab has reached maximum capacity")]
SessionsMemoryAtCapacity,
#[error("error from the backend: {0}")]
BackendError(BackendError),
#[error("Reached maximum attempts to connect to this backend")]
TooManyAttempts,
#[error("failed to retrieve the cluster: {0}")]
RetrieveClusterError(RetrieveClusterError),
}

/// used in kawa_h1 module for the Http session state
#[derive(thiserror::Error, Debug)]
pub enum RetrieveClusterError {
#[error("No method given")]
NoMethod,
#[error("No host given")]
NoHost,
#[error("No path given")]
NoPath,
#[error("unauthorized route")]
UnauthorizedRoute,
#[error("failed to retrieve the frontend for the request: {0}")]
FrontendFromRequestError(FrontendFromRequestError),
}

/// Used in sessions
#[derive(Debug, PartialEq, Eq)]
pub enum AcceptError {
IoError,
Expand All @@ -653,6 +685,7 @@ pub enum AcceptError {
BufferCapacityReached,
}

/// returned by the HTTP, HTTPS and TCP listeners
#[derive(thiserror::Error, Debug)]
pub enum ListenerError {
#[error("failed to acquire the lock, {0}")]
Expand Down
101 changes: 48 additions & 53 deletions lib/src/protocol/kawa_h1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ use std::{
rc::{Rc, Weak},
};

use anyhow::{bail, Context};
use kawa;
use mio::{net::TcpStream, *};
use rusty_ulid::Ulid;
use sozu_command::proto::command::{Event, EventKind, ListenerType};
use time::{Duration, Instant};

use crate::{
backends::Backend,
backends::{Backend, BackendError},
logs::{Endpoint, LogContext, RequestRecord},
pool::{Checkout, Pool},
protocol::{
Expand All @@ -30,9 +29,9 @@ use crate::{
socket::{stats::socket_rtt, SocketHandler, SocketResult, TransportProtocol},
sozu_command::ready::Ready,
timer::TimeoutContainer,
AcceptError, BackendConnectAction, BackendConnectionStatus, L7ListenerHandler, L7Proxy,
ListenerHandler, Protocol, ProxySession, Readiness, SessionIsToBeClosed, SessionMetrics,
SessionResult, StateResult,
AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus,
L7ListenerHandler, L7Proxy, ListenerHandler, Protocol, ProxySession, Readiness,
RetrieveClusterError, SessionIsToBeClosed, SessionMetrics, SessionResult, StateResult,
};

/// Generic Http representation using the Kawa crate using the Checkout of Sozu as buffer
Expand Down Expand Up @@ -108,6 +107,7 @@ pub struct Http<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> {
configured_connect_timeout: Duration,
configured_frontend_timeout: Duration,
pub cluster_id: Option<String>,
/// attempts to connect to the backends during the session
connection_attempts: u8,
pub frontend_readiness: Readiness,
pub frontend_socket: Front,
Expand Down Expand Up @@ -973,11 +973,11 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
}

/// Check the number of connection attempts against authorized connection retries
fn check_circuit_breaker(&mut self) -> anyhow::Result<()> {
fn check_circuit_breaker(&mut self) -> Result<(), BackendConnectionError> {
if self.connection_attempts >= CONN_RETRIES {
error!("{} max connection attempt reached", self.log_context());
self.set_answer(DefaultAnswerStatus::Answer503, None);
bail!("Maximum connection attempt reached");
return Err(BackendConnectionError::TooManyAttempts);
}
Ok(())
}
Expand All @@ -1000,55 +1000,58 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
}

// -> host, path, method
pub fn extract_route(&self) -> anyhow::Result<(&str, &str, &Method)> {
pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> {
let given_method = self
.context
.method
.as_ref()
.with_context(|| "No method given")?;
.ok_or(RetrieveClusterError::NoMethod)?;
let given_authority = self
.context
.authority
.as_deref()
.with_context(|| "No host given")?;
.ok_or(RetrieveClusterError::NoHost)?;
let given_path = self
.context
.path
.as_deref()
.with_context(|| "No path given")?;
.ok_or(RetrieveClusterError::NoPath)?;

Ok((given_authority, given_path, given_method))
}

fn cluster_id_from_request(
&mut self,
proxy: Rc<RefCell<dyn L7Proxy>>,
) -> anyhow::Result<String> {
) -> Result<String, RetrieveClusterError> {
let (host, uri, method) = match self.extract_route() {
Ok(tuple) => tuple,
Err(e) => {
Err(cluster_error) => {
self.set_answer(DefaultAnswerStatus::Answer400, None);
return Err(e).with_context(|| "Could not extract route from request");
return Err(cluster_error);
}
};

let cluster_id_res = self
let route_result = self
.listener
.borrow()
.frontend_from_request(host, uri, method);

let cluster_id = match cluster_id_res {
Ok(route) => match route {
Route::ClusterId(cluster_id) => cluster_id,
Route::Deny => {
self.set_answer(DefaultAnswerStatus::Answer401, None);
bail!("Unauthorized route");
}
},
Err(e) => {
let no_host_error = format!("Host not found: {host}: {e:#}");
let route = match route_result {
Ok(route) => route,
Err(frontend_error) => {
self.set_answer(DefaultAnswerStatus::Answer404, None);
bail!(no_host_error);
return Err(RetrieveClusterError::FrontendFromRequestError(
frontend_error,
));
}
};

let cluster_id = match route {
Route::ClusterId(cluster_id) => cluster_id,
Route::Deny => {
self.set_answer(DefaultAnswerStatus::Answer401, None);
return Err(RetrieveClusterError::UnauthorizedRoute);
}
};

Expand All @@ -1066,7 +1069,7 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
DefaultAnswerStatus::Answer301,
Some(Rc::new(answer.into_bytes())),
);
bail!("Route is unauthorized");
return Err(RetrieveClusterError::UnauthorizedRoute);
}

Ok(cluster_id)
Expand All @@ -1078,20 +1081,18 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
frontend_should_stick: bool,
proxy: Rc<RefCell<dyn L7Proxy>>,
metrics: &mut SessionMetrics,
) -> anyhow::Result<TcpStream> {
let (backend, conn) = match self.get_backend_for_sticky_session(
frontend_should_stick,
self.context.sticky_session_found.as_deref(),
cluster_id,
proxy,
) {
Ok(tuple) => tuple,
Err(e) => {
) -> Result<TcpStream, BackendConnectionError> {
let (backend, conn) = self
.get_backend_for_sticky_session(
frontend_should_stick,
self.context.sticky_session_found.as_deref(),
cluster_id,
proxy,
)
.map_err(|backend_error| {
self.set_answer(DefaultAnswerStatus::Answer503, None);
return Err(e)
.with_context(|| format!("Could not find a backend for cluster {cluster_id}"));
}
};
BackendConnectionError::BackendError(backend_error)
})?;

if frontend_should_stick {
// update sticky name in case it changed I guess?
Expand Down Expand Up @@ -1120,23 +1121,18 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
sticky_session: Option<&str>,
cluster_id: &str,
proxy: Rc<RefCell<dyn L7Proxy>>,
) -> anyhow::Result<(Rc<RefCell<Backend>>, TcpStream)> {
) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
match (frontend_should_stick, sticky_session) {
(true, Some(sticky_session)) => proxy
.borrow()
.backends()
.borrow_mut()
.backend_from_sticky_session(cluster_id, sticky_session)
.with_context(|| {
format!(
"Couldn't find a backend corresponding to sticky_session {sticky_session} for cluster {cluster_id}"
)
}),
_ => Ok(proxy
.backend_from_sticky_session(cluster_id, sticky_session),
_ => proxy
.borrow()
.backends()
.borrow_mut()
.backend_from_cluster_id(cluster_id)?),
.backend_from_cluster_id(cluster_id),
}
}

Expand All @@ -1145,16 +1141,15 @@ impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L
session_rc: Rc<RefCell<dyn ProxySession>>,
proxy: Rc<RefCell<dyn L7Proxy>>,
metrics: &mut SessionMetrics,
) -> anyhow::Result<BackendConnectAction> {
) -> Result<BackendConnectAction, BackendConnectionError> {
let old_cluster_id = self.cluster_id.clone();
let old_backend_token = self.backend_token;

self.check_circuit_breaker()
.with_context(|| "Circuit broke")?;
self.check_circuit_breaker()?;

let cluster_id = self
.cluster_id_from_request(proxy.clone())
.with_context(|| "Could not get cluster id from request")?;
.map_err(|cluster_error| BackendConnectionError::RetrieveClusterError(cluster_error))?;

trace!(
"connect_to_backend: {:?} {:?} {:?}",
Expand Down
31 changes: 15 additions & 16 deletions lib/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ use crate::{
state::ClusterId,
},
timer::TimeoutContainer,
AcceptError, BackendConnectAction, BackendConnectionStatus, CachedTags, ListenerError,
ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession, Readiness,
SessionIsToBeClosed, SessionMetrics, SessionResult, StateMachineBuilder, StateResult,
AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus, CachedTags,
ListenerError, ListenerHandler, Protocol, ProxyConfiguration, ProxyError, ProxySession,
Readiness, SessionIsToBeClosed, SessionMetrics, SessionResult, StateMachineBuilder,
StateResult,
};

StateMachineBuilder! {
Expand Down Expand Up @@ -819,24 +820,23 @@ impl TcpSession {
fn connect_to_backend(
&mut self,
session_rc: Rc<RefCell<dyn ProxySession>>,
) -> anyhow::Result<BackendConnectAction> {
let cluster_id = match self.listener.borrow().cluster_id.clone() {
Some(cluster_id) => cluster_id,
None => {
error!("no TCP cluster corresponds to that front address");
bail!("no TCP cluster found.")
}
};
) -> Result<BackendConnectAction, BackendConnectionError> {
let cluster_id = self
.listener
.borrow()
.cluster_id
.clone()
.ok_or(BackendConnectionError::FoundNoTcpCluster)?;

self.cluster_id = Some(cluster_id.clone());

if self.connection_attempt >= CONN_RETRIES {
error!("{} max connection attempt reached", self.log_context());
bail!(format!("Too many connections on cluster {cluster_id}"));
return Err(BackendConnectionError::TooManyConnections(cluster_id));
}

if self.proxy.borrow().sessions.borrow().at_capacity() {
bail!("not enough memory, cannot connect to backend");
return Err(BackendConnectionError::SessionsMemoryAtCapacity);
}

let (backend, mut stream) = self
Expand All @@ -845,9 +845,8 @@ impl TcpSession {
.backends
.borrow_mut()
.backend_from_cluster_id(&cluster_id)
.with_context(|| {
format!("Could not get backend and TCP stream from cluster id {cluster_id}")
})?;
.map_err(|backend_error| BackendConnectionError::BackendError(backend_error))?;

/*
this was the old error matching for backend_from_cluster_id.
panic! is called in case of mio::net::MioTcpStream::connect() error
Expand Down

0 comments on commit bdde240

Please sign in to comment.