Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@

# Dev TLS certs
.certs/

# Others
scripts/venv
.vscode
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ with a JSON payload of the form:

The currently supported reducers are `max`, `min`, `mean`, `sum`, `select` and `count`. All reducers return the result using the same datatype as specified in the request except for `count` which always returns the result as `int64`.

The proxy adds two custom headers `x-activestorage-dtype` and `x-activestrorage-shape` to the HTTP response to allow the numeric result to be reconstructed from the binary content of the response.
The proxy adds two custom headers `x-activestorage-dtype` and `x-activestrorage-shape` to the HTTP response to allow the numeric result to be reconstructed from the binary content of the response. An additional `x-activestorage-count` header is also returned which contains the number of array elements operated on while performing the requested reduction. This header is useful, for example, to calculate the mean over multiple requests where the number of items operated on may differ between chunks.

[//]: <> (TODO: No OpenAPI support yet).
[//]: <> (For a running instance of the proxy server, the full OpenAPI specification is browsable as a web page at the `{proxy-address}/redoc/` endpoint or in raw JSON form at `{proxy-address}/openapi.json`.)
Expand Down
11 changes: 8 additions & 3 deletions scripts/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def get_args() -> argparse.Namespace:
parser.add_argument("--shape", type=str)
parser.add_argument("--order", default="C") #, choices=["C", "F"]) allow invalid for testing
parser.add_argument("--selection", type=str)
parser.add_argument("--show-response-headers", action=argparse.BooleanOptionalAction)
return parser.parse_args()


Expand Down Expand Up @@ -65,13 +66,17 @@ def request(url: str, username: str, password: str, request_data: dict):
return response


def display(response):
def display(response, show_headers=False):
#print(response.content)
dtype = response.headers['x-activestorage-dtype']
shape = json.loads(response.headers['x-activestorage-shape'])
result = np.frombuffer(response.content, dtype=dtype)
result = result.reshape(shape)
print(result)
if show_headers:
print("\nResponse headers:", response.headers)
print("\nResult:", result)
else:
print(result)


def display_error(response):
Expand All @@ -88,7 +93,7 @@ def main():
url = f'{args.server}/v1/{args.operation}/'
response = request(url, args.username, args.password, request_data)
if response.ok:
display(response)
display(response, show_headers=args.show_response_headers)
else:
display_error(response)
sys.exit(1)
Expand Down
3 changes: 3 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use tower_http::validate_request::ValidateRequestHeaderLayer;
static HEADER_DTYPE: header::HeaderName = header::HeaderName::from_static("x-activestorage-dtype");
/// `x-activestorage-shape` header definition
static HEADER_SHAPE: header::HeaderName = header::HeaderName::from_static("x-activestorage-shape");
/// `x-activestorage-count` header definition
static HEADER_COUNT: header::HeaderName = header::HeaderName::from_static("x-activestorage-count");

impl IntoResponse for models::Response {
/// Convert a [crate::models::Response] into a [axum::response::Response].
Expand All @@ -41,6 +43,7 @@ impl IntoResponse for models::Response {
),
(&HEADER_DTYPE, self.dtype.to_string().to_lowercase()),
(&HEADER_SHAPE, serde_json::to_string(&self.shape).unwrap()),
(&HEADER_COUNT, serde_json::to_string(&self.count).unwrap()),
],
self.body,
)
Expand Down
11 changes: 9 additions & 2 deletions src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,19 @@ pub struct Response {
pub dtype: DType,
/// Shape of the response
pub shape: Vec<usize>,
/// Number of non-missing elements operated on to generate response
pub count: i64,
}

impl Response {
/// Return a Response object
pub fn new(body: Bytes, dtype: DType, shape: Vec<usize>) -> Response {
Response { body, dtype, shape }
pub fn new(body: Bytes, dtype: DType, shape: Vec<usize>, count: i64) -> Response {
Response {
body,
dtype,
shape,
count,
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ mod tests {
data.clone(),
request_data.dtype,
vec![3],
3,
))
}
}
Expand All @@ -125,6 +126,7 @@ mod tests {
assert_eq!(&[1, 2, 3, 4][..], response.body);
assert_eq!(models::DType::Uint32, response.dtype);
assert_eq!(vec![3], response.shape);
assert_eq!(3, response.count);
}

struct TestNumOp {}
Expand All @@ -140,6 +142,7 @@ mod tests {
body.into(),
request_data.dtype,
vec![1, 2],
2,
))
}
}
Expand All @@ -163,5 +166,6 @@ mod tests {
assert_eq!("i64", response.body);
assert_eq!(models::DType::Int64, response.dtype);
assert_eq!(vec![1, 2], response.shape);
assert_eq!(2, response.count);
}
}
72 changes: 63 additions & 9 deletions src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ impl NumOperation for Count {
let body = len.to_le_bytes();
// Need to copy to provide ownership to caller.
let body = Bytes::copy_from_slice(&body);
Ok(models::Response::new(body, models::DType::Int64, vec![]))
Ok(models::Response::new(
body,
models::DType::Int64,
vec![],
len,
))
}
}

Expand All @@ -44,6 +49,8 @@ impl NumOperation for Max {
let array = array::build_array::<T>(request_data, data)?;
let slice_info = array::build_slice_info::<T>(&request_data.selection, array.shape());
let sliced = array.slice(slice_info);
// FIXME: Account for missing data?
let count = i64::try_from(sliced.len())?;
// FIXME: endianness?
let body = sliced
.max()
Expand All @@ -54,7 +61,12 @@ impl NumOperation for Max {
.as_bytes();
// Need to copy to provide ownership to caller.
let body = Bytes::copy_from_slice(body);
Ok(models::Response::new(body, request_data.dtype, vec![]))
Ok(models::Response::new(
body,
request_data.dtype,
vec![],
count,
))
}
}

Expand All @@ -69,14 +81,21 @@ impl NumOperation for Mean {
let array = array::build_array::<T>(request_data, data)?;
let slice_info = array::build_slice_info::<T>(&request_data.selection, array.shape());
let sliced = array.slice(slice_info);
// FIXME: Account for missing data?
let count = i64::try_from(sliced.len())?;
// FIXME: endianness?
let body = sliced
.mean()
.ok_or(ActiveStorageError::EmptyArray { operation: "mean" })?;
let body = body.as_bytes();
// Need to copy to provide ownership to caller.
let body = Bytes::copy_from_slice(body);
Ok(models::Response::new(body, request_data.dtype, vec![]))
Ok(models::Response::new(
body,
request_data.dtype,
vec![],
count,
))
}
}

Expand All @@ -91,6 +110,8 @@ impl NumOperation for Min {
let array = array::build_array::<T>(request_data, data)?;
let slice_info = array::build_slice_info::<T>(&request_data.selection, array.shape());
let sliced = array.slice(slice_info);
// FIXME: Account for missing data?
let count = i64::try_from(sliced.len())?;
// FIXME: endianness?
let body = sliced
.min()
Expand All @@ -101,7 +122,12 @@ impl NumOperation for Min {
.as_bytes();
// Need to copy to provide ownership to caller.
let body = Bytes::copy_from_slice(body);
Ok(models::Response::new(body, request_data.dtype, vec![]))
Ok(models::Response::new(
body,
request_data.dtype,
vec![],
count,
))
}
}

Expand All @@ -116,6 +142,8 @@ impl NumOperation for Select {
let array = array::build_array::<T>(request_data, data)?;
let slice_info = array::build_slice_info::<T>(&request_data.selection, array.shape());
let sliced = array.slice(slice_info);
// FIXME: Account for missing data?
let count = i64::try_from(sliced.len())?;
let shape = sliced.shape().to_vec();
// Transpose Fortran ordered arrays before iterating.
let body = if !array.is_standard_layout() {
Expand All @@ -129,7 +157,12 @@ impl NumOperation for Select {
let body = body.as_bytes();
// Need to copy to provide ownership to caller.
let body = Bytes::copy_from_slice(body);
Ok(models::Response::new(body, request_data.dtype, shape))
Ok(models::Response::new(
body,
request_data.dtype,
shape,
count,
))
}
}

Expand All @@ -144,12 +177,19 @@ impl NumOperation for Sum {
let array = array::build_array::<T>(request_data, data)?;
let slice_info = array::build_slice_info::<T>(&request_data.selection, array.shape());
let sliced = array.slice(slice_info);
// FIXME: Account for missing data?
let count = i64::try_from(sliced.len())?;
// FIXME: endianness?
let body = sliced.sum();
let body = body.as_bytes();
// Need to copy to provide ownership to caller.
let body = Bytes::copy_from_slice(body);
Ok(models::Response::new(body, request_data.dtype, vec![]))
Ok(models::Response::new(
body,
request_data.dtype,
vec![],
count,
))
}
}

Expand All @@ -174,15 +214,17 @@ mod tests {
order: None,
selection: None,
};
let data = [1, 2, 3, 4, 5, 6, 7, 8];
let data: [u8; 8] = [1, 2, 3, 4, 5, 6, 7, 8];
let bytes = Bytes::copy_from_slice(&data);
let response = Count::execute(&request_data, &bytes).unwrap();
// A u8 slice of 8 elements == a u32 slice with 2 elements
// Count is always i64.
let expected: i64 = 2;
assert_eq!(expected.as_bytes(), response.body);
assert_eq!(8, response.body.len());
assert_eq!(8, response.body.len()); // Assert that count value is 8 bytes (i.e. i64)
assert_eq!(models::DType::Int64, response.dtype);
assert_eq!(vec![0; 0], response.shape);
assert_eq!(expected, response.count);
}

#[test]
Expand All @@ -198,14 +240,20 @@ mod tests {
order: None,
selection: None,
};
let data = [1, 2, 3, 4, 5, 6, 7, 8];
// data:
// A u8 slice of 8 elements == a single i64 value
// where each slice element is 2 hexadecimal digits
// and the order is reversed on little-endian systems
// so [1, 2, 3] is 0x030201 as an i64 in hexadecimal
let data: [u8; 8] = [1, 2, 3, 4, 5, 6, 7, 8];
let bytes = Bytes::copy_from_slice(&data);
let response = Max::execute(&request_data, &bytes).unwrap();
let expected: i64 = 0x0807060504030201;
assert_eq!(expected.as_bytes(), response.body);
assert_eq!(8, response.body.len());
assert_eq!(models::DType::Int64, response.dtype);
assert_eq!(vec![0; 0], response.shape);
assert_eq!(1, response.count);
}

#[test]
Expand All @@ -229,6 +277,7 @@ mod tests {
assert_eq!(4, response.body.len());
assert_eq!(models::DType::Uint32, response.dtype);
assert_eq!(vec![0; 0], response.shape);
assert_eq!(2, response.count);
}

#[test]
Expand All @@ -252,6 +301,7 @@ mod tests {
assert_eq!(8, response.body.len());
assert_eq!(models::DType::Uint64, response.dtype);
assert_eq!(vec![0; 0], response.shape);
assert_eq!(1, response.count);
}

#[test]
Expand All @@ -275,6 +325,7 @@ mod tests {
assert_eq!(8, response.body.len());
assert_eq!(models::DType::Float32, response.dtype);
assert_eq!(vec![2], response.shape);
assert_eq!(2, response.count);
}

#[test]
Expand All @@ -298,6 +349,7 @@ mod tests {
assert_eq!(16, response.body.len());
assert_eq!(models::DType::Float64, response.dtype);
assert_eq!(vec![2, 1], response.shape);
assert_eq!(2, response.count);
}

#[test]
Expand Down Expand Up @@ -327,6 +379,7 @@ mod tests {
assert_eq!(8, response.body.len());
assert_eq!(models::DType::Float32, response.dtype);
assert_eq!(vec![2, 1], response.shape);
assert_eq!(2, response.count);
}

#[test]
Expand All @@ -350,5 +403,6 @@ mod tests {
assert_eq!(4, response.body.len());
assert_eq!(models::DType::Uint32, response.dtype);
assert_eq!(vec![0; 0], response.shape);
assert_eq!(2, response.count);
}
}