-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Custom Lambda Runtime #276
base: main
Are you sure you want to change the base?
Changes from 1 commit
7ea939b
26b1622
95efe18
be96317
ebf5e97
6f990ad
f6a904b
defb679
c95cecb
2de1748
b7e6054
b07406c
0eb162e
24c4261
602460b
25403d0
a65d045
206ec6a
aa06238
39562d6
108c2cb
016df57
7ce4f85
fe73a65
ed8cb7d
f2b7151
0cf6033
e3f8553
7c69d9e
cfd8f0a
accb4b6
6acf432
2449d3e
9d79651
3c4974c
2548296
35d797f
93d55aa
af3b527
f3022e4
d630597
366248a
ab6fa05
54c3032
b5e3019
d6de562
13aa8e9
3517cb7
ffa7bb3
f1f7a2c
f83a9de
c700129
9a49c7d
d53f8be
f284899
3da24a9
8cdfc9b
541704e
89d1c69
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package feral.lambda.runtime | ||
|
||
import io.circe.Encoder | ||
|
||
final class LambdaErrorRequest(val errorMessage: String, val errorType: String, val stackTrace: List[String]) | ||
|
||
object LambdaErrorRequest { | ||
def apply(errorMessage: String, errorType: String, stackTrace: List[String]) = | ||
new LambdaErrorRequest(errorMessage, errorType, stackTrace) | ||
|
||
implicit val encoder: Encoder[LambdaErrorRequest] = | ||
Encoder.forProduct3("errorMessage", "errorType", "stackTrace")(e => | ||
(e.errorMessage, e.errorType, e.stackTrace)) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,21 +1,61 @@ | ||
package feral.lambda.runtime | ||
|
||
import cats.ApplicativeError | ||
import cats.effect.kernel.Concurrent | ||
import io.circe._ | ||
import cats.syntax.all._ | ||
import feral.lambda.{ClientContext, ClientContextClient, ClientContextEnv, CognitoIdentity} | ||
import io.circe.syntax.EncoderOps | ||
import org.http4s.circe._ | ||
import org.http4s.{EntityDecoder, Headers, Response, headers} | ||
import org.typelevel.ci.CIString | ||
import java.util.concurrent.TimeUnit | ||
import scala.concurrent.duration.FiniteDuration | ||
|
||
import java.time.Instant | ||
import org.http4s.Response | ||
|
||
case class LambdaRequest( | ||
deadlineTimeInMs: Instant, | ||
id: String, | ||
invokedFunctionArn: String, | ||
body: Json | ||
final class LambdaRequest( | ||
val deadlineTimeInMs: FiniteDuration, | ||
val id: String, | ||
val invokedFunctionArn: String, | ||
val identity: Option[CognitoIdentity], | ||
val clientContext: Option[ClientContext], | ||
val body: Json | ||
) | ||
|
||
object LambdaRequest { | ||
def fromResponse[F[_]](response: Response[F])(implicit F: ApplicativeError[F, Throwable]): F[LambdaRequest] = { | ||
??? | ||
private[this] final val requestIdHeader = "Lambda-Runtime-Aws-Request-id" | ||
private[this] final val invokedFunctionArnHeader = "Lambda-Runtime-Invoked-Function-Arn" | ||
private[this] final val deadlineTimeHeader = "Lambda-Runtime-Deadline-Ms" | ||
private[this] final val cognitoIdentityHeader = "Lambda-Runtime-Cognito-Identity" | ||
private[this] final val clientContextHeader = "Lambda-Runtime-Client-Context" | ||
def fromResponse[F[_]](response: Response[F])(implicit F: Concurrent[F]): F[LambdaRequest] = { | ||
implicit val jsonDecoder: EntityDecoder[F, Json] = jsonDecoderIncremental | ||
for { // TODO use custom header model for aws headers instead | ||
id <- F.fromOption(response.headers.get(CIString(requestIdHeader)), new NoSuchElementException(requestIdHeader)).map(_.head.value) | ||
invokedFunctionArn <- F.fromOption(response.headers.get(CIString(invokedFunctionArnHeader)), new NoSuchElementException(invokedFunctionArnHeader)).map(_.head.value) | ||
deadlineTimeInMs <- F.fromOption(response.headers.get(CIString(deadlineTimeHeader)), new NoSuchElementException(deadlineTimeHeader)).map(_.head.value.toLong) | ||
identity <- F.pure(response.headers.get(CIString(cognitoIdentityHeader)).flatMap(_.head.value.asJson.as[CognitoIdentity].toOption)) | ||
clientContext <- F.pure(response.headers.get(CIString(clientContextHeader)).flatMap(_.head.value.asJson.as[ClientContext].toOption)) | ||
body <- F.rethrow(jsonDecoder.decode(response, strict = false).value) // basically a reimplementation of response.as[Json] from dsl | ||
} yield { | ||
new LambdaRequest(FiniteDuration.apply(deadlineTimeInMs, TimeUnit.MILLISECONDS), id, invokedFunctionArn, identity, clientContext, body) | ||
} | ||
} | ||
|
||
implicit val cognitoIdentityDecoder: Decoder[CognitoIdentity] = | ||
Decoder.forProduct2("identity_id", "identity_pool_id")((identityId: String, poolId: String) => // field names are just a guess for just now until I find the schema | ||
new CognitoIdentity(identityId, poolId)) | ||
|
||
implicit val clientContextDecoder: Decoder[ClientContext] = Decoder.forProduct4("client", "custom", "env", "services")( | ||
(client: ClientContextClient, custom: JsonObject, env: ClientContextEnv, _: JsonObject) => | ||
new ClientContext(client, env, custom) | ||
) | ||
|
||
implicit val clientContextClientDecoder: Decoder[ClientContextClient] = Decoder.forProduct5("client_id", "app_title", "app_version_name", "app_version_code", "app_package_name")( | ||
(clientId: String, appTitle: String, appVersionName: String, appVersionCode: String, appPackageName: String) => | ||
new ClientContextClient(clientId, appTitle, appVersionName, appVersionCode, appPackageName) | ||
) | ||
|
||
implicit val clientContextEnvDecoder: Decoder[ClientContextEnv] = Decoder.forProduct5("platform", "model", "make", "platform_version", "locale")( | ||
(platform: String, model: String, make: String, platformVersion: String, locale: String) => | ||
new ClientContextEnv(platformVersion, platform, make, model, locale) | ||
) | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -18,50 +18,58 @@ package feral.lambda | |||||
package runtime | ||||||
|
||||||
import cats.syntax.all._ | ||||||
import cats.effect.kernel.Sync | ||||||
import cats.effect.syntax.all._ | ||||||
import cats.effect.kernel.{Async, Concurrent, Sync} | ||||||
import io.circe.Json | ||||||
import org.http4s.Method.POST | ||||||
import org.http4s.client.Client | ||||||
import org.http4s.circe.jsonEncoderWithPrinter | ||||||
import org.http4s.circe._ | ||||||
|
||||||
import scala.concurrent.duration.FiniteDuration | ||||||
import java.util.concurrent.TimeUnit | ||||||
import org.http4s.{EntityEncoder, Uri} | ||||||
import org.http4s.{EntityEncoder, Request, Uri} | ||||||
import org.http4s.client.dsl.Http4sClientDsl | ||||||
import io.circe._ | ||||||
import cats.effect.kernel.Async | ||||||
import cats.effect.kernel.Outcome._ | ||||||
import cats.effect.std.Env | ||||||
import io.circe.syntax.EncoderOps | ||||||
import org.http4s.implicits.http4sLiteralsSyntax | ||||||
|
||||||
// TODO apply function error handling | ||||||
// How to run global or static code from handler? | ||||||
// TODO Custom AWS header models | ||||||
// TODO CognitoIdentity/ClientContext JSON encoding | ||||||
import scala.concurrent.CancellationException | ||||||
|
||||||
object FeralLambdaRuntime { | ||||||
|
||||||
final val ApiVersion = "2018-06-01" | ||||||
|
||||||
def apply[F[_]](client: Client[F])(handler: (Json, Context[F]) => F[Json])(implicit F: Async[F]): F[Unit] = { | ||||||
implicit val lambdaEnv: LambdaRuntimeEnv[F] = LambdaRuntimeEnv(Env.make) // maybe better way | ||||||
// TODO find out where to use initErrorUrl | ||||||
def apply[F[_]](client: Client[F])(handler: (Json, Context[F]) => F[Json])(implicit F: Async[F], env: LambdaRuntimeEnv[F]): F[Unit] = { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can make this
Suggested change
|
||||||
implicit val jsonEncoder: EntityEncoder[F, Json] = jsonEncoderWithPrinter[F](Printer.noSpaces.copy(dropNullValues = true)) | ||||||
val http4sClientDsl = new Http4sClientDsl[F] {} | ||||||
import http4sClientDsl._ | ||||||
(for { | ||||||
runtimeApi <- lambdaEnv.lambdaRuntimeApi | ||||||
runtimeApi <- env.lambdaRuntimeApi | ||||||
request <- client.get(getRuntimeUrl(runtimeApi))(LambdaRequest.fromResponse) | ||||||
context <- createContext(request) | ||||||
result <- handler(request.body, context) | ||||||
invocationUrl = getInvocationUrl(LambdaRuntimeEnv.AWS_LAMBDA_RUNTIME_API, request.id) | ||||||
_ <- client.successful(POST(result, invocationUrl)) | ||||||
handlerFiber <- handler(request.body, context).start | ||||||
outcome <- handlerFiber.join | ||||||
invocationErrorUrl = getInvocationErrorUrl(runtimeApi, request.id) | ||||||
result <- outcome match { | ||||||
case Succeeded(result: F[Json]) => result | ||||||
case Errored(e: Throwable) => | ||||||
val error = LambdaErrorRequest(e.getMessage, "exception", List()) | ||||||
client.expect[Unit](Request(POST, invocationErrorUrl).withEntity(error.asJson)) >> F.raiseError(e) | ||||||
case Canceled() => | ||||||
val error = LambdaErrorRequest("cancelled", "cancellation", List()) // TODO need to think about better messages | ||||||
client.expect[Unit](Request(POST, invocationErrorUrl).withEntity(error.asJson)) >> F.raiseError(new CancellationException) // is this correct behaviour | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, we should not raise a The idea behind the running the handler on a separate fiber, and then joining that fiber to check the outcome, is so that if the handler self-cancels for a particular event, the Lambda can continue processing additional events. |
||||||
} | ||||||
invocationUrl = getInvocationUrl(runtimeApi, request.id) | ||||||
_ <- client.expect[Unit](Request(POST, invocationUrl).withEntity(result)) | ||||||
} yield ()).foreverM | ||||||
}.void | ||||||
} | ||||||
|
||||||
private def createContext[F[_]](request: LambdaRequest)(implicit F: Sync[F], lambdaEnv: LambdaRuntimeEnv[F]): F[Context[F]] = for { | ||||||
functionName <- lambdaEnv.lambdaFunctionName | ||||||
functionVersion <- lambdaEnv.lambdaFunctionVersion | ||||||
functionMemorySize <- lambdaEnv.lambdaFunctionMemorySize | ||||||
logGroupName <- lambdaEnv.lambdaLogGroupName | ||||||
logStreamName <- lambdaEnv.lambdaLogStreamName | ||||||
private def createContext[F[_]](request: LambdaRequest)(implicit F: Sync[F], env: LambdaRuntimeEnv[F]): F[Context[F]] = for { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Eh, since it's a private method you can just re-use the |
||||||
functionName <- env.lambdaFunctionName | ||||||
functionVersion <- env.lambdaFunctionVersion | ||||||
functionMemorySize <- env.lambdaFunctionMemorySize | ||||||
logGroupName <- env.lambdaLogGroupName | ||||||
logStreamName <- env.lambdaLogStreamName | ||||||
} yield { | ||||||
new Context[F]( | ||||||
functionName, | ||||||
|
@@ -71,18 +79,18 @@ object FeralLambdaRuntime { | |||||
request.id, | ||||||
logGroupName, | ||||||
logStreamName, | ||||||
None, | ||||||
None, | ||||||
F.realTimeInstant.map(curTime => FiniteDuration(request.deadlineTimeInMs.toEpochMilli - curTime.toEpochMilli, TimeUnit.MILLISECONDS))/// how to provide test version?, maybe separate Clock parameter? | ||||||
request.identity, | ||||||
request.clientContext, | ||||||
F.realTime.map(curTime => request.deadlineTimeInMs - curTime) | ||||||
) | ||||||
} | ||||||
|
||||||
private def getRuntimeUrl(api: String) = uri"http://$api/$ApiVersion/runtime/invocation/next" | ||||||
private def getRuntimeUrl(api: String) = Uri.unsafeFromString("http://$api/$ApiVersion/runtime/invocation/next") | ||||||
|
||||||
private def getInvocationUrl(api: String, id: String) = uri"http://$api/$ApiVersion/runtime/invocation/$id/response" | ||||||
private def getInvocationUrl(api: String, id: String) = Uri.unsafeFromString("http://$api/$ApiVersion/runtime/invocation/$id/response") | ||||||
|
||||||
private def getInitErrorUrl(api: String) = uri"http://$api/$ApiVersion/runtime/init/error" | ||||||
private def getInitErrorUrl(api: String) = Uri.unsafeFromString("http://$api/$ApiVersion/runtime/init/error") | ||||||
|
||||||
private def getInvocationErrorUrl(api: String, errorType: String) = uri"http://$api/$ApiVersion/runtime/invocation/$errorType/error" | ||||||
private def getInvocationErrorUrl(api: String, requestId: String) = Uri.unsafeFromString("http://$api/$ApiVersion/runtime/invocation/$requestId/error") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should avoid the |
||||||
|
||||||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,6 +1,6 @@ | ||||||
package feral.lambda.runtime | ||||||
|
||||||
import cats.{ApplicativeError, Functor} | ||||||
import cats.{ApplicativeError, Functor, MonadThrow} | ||||||
import cats.effect.kernel.Sync | ||||||
import cats.effect.std.Env | ||||||
import cats.syntax.all._ | ||||||
|
@@ -23,48 +23,53 @@ trait LambdaRuntimeEnv[F[_]] { | |||||
} | ||||||
|
||||||
object LambdaRuntimeEnv { | ||||||
final val HANDLER = "_HANDLER" | ||||||
final val AWS_REGION = "AWS_REGION" | ||||||
final val AWS_EXECUTION_ENV = "AWS_EXECUTION_ENV" | ||||||
final val AWS_LAMBDA_FUNCTION_NAME = "AWS_LAMBDA_FUNCTION_NAME" | ||||||
final val AWS_LAMBDA_FUNCTION_MEMORY_SIZE = "AWS_LAMBDA_FUNCTION_MEMORY_SIZE" | ||||||
final val AWS_LAMBDA_FUNCTION_VERSION = "AWS_LAMBDA_FUNCTION_VERSION" | ||||||
final val AWS_LAMBDA_LOG_GROUP_NAME = "AWS_LAMBDA_LOG_GROUP_NAME" | ||||||
final val AWS_LAMBDA_LOG_STREAM_NAME = "AWS_LAMBDA_LOG_STREAM_NAME" | ||||||
final val AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID" | ||||||
final val AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY" | ||||||
final val AWS_LAMBDA_RUNTIME_API = "AWS_LAMBDA_RUNTIME_API" | ||||||
final val LAMBDA_TASK_ROOT = "LAMBDA_TASK_ROOT" | ||||||
final val LAMBDA_RUNTIME_DIR = "LAMBDA_RUNTIME_DIR" | ||||||
final val TZ = "TZ" | ||||||
private[this] final val HANDLER = "_HANDLER" | ||||||
private[this] final val AWS_REGION = "AWS_REGION" | ||||||
private[this] final val AWS_EXECUTION_ENV = "AWS_EXECUTION_ENV" | ||||||
private[this] final val AWS_LAMBDA_FUNCTION_NAME = "AWS_LAMBDA_FUNCTION_NAME" | ||||||
private[this] final val AWS_LAMBDA_FUNCTION_MEMORY_SIZE = "AWS_LAMBDA_FUNCTION_MEMORY_SIZE" | ||||||
private[this] final val AWS_LAMBDA_FUNCTION_VERSION = "AWS_LAMBDA_FUNCTION_VERSION" | ||||||
private[this] final val AWS_LAMBDA_LOG_GROUP_NAME = "AWS_LAMBDA_LOG_GROUP_NAME" | ||||||
private[this] final val AWS_LAMBDA_LOG_STREAM_NAME = "AWS_LAMBDA_LOG_STREAM_NAME" | ||||||
private[this] final val AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID" | ||||||
private[this] final val AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY" | ||||||
private[this] final val AWS_LAMBDA_RUNTIME_API = "AWS_LAMBDA_RUNTIME_API" | ||||||
private[this] final val LAMBDA_TASK_ROOT = "LAMBDA_TASK_ROOT" | ||||||
private[this] final val LAMBDA_RUNTIME_DIR = "LAMBDA_RUNTIME_DIR" | ||||||
private[this] final val TZ = "TZ" | ||||||
|
||||||
def apply[F[_]: Functor](env: Env[F]): LambdaRuntimeEnv[F] = new LambdaRuntimeEnv[F] { // should be error effect? | ||||||
override def handler: F[String] = env.get(HANDLER).map(_.get) | ||||||
def apply[F[_]](implicit lre: LambdaRuntimeEnv[F]): LambdaRuntimeEnv[F] = lre | ||||||
|
||||||
override def region: F[String] = env.get(AWS_REGION).map(_.get) | ||||||
implicit def forEnv[F[_]: MonadThrow](implicit env: Env[F]): LambdaRuntimeEnv[F] = new LambdaRuntimeEnv[F] { | ||||||
override def handler: F[String] = getOrThrow(HANDLER) | ||||||
|
||||||
override def executionEnv: F[String] = env.get(AWS_EXECUTION_ENV).map(_.get) | ||||||
override def region: F[String] = getOrThrow(AWS_REGION) | ||||||
|
||||||
override def lambdaFunctionName: F[String] = env.get(AWS_LAMBDA_FUNCTION_NAME).map(_.get) | ||||||
override def executionEnv: F[String] = getOrThrow(AWS_EXECUTION_ENV) | ||||||
|
||||||
override def lambdaFunctionMemorySize: F[Int] = env.get(AWS_LAMBDA_FUNCTION_MEMORY_SIZE).map(_.get) | ||||||
override def lambdaFunctionName: F[String] = getOrThrow(AWS_LAMBDA_FUNCTION_NAME) | ||||||
|
||||||
override def lambdaFunctionVersion: F[String] = env.get(AWS_LAMBDA_FUNCTION_VERSION).map(_.get) | ||||||
override def lambdaFunctionMemorySize: F[Int] = getOrThrow(AWS_LAMBDA_FUNCTION_MEMORY_SIZE).map(_.toInt) // TODO better way? | ||||||
|
||||||
override def lambdaLogGroupName: F[String] = env.get(AWS_LAMBDA_LOG_GROUP_NAME).map(_.get) | ||||||
override def lambdaFunctionVersion: F[String] = getOrThrow(AWS_LAMBDA_FUNCTION_VERSION) | ||||||
|
||||||
override def lambdaLogStreamName: F[String] = env.get(AWS_LAMBDA_LOG_STREAM_NAME).map(_.get) | ||||||
override def lambdaLogGroupName: F[String] = getOrThrow(AWS_LAMBDA_LOG_GROUP_NAME) | ||||||
|
||||||
override def accessKeyId: F[String] = env.get(AWS_ACCESS_KEY_ID).map(_.get) | ||||||
override def lambdaLogStreamName: F[String] = getOrThrow(AWS_LAMBDA_LOG_STREAM_NAME) | ||||||
|
||||||
override def secretAccessKey: F[String] = env.get(AWS_SECRET_ACCESS_KEY).map(_.get) | ||||||
override def accessKeyId: F[String] = getOrThrow(AWS_ACCESS_KEY_ID) | ||||||
|
||||||
override def lambdaRuntimeApi: F[String] = env.get(AWS_LAMBDA_RUNTIME_API).map(_.get) | ||||||
override def secretAccessKey: F[String] = getOrThrow(AWS_SECRET_ACCESS_KEY) | ||||||
|
||||||
override def lambdaTaskRoot: F[String] = env.get(LAMBDA_TASK_ROOT).map(_.get) | ||||||
override def lambdaRuntimeApi: F[String] = getOrThrow(AWS_LAMBDA_RUNTIME_API) | ||||||
|
||||||
override def lambdaRuntimeDir: F[String] = env.get(LAMBDA_RUNTIME_DIR).map(_.get) | ||||||
override def lambdaTaskRoot: F[String] = getOrThrow(LAMBDA_TASK_ROOT) | ||||||
|
||||||
override def timezone: F[String] = env.get(TZ).map(_.get) | ||||||
override def lambdaRuntimeDir: F[String] = getOrThrow(LAMBDA_RUNTIME_DIR) | ||||||
|
||||||
override def timezone: F[String] = getOrThrow(TZ) | ||||||
|
||||||
private[this] def getOrThrow(envName: String): F[String] = | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
env.get(envName).flatMap(_.liftTo(new NoSuchElementException(envName))) | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
response.as
is not DSL, it's a core method :)By DSL, I very specifically meant things you need to import from
org.http4s.client.dsl._