Skip to content

Commit

Permalink
feat: basic tls support (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdockerty committed Dec 11, 2023
1 parent b411e18 commit 15ed5cb
Show file tree
Hide file tree
Showing 14 changed files with 567 additions and 79 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ serde_json = "1.0.107"
serde_yaml = "0.9.25"
thiserror = "1.0.49"
tokio = { version = "1.34.0", features = ["full"] }
tokio-native-tls = "0.3.1"
tokio-util = "0.7.10"
tracing = "0.1.37"
tracing-log = { version = "0.1.3", features = ["env_logger"] }
Expand Down
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
deps:
go install filippo.io/mkcert@latest

gen_tls:
mkcert -cert-file tests/fixtures/tls/server.pem -key-file tests/fixtures/tls/server.key localhost 127.0.0.1 ::1

# Trust local CA for testing purposes.
local_cert_trust:
mkcert -install
@echo "Ensure you run 'make local_cert_untrust' once you are done testing."

# Untrust localhost from cert store once testing is finished.
local_cert_untrust:
mkcert -uninstall
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ gruglb --config path/to/config.yml

## Features

- Round-robin load balancing of HTTP/TCP connections.
- Health checks for HTTP/TCP targets.
- Round-robin load balancing of HTTP/HTTPS/TCP connections.
- Health checks for HTTP/HTTPS/TCP targets.
- Graceful termination.
- TLS via termination, backends are still expected to be accessible over HTTP.

## How does it work?

Expand Down Expand Up @@ -129,18 +130,17 @@ Bombarding http://127.0.0.1:8080 for 10s using 125 connection(s)
[========================================================================================] 10s
Done!
Statistics Avg Stdev Max
Reqs/sec 40954.38 2857.90 45273.78
Latency 3.05ms 485.51us 36.43ms
Reqs/sec 42558.30 3130.17 47446.16
Latency 2.93ms 427.72us 29.29ms
Latency Distribution
50% 2.97ms
75% 3.32ms
90% 3.75ms
95% 4.13ms
99% 5.28ms
50% 2.85ms
75% 3.17ms
90% 3.61ms
95% 4.01ms
99% 5.22ms
HTTP codes:
1xx - 0, 2xx - 408841, 3xx - 0, 4xx - 0, 5xx - 0
1xx - 0, 2xx - 425267, 3xx - 0, 4xx - 0, 5xx - 0
others - 0
Throughput: 48.42MB/s
```
</details>

Expand Down
28 changes: 24 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use anyhow::Result;
use serde::{Deserialize, Serialize};

use std::{collections::HashMap, fmt::Display, fs::File, time::Duration};
use std::{collections::HashMap, fmt::Display, fs::File, path::PathBuf, time::Duration};
use tracing_subscriber::filter::LevelFilter;

/// Protocol to use against a configured target.
#[derive(Debug, PartialEq)]
pub enum Protocol {
Tcp,
Http,
Https,
Unsupported,
}

Expand All @@ -17,6 +18,7 @@ impl Display for Protocol {
match self {
Self::Tcp => write!(f, "TCP"),
Self::Http => write!(f, "HTTP"),
Self::Https => write!(f, "HTTPS"),
Self::Unsupported => write!(f, "Unsupported"),
}
}
Expand Down Expand Up @@ -50,23 +52,40 @@ pub struct Config {
// traffic to configured backend servers.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Target {
// Protocol to use for the target's backends.
// Protocol to use for the target's backend servers.
pub protocol: String,

// Listener to use with TCP.
// Listener port to bind for this target.
//
// Incoming traffic on this port will have traffic routed between the configured
// backends server.
pub listener: Option<u16>,

/// Backends to route traffic to.
/// Backend servers to route traffic to.
pub backends: Option<Vec<Backend>>,

/// TLS configuration.
///
/// Specifying this means that TLS is terminated at the load balancer for the
/// backend servers defined underneath this target.
pub tls: Option<TLSConfig>,
// TODO:
// routing_algorithm: RoutingAlgorithm,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct TLSConfig {
pub cert_file: PathBuf,
pub cert_key: PathBuf,
}

impl Target {
/// Retrieve the type of protocol configured for the target.
pub fn protocol_type(&self) -> Protocol {
match self.protocol.as_str() {
"tcp" => Protocol::Tcp,
"http" => Protocol::Http,
"https" => Protocol::Https,
_ => Protocol::Unsupported,
}
}
Expand Down Expand Up @@ -198,6 +217,7 @@ mod tests {
protocol: "invalid_protocol".to_string(),
listener: None,
backends: None,
tls: None,
};

assert_eq!(http_target.protocol_type(), Protocol::Http);
Expand Down
184 changes: 184 additions & 0 deletions src/https.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
use crate::config::Protocol;
use crate::proxy::Connection;
use crate::proxy::Proxy;
use anyhow::{Context, Result};
use async_trait::async_trait;
use reqwest::Response;
use std::sync::Arc;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::sync::RwLock;
use tracing::debug;
use tracing::error;
use tracing::info;

/// `HttpsProxy` is used as a concrete implementation of the `Proxy` trait for HTTP
/// connection proxying to configured targets.
#[derive(Debug)]
pub struct HttpsProxy {}

impl HttpsProxy {
/// Return a new instance of `HttpsProxy`.
///
/// `HttpsProxy` has a static lifetime as it exists the entire duration of the
/// application's active lifecycle.
pub fn new() -> &'static HttpsProxy {
&Self {}
}

/// Helper for creating the relevant HTTP response to write into a `TcpStream`.
async fn construct_response(&self, response: Response) -> Result<String> {
let http_version = response.version();
let status = response.status();
let headers = response.headers();
let mut header = String::default();

// Construct headers that were in the response from the backend.
// The `\r\n` at the beginning is important to separate header key-value
// items from each other.
for header_key in headers.keys() {
if let Some(value) = headers.get(header_key) {
header.push_str(&format!("\r\n{}: {}", header_key, value.to_str().unwrap()));
}
}
let response_body = response.text().await?;
let status_line = format!("{:?} {}", http_version, status);
let response = format!("{status_line}{header}\r\n\r\n{response_body}");
Ok(response)
}
}

#[async_trait]
impl Proxy for HttpsProxy {
fn protocol_type(&self) -> Protocol {
Protocol::Https
}

/// Handles the proxying of HTTP connections to configured targets.
async fn proxy(
&'static self,
connection: Connection,
routing_idx: Arc<RwLock<usize>>,
) -> Result<()> {
if let Some(backends) = connection.targets.get(&connection.target_name) {
let backend_count = backends.len();
if backend_count == 0 {
info!(
"[{}] No routable backends for {}, nothing to do",
self.protocol_type(),
&connection.target_name
);
return Ok(());
}

// TODO: All of this is shared with HTTP outside of these 2 lines.
// Can we do something nice here to handle that?
let tls_acceptor = connection.tls.clone().unwrap().clone();

match tls_acceptor.accept(connection.stream).await {
Ok(mut tls_stream) => {
debug!("Backends configured {:?}", &backends);
let buf = BufReader::new(&mut tls_stream);

let mut lines = buf.lines();
let mut http_request: Vec<String> = vec![];

while let Some(line) = lines.next_line().await.unwrap() {
if line.is_empty() {
break;
}
http_request.push(line);
}

let info = http_request[0].clone();
let http_info = info
.split_whitespace()
.map(|s| s.to_string())
.collect::<Vec<_>>();

let method = http_info[0].clone();
let request_path = http_info[1].clone();

// Limit the scope of the index write lock.
let http_backend: String;
{
let mut idx = routing_idx.write().await;

debug!(
"[{}] {backend_count} backends configured for {}, current index {idx}",
self.protocol_type(),
&connection.target_name
);

// Reset index when out of bounds to route back to the first server.
if *idx >= backend_count {
*idx = 0;
}

http_backend = format!(
"http://{}:{}{}",
backends[*idx].host, backends[*idx].port, request_path
);

// Increment a shared index after we've constructed our current connection
// address.
*idx += 1;
}

info!(
"[{}] Attempting to connect to {}",
self.protocol_type(),
&http_backend
);

match method.as_str() {
"GET" => {
let backend_response = connection
.client
.as_ref()
.unwrap()
.get(&http_backend)
.send()
.await
.with_context(|| {
format!("unable to send response to {http_backend}")
})?;
let response = self.construct_response(backend_response).await?;

tls_stream.write_all(response.as_bytes()).await?;
}
"POST" => {
let backend_response = connection
.client
.as_ref()
.unwrap()
.post(&http_backend)
.send()
.await
.with_context(|| {
format!("unable to send response to {http_backend}")
})?;
let response = self.construct_response(backend_response).await?;

tls_stream.write_all(response.as_bytes()).await?;
}
_ => {
error!("Unsupported: {method}")
}
}
info!(
"[{}] response sent to {}",
self.protocol_type(),
&http_backend
);
}
Err(e) => error!("unable to accept TLS stream: {e}"),
}
} else {
info!("[{}] No backend configured", self.protocol_type());
};

Ok(())
}
}
Loading

0 comments on commit 15ed5cb

Please sign in to comment.