Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ axum = { version = "0.6", features = ["headers"] }
axum-server = { version = "0.4.7", features = ["tls-rustls"] }
clap = { version = "4.2", features = ["derive", "env"] }
expanduser = "1.2.2"
flate2 = "1.0"
http = "*"
hyper = { version = "0.14", features = ["full"] }
lazy_static = "1.4"
Expand Down
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ with a JSON payload of the form:
"selection": [
[0, 19, 2],
[1, 3, 1]
]
],

// Algorithm used to compress the data
// - optional, defaults to no compression
"compression": "gzip|zlib"
}
```

Expand All @@ -92,7 +96,7 @@ In particular, the following are known limitations which we intend to address:

* Error handling and reporting is minimal
* No support for missing data
* No support for compressed or encrypted objects
* No support for encrypted objects

## Running

Expand Down
2 changes: 2 additions & 0 deletions scripts/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def get_args() -> argparse.Namespace:
parser.add_argument("--shape", type=str)
parser.add_argument("--order", default="C") #, choices=["C", "F"]) allow invalid for testing
parser.add_argument("--selection", type=str)
parser.add_argument("--compression", type=str)
parser.add_argument("--show-response-headers", action=argparse.BooleanOptionalAction)
return parser.parse_args()

Expand All @@ -49,6 +50,7 @@ def build_request_data(args: argparse.Namespace) -> dict:
'offset': args.offset,
'size': args.size,
'order': args.order,
'compression': args.compression,
}
if args.shape:
request_data["shape"] = json.loads(args.shape)
Expand Down
17 changes: 14 additions & 3 deletions scripts/upload_sample_data.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from enum import Enum
import gzip
import numpy as np
import pathlib
import s3fs
import zlib

NUM_ITEMS = 10
OBJECT_PREFIX = "data"
COMPRESSION_ALGS = [None, "gzip", "zlib"]

#Use enum which also subclasses string type so that
# auto-generated OpenAPI schema can determine allowed dtypes
Expand Down Expand Up @@ -33,8 +36,16 @@ def n_bytes(self):
pass

# Create numpy arrays and upload to S3 as bytes
for d in AllowedDatatypes.__members__.keys():
with s3_fs.open(bucket / f'{OBJECT_PREFIX}-{d}.dat', 'wb') as s3_file:
s3_file.write(np.arange(NUM_ITEMS, dtype=d).tobytes())
for compression in COMPRESSION_ALGS:
compression_suffix = f"-{compression}" if compression else ""
for d in AllowedDatatypes.__members__.keys():
obj_name = f'{OBJECT_PREFIX}-{d}{compression_suffix}.dat'
with s3_fs.open(bucket / obj_name, 'wb') as s3_file:
data = np.arange(NUM_ITEMS, dtype=d).tobytes()
if compression == "gzip":
data = gzip.compress(data)
elif compression == "zlib":
data = zlib.compress(data)
s3_file.write(data)

print("Data upload successful. \nBucket contents:\n", s3_fs.ls(bucket))
6 changes: 6 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Active Storage server API

use crate::error::ActiveStorageError;
use crate::filter_pipeline;
use crate::metrics::{metrics_handler, track_metrics};
use crate::models;
use crate::operation;
Expand Down Expand Up @@ -159,6 +160,11 @@ async fn operation_handler<T: operation::Operation>(
ValidatedJson(request_data): ValidatedJson<models::RequestData>,
) -> Result<models::Response, ActiveStorageError> {
let data = download_object(&auth, &request_data).await?;
let data = filter_pipeline::filter_pipeline(&request_data, &data)?;
if request_data.compression.is_some() || request_data.size.is_none() {
// Validate the raw uncompressed data size now that we know it.
models::validate_raw_size(data.len(), request_data.dtype, &request_data.shape)?;
}
T::execute(&request_data, &data)
}

Expand Down
5 changes: 5 additions & 0 deletions src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ mod tests {
shape: None,
order: None,
selection: None,
compression: None,
},
);
assert_eq!([42], shape.raw_dim().as_array_view().as_slice().unwrap());
Expand All @@ -255,6 +256,7 @@ mod tests {
shape: Some(vec![1, 2, 3]),
order: None,
selection: None,
compression: None,
},
);
assert_eq!(
Expand Down Expand Up @@ -458,6 +460,7 @@ mod tests {
shape: None,
order: None,
selection: None,
compression: None,
};
let bytes = Bytes::copy_from_slice(&data);
let array = build_array::<u32>(&request_data, &bytes).unwrap();
Expand All @@ -477,6 +480,7 @@ mod tests {
shape: Some(vec![2, 1]),
order: None,
selection: None,
compression: None,
};
let bytes = Bytes::copy_from_slice(&data);
let array = build_array::<i64>(&request_data, &bytes).unwrap();
Expand All @@ -496,6 +500,7 @@ mod tests {
shape: None,
order: None,
selection: None,
compression: None,
};
let bytes = Bytes::copy_from_slice(&data);
let array = build_array::<u32>(&request_data, &bytes).unwrap();
Expand Down
104 changes: 104 additions & 0 deletions src/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//! (De)compression support.

use crate::error::ActiveStorageError;
use crate::models;

use axum::body::Bytes;
use flate2::read::{GzDecoder, ZlibDecoder};
use std::io::Read;

/// Decompresses some Bytes and returns the uncompressed data.
///
/// # Arguments
///
/// * `compression`: Compression algorithm
/// * `data`: Compressed data [Bytes](axum::body::Bytes)
pub fn decompress(
compression: models::Compression,
data: &Bytes,
) -> Result<Bytes, ActiveStorageError> {
let mut decoder: Box<dyn Read> = match compression {
models::Compression::Gzip => Box::new(GzDecoder::<&[u8]>::new(data)),
models::Compression::Zlib => Box::new(ZlibDecoder::<&[u8]>::new(data)),
};
// The data returned by the S3 client does not have any alignment guarantees. In order to
// reinterpret the data as an array of numbers with a higher alignment than 1, we need to
// return the data in Bytes object in which the underlying data has a higher alignment.
// For now we're hard-coding an alignment of 8 bytes, although this should depend on the
// data type, and potentially whether there are any SIMD requirements.
// Create an 8-byte aligned Vec<u8>.
// FIXME: The compressed length will not be enough to store the uncompressed data, and may
// result in a change in the underlying buffer to one that is not correctly aligned.
let mut buf = maligned::align_first::<u8, maligned::A8>(data.len());
decoder.read_to_end(&mut buf)?;
// Release any unnecessary capacity.
buf.shrink_to(0);
Ok(buf.into())
}

#[cfg(test)]
mod tests {
use super::*;
use flate2::read::{GzEncoder, ZlibEncoder};
use flate2::Compression;

fn compress_gzip() -> Vec<u8> {
// Adapated from flate2 documentation.
let mut result = Vec::<u8>::new();
let input = b"hello world";
let mut deflater = GzEncoder::new(&input[..], Compression::fast());
deflater.read_to_end(&mut result).unwrap();
result
}

fn compress_zlib() -> Vec<u8> {
// Adapated from flate2 documentation.
let mut result = Vec::<u8>::new();
let input = b"hello world";
let mut deflater = ZlibEncoder::new(&input[..], Compression::fast());
deflater.read_to_end(&mut result).unwrap();
result
}

#[test]
fn test_decompress_gzip() {
let compressed = compress_gzip();
let result = decompress(models::Compression::Gzip, &compressed.into()).unwrap();
assert_eq!(result, b"hello world".as_ref());
assert_eq!(result.as_ptr().align_offset(8), 0);
}

#[test]
fn test_decompress_zlib() {
let compressed = compress_zlib();
let result = decompress(models::Compression::Zlib, &compressed.into()).unwrap();
assert_eq!(result, b"hello world".as_ref());
assert_eq!(result.as_ptr().align_offset(8), 0);
}

#[test]
fn test_decompress_invalid_gzip() {
let invalid = b"invalid format";
let err = decompress(models::Compression::Gzip, &invalid.as_ref().into()).unwrap_err();
match err {
ActiveStorageError::Decompression(io_err) => {
assert_eq!(io_err.kind(), std::io::ErrorKind::InvalidInput);
assert_eq!(io_err.to_string(), "invalid gzip header");
}
err => panic!("unexpected error {}", err),
}
}

#[test]
fn test_decompress_invalid_zlib() {
let invalid = b"invalid format";
let err = decompress(models::Compression::Zlib, &invalid.as_ref().into()).unwrap_err();
match err {
ActiveStorageError::Decompression(io_err) => {
assert_eq!(io_err.kind(), std::io::ErrorKind::InvalidInput);
assert_eq!(io_err.to_string(), "corrupt deflate stream");
}
err => panic!("unexpected error {}", err),
}
}
}
32 changes: 30 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ use tracing::{event, Level};
/// Each variant may result in a different API error response.
#[derive(Debug, Error)]
pub enum ActiveStorageError {
/// Error decompressing data
#[error("failed to decompress data")]
Decompression(#[from] std::io::Error),

/// Attempt to perform an invalid operation on an empty array or selection
#[error("cannot perform {operation} on empty array or selection")]
EmptyArray { operation: &'static str },
Expand All @@ -34,7 +38,11 @@ pub enum ActiveStorageError {
#[error("request data is not valid")]
RequestDataJsonRejection(#[from] JsonRejection),

/// Error validating RequestData
/// Error validating RequestData (single error)
#[error("request data is not valid")]
RequestDataValidationSingle(#[from] validator::ValidationError),

/// Error validating RequestData (multiple errors)
#[error("request data is not valid")]
RequestDataValidation(#[from] validator::ValidationErrors),

Expand Down Expand Up @@ -174,8 +182,10 @@ impl From<ActiveStorageError> for ErrorResponse {
fn from(error: ActiveStorageError) -> Self {
let response = match &error {
// Bad request
ActiveStorageError::EmptyArray { operation: _ }
ActiveStorageError::Decompression(_)
| ActiveStorageError::EmptyArray { operation: _ }
| ActiveStorageError::RequestDataJsonRejection(_)
| ActiveStorageError::RequestDataValidationSingle(_)
| ActiveStorageError::RequestDataValidation(_)
| ActiveStorageError::ShapeInvalid(_) => Self::bad_request(&error),

Expand Down Expand Up @@ -309,6 +319,15 @@ mod tests {
assert_eq!(caused_by, error_response.error.caused_by);
}

#[tokio::test]
async fn decompression_error() {
let io_error = std::io::Error::new(std::io::ErrorKind::InvalidInput, "decompression error");
let error = ActiveStorageError::Decompression(io_error);
let message = "failed to decompress data";
let caused_by = Some(vec!["decompression error"]);
test_active_storage_error(error, StatusCode::BAD_REQUEST, message, caused_by).await;
}

#[tokio::test]
async fn empty_array_op_error() {
let error = ActiveStorageError::EmptyArray { operation: "foo" };
Expand All @@ -326,6 +345,15 @@ mod tests {
.await;
}

#[tokio::test]
async fn request_data_validation_single() {
let validation_error = validator::ValidationError::new("foo");
let error = ActiveStorageError::RequestDataValidationSingle(validation_error);
let message = "request data is not valid";
let caused_by = Some(vec!["Validation error: foo [{}]"]);
test_active_storage_error(error, StatusCode::BAD_REQUEST, message, caused_by).await;
}

#[tokio::test]
async fn request_data_validation() {
let mut validation_errors = validator::ValidationErrors::new();
Expand Down
Loading