diff --git a/Cargo.toml b/Cargo.toml index 6b93aab..11efabf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,11 @@ edition = "2021" [dependencies] async-trait = "0.1" -axum = "0.6" +aws-config = "0.54" +aws-credential-types = { version = "0.54", features = ["hardcoded-credentials"] } +aws-sdk-s3 = "0.24" +aws-types = "0.54" +axum = { version = "0.6", features = ["headers"] } hyper = { version = "0.14", features = ["full"] } mime = "0.3" serde = { version = "1.0", features = ["derive"] } diff --git a/src/app.rs b/src/app.rs index 60be4e5..10e48c3 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,14 +1,16 @@ use crate::models; +use crate::s3_client::S3Client; use crate::validated_json::ValidatedJson; use axum::{ body::Body, + headers::authorization::{Authorization, Basic}, http::header, http::Request, http::StatusCode, response::{IntoResponse, Response}, routing::{get, post}, - Router, + Router, TypedHeader, }; use tower::ServiceBuilder; @@ -70,53 +72,78 @@ async fn schema() -> &'static str { } async fn count( + TypedHeader(auth): TypedHeader>, ValidatedJson(request_data): ValidatedJson, ) -> models::Response { - models::Response::new( - request_data.source.to_string(), - models::DType::Int32, - vec![], - ) + let client = S3Client::new(&request_data.source, auth.username(), auth.password()).await; + let data = client + .download_object(&request_data.bucket, &request_data.object, None) + .await; + let message = format!("{:?}", data); + models::Response::new(message, models::DType::Int32, vec![]) } -async fn max(ValidatedJson(request_data): ValidatedJson) -> models::Response { - models::Response::new( - request_data.source.to_string(), - models::DType::Int32, - vec![], - ) +async fn max( + TypedHeader(auth): TypedHeader>, + ValidatedJson(request_data): ValidatedJson, +) -> models::Response { + let message = format!( + "url {} username {} password {}", + request_data.source, + auth.username(), + auth.password() + ); + models::Response::new(message, models::DType::Int32, vec![]) } -async fn mean(ValidatedJson(request_data): ValidatedJson) -> models::Response { - models::Response::new( - request_data.source.to_string(), - models::DType::Int32, - vec![], - ) +async fn mean( + TypedHeader(auth): TypedHeader>, + ValidatedJson(request_data): ValidatedJson, +) -> models::Response { + let message = format!( + "url {} username {} password {}", + request_data.source, + auth.username(), + auth.password() + ); + models::Response::new(message, models::DType::Int32, vec![]) } -async fn min(ValidatedJson(request_data): ValidatedJson) -> models::Response { - models::Response::new( - request_data.source.to_string(), - models::DType::Int32, - vec![], - ) +async fn min( + TypedHeader(auth): TypedHeader>, + ValidatedJson(request_data): ValidatedJson, +) -> models::Response { + let message = format!( + "url {} username {} password {}", + request_data.source, + auth.username(), + auth.password() + ); + models::Response::new(message, models::DType::Int32, vec![]) } async fn select( + TypedHeader(auth): TypedHeader>, ValidatedJson(request_data): ValidatedJson, ) -> models::Response { - models::Response::new( - request_data.source.to_string(), - models::DType::Int32, - vec![], - ) + let message = format!( + "url {} username {} password {}", + request_data.source, + auth.username(), + auth.password() + ); + models::Response::new(message, models::DType::Int32, vec![]) } -async fn sum(ValidatedJson(request_data): ValidatedJson) -> models::Response { - models::Response::new( - request_data.source.to_string(), - models::DType::Int32, - vec![], - ) +async fn sum( + TypedHeader(auth): TypedHeader>, + ValidatedJson(request_data): ValidatedJson, +) -> models::Response { + let message = format!( + "url {} username {} password {}", + request_data.source, + auth.username(), + auth.password() + ); + models::Response::new(message, models::DType::Int32, vec![]) } diff --git a/src/main.rs b/src/main.rs index f0a8b83..eb2dc3a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ use tokio::signal; mod app; mod models; +mod s3_client; mod validated_json; #[tokio::main] diff --git a/src/s3_client.rs b/src/s3_client.rs new file mode 100644 index 0000000..578b945 --- /dev/null +++ b/src/s3_client.rs @@ -0,0 +1,75 @@ +/// This module provides a simplified S3 client that supports downloading objects. +/// It attempts to hide the complexities of working with the AWS SDK for S3. +use aws_config::{self, meta::region::RegionProviderChain}; +use aws_credential_types::Credentials; +use aws_sdk_s3::Client; +use aws_types::region::Region; +use axum::body::Bytes; +use url::Url; + +/// S3 client object. +pub struct S3Client { + /// Underlying AWS SDK S3 client object. + client: Client, +} + +impl S3Client { + /// Creates an S3Client object + /// + /// # Arguments + /// + /// * `url`: Object storage API URL + /// * `username`: Object storage account username + /// * `password`: Object storage account password + pub async fn new(url: &Url, username: &str, password: &str) -> Self { + let credentials = Credentials::from_keys(username, password, None); + let region = RegionProviderChain::default_provider().or_else(Region::new("us-east-1")); + let config = aws_config::from_env().region(region).load().await; + let s3_config = aws_sdk_s3::config::Builder::from(&config) + .credentials_provider(credentials) + .endpoint_url(url.to_string()) + .force_path_style(true) + .build(); + let client = Client::from_conf(s3_config); + Self { client } + } + + /// Downloads an object from object storage and returns the data as Bytes + /// + /// # Arguments + /// + /// * `bucket`: Name of the bucket + /// * `key`: Name of the object in the bucket + /// * `range`: Optional byte range + pub async fn download_object( + self: &S3Client, + bucket: &str, + key: &str, + range: Option, + ) -> Bytes { + let response = self + .client + .get_object() + .bucket(bucket) + .key(key) + .set_range(range) + .send() + .await + .unwrap(); + let data = response.body.collect().await; + // TODO: Provide a streaming response. + data.unwrap().into_bytes() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use url::Url; + + #[tokio::test] + async fn new() { + let url = Url::parse("http://example.com").unwrap(); + S3Client::new(&url, "user", "password").await; + } +}