Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
name = "s3-active-storage"
version = "0.1.0"
edition = "2021"
# Due to AWS SDK.
rust-version = "1.62.1"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -13,6 +15,7 @@ aws-sdk-s3 = "0.24"
aws-types = "0.54"
axum = { version = "0.6", features = ["headers"] }
hyper = { version = "0.14", features = ["full"] }
maligned = "0.2.1"
mime = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "*"
Expand All @@ -21,7 +24,7 @@ strum_macros = "0.24"
thiserror = "1.0"
tokio = { version = "1.24", features = ["full"] }
tower = "0.4"
tower-http = { version = "0.3", features = ["trace", "validate-request"] }
tower-http = { version = "0.3", features = ["normalize-path", "trace", "validate-request"] }
url = { version = "2", features = ["serde"] }
validator = { version = "0.16", features = ["derive"] }

Expand Down
30 changes: 30 additions & 0 deletions scripts/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Make a request to the active storage server.

import json
import requests
import numpy as np
import sys

request_data = {
'source': 'http://localhost:9000',
'bucket': 'sample-data',
'object': 'data-uint32.dat',
'dtype': 'uint32',
# All other fields assume their default values
}

if len(sys.argv) > 1:
reducer = sys.argv[1]
else:
reducer = 'min'

response = requests.post(
f'http://localhost:8080/v1/{reducer}',
json=request_data,
auth=('minioadmin', 'minioadmin')
)
print(response.content)
sum_result = np.frombuffer(response.content, dtype=response.headers['x-activestorage-dtype'])
shape = json.loads(response.headers['x-activestorage-shape'])
sum_result = sum_result.reshape(shape)
print(sum_result)
3 changes: 1 addition & 2 deletions scripts/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
aioboto3
botocore
numpy
requests
s3fs
2 changes: 2 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use axum::{
};

use tower::ServiceBuilder;
use tower_http::normalize_path::NormalizePathLayer;
use tower_http::trace::TraceLayer;
use tower_http::validate_request::ValidateRequestHeaderLayer;

Expand Down Expand Up @@ -65,6 +66,7 @@ pub fn router() -> Router {
Router::new()
.route("/.well-known/s3-active-storage-schema", get(schema))
.nest("/v1", v1())
.layer(NormalizePathLayer::trim_trailing_slash())
}

async fn schema() -> &'static str {
Expand Down
21 changes: 19 additions & 2 deletions src/s3_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl S3Client {
key: &str,
range: Option<String>,
) -> Bytes {
// TODO: Provide a streaming response.
let response = self
.client
.get_object()
Expand All @@ -56,9 +57,25 @@ impl S3Client {
.send()
.await
.unwrap();
let content_length = response.content_length();
// The data returned by the S3 client does not have any alignment guarantees. In order to
// reinterpret the data as an array of numbers with a higher alignment than 1, we need to
// return the data in Bytes object in which the underlying data has a higher alignment.
// For now we're hard-coding an alignment of 8 bytes, although this should depend on the
// data type, and potentially whether there are any SIMD requirements.
// FIXME: The current method is rather inefficient, involving copying the data at least
// twice. This is functional, but should be revisited.

// Read all data into memory as an AggregatedBytes.
let data = response.body.collect().await;
// TODO: Provide a streaming response.
data.unwrap().into_bytes()
// Create an 8-byte aligned Vec<u8>.
let mut buf = maligned::align_first::<u8, maligned::A8>(content_length as usize);
// Copy the data into an unaligned Vec<u8>.
let mut vec = data.unwrap().to_vec();
// Copy the data into the aligned Vec<u8>.
buf.append(&mut vec);
// Return as Bytes.
buf.into()
}
}

Expand Down