Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,14 @@ fn router() -> Router {
.nest("/v1", v1())
}

/// Returns a [tower_service::Service] for the Active Storage server API
/// S3 Active Storage Server Service type alias
///
/// This type implements [tower_service::Service].
// FIXME: The Service type should be some form of tower_service::Service, but couldn't find the
// necessary trait bounds.
pub type Service = tower_http::normalize_path::NormalizePath<Router>;

/// Returns a [crate::app::Service] for the Active Storage server API
///
/// The service is populated with all routes as well as the following middleware:
///
Expand All @@ -95,10 +102,7 @@ fn router() -> Router {
/// headers
/// * a [tower_http::normalize_path::NormalizePathLayer] for trimming trailing slashes from
/// requests
pub fn service() -> tower_http::normalize_path::NormalizePath<Router> {
// FIXME: The return type should be some form of tower_service::Service, but couldn't find the
// necessary trait bounds.

pub fn service() -> Service {
// Note that any middleware that should affect routing must wrap the router.
// See
// https://docs.rs/axum/0.6.18/axum/middleware/index.html#rewriting-request-uri-in-middleware.
Expand Down
39 changes: 39 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//! Command Line Interface (CLI) arguments.

use clap::Parser;

/// S3 Active Storage Proxy command line interface
#[derive(Debug, Parser)]
pub struct CommandLineArgs {
/// The IP address on which the proxy should listen
#[arg(long, default_value = "0.0.0.0", env = "S3_ACTIVE_STORAGE_HOST")]
pub host: String,
/// The port to which the proxy should bind
#[arg(long, default_value_t = 8080, env = "S3_ACTIVE_STORAGE_PORT")]
pub port: u16,
/// Flag indicating whether HTTPS should be used
#[arg(long, default_value_t = false, env = "S3_ACTIVE_STORAGE_HTTPS")]
pub https: bool,
/// Path to the certificate file to be used for HTTPS encryption
#[arg(
long,
default_value = "~/.config/s3-active-storage/certs/cert.pem",
env = "S3_ACTIVE_STORAGE_CERT_FILE"
)]
pub cert_file: String,
/// Path to the key file to be used for HTTPS encryption
#[arg(
long,
default_value = "~/.config/s3-active-storage/certs/key.pem",
env = "S3_ACTIVE_STORAGE_KEY_FILE"
)]
pub key_file: String,
/// Maximum time in seconds to wait for operations to complete upon receiving `ctrl+c` signal.
#[arg(long, default_value_t = 60, env = "S3_ACTIVE_STORAGE_SHUTDOWN_TIMEOUT")]
pub graceful_shutdown_timeout: u64,
}

/// Returns parsed command line arguments.
pub fn parse() -> CommandLineArgs {
CommandLineArgs::parse()
}
148 changes: 6 additions & 142 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,159 +22,23 @@
//! * [ndarray] provides [NumPy](https://numpy.orgq)-like n-dimensional arrays used in numerical
//! computation.

use std::{net::SocketAddr, process::exit, str::FromStr, time::Duration};

use axum::ServiceExt;
use axum_server::{tls_rustls::RustlsConfig, Handle};
use clap::Parser;
use expanduser::expanduser;
use tokio::signal;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

mod app;
mod array;
mod cli;
mod error;
mod models;
mod operation;
mod operations;
mod s3_client;
mod server;
mod tracing;
mod validated_json;

/// S3 Active Storage Proxy command line interface
#[derive(Debug, Parser)]
struct CommandLineArgs {
/// The IP address on which the proxy should listen
#[arg(long, default_value = "0.0.0.0", env = "S3_ACTIVE_STORAGE_HOST")]
host: String,
/// The port to which the proxy should bind
#[arg(long, default_value_t = 8080, env = "S3_ACTIVE_STORAGE_PORT")]
port: u16,
/// Flag indicating whether HTTPS should be used
#[arg(long, default_value_t = false, env = "S3_ACTIVE_STORAGE_HTTPS")]
https: bool,
/// Path to the certificate file to be used for HTTPS encryption
#[arg(
long,
default_value = "~/.config/s3-active-storage/certs/cert.pem",
env = "S3_ACTIVE_STORAGE_CERT_FILE"
)]
cert_file: String,
/// Path to the key file to be used for HTTPS encryption
#[arg(
long,
default_value = "~/.config/s3-active-storage/certs/key.pem",
env = "S3_ACTIVE_STORAGE_KEY_FILE"
)]
key_file: String,
/// Maximum time in seconds to wait for operations to complete upon receiving `ctrl+c` signal.
#[arg(long, default_value_t = 60, env = "S3_ACTIVE_STORAGE_SHUTDOWN_TIMEOUT")]
graceful_shutdown_timeout: u64,
}

/// Application entry point
#[tokio::main]
async fn main() {
let args = CommandLineArgs::parse();

init_tracing();

let args = cli::parse();
tracing::init_tracing();
let service = app::service();
let addr = SocketAddr::from_str(&format!("{}:{}", args.host, args.port))
.expect("invalid host name, IP address or port number");

// Catch ctrl+c and try to shutdown gracefully
let handle = Handle::new();
tokio::spawn(shutdown_signal(
handle.clone(),
args.graceful_shutdown_timeout,
));

if args.https {
// Expand files
let abs_cert_file = expanduser(args.cert_file)
.expect("Failed to expand ~ to user name. Please provide an absolute path instead.")
.canonicalize()
.expect("failed to determine absolute path to TLS cerficate file");
let abs_key_file = expanduser(args.key_file)
.expect("Failed to expand ~ to user name. Please provide an absolute path instead.")
.canonicalize()
.expect("failed to determine absolute path to TLS key file");
// Check files exist
if !abs_cert_file.exists() {
println!(
"TLS certificate file expected at '{}' but not found.",
abs_cert_file.display()
);
exit(1)
}
if !abs_key_file.exists() {
println!(
"TLS key file expected at '{}' but not found.",
abs_key_file.display()
);
exit(1)
}
// Set up TLS config
let tls_config = RustlsConfig::from_pem_file(abs_cert_file, abs_key_file)
.await
.expect("Failed to load TLS certificate files");
// run HTTPS server with hyper
axum_server::bind_rustls(addr, tls_config)
.handle(handle)
.serve(service.into_make_service())
.await
.unwrap();
} else {
// run HTTP server with hyper
axum_server::bind(addr)
.handle(handle)
.serve(service.into_make_service())
.await
.unwrap();
}
}

/// Initlialise tracing (logging)
///
/// Applies a filter based on the `RUST_LOG` environment variable, falling back to enable debug
/// logging for this crate and tower_http if not set.
fn init_tracing() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "s3_active_storage=debug,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
}

/// Graceful shutdown handler
///
/// Installs signal handlers to catch Ctrl-C or SIGTERM and trigger a graceful shutdown.
async fn shutdown_signal(handle: Handle, timeout: u64) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}

println!("signal received, starting graceful shutdown");
// Force shutdown if graceful shutdown takes longer than 10s
handle.graceful_shutdown(Some(Duration::from_secs(timeout)));
server::serve(&args, service).await;
}
103 changes: 103 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//! Web server

use crate::cli;

use std::{net::SocketAddr, process::exit, str::FromStr, time::Duration};

use axum::ServiceExt;
use axum_server::{tls_rustls::RustlsConfig, Handle};
use expanduser::expanduser;
use tokio::signal;

/// Serve the S3 Active Storage service
///
/// # Arguments
///
/// * `args`: Command line arguments
/// * `service`: The [crate::app::Service] to serve
pub async fn serve(args: &cli::CommandLineArgs, service: crate::app::Service) {
let addr = SocketAddr::from_str(&format!("{}:{}", args.host, args.port))
.expect("invalid host name, IP address or port number");

// Catch ctrl+c and try to shutdown gracefully
let handle = Handle::new();
tokio::spawn(shutdown_signal(
handle.clone(),
args.graceful_shutdown_timeout,
));

if args.https {
// Expand files
let abs_cert_file = expanduser(&args.cert_file)
.expect("Failed to expand ~ to user name. Please provide an absolute path instead.")
.canonicalize()
.expect("failed to determine absolute path to TLS cerficate file");
let abs_key_file = expanduser(&args.key_file)
.expect("Failed to expand ~ to user name. Please provide an absolute path instead.")
.canonicalize()
.expect("failed to determine absolute path to TLS key file");
// Check files exist
if !abs_cert_file.exists() {
println!(
"TLS certificate file expected at '{}' but not found.",
abs_cert_file.display()
);
exit(1)
}
if !abs_key_file.exists() {
println!(
"TLS key file expected at '{}' but not found.",
abs_key_file.display()
);
exit(1)
}
// Set up TLS config
let tls_config = RustlsConfig::from_pem_file(abs_cert_file, abs_key_file)
.await
.expect("Failed to load TLS certificate files");
// run HTTPS server with hyper
axum_server::bind_rustls(addr, tls_config)
.handle(handle)
.serve(service.into_make_service())
.await
.unwrap();
} else {
// run HTTP server with hyper
axum_server::bind(addr)
.handle(handle)
.serve(service.into_make_service())
.await
.unwrap();
}
}

/// Graceful shutdown handler
///
/// Installs signal handlers to catch Ctrl-C or SIGTERM and trigger a graceful shutdown.
async fn shutdown_signal(handle: Handle, timeout: u64) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}

println!("signal received, starting graceful shutdown");
// Force shutdown if graceful shutdown takes longer than 10s
handle.graceful_shutdown(Some(Duration::from_secs(timeout)));
}
17 changes: 17 additions & 0 deletions src/tracing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//! Tracing (logging)

use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

/// Initlialise tracing (logging)
///
/// Applies a filter based on the `RUST_LOG` environment variable, falling back to enable debug
/// logging for this crate and tower_http if not set.
pub fn init_tracing() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "s3_active_storage=debug,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
}