Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ thiserror = "1.0"
tokio = { version = "1.24", features = ["full"] }
tower = "0.4"
tower-http = { version = "0.3", features = ["normalize-path", "trace", "validate-request"] }
tokio-stream = "0.1"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = { version = "2", features = ["serde"] }
validator = { version = "0.16", features = ["derive"] }
zerocopy = { version = "0.6.1", features = ["alloc", "simd"] }
Expand Down
7 changes: 5 additions & 2 deletions scripts/upload_sample_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import pathlib
import s3fs

NUM_ITEMS = 10
OBJECT_PREFIX = "data"

#Use enum which also subclasses string type so that
# auto-generated OpenAPI schema can determine allowed dtypes
class AllowedDatatypes(str, Enum):
Expand Down Expand Up @@ -32,7 +35,7 @@ def n_bytes(self):

# Create numpy arrays and upload to S3 as bytes
for d in AllowedDatatypes.__members__.keys():
with s3_fs.open(bucket / f'data-{d}.dat', 'wb') as s3_file:
s3_file.write(np.arange(10, dtype=d).tobytes())
with s3_fs.open(bucket / f'{OBJECT_PREFIX}-{d}.dat', 'wb') as s3_file:
s3_file.write(np.arange(NUM_ITEMS, dtype=d).tobytes())

print("Data upload successful. \nBucket contents:\n", s3_fs.ls(bucket))
7 changes: 0 additions & 7 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ impl From<ActiveStorageError> for ErrorResponse {
| ActiveStorageError::S3ByteStream(_) => Self::internal_server_error(&error),

ActiveStorageError::S3GetObject(sdk_error) => {
// FIXME: we lose "error retrieving object from S3 storage"
// Tailor the response based on the specific SdkError variant.
match &sdk_error {
// These are generic SdkError variants.
Expand All @@ -190,12 +189,6 @@ impl From<ActiveStorageError> for ErrorResponse {
// This is a more specific ServiceError variant, with GetObjectError as the
// inner error.
SdkError::ServiceError(get_obj_error) => {
//let error = if let Some(get_obj_message) = get_obj_error.err().message() {
// // FIXME: use message() & code()?
// &get_obj_error.err()
//} else {
// &sdk_error
//};
let get_obj_error = get_obj_error.err();
match get_obj_error.kind {
GetObjectErrorKind::InvalidObjectState(_)
Expand Down
13 changes: 13 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use tokio::signal;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

mod app;
mod array;
Expand All @@ -11,6 +12,8 @@ mod validated_json;

#[tokio::main]
async fn main() {
init_tracing();

let router = app::router();

// run it with hyper on localhost:8080
Expand All @@ -21,6 +24,16 @@ async fn main() {
.unwrap();
}

fn init_tracing() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "s3_active_storage=debug,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
}

async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c()
Expand Down
19 changes: 7 additions & 12 deletions src/s3_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use aws_credential_types::Credentials;
use aws_sdk_s3::Client;
use aws_types::region::Region;
use axum::body::Bytes;
use tokio_stream::StreamExt;
use url::Url;

/// S3 client object.
Expand Down Expand Up @@ -49,8 +50,7 @@ impl S3Client {
key: &str,
range: Option<String>,
) -> Result<Bytes, ActiveStorageError> {
// TODO: Provide a streaming response.
let response = self
let mut response = self
.client
.get_object()
.bucket(bucket)
Expand All @@ -64,18 +64,13 @@ impl S3Client {
// return the data in Bytes object in which the underlying data has a higher alignment.
// For now we're hard-coding an alignment of 8 bytes, although this should depend on the
// data type, and potentially whether there are any SIMD requirements.
// FIXME: The current method is rather inefficient, involving copying the data at least
// twice. This is functional, but should be revisited.

// Create an 8-byte aligned Vec<u8>.
let mut buf = maligned::align_first::<u8, maligned::A8>(content_length as usize);
// Read all data into memory as an AggregatedBytes.
let data = response.body.collect().await;
// Copy the data into an unaligned Vec<u8>.
let bytes = data.map_err(ActiveStorageError::S3ByteStream)?;
let mut vec = bytes.to_vec();
// Copy the data into the aligned Vec<u8>.
buf.append(&mut vec);

// Iterate over the streaming response, copying data into the aligned Vec<u8>.
while let Some(bytes) = response.body.try_next().await? {
buf.extend_from_slice(&bytes)
}
// Return as Bytes.
Ok(buf.into())
}
Expand Down