From 7d78d1b948311ba99095cad4de272bf788aa5881 Mon Sep 17 00:00:00 2001 From: Mark Goddard Date: Wed, 15 Mar 2023 11:27:18 +0000 Subject: [PATCH 1/4] Explicitly set MSRV in Cargo.toml --- Cargo.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 11efabf..be98691 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,8 @@ name = "s3-active-storage" version = "0.1.0" edition = "2021" +# Due to AWS SDK. +rust-version = "1.62.1" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html From dce40a1bbe1061bcbf9004c48aae667b5802423d Mon Sep 17 00:00:00 2001 From: Mark Goddard Date: Wed, 15 Mar 2023 11:28:29 +0000 Subject: [PATCH 2/4] api: Normalise paths to remove trailing slashes --- Cargo.toml | 2 +- src/app.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index be98691..33417f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ strum_macros = "0.24" thiserror = "1.0" tokio = { version = "1.24", features = ["full"] } tower = "0.4" -tower-http = { version = "0.3", features = ["trace", "validate-request"] } +tower-http = { version = "0.3", features = ["normalize-path", "trace", "validate-request"] } url = { version = "2", features = ["serde"] } validator = { version = "0.16", features = ["derive"] } diff --git a/src/app.rs b/src/app.rs index 10e48c3..b38b79b 100644 --- a/src/app.rs +++ b/src/app.rs @@ -14,6 +14,7 @@ use axum::{ }; use tower::ServiceBuilder; +use tower_http::normalize_path::NormalizePathLayer; use tower_http::trace::TraceLayer; use tower_http::validate_request::ValidateRequestHeaderLayer; @@ -65,6 +66,7 @@ pub fn router() -> Router { Router::new() .route("/.well-known/s3-active-storage-schema", get(schema)) .nest("/v1", v1()) + .layer(NormalizePathLayer::trim_trailing_slash()) } async fn schema() -> &'static str { From d4fab371fd08ffe99feeb5ab8fdbf4148ecbe6ff Mon Sep 17 00:00:00 2001 From: Mark Goddard Date: Wed, 15 Mar 2023 16:18:11 +0000 Subject: [PATCH 3/4] s3_client: Return data in an 8-byte aligned slice The data returned by the AWS S3 client does not have any alignment guarantees. In order to reinterpret the data as an array of numbers with a higher alignment than 1, we need to return the data in Bytes object in which the underlying data has a higher alignment. For now we're hard-coding an alignment of 8 bytes, although this should depend on the data type, and potentially whether there are any SIMD requirements. The current method is rather inefficient, involving copying the data at least twice. This is functional, but should be revisited. --- Cargo.toml | 1 + src/s3_client.rs | 21 +++++++++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 33417f4..1aaa490 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ aws-sdk-s3 = "0.24" aws-types = "0.54" axum = { version = "0.6", features = ["headers"] } hyper = { version = "0.14", features = ["full"] } +maligned = "0.2.1" mime = "0.3" serde = { version = "1.0", features = ["derive"] } serde_json = "*" diff --git a/src/s3_client.rs b/src/s3_client.rs index 578b945..1943cc2 100644 --- a/src/s3_client.rs +++ b/src/s3_client.rs @@ -47,6 +47,7 @@ impl S3Client { key: &str, range: Option, ) -> Bytes { + // TODO: Provide a streaming response. let response = self .client .get_object() @@ -56,9 +57,25 @@ impl S3Client { .send() .await .unwrap(); + let content_length = response.content_length(); + // The data returned by the S3 client does not have any alignment guarantees. In order to + // reinterpret the data as an array of numbers with a higher alignment than 1, we need to + // return the data in Bytes object in which the underlying data has a higher alignment. + // For now we're hard-coding an alignment of 8 bytes, although this should depend on the + // data type, and potentially whether there are any SIMD requirements. + // FIXME: The current method is rather inefficient, involving copying the data at least + // twice. This is functional, but should be revisited. + + // Read all data into memory as an AggregatedBytes. let data = response.body.collect().await; - // TODO: Provide a streaming response. - data.unwrap().into_bytes() + // Create an 8-byte aligned Vec. + let mut buf = maligned::align_first::(content_length as usize); + // Copy the data into an unaligned Vec. + let mut vec = data.unwrap().to_vec(); + // Copy the data into the aligned Vec. + buf.append(&mut vec); + // Return as Bytes. + buf.into() } } From bc76f97582e82bbe90055af8ba7b15dfe9249fff Mon Sep 17 00:00:00 2001 From: Mark Goddard Date: Thu, 16 Mar 2023 10:12:25 +0000 Subject: [PATCH 4/4] Add basic Python client for testing --- scripts/client.py | 30 ++++++++++++++++++++++++++++++ scripts/requirements.txt | 3 +-- 2 files changed, 31 insertions(+), 2 deletions(-) create mode 100644 scripts/client.py diff --git a/scripts/client.py b/scripts/client.py new file mode 100644 index 0000000..8905473 --- /dev/null +++ b/scripts/client.py @@ -0,0 +1,30 @@ +# Make a request to the active storage server. + +import json +import requests +import numpy as np +import sys + +request_data = { + 'source': 'http://localhost:9000', + 'bucket': 'sample-data', + 'object': 'data-uint32.dat', + 'dtype': 'uint32', + # All other fields assume their default values +} + +if len(sys.argv) > 1: + reducer = sys.argv[1] +else: + reducer = 'min' + +response = requests.post( + f'http://localhost:8080/v1/{reducer}', + json=request_data, + auth=('minioadmin', 'minioadmin') +) +print(response.content) +sum_result = np.frombuffer(response.content, dtype=response.headers['x-activestorage-dtype']) +shape = json.loads(response.headers['x-activestorage-shape']) +sum_result = sum_result.reshape(shape) +print(sum_result) diff --git a/scripts/requirements.txt b/scripts/requirements.txt index e872ce2..efc3ea4 100644 --- a/scripts/requirements.txt +++ b/scripts/requirements.txt @@ -1,4 +1,3 @@ -aioboto3 -botocore numpy +requests s3fs