Skip to content

Commit

Permalink
refactor: use trait bound accept fn to reduce duplicate logic (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdockerty committed Dec 6, 2023
1 parent 1abbc55 commit 7686eaf
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 205 deletions.
26 changes: 15 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ The configuration is defined in YAML, using the `example-config.yaml` that is us

```yaml
# The interval, in seconds, to conduct HTTP/TCP health checks.
health_check_interval: 5
health_check_interval: 2

# Run a graceful shutdown period of 30 seconds to terminate separate worker threads.
# Defaults to true.
graceful_shutdown: true

# Log level information, defaults to 'info'
logging: info
Expand Down Expand Up @@ -115,21 +119,21 @@ Using two [`simplebenchserver`](https://pkg.go.dev/github.com/codesenberg/bombar
```
bombardier http://127.0.0.1:8080 --latencies --fasthttp -H "Connection: close"
Bombarding http://127.0.0.1:8080 for 10s using 125 connection(s)
[=========================================================================================] 10s
[========================================================================================] 10s
Done!
Statistics Avg Stdev Max
Reqs/sec 34000.50 1663.16 39891.49
Latency 3.67ms 205.64us 21.55ms
Reqs/sec 40954.38 2857.90 45273.78
Latency 3.05ms 485.51us 36.43ms
Latency Distribution
50% 3.63ms
75% 3.76ms
90% 3.93ms
95% 4.07ms
99% 4.55ms
50% 2.97ms
75% 3.32ms
90% 3.75ms
95% 4.13ms
99% 5.28ms
HTTP codes:
1xx - 0, 2xx - 339622, 3xx - 0, 4xx - 0, 5xx - 0
1xx - 0, 2xx - 408841, 3xx - 0, 4xx - 0, 5xx - 0
others - 0
Throughput: 40.22MB/s
Throughput: 48.42MB/s
```

### nginx
Expand Down
25 changes: 25 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ pub enum Protocol {
Unsupported,
}

impl Display for Protocol {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Tcp => write!(f, "TCP"),
Self::Http => write!(f, "HTTP"),
Self::Unsupported => write!(f, "Unsupported"),
}
}
}

// Represents the load balancer configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
Expand Down Expand Up @@ -98,6 +108,21 @@ pub fn new(config_file: File) -> Result<Config> {
}

impl Config {
/// Used for ensuring whether a particular type of proxy is required or not.
/// This is helpful in initialising particular proxies dependent on a provided
/// configuration.
pub fn requires_proxy_type(&self, protocol: Protocol) -> bool {
if let Some(targets) = &self.targets {
for target in targets.values() {
if target.protocol_type() == protocol {
return true;
}
}
return false;
}
false
}

/// Retrieve a list of names given to targets.
pub fn target_names(&self) -> Option<Vec<String>> {
if let Some(targets) = &self.targets {
Expand Down
136 changes: 61 additions & 75 deletions src/http.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
use crate::config::Backend;
use crate::config::Protocol;
use crate::proxy::Connection;
use crate::proxy::Proxy;
use anyhow::{Context, Result};
use async_trait::async_trait;
use dashmap::DashMap;
use reqwest::Response;
use std::sync::Arc;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::net::TcpListener;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::error;
use tracing::info;

/// `HttpProxy` is used as a concrete implementation of the `Proxy` trait for HTTP
/// connection proxying to configured targets.
#[derive(Debug)]
pub struct HttpProxy {}

impl HttpProxy {
/// Return a new instance of `HttpProxy`.
///
/// `HttpProxy` has a static lifetime as it exists the entire duration of the
/// application's active lifecycle.
pub fn new() -> &'static HttpProxy {
&Self {}
}

// TODO: Pass connection here for specific HTTP requirement for request_path and method,
// rather than doing it inline of the proxy func.
// async fn construct_request_path_and_method(&self, mut connection: Connection) -> (String, String) {}

/// Helper for creating the relevant HTTP response to write into a `TcpStream`.
async fn construct_response(response: Response) -> Result<String> {
async fn construct_response(&self, response: Response) -> Result<String> {
let http_version = response.version();
let status = response.status();
let headers = response.headers();
Expand All @@ -45,85 +55,56 @@ impl HttpProxy {

#[async_trait]
impl Proxy for HttpProxy {
async fn accept(
listeners: Vec<(String, TcpListener)>,
current_healthy_targets: Arc<DashMap<String, Vec<Backend>>>,
cancel: CancellationToken,
) -> Result<()> {
let idx: Arc<RwLock<usize>> = Arc::new(RwLock::new(0));
for (name, listener) in listeners {
let idx = idx.clone();
let client = Arc::new(reqwest::Client::new());
let current_healthy_targets = current_healthy_targets.clone();
let cancel = cancel.clone();
tokio::spawn(async move {
while let Ok((mut stream, address)) = listener.accept().await {
if cancel.is_cancelled() {
info!("[CANCEL] Received cancel, no longer receiving any HTTP requests.");
stream.shutdown().await.unwrap();
break;
}
let name = name.clone();
let idx = Arc::clone(&idx);
let current_healthy_targets = Arc::clone(&current_healthy_targets);
info!("Incoming HTTP request from {address}");
let buf = BufReader::new(&mut stream);
let mut lines = buf.lines();
let mut http_request: Vec<String> = vec![];

while let Some(line) = lines.next_line().await.unwrap() {
if line.is_empty() {
break;
}
http_request.push(line);
}

let info = http_request[0].clone();
let http_info = info
.split_whitespace()
.map(|s| s.to_string())
.collect::<Vec<_>>();

let method = http_info[0].clone();
let path = http_info[1].clone();
let client = client.clone();
tokio::spawn(async move {
debug!("{method} request at {path}");
let connection = Connection {
client: Some(client),
targets: current_healthy_targets,
target_name: name,
method: Some(method),
request_path: Some(path),
stream,
};
HttpProxy::proxy(connection, idx).await.unwrap();
});
}
});
}
Ok(())
fn protocol_type(&self) -> Protocol {
Protocol::Http
}

async fn proxy(mut connection: Connection, routing_idx: Arc<RwLock<usize>>) -> Result<()> {
/// Handles the proxying of HTTP connections to configured targets.
async fn proxy(
&'static self,
mut connection: Connection,
routing_idx: Arc<RwLock<usize>>,
) -> Result<()> {
if let Some(backends) = connection.targets.get(&connection.target_name) {
let backend_count = backends.len();
if backend_count == 0 {
info!(
"[HTTP] No routable backends for {}, nothing to do",
"[{}] No routable backends for {}, nothing to do",
self.protocol_type(),
&connection.target_name
);
return Ok(());
}
debug!("Backends configured {:?}", &backends);

let buf = BufReader::new(&mut connection.stream);
let mut lines = buf.lines();
let mut http_request: Vec<String> = vec![];

while let Some(line) = lines.next_line().await.unwrap() {
if line.is_empty() {
break;
}
http_request.push(line);
}

let info = http_request[0].clone();
let http_info = info
.split_whitespace()
.map(|s| s.to_string())
.collect::<Vec<_>>();

let method = http_info[0].clone();
let request_path = http_info[1].clone();

// Limit the scope of the index write lock.
let http_backend: String;
{
let mut idx = routing_idx.write().await;

debug!(
"[HTTP] {backend_count} backends configured for {}, current index {idx}",
"[{}] {backend_count} backends configured for {}, current index {idx}",
self.protocol_type(),
&connection.target_name
);

Expand All @@ -134,19 +115,20 @@ impl Proxy for HttpProxy {

http_backend = format!(
"http://{}:{}{}",
backends[*idx].host,
backends[*idx].port,
connection.request_path.unwrap()
backends[*idx].host, backends[*idx].port, request_path
);

// Increment a shared index after we've constructed our current connection
// address.
*idx += 1;
}

info!("[HTTP] Attempting to connect to {}", &http_backend);
info!(
"[{}] Attempting to connect to {}",
self.protocol_type(),
&http_backend
);

let method = connection.method.unwrap();
match method.as_str() {
"GET" => {
let backend_response = connection
Expand All @@ -156,7 +138,7 @@ impl Proxy for HttpProxy {
.send()
.await
.with_context(|| format!("unable to send response to {http_backend}"))?;
let response = HttpProxy::construct_response(backend_response).await?;
let response = self.construct_response(backend_response).await?;

connection.stream.write_all(response.as_bytes()).await?;
}
Expand All @@ -168,17 +150,21 @@ impl Proxy for HttpProxy {
.send()
.await
.with_context(|| format!("unable to send response to {http_backend}"))?;
let response = HttpProxy::construct_response(backend_response).await?;
let response = self.construct_response(backend_response).await?;

connection.stream.write_all(response.as_bytes()).await?;
}
_ => {
error!("Unsupported: {method}")
}
}
info!("[HTTP] response sent to {}", &http_backend);
info!(
"[{}] response sent to {}",
self.protocol_type(),
&http_backend
);
} else {
info!("[HTTP] No backend configured");
info!("[{}] No backend configured", self.protocol_type());
};

Ok(())
Expand Down
Loading

0 comments on commit 7686eaf

Please sign in to comment.