diff --git a/examples/README.md b/examples/README.md index 2529666..44d9f51 100644 --- a/examples/README.md +++ b/examples/README.md @@ -9,7 +9,8 @@ Examples of using the [waSCC](https://wascc.dev/) runtime for AWS Lambda. ## Examples -* [`custom`](custom/README.md) A simple example that processes a custom lambda event +* [`custom`](custom/README.md) A simple example that processes a custom Lambda event +* [`apigw`](apigw/README.md) An example that receives an HTTP request via API Gateway * [`sqs`](sqs/README.md) Receive an SQS message and publish a reply ## Notes diff --git a/examples/apigw/.gitignore b/examples/apigw/.gitignore new file mode 100644 index 0000000..d7c2077 --- /dev/null +++ b/examples/apigw/.gitignore @@ -0,0 +1,7 @@ +.terraform/ + +terraform.tfstate +terraform.tfstate.backup + +app.zip +output.json diff --git a/examples/apigw/Makefile b/examples/apigw/Makefile new file mode 100644 index 0000000..03e0afe --- /dev/null +++ b/examples/apigw/Makefile @@ -0,0 +1,9 @@ +.PHONY: all apply + +all: apply + +apply: app.zip + terraform12 apply + +app.zip: manifest.yaml actor/target/wasm32-unknown-unknown/release/uppercase.wasm + zip -j $@ $^ diff --git a/examples/apigw/README.md b/examples/apigw/README.md new file mode 100644 index 0000000..e233e21 --- /dev/null +++ b/examples/apigw/README.md @@ -0,0 +1,42 @@ +# API Gateway HTTP Request Invocation + +This actor is identical to the [Krustlet Uppercase](https://github.com/deislabs/krustlet/tree/master/demos/wascc/uppercase) demo, +the only change being that the actor is signed with the `awslambda:runtime` capability instead of the standard HTTP server capability. + +### Build + +Build the [sample waSCC actor](actor/README.md). + +```console +$ cd actor +$ make release +$ cd .. +``` + +### Deploy + +This examples uses the `wascc-slim` Lambda layer. +See [`layers`](../../layers/README/md) for instructions on building the waSCC runtime Lambda layers. + +```console +$ terraform init +``` + +Set AWS environment variables for your authenticated session. + +```console +$ make +``` + +### Test + +```console +$ curl https://v3390lt0j2.execute-api.us-west-2.amazonaws.com/?today=tuesday +{"original":"today=tuesday","uppercased":"TODAY=TUESDAY"} +``` + +### Known Issues + +It works on my machine! + +The public key of the actor in `manifest.yaml` is the value I use and will have to be changed when you generate your own keys. diff --git a/examples/apigw/actor/.gitignore b/examples/apigw/actor/.gitignore new file mode 100644 index 0000000..4a96773 --- /dev/null +++ b/examples/apigw/actor/.gitignore @@ -0,0 +1 @@ +.keys/ diff --git a/examples/apigw/actor/Cargo.toml b/examples/apigw/actor/Cargo.toml new file mode 100644 index 0000000..0302e6e --- /dev/null +++ b/examples/apigw/actor/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "uppercase" +version = "0.6.1" +authors = ["Kit Ewbank "] +edition = "2018" +license = "Apache-2.0" +readme = "README.md" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +wascc-actor = "0.6.0" +log = '0.4.8' +serde = { version = "1.0.104", features = ["derive"]} +wascc-codec = "0.6.0" + +[profile.release] +# Optimize for small code size +opt-level = "s" + +[workspace] diff --git a/examples/apigw/actor/Makefile b/examples/apigw/actor/Makefile new file mode 100644 index 0000000..45c97ad --- /dev/null +++ b/examples/apigw/actor/Makefile @@ -0,0 +1,49 @@ +COLOR ?= always # Valid COLOR options: {always, auto, never} +CARGO = cargo --color $(COLOR) +TARGET = target/wasm32-unknown-unknown +DEBUG = $(TARGET)/debug +RELEASE = $(TARGET)/release +KEYDIR ?= .keys + +.PHONY: all bench build check clean doc test update keys keys-account keys-module + +all: build + +bench: + @$(CARGO) bench + +build: + @$(CARGO) build --target wasm32-unknown-unknown + wascap sign $(DEBUG)/uppercase.wasm $(DEBUG)/uppercase_signed.wasm --issuer $(KEYDIR)/account.nk --subject $(KEYDIR)/module.nk --cap awslambda:runtime --cap wascc:logging --name uppercase + +check: + @$(CARGO) check + +clean: + @$(CARGO) clean + +doc: + @$(CARGO) doc + +test: build + @$(CARGO) test + +update: + @$(CARGO) update + +release: + @$(CARGO) build --release --target wasm32-unknown-unknown + wascap sign $(RELEASE)/uppercase.wasm $(RELEASE)/uppercase.wasm --issuer $(KEYDIR)/account.nk --subject $(KEYDIR)/module.nk --cap awslambda:runtime --cap wascc:logging --name uppercase + +keys: keys-account +keys: keys-module + +keys-account: + @mkdir -p $(KEYDIR) + nk gen account > $(KEYDIR)/account.txt + awk '/Seed/{ print $$2 }' $(KEYDIR)/account.txt > $(KEYDIR)/account.nk + +keys-module: + @mkdir -p $(KEYDIR) + nk gen module > $(KEYDIR)/module.txt + awk '/Seed/{ print $$2 }' $(KEYDIR)/module.txt > $(KEYDIR)/module.nk diff --git a/examples/apigw/actor/README.md b/examples/apigw/actor/README.md new file mode 100644 index 0000000..c82988e --- /dev/null +++ b/examples/apigw/actor/README.md @@ -0,0 +1,38 @@ +# Sample waSCC Actor + +A sample [waSCC](https://wascc.dev/) actor that uses the AWS Lambda runtime capability provider. + +This actor is identical to the [Krustlet Uppercase](https://github.com/deislabs/krustlet/tree/master/demos/wascc/uppercase) demo, +the only change being that the actor is signed with the `awslambda:runtime` capability instead of the standard HTTP server capability. + +## Build + +#### Install [NKeys](https://github.com/encabulators/nkeys) + +```console +cargo install nkeys --features "cli" +``` + +#### Generate Keys + +```console +make keys +``` + +#### Add `wasm32-unknown-unknown` Compilation Target + +```console +rustup target add wasm32-unknown-unknown +``` + +#### Install [WASCAP](https://github.com/wascc/wascap) + +```console +cargo install wascap --features "cli" +``` + +#### Build Actor + +```console +make release +``` diff --git a/examples/apigw/actor/src/lib.rs b/examples/apigw/actor/src/lib.rs new file mode 100644 index 0000000..4d8f29f --- /dev/null +++ b/examples/apigw/actor/src/lib.rs @@ -0,0 +1,35 @@ +extern crate wascc_actor as actor; + +#[macro_use] +extern crate log; +extern crate serde; +extern crate wascc_codec; + +use actor::prelude::*; +use serde::Serialize; +use wascc_codec::serialize; + +actor_handlers! { + codec::http::OP_HANDLE_REQUEST => uppercase, + codec::core::OP_HEALTH_REQUEST => health +} + +fn uppercase(r: codec::http::Request) -> CallResult { + info!("Query String: {}", r.query_string); + let upper = UppercaseResponse { + original: r.query_string.to_string(), + uppercased: r.query_string.to_ascii_uppercase(), + }; + + Ok(serialize(codec::http::Response::json(upper, 200, "OK"))?) +} + +fn health(_req: codec::core::HealthRequest) -> ReceiveResult { + Ok(vec![]) +} + +#[derive(Serialize)] +struct UppercaseResponse { + original: String, + uppercased: String, +} diff --git a/examples/apigw/main.tf b/examples/apigw/main.tf new file mode 100644 index 0000000..99f6377 --- /dev/null +++ b/examples/apigw/main.tf @@ -0,0 +1,135 @@ +// +// waSCC runtime for AWS Lambda example configuration. +// + +terraform { + required_version = ">= 0.12.19" +} + + +# Build from https://github.com/terraform-providers/terraform-provider-aws/commit/df71a4fd95c0e5a9afe5b08c43a951d3a7fda0ed. +# Will be released in v2.59.0. +# provider "aws" { +# version = ">= 2.58.0" +# } + +// +// Data sources for current AWS account ID, partition and region. +// + +data "aws_caller_identity" "current" {} + +data "aws_partition" "current" {} + +data "aws_region" "current" {} + +// +// API Gateway resources. +// + +resource "aws_apigatewayv2_api" "example" { + name = "waSCC-example-apigw" + protocol_type = "HTTP" + target = aws_lambda_function.example.arn +} + +// +// Lambda resources. +// + +data "aws_lambda_layer_version" "slim" { + layer_name = "wascc-slim" +} + +resource "aws_lambda_function" "example" { + filename = "${path.module}/app.zip" + source_code_hash = filebase64sha256("${path.module}/app.zip") + function_name = "waSCC-example-apigw" + role = aws_iam_role.example.arn + handler = "doesnt.matter" + runtime = "provided" + memory_size = 256 + timeout = 90 + + layers = [data.aws_lambda_layer_version.slim.arn] + + environment { + variables = { + RUST_BACKTRACE = "1" + RUST_LOG = "info,cranelift_wasm=warn,cranelift_codegen=info" + } + } +} + +// See https://docs.aws.amazon.com/lambda/latest/dg/services-apigateway.html#apigateway-permissions. +resource "aws_lambda_permission" "example" { + action = "lambda:InvokeFunction" + function_name = aws_lambda_function.example.function_name + principal = "apigateway.amazonaws.com" + source_arn = "arn:${data.aws_partition.current.partition}:execute-api:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:${aws_apigatewayv2_api.example.id}/*/$default" +} + +// +// IAM resources. +// + +resource "aws_iam_role" "example" { + name = "waSCC-example-apigw-Lambda-role" + + assume_role_policy = < { + /// The request type. + type T: Serialize; + + /// The response type. + type U: Deserialize<'de>; + + /// The operation this dispatcher dispatches. + const OP: &'static str; + + /// Dispatches a request to the specified actor using our dispatcher. + fn dispatch_request(&self, actor: &str, request: Self::T) -> anyhow::Result { + let input = serialize(request).map_err(|e| DispatcherError::RequestSerialization { + source: anyhow!("{}", e), + })?; + + let handler_resp = { + let dispatcher = self.dispatcher(); + let lock = dispatcher.read().unwrap(); + lock.dispatch(actor, Self::OP, &input) + }; + let output = handler_resp.map_err(|e| DispatcherError::NotDispatched { + actor: actor.into(), + op: Self::OP.into(), + source: anyhow!("{}", e), + })?; + + let response = deserialize::(output.as_slice()).map_err(|e| { + DispatcherError::ResponseDeserialization { + source: anyhow!("{}", e), + } + })?; + + Ok(response) + } + + /// Returns a dispatcher. + fn dispatcher(&self) -> Arc>>; + + /// Attempts to dispatch a Lambda invocation event, returning an invocation response. + /// The bodies of the invocation event and response are passed and returned. + fn dispatch_invocation_event(&self, actor: &str, event: &[u8]) -> anyhow::Result>; +} + +/// The invocation request is not an HTTP request. +#[derive(thiserror::Error, Debug)] +#[error("Not an HTTP request")] +pub(crate) struct NotHttpRequestError; + +/// Dispatches HTTP requests. +pub(crate) struct HttpDispatcher { + dispatcher: Arc>>, +} + +impl HttpDispatcher { + /// Returns a new `HttpDispatcher`. + pub fn new(dispatcher: Arc>>) -> Self { + HttpDispatcher { dispatcher } + } + + /// Dispatches an ALB target group request. + fn dispatch_alb_request( + &self, + actor: &str, + request: AlbTargetGroupRequestWrapper, + ) -> anyhow::Result { + info!("HttpDispatcher dispatch ALB target group request"); + Ok(self + .dispatch_request(actor, request.try_into()?)? + .try_into()?) + } + + /// Dispatches an API Gateway proxy request. + fn dispatch_apigw_request( + &self, + actor: &str, + request: ApiGatewayProxyRequestWrapper, + ) -> anyhow::Result { + info!("HttpDispatcher dispatch API Gateway proxy request"); + Ok(self + .dispatch_request(actor, request.try_into()?)? + .try_into()?) + } + + /// Dispatches an API Gateway v2 proxy request. + fn dispatch_apigwv2_request( + &self, + actor: &str, + request: ApiGatewayV2ProxyRequestWrapper, + ) -> anyhow::Result { + info!("HttpDispatcher dispatch API Gateway v2 proxy request"); + Ok(self + .dispatch_request(actor, request.try_into()?)? + .try_into()?) + } +} + +impl Dispatcher<'_> for HttpDispatcher { + type T = wascc_codec::http::Request; + type U = wascc_codec::http::Response; + + const OP: &'static str = wascc_codec::http::OP_HANDLE_REQUEST; + + fn dispatcher(&self) -> Arc>> { + Arc::clone(&self.dispatcher) + } + + /// Attempts to dispatch a Lambda invocation event, returning an invocation response. + /// The bodies of the invocation event and response are passed and returned. + fn dispatch_invocation_event(&self, actor: &str, body: &[u8]) -> anyhow::Result> { + let body = std::str::from_utf8(body).map_err(|e| { + debug!("{}", e); + NotHttpRequestError {} + })?; + + debug!("Lambda invocation event body:\n{}", body); + + match serde_json::from_str(body) { + Ok(request @ alb::AlbTargetGroupRequest { .. }) => { + let response: alb::AlbTargetGroupResponse = + self.dispatch_alb_request(actor, request.into())?.into(); + return serde_json::to_vec(&response).map_err(|e| e.into()); + } + _ => debug!("Not an ALB request"), + }; + match serde_json::from_str(body) { + Ok(request @ apigw::ApiGatewayProxyRequest { .. }) => { + let response: apigw::ApiGatewayProxyResponse = + self.dispatch_apigw_request(actor, request.into())?.into(); + return serde_json::to_vec(&response).map_err(|e| e.into()); + } + _ => debug!("Not an API Gateway proxy request"), + }; + match serde_json::from_str(body) { + Ok(request @ apigw::ApiGatewayV2httpRequest { .. }) => { + let response: apigw::ApiGatewayV2httpResponse = + self.dispatch_apigwv2_request(actor, request.into())?.into(); + return serde_json::to_vec(&response).map_err(|e| e.into()); + } + _ => debug!("Not an API Gateway v2 proxy request"), + }; + + Err(NotHttpRequestError {}.into()) + } +} + +/// Dispatches Lambda raw events. +pub(crate) struct RawEventDispatcher { + dispatcher: Arc>>, +} + +impl RawEventDispatcher { + /// Returns a new `RawEventDispatcher`. + pub fn new(dispatcher: Arc>>) -> Self { + RawEventDispatcher { dispatcher } + } +} + +impl Dispatcher<'_> for RawEventDispatcher { + type T = codec::Event; + type U = codec::Response; + + const OP: &'static str = codec::OP_HANDLE_EVENT; + + fn dispatcher(&self) -> Arc>> { + Arc::clone(&self.dispatcher) + } + + /// Attempts to dispatch a Lambda invocation event, returning an invocation response. + /// The bodies of the invocation event and response are passed and returned. + fn dispatch_invocation_event(&self, actor: &str, body: &[u8]) -> anyhow::Result> { + let raw_event = codec::Event { + body: body.to_vec(), + }; + + Ok(self.dispatch_request(actor, raw_event)?.body) + } +} diff --git a/provider/src/http.rs b/provider/src/http.rs new file mode 100644 index 0000000..1cc371a --- /dev/null +++ b/provider/src/http.rs @@ -0,0 +1,260 @@ +// Copyright 2015-2020 Capital One Services, LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +// waSCC AWS Lambda Runtime Provider +// + +use aws_lambda_events::event::{alb, apigw}; + +use std::collections::HashMap; +use std::convert::TryFrom; + +pub(crate) struct AlbTargetGroupRequestWrapper(alb::AlbTargetGroupRequest); + +impl From for AlbTargetGroupRequestWrapper { + /// Converts an ALB target group request to an instance of the wrapper type. + fn from(request: alb::AlbTargetGroupRequest) -> Self { + AlbTargetGroupRequestWrapper(request) + } +} + +impl TryFrom for wascc_codec::http::Request { + type Error = anyhow::Error; + + /// Attempts conversion of an ALB target group request to an actor's HTTP request. + fn try_from(request: AlbTargetGroupRequestWrapper) -> anyhow::Result { + let query_string = query_string(request.0.query_string_parameters); + + Ok(wascc_codec::http::Request { + method: request + .0 + .http_method + .ok_or(anyhow!("Missing method in ALB target group request"))?, + path: request + .0 + .path + .ok_or(anyhow!("Missing path in ALB target group request"))?, + query_string, + header: request.0.headers, + body: match request.0.body { + Some(s) if request.0.is_base64_encoded => base64::decode(s)?, + Some(s) => s.into_bytes(), + None => vec![], + }, + }) + } +} + +pub(crate) struct AlbTargetGroupResponseWrapper(alb::AlbTargetGroupResponse); + +impl From for alb::AlbTargetGroupResponse { + /// Converts instance of the wrapper type to an ALB response. + fn from(response: AlbTargetGroupResponseWrapper) -> Self { + response.0 + } +} + +impl From for AlbTargetGroupResponseWrapper { + /// Converts instance of an ALB response to the wrapper type. + fn from(response: alb::AlbTargetGroupResponse) -> Self { + AlbTargetGroupResponseWrapper(response) + } +} + +impl TryFrom for AlbTargetGroupResponseWrapper { + type Error = anyhow::Error; + + /// Attempts conversion of an actor's HTTP response to an ALB response. + fn try_from(response: wascc_codec::http::Response) -> anyhow::Result { + let (body, is_base64_encoded) = body_string(response.body); + + Ok(alb::AlbTargetGroupResponse { + status_code: response.status_code as i64, + status_description: Some(response.status), + headers: response.header, + multi_value_headers: HashMap::new(), + body, + is_base64_encoded, + } + .into()) + } +} + +pub(crate) struct ApiGatewayProxyRequestWrapper(apigw::ApiGatewayProxyRequest); + +impl From for ApiGatewayProxyRequestWrapper { + /// Converts an API Gateway proxy request to an instance of the wrapper type. + fn from(request: apigw::ApiGatewayProxyRequest) -> Self { + ApiGatewayProxyRequestWrapper(request) + } +} + +impl TryFrom for wascc_codec::http::Request { + type Error = anyhow::Error; + + /// Attempts conversion of an API Gateway proxy request to an actor's HTTP request. + fn try_from(request: ApiGatewayProxyRequestWrapper) -> anyhow::Result { + let query_string = query_string(request.0.query_string_parameters); + + Ok(wascc_codec::http::Request { + method: request + .0 + .http_method + .ok_or(anyhow!("Missing method in API Gateway proxy request"))?, + path: request + .0 + .path + .ok_or(anyhow!("Missing path in API Gateway proxy request"))?, + query_string, + header: request.0.headers, + body: match request.0.body { + Some(s) if request.0.is_base64_encoded.is_some() => base64::decode(s)?, + Some(s) => s.into_bytes(), + None => vec![], + }, + }) + } +} + +pub(crate) struct ApiGatewayProxyResponseWrapper(apigw::ApiGatewayProxyResponse); + +impl From for apigw::ApiGatewayProxyResponse { + /// Converts instance of the wrapper type to an API Gateway proxy response. + fn from(response: ApiGatewayProxyResponseWrapper) -> Self { + response.0 + } +} + +impl From for ApiGatewayProxyResponseWrapper { + /// Converts instance of an API Gateway proxy response to the wrapper type. + fn from(response: apigw::ApiGatewayProxyResponse) -> Self { + ApiGatewayProxyResponseWrapper(response) + } +} + +impl TryFrom for ApiGatewayProxyResponseWrapper { + type Error = anyhow::Error; + + /// Attempts conversion of an actor's HTTP response to an API Gateway proxy response. + fn try_from(response: wascc_codec::http::Response) -> anyhow::Result { + let (body, is_base64_encoded) = body_string(response.body); + + Ok(apigw::ApiGatewayProxyResponse { + status_code: response.status_code as i64, + headers: response.header, + multi_value_headers: HashMap::new(), + body, + is_base64_encoded: Some(is_base64_encoded), + } + .into()) + } +} + +pub(crate) struct ApiGatewayV2ProxyRequestWrapper(apigw::ApiGatewayV2httpRequest); + +impl From for ApiGatewayV2ProxyRequestWrapper { + /// Converts an API Gateway v2 proxy request to an instance of the wrapper type. + fn from(request: apigw::ApiGatewayV2httpRequest) -> Self { + ApiGatewayV2ProxyRequestWrapper(request) + } +} + +impl TryFrom for wascc_codec::http::Request { + type Error = anyhow::Error; + + /// Attempts conversion of an API Gateway v2 proxy request to an actor's HTTP request. + fn try_from(request: ApiGatewayV2ProxyRequestWrapper) -> anyhow::Result { + let query_string = query_string(request.0.query_string_parameters); + + Ok(wascc_codec::http::Request { + method: request + .0 + .request_context + .http + .method + .ok_or(anyhow!("Missing method in API Gateway v2 proxy request"))?, + path: request + .0 + .request_context + .http + .path + .ok_or(anyhow!("Missing path in API Gateway v2 proxy request"))?, + query_string, + header: request.0.headers, + body: match request.0.body { + Some(s) if request.0.is_base64_encoded => base64::decode(s)?, + Some(s) => s.into_bytes(), + None => vec![], + }, + }) + } +} + +pub(crate) struct ApiGatewayV2ProxyResponseWrapper(apigw::ApiGatewayV2httpResponse); + +impl From for apigw::ApiGatewayV2httpResponse { + /// Converts instance of the wrapper type to an API Gateway v2 proxy response. + fn from(response: ApiGatewayV2ProxyResponseWrapper) -> Self { + response.0 + } +} + +impl From for ApiGatewayV2ProxyResponseWrapper { + /// Converts instance of an API Gateway v2 proxy response to the wrapper type. + fn from(response: apigw::ApiGatewayV2httpResponse) -> Self { + ApiGatewayV2ProxyResponseWrapper(response) + } +} + +impl TryFrom for ApiGatewayV2ProxyResponseWrapper { + type Error = anyhow::Error; + + /// Attempts conversion of an actor's HTTP response to an API Gateway v2 proxy response. + fn try_from(response: wascc_codec::http::Response) -> anyhow::Result { + let (body, is_base64_encoded) = body_string(response.body); + + Ok(apigw::ApiGatewayV2httpResponse { + status_code: response.status_code as i64, + headers: response.header, + multi_value_headers: HashMap::new(), + body, + is_base64_encoded: Some(is_base64_encoded), + cookies: Vec::new(), + } + .into()) + } +} + +/// Returns a string representation of the specified bytes and +/// a flag indicating whether or not the string is base64 encoded. +fn body_string(bytes: Vec) -> (Option, bool) { + if bytes.is_empty() { + return (None, false); + } + + match std::str::from_utf8(&bytes) { + Ok(s) => (Some(s.into()), false), + Err(_) => (Some(base64::encode(bytes)), true), + } +} + +/// Returns a string representation of the specified query string parameters. +fn query_string(qs: HashMap) -> String { + url::form_urlencoded::Serializer::new(String::new()) + .extend_pairs(qs.iter()) + .finish() +} + +// TODO Handle multi_value_query_string_parameters. diff --git a/provider/src/lambda.rs b/provider/src/lambda.rs index 3eb9c91..38f7f4c 100644 --- a/provider/src/lambda.rs +++ b/provider/src/lambda.rs @@ -18,7 +18,6 @@ use reqwest::header::USER_AGENT; use serde_json; -use std::error::Error; /// Represents an invocation event. pub(crate) struct InvocationEvent { @@ -35,7 +34,7 @@ pub(crate) struct InvocationResponse { /// Represents an invocation error. pub(crate) struct InvocationError { - error: Box, + error: anyhow::Error, request_id: String, } @@ -181,7 +180,7 @@ impl InvocationResponse { impl InvocationError { /// Creates a new `InvocationError` with the specified error and request ID. - pub fn new(error: Box, request_id: &str) -> Self { + pub fn new(error: anyhow::Error, request_id: &str) -> Self { InvocationError { error: error, request_id: request_id.into(), diff --git a/provider/src/lib.rs b/provider/src/lib.rs index 28e189d..3b94f8e 100644 --- a/provider/src/lib.rs +++ b/provider/src/lib.rs @@ -21,16 +21,22 @@ extern crate anyhow; #[macro_use] extern crate log; -use std::collections::HashMap; -use std::env; -use wascc_codec::capabilities::{CapabilityProvider, Dispatcher, NullDispatcher}; +use wascc_codec::capabilities::CapabilityProvider; use wascc_codec::core::{CapabilityConfiguration, OP_BIND_ACTOR, OP_REMOVE_ACTOR}; -use wascc_codec::{deserialize, serialize}; +use wascc_codec::deserialize; +use std::collections::HashMap; +use std::env; use std::error::Error; use std::sync::{Arc, RwLock}; use std::thread; +use crate::dispatch::{ + Dispatcher, DispatcherError, HttpDispatcher, NotHttpRequestError, RawEventDispatcher, +}; + +mod dispatch; +mod http; mod lambda; const CAPABILITY_ID: &str = "awslambda:runtime"; @@ -39,14 +45,14 @@ const CAPABILITY_ID: &str = "awslambda:runtime"; /// Represents a waSCC AWS Lambda runtime provider. pub struct AwsLambdaRuntimeProvider { - dispatcher: Arc>>, + dispatcher: Arc>>, shutdown: Arc>>, } /// Polls the Lambda event machinery. struct Poller { client: lambda::Client, - dispatcher: Arc>>, + dispatcher: Arc>>, module_id: String, shutdown: Arc>>, } @@ -55,7 +61,9 @@ impl Default for AwsLambdaRuntimeProvider { // Returns the default value for `AwsLambdaRuntimeProvider`. fn default() -> Self { AwsLambdaRuntimeProvider { - dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))), + dispatcher: Arc::new(RwLock::new(Box::new( + wascc_codec::capabilities::NullDispatcher::new(), + ))), shutdown: Arc::new(RwLock::new(HashMap::new())), } } @@ -127,7 +135,10 @@ impl CapabilityProvider for AwsLambdaRuntimeProvider { } /// Called when the host runtime is ready and has configured a dispatcher. - fn configure_dispatch(&self, dispatcher: Box) -> Result<(), Box> { + fn configure_dispatch( + &self, + dispatcher: Box, + ) -> Result<(), Box> { debug!("awslambda:runtime configure_dispatch"); let mut lock = self.dispatcher.write().unwrap(); @@ -160,7 +171,7 @@ impl Poller { fn new( module_id: &str, endpoint: &str, - dispatcher: Arc>>, + dispatcher: Arc>>, shutdown: Arc>>, ) -> Self { Poller { @@ -173,6 +184,9 @@ impl Poller { /// Runs the poller until shutdown. fn run(&self) { + let http_dispatcher = HttpDispatcher::new(Arc::clone(&self.dispatcher)); + let raw_event_dispatcher = RawEventDispatcher::new(Arc::clone(&self.dispatcher)); + loop { if self.shutdown() { break; @@ -181,18 +195,21 @@ impl Poller { // Get next event. debug!("Poller get next event"); let event = match self.client.next_invocation_event() { - Err(err) => { - error!("{}", err); + Err(e) => { + error!("{}", e); continue; } Ok(evt) => match evt { - None => continue, + None => { + warn!("No event"); + continue; + } Some(event) => event, }, }; let request_id = match event.request_id() { None => { - warn!("Missing request ID"); + warn!("No request ID"); continue; } Some(request_id) => request_id, @@ -203,41 +220,75 @@ impl Poller { env::set_var("_X_AMZN_TRACE_ID", trace_id); } - // Call handler. - debug!("Poller call handler"); - let handler_resp = { - let event = codec::Event { - body: event.body().to_vec(), - }; - let buf = serialize(event).unwrap(); - let lock = self.dispatcher.read().unwrap(); - lock.dispatch(&self.module_id, codec::OP_HANDLE_EVENT, &buf) - }; - // Handle response or error. - match handler_resp { - Ok(r) => { - let r = deserialize::(r.as_slice()).unwrap(); - let resp = lambda::InvocationResponse::new(r.body, request_id); - debug!("Poller send response"); - match self.client.send_invocation_response(resp) { - Ok(_) => {} - Err(e) => error!("Unable to send invocation response: {}", e), + // Try first to dispatch as an HTTP request. + match http_dispatcher.dispatch_invocation_event(&self.module_id, event.body()) { + // The invocation event could be converted to an HTTP request and was dispatched succesfully. + Ok(body) => { + self.send_invocation_response(body, request_id); + continue; + } + // The event couldn't be converted to an HTTP request. + // Dispatch as a Lambda raw event. + Err(e) if e.is::() => info!("{}", e), + Err(e) if e.is::() => { + match e.downcast_ref::().unwrap() { + // The event could be converted to an HTTP request but couldn't be serialized. + // Dispatch as a Lambda raw event. + e @ DispatcherError::RequestSerialization { .. } => warn!("{}", e), + // The event could be converted to an HTTP request but wasn't dispatched to the actor. + // Dispatch as a Lambda raw event. + e @ DispatcherError::NotDispatched { .. } => warn!("{}", e), + // The event could be converted to an HTTP request and was + // dispatched succesfully but there was an error after dispatch, + // Fail the invocation. + _ => { + error!("{}", e); + self.send_invocation_error(e, request_id); + continue; + } } } + // Some other error. + // Fail the invocation. Err(e) => { - error!("Guest failed to handle Lambda event: {}", e); - let err = lambda::InvocationError::new(e, request_id); - debug!("Poller send error"); - match self.client.send_invocation_error(err) { - Ok(_) => {} - Err(e) => error!("Unable to send invocation error: {}", e), - } + error!("{}", e); + self.send_invocation_error(e, request_id); + continue; + } + }; + + // Dispatch as a Lambda raw event. + match raw_event_dispatcher.dispatch_invocation_event(&self.module_id, event.body()) { + Ok(body) => self.send_invocation_response(body, request_id), + Err(e) => { + error!("{}", e); + self.send_invocation_error(e, request_id) } } } } - // Returns whether the shutdown flag is set. + /// Sends an invocation error. + fn send_invocation_error(&self, e: anyhow::Error, request_id: &str) { + let err = lambda::InvocationError::new(e, request_id); + debug!("Poller send error"); + match self.client.send_invocation_error(err) { + Ok(_) => {} + Err(e) => error!("Unable to send invocation error: {}", e), + } + } + + /// Sends an invocation response. + fn send_invocation_response(&self, body: Vec, request_id: &str) { + let resp = lambda::InvocationResponse::new(body, request_id); + debug!("Poller send response"); + match self.client.send_invocation_response(resp) { + Ok(_) => {} + Err(e) => error!("Unable to send invocation response: {}", e), + } + } + + /// Returns whether the shutdown flag is set. fn shutdown(&self) -> bool { *self.shutdown.read().unwrap().get(&self.module_id).unwrap() } diff --git a/runtime/src/main.rs b/runtime/src/main.rs index 66e8c50..0fd74bb 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -48,7 +48,7 @@ fn main() -> anyhow::Result<()> { info!("Logger already intialized"); } - info!("aws-lambda-wascc-runtime starting"); + info!("aws-lambda-wascc-runtime {} starting", env!("CARGO_PKG_VERSION")); let host = WasccHost::new();