diff --git a/Cargo.toml b/Cargo.toml index 3cc576f..658da43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,9 @@ thiserror = "1.0" tokio = { version = "1.24", features = ["full"] } tower = "0.4" tower-http = { version = "0.3", features = ["normalize-path", "trace", "validate-request"] } +tokio-stream = "0.1" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } url = { version = "2", features = ["serde"] } validator = { version = "0.16", features = ["derive"] } zerocopy = { version = "0.6.1", features = ["alloc", "simd"] } diff --git a/scripts/upload_sample_data.py b/scripts/upload_sample_data.py index a693037..416e31e 100644 --- a/scripts/upload_sample_data.py +++ b/scripts/upload_sample_data.py @@ -3,6 +3,9 @@ import pathlib import s3fs +NUM_ITEMS = 10 +OBJECT_PREFIX = "data" + #Use enum which also subclasses string type so that # auto-generated OpenAPI schema can determine allowed dtypes class AllowedDatatypes(str, Enum): @@ -32,7 +35,7 @@ def n_bytes(self): # Create numpy arrays and upload to S3 as bytes for d in AllowedDatatypes.__members__.keys(): - with s3_fs.open(bucket / f'data-{d}.dat', 'wb') as s3_file: - s3_file.write(np.arange(10, dtype=d).tobytes()) + with s3_fs.open(bucket / f'{OBJECT_PREFIX}-{d}.dat', 'wb') as s3_file: + s3_file.write(np.arange(NUM_ITEMS, dtype=d).tobytes()) print("Data upload successful. \nBucket contents:\n", s3_fs.ls(bucket)) diff --git a/src/error.rs b/src/error.rs index f01b159..e1b89c1 100644 --- a/src/error.rs +++ b/src/error.rs @@ -177,7 +177,6 @@ impl From for ErrorResponse { | ActiveStorageError::S3ByteStream(_) => Self::internal_server_error(&error), ActiveStorageError::S3GetObject(sdk_error) => { - // FIXME: we lose "error retrieving object from S3 storage" // Tailor the response based on the specific SdkError variant. match &sdk_error { // These are generic SdkError variants. @@ -190,12 +189,6 @@ impl From for ErrorResponse { // This is a more specific ServiceError variant, with GetObjectError as the // inner error. SdkError::ServiceError(get_obj_error) => { - //let error = if let Some(get_obj_message) = get_obj_error.err().message() { - // // FIXME: use message() & code()? - // &get_obj_error.err() - //} else { - // &sdk_error - //}; let get_obj_error = get_obj_error.err(); match get_obj_error.kind { GetObjectErrorKind::InvalidObjectState(_) diff --git a/src/main.rs b/src/main.rs index 6660080..1529c46 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ use tokio::signal; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; mod app; mod array; @@ -11,6 +12,8 @@ mod validated_json; #[tokio::main] async fn main() { + init_tracing(); + let router = app::router(); // run it with hyper on localhost:8080 @@ -21,6 +24,16 @@ async fn main() { .unwrap(); } +fn init_tracing() { + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "s3_active_storage=debug,tower_http=debug".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); +} + async fn shutdown_signal() { let ctrl_c = async { signal::ctrl_c() diff --git a/src/s3_client.rs b/src/s3_client.rs index 86b77f4..52bb3c0 100644 --- a/src/s3_client.rs +++ b/src/s3_client.rs @@ -7,6 +7,7 @@ use aws_credential_types::Credentials; use aws_sdk_s3::Client; use aws_types::region::Region; use axum::body::Bytes; +use tokio_stream::StreamExt; use url::Url; /// S3 client object. @@ -49,8 +50,7 @@ impl S3Client { key: &str, range: Option, ) -> Result { - // TODO: Provide a streaming response. - let response = self + let mut response = self .client .get_object() .bucket(bucket) @@ -64,18 +64,13 @@ impl S3Client { // return the data in Bytes object in which the underlying data has a higher alignment. // For now we're hard-coding an alignment of 8 bytes, although this should depend on the // data type, and potentially whether there are any SIMD requirements. - // FIXME: The current method is rather inefficient, involving copying the data at least - // twice. This is functional, but should be revisited. - // Create an 8-byte aligned Vec. let mut buf = maligned::align_first::(content_length as usize); - // Read all data into memory as an AggregatedBytes. - let data = response.body.collect().await; - // Copy the data into an unaligned Vec. - let bytes = data.map_err(ActiveStorageError::S3ByteStream)?; - let mut vec = bytes.to_vec(); - // Copy the data into the aligned Vec. - buf.append(&mut vec); + + // Iterate over the streaming response, copying data into the aligned Vec. + while let Some(bytes) = response.body.try_next().await? { + buf.extend_from_slice(&bytes) + } // Return as Bytes. Ok(buf.into()) }