Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 88 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ clap = { version = "4.2", features = ["derive", "env"] }
expanduser = "1.2.2"
http = "*"
hyper = { version = "0.14", features = ["full"] }
lazy_static = "1.4"
maligned = "0.2.1"
mime = "0.3"
ndarray = "0.15"
ndarray-stats = "0.5"
num-traits = "0.2.15"
prometheus = { version = "0.13", features = ["process"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "*"
strum_macros = "0.24"
Expand Down
22 changes: 22 additions & 0 deletions scripts/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# NOTE: This should only be used for explicitly testing the prometheus metrics
# since it builds the Rust project from scratch within the container each time
# so is very slow and inefficient for regular development work.
services:
active-storage-proxy:
build: ../.
ports:
- "8080:8080"
minio:
image: minio/minio
command: ["server", "data", "--console-address", ":9001"]
ports:
- "9000:9000"
- "9001:9001"
prometheus:
image: prom/prometheus
volumes:
- type: bind
source: ./prometheus.yml
target: /etc/prometheus/prometheus.yml
ports:
- "9090:9090"
7 changes: 7 additions & 0 deletions scripts/prometheus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
global:
scrape_interval: 5s
scrape_configs:
- job_name: active-storage-proxy
static_configs:
- targets:
- "active-storage-proxy:8080"
4 changes: 4 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
//! Active Storage server API

use crate::error::ActiveStorageError;
use crate::metrics::{metrics_handler, track_metrics};
use crate::models;
use crate::operation;
use crate::operations;
use crate::s3_client;
use crate::validated_json::ValidatedJson;

use axum::middleware;
use axum::{
body::{Body, Bytes},
extract::Path,
Expand Down Expand Up @@ -86,7 +88,9 @@ fn router() -> Router {

Router::new()
.route("/.well-known/s3-active-storage-schema", get(schema))
.route("/metrics", get(metrics_handler))
.nest("/v1", v1())
.route_layer(middleware::from_fn(track_metrics))
}

/// S3 Active Storage Server Service type alias
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod app;
pub mod array;
pub mod cli;
pub mod error;
pub mod metrics;
pub mod models;
pub mod operation;
pub mod operations;
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use s3_active_storage::app;
use s3_active_storage::cli;
use s3_active_storage::metrics;
use s3_active_storage::server;
use s3_active_storage::tracing;

Expand All @@ -10,6 +11,7 @@ use s3_active_storage::tracing;
async fn main() {
let args = cli::parse();
tracing::init_tracing();
metrics::register_metrics();
let service = app::service();
server::serve(&args, service).await;
}
83 changes: 83 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use std::time::Instant;

use axum::{http::Request, middleware::Next, response::IntoResponse};
use lazy_static::lazy_static;
use prometheus::{self, Encoder, HistogramOpts, HistogramVec, IntCounterVec, Opts};

lazy_static! {
// Simple request counter
pub static ref INCOMING_REQUESTS: IntCounterVec = IntCounterVec::new(
Opts::new("incoming_requests", "The number of HTTP requests received"),
&["http_method", "path"]
).expect("Prometheus metric initialization failed");
// Request counter by status code
pub static ref RESPONSE_CODE_COLLECTOR: IntCounterVec = IntCounterVec::new(
Opts::new("outgoing_response", "The number of responses sent"),
&["status_code", "http_method", "path"]
).expect("Prometheus metric initialization failed");
// Response histogram by response time
pub static ref RESPONSE_TIME_COLLECTOR: HistogramVec = HistogramVec::new(
HistogramOpts{
common_opts: Opts::new("response_time", "The time taken to respond to each request"),
buckets: prometheus::DEFAULT_BUCKETS.to_vec(), // Change buckets here if desired
},
&["status_code", "http_method", "path"],
).expect("Prometheus metric initialization failed");
}

/// Registers various prometheus metrics with the global registry
pub fn register_metrics() {
let registry = prometheus::default_registry();
registry
.register(Box::new(INCOMING_REQUESTS.clone()))
.expect("registering prometheus metrics during initialization failed");
registry
.register(Box::new(RESPONSE_CODE_COLLECTOR.clone()))
.expect("registering prometheus metrics during initialization failed");
registry
.register(Box::new(RESPONSE_TIME_COLLECTOR.clone()))
.expect("registering prometheus metrics during initialization failed");
}

/// Returns currently gathered prometheus metrics
pub async fn metrics_handler() -> String {
let encoder = prometheus::TextEncoder::new();
let mut buffer = Vec::new();

encoder
.encode(&prometheus::gather(), &mut buffer)
.expect("could not encode gathered metrics into temporary buffer");

String::from_utf8(buffer).expect("could not convert metrics buffer into string")
}

pub async fn track_metrics<B>(request: Request<B>, next: Next<B>) -> impl IntoResponse {
// Extract some useful quantities
let timer = Instant::now();
let http_method = &request.method().to_string().to_ascii_uppercase();
let request_path = request.uri().path().to_string();

// Increment request counter
INCOMING_REQUESTS
.with_label_values(&[http_method, &request_path])
.inc();

// Pass request onto next layer
let response = next.run(request).await;
let status_code = response.status();
// Due to 'concentric shell model' for axum layers,
// latency is time taken to traverse all inner
// layers (including primary reduction operation)
// and then back up the layer stack.
let latency = timer.elapsed().as_secs_f64();

// Record response metrics
RESPONSE_CODE_COLLECTOR
.with_label_values(&[status_code.as_str(), http_method, &request_path])
.inc();
RESPONSE_TIME_COLLECTOR
.with_label_values(&[status_code.as_str(), http_method, &request_path])
.observe(latency);

response
}