Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ cd s3-active-storage-rs
Next, use Cargo to build the package:

```sh
cargo build
cargo build --release
```

The active storage server may be run using Cargo:

```sh
cargo run
cargo run --release
```

Or installed to the system:
Expand Down Expand Up @@ -203,6 +203,19 @@ The proxy adds two custom headers `x-activestorage-dtype` and `x-activestrorage-

---

## Documentation

The source code is documented using [rustdoc](https://doc.rust-lang.org/rustdoc/what-is-rustdoc.html).
Currently the `s3-active-storage` crate is not uploaded to https://crates.io, so we do not benefit from hosted documentation on https://docs.rs.
It is however possible to build the documentation locally:

```sh
cargo doc
```

Cargo builds documentation for the `s3-active-storage` crate and all of its dependencies.
The resulting documentation is available under `target/doc`, and may be viewed in a web browser using file:///path/to/s3-active-storage/target/doc/s3-active-storage/index.html.

## Contributing

See [CONTRIBUTING.md](CONTRIBUTING.md) for information about contributing to S3 active storage.
Expand Down
40 changes: 38 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Active Storage server API

use crate::error::ActiveStorageError;
use crate::models;
use crate::operation;
Expand All @@ -22,10 +24,13 @@ use tower_http::normalize_path::NormalizePathLayer;
use tower_http::trace::TraceLayer;
use tower_http::validate_request::ValidateRequestHeaderLayer;

/// `x-activestorage-dtype` header definition
static HEADER_DTYPE: header::HeaderName = header::HeaderName::from_static("x-activestorage-dtype");
/// `x-activestorage-shape` header definition
static HEADER_SHAPE: header::HeaderName = header::HeaderName::from_static("x-activestorage-shape");

impl IntoResponse for models::Response {
/// Convert a [crate::models::Response] into a [axum::response::Response].
fn into_response(self) -> Response {
(
[
Expand All @@ -42,6 +47,15 @@ impl IntoResponse for models::Response {
}
}

/// Returns a [axum::Router] for the Active Storage server API
///
/// The router is populated with all routes as well as the following middleware:
///
/// * a [tower_http::trace::TraceLayer] for tracing requests and responses
/// * a [tower_http::validate_request::ValidateRequestHeaderLayer] for validating authorisation
/// headers
/// * a [tower_http::normalize_path::NormalizePathLayer] for trimming trailing slashes from
/// requests
pub fn router() -> Router {
fn v1() -> Router {
Router::new()
Expand Down Expand Up @@ -74,10 +88,19 @@ pub fn router() -> Router {
.layer(NormalizePathLayer::trim_trailing_slash())
}

/// TODO: Return an OpenAPI schema
async fn schema() -> &'static str {
"Hello, world!"
}

/// Download an object from S3
///
/// Requests a byte range if `offset` or `size` is specified in the request.
///
/// # Arguments
///
/// * `auth`: Basic authentication credentials
/// * `request_data`: RequestData object for the request
async fn download_object(
auth: &Authorization<Basic>,
request_data: &models::RequestData,
Expand All @@ -89,9 +112,15 @@ async fn download_object(
.await
}

/// Handler for operations
/// Handler for Active Storage operations
///
/// Downloads object data from S3 storage and executes the requested reduction operation.
///
/// Returns a `Result` with `models::Response` on success and `ActiveStorageError` on failure.
/// This function is generic over any type implementing the [crate::operation::Operation] trait,
/// allowing it to handle any operation conforming to that interface.
///
/// Returns a `Result` with [crate::models::Response] on success and
/// [crate::error::ActiveStorageError] on failure.
///
/// # Arguments
///
Expand All @@ -105,6 +134,13 @@ async fn operation_handler<T: operation::Operation>(
T::execute(&request_data, &data)
}

/// Handler for unknown operations
///
/// Returns an [crate::error::ActiveStorageError].
///
/// # Arguments
///
/// * `operation`: the unknown operation from the URL path
async fn unknown_operation_handler(Path(operation): Path<String>) -> ActiveStorageError {
ActiveStorageError::UnsupportedOperation { operation }
}
2 changes: 1 addition & 1 deletion src/array.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This module provides functions and utilities for working with ndarray objects.
//! Functions and utilities for working with [ndarray] objects.

use crate::error::ActiveStorageError;
use crate::models;
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ pub enum ActiveStorageError {
UnsupportedOperation { operation: String },
}

// Tell axum how to convert `ActiveStorageError` into a response.
impl IntoResponse for ActiveStorageError {
/// Convert from an `ActiveStorageError` into an [axum::response::Response].
fn into_response(self) -> Response {
ErrorResponse::from(self).into_response()
}
Expand Down
32 changes: 32 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
//! This crate provides an Active Storage Server. It implements simple reductions on S3 objects
//! containing numeric binary data. By implementing these reductions in the storage system the
//! volume of data that needs to be transferred to the end user is vastly reduced, leading to
//! faster computations.
//!
//! The work is funded by the
//! [ExCALIBUR project](https://www.metoffice.gov.uk/research/approach/collaboration/spf/excalibur)
//! and is done in collaboration with the
//! [University of Reading](http://www.reading.ac.uk/).
//!
//! This is a performant implementation of the Active Storage Server.
//! The original Python functional prototype is available
//! [here](https://github.com/stackhpc/s3-active-storage).
//!
//! The Active Storage Server is built on top of a number of open source components.
//!
//! * [Tokio](tokio), the most popular asynchronous Rust runtime.
//! * [Axum](axum) web framework, built by the Tokio team. Axum performs well in [various](https://github.com/programatik29/rust-web-benchmarks/blob/master/result/hello-world.md) [benchmarks](https://web-frameworks-benchmark.netlify.app/result?l=rust)
//! and is built on top of various popular components, including the [hyper] HTTP library.
//! * [Serde](serde) performs (de)serialisation of JSON request and response data.
//! * [AWS SDK for S3](aws-sdk-s3) is used to interact with S3-compatible object stores.
//! * [ndarray] provides [NumPy](https://numpy.orgq)-like n-dimensional arrays used in numerical
//! computation.

use tokio::signal;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -10,6 +34,7 @@ mod operations;
mod s3_client;
mod validated_json;

/// Application entry point
#[tokio::main]
async fn main() {
init_tracing();
Expand All @@ -24,6 +49,10 @@ async fn main() {
.unwrap();
}

/// Initlialise tracing (logging)
///
/// Applies a filter based on the `RUST_LOG` environment variable, falling back to enable debug
/// logging for this crate and tower_http if not set.
fn init_tracing() {
tracing_subscriber::registry()
.with(
Expand All @@ -34,6 +63,9 @@ fn init_tracing() {
.init();
}

/// Graceful shutdown handler
///
/// Installs signal handlers to catch Ctrl-C or SIGTERM and trigger a graceful shutdown.
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c()
Expand Down
36 changes: 35 additions & 1 deletion src/models.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
//! Data types and associated functions and methods

use axum::body::Bytes;
use serde::{Deserialize, Serialize};
use strum_macros::Display;
use url::Url;
use validator::{Validate, ValidationError};

/// Supported numerical data types
#[derive(Clone, Copy, Debug, Deserialize, Display, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum DType {
/// [i32]
Int32,
/// [i64]
Int64,
/// [u64]
Uint32,
/// [u64]
Uint64,
/// [f64]
Float32,
/// [f64]
Float64,
}

Expand All @@ -29,20 +38,29 @@ impl DType {
}
}

/// Array ordering
///
/// Defines an ordering for multi-dimensional arrays.
#[derive(Debug, Deserialize, PartialEq)]
pub enum Order {
/// Row-major (C) ordering
C,
/// Column-major (Fortran) ordering
F,
}

/// A slice of a single dimension of an array
// NOTE: In serde, structs can be deserialised from sequences or maps. This allows us to support
// the [<start>, <end>, <stride>] API, with the convenience of named fields.
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize, Validate)]
#[serde(deny_unknown_fields)]
#[validate(schema(function = "validate_slice"))]
pub struct Slice {
/// Start of the slice
pub start: usize,
/// End of the slice
pub end: usize,
/// Stride size
#[validate(range(min = 1, message = "stride must be greater than 0"))]
pub stride: usize,
}
Expand All @@ -55,38 +73,50 @@ impl Slice {
}
}

/// Request data for operations
#[derive(Debug, Deserialize, PartialEq, Validate)]
#[serde(deny_unknown_fields)]
#[validate(schema(function = "validate_request_data"))]
pub struct RequestData {
/// URL of the S3-compatible object store
// TODO: Investigate using lifetimes to enable zero-copy: https://serde.rs/lifetimes.html
pub source: Url,
/// S3 bucket containing the object
#[validate(length(min = 1, message = "bucket must not be empty"))]
pub bucket: String,
/// S3 object containing the data
#[validate(length(min = 1, message = "object must not be empty"))]
pub object: String,
/// Data type
pub dtype: DType,
/// Offset in bytes of the numerical data within the object
pub offset: Option<usize>,
/// Size in bytes of the numerical data from the offset
#[validate(range(min = 1, message = "size must be greater than 0"))]
pub size: Option<usize>,
/// Shape of the multi-dimensional array
#[validate(
length(min = 1, message = "shape length must be greater than 0"),
custom = "validate_shape"
)]
pub shape: Option<Vec<usize>>,
/// Order of the multi-dimensional array
pub order: Option<Order>,
/// Subset of the data to operate on
#[validate]
#[validate(length(min = 1, message = "selection length must be greater than 0"))]
pub selection: Option<Vec<Slice>>,
}

/// Validate an array shape
fn validate_shape(shape: &[usize]) -> Result<(), ValidationError> {
if shape.iter().any(|index| *index == 0) {
return Err(ValidationError::new("shape indices must be greater than 0"));
}
Ok(())
}

/// Validate an array slice
fn validate_slice(slice: &Slice) -> Result<(), ValidationError> {
if slice.end <= slice.start {
let mut error = ValidationError::new("Selection end must be greater than start");
Expand Down Expand Up @@ -121,10 +151,10 @@ fn validate_shape_selection(
Ok(())
}

/// Validate request data
fn validate_request_data(request_data: &RequestData) -> Result<(), ValidationError> {
// Validation of multiple fields in RequestData.
// TODO: More validation of shape & selection vs. size
// TODO: More validation that selection fits in shape
if let Some(size) = &request_data.size {
let dtype_size = request_data.dtype.size_of();
if size % dtype_size != 0 {
Expand All @@ -150,12 +180,16 @@ fn validate_request_data(request_data: &RequestData) -> Result<(), ValidationErr

/// Response containing the result of a computation and associated metadata.
pub struct Response {
/// Response data. May be a scalar or multi-dimensional array.
pub body: Bytes,
/// Data type of the response
pub dtype: DType,
/// Shape of the response
pub shape: Vec<usize>,
}

impl Response {
/// Return a Response object
pub fn new(body: Bytes, dtype: DType, shape: Vec<usize>) -> Response {
Response { body, dtype, shape }
}
Expand Down
2 changes: 2 additions & 0 deletions src/operation.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Interface for Active Storage operations

use crate::error::ActiveStorageError;
use crate::models;

Expand Down
2 changes: 1 addition & 1 deletion src/s3_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This module provides a simplified S3 client that supports downloading objects.
//! A simplified S3 client that supports downloading objects.
//! It attempts to hide the complexities of working with the AWS SDK for S3.

use crate::error::ActiveStorageError;
Expand Down
2 changes: 2 additions & 0 deletions src/validated_json.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! Axum extractor that deserialises and validates JSON

use crate::error::ActiveStorageError;

use async_trait::async_trait;
Expand Down