-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Custom Lambda Runtime #276
base: main
Are you sure you want to change the base?
Changes from 6 commits
7ea939b
26b1622
95efe18
be96317
ebf5e97
6f990ad
f6a904b
defb679
c95cecb
2de1748
b7e6054
b07406c
0eb162e
24c4261
602460b
25403d0
a65d045
206ec6a
aa06238
39562d6
108c2cb
016df57
7ce4f85
fe73a65
ed8cb7d
f2b7151
0cf6033
e3f8553
7c69d9e
cfd8f0a
accb4b6
6acf432
2449d3e
9d79651
3c4974c
2548296
35d797f
93d55aa
af3b527
f3022e4
d630597
366248a
ab6fa05
54c3032
b5e3019
d6de562
13aa8e9
3517cb7
ffa7bb3
f1f7a2c
f83a9de
c700129
9a49c7d
d53f8be
f284899
3da24a9
8cdfc9b
541704e
89d1c69
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
package feral.lambda.runtime | ||
|
||
import cats.ApplicativeError | ||
import io.circe._ | ||
|
||
import java.time.Instant | ||
import org.http4s.Response | ||
|
||
case class LambdaRequest( | ||
deadlineTimeInMs: Instant, | ||
id: String, | ||
invokedFunctionArn: String, | ||
body: Json | ||
) | ||
|
||
object LambdaRequest { | ||
// Still need to decide how to handle failed request or invalid header values | ||
def fromResponse[F[_]](response: Response[F])(implicit F: ApplicativeError[F, Throwable]): F[LambdaRequest] = { | ||
F.pure(LambdaRequest(???)) | ||
//Unsure on best way to unpack the response headers into LambdaRequest case class | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Right:tm: way to do this is to define our own header models for each of the AWS headers. Take your pick from examples :) https://github.com/http4s/http4s/tree/v0.23.16/core/shared/src/main/scala/org/http4s/headers There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had a go at creating header models using a similar way how the headers defined in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which private methods specifically? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The main one was the edit: maybe I didn't have a close enough look at other files, there seems to be other ways of defining a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can just inline that from here, it's pretty straightforward. I suspect the |
||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,20 @@ | ||||||
package feral.lambda.runtime | ||||||
|
||||||
import cats.effect.kernel.Sync | ||||||
|
||||||
object LambdaReservedEnvVars { | ||||||
val HANDLER = "_HANDLER" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
val AWS_REGION = "AWS_REGION" | ||||||
val AWS_EXECUTION_ENV = "AWS_EXECUTION_ENV" | ||||||
val AWS_LAMBDA_FUNCTION_NAME = "AWS_LAMBDA_FUNCTION_NAME" | ||||||
val AWS_LAMBDA_FUNCTION_MEMORY_SIZE = "AWS_LAMBDA_FUNCTION_MEMORY_SIZE" | ||||||
val AWS_LAMBDA_FUNCTION_VERSION = "AWS_LAMBDA_FUNCTION_VERSION" | ||||||
val AWS_LAMBDA_LOG_GROUP_NAME = "AWS_LAMBDA_LOG_GROUP_NAME" | ||||||
val AWS_LAMBDA_LOG_STREAM_NAME = "AWS_LAMBDA_LOG_STREAM_NAME" | ||||||
val AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID" | ||||||
val AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY" | ||||||
val AWS_LAMBDA_RUNTIME_API = "AWS_LAMBDA_RUNTIME_API" | ||||||
val LAMBDA_TASK_ROOT = "LAMBDA_TASK_ROOT" | ||||||
val LAMBDA_RUNTIME_DIR = "LAMBDA_RUNTIME_DIR" | ||||||
val TZ = "TZ" | ||||||
} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we should expose this as e.g. trait LambdaRuntimeEnv[F] {
def lambdaRuntimeDir: F[String] = ???
// ...
}
object LambdaRuntimeEnv {
def apply[F[_]](env: Env[F]): LambdaRuntimeEnv[F] = ???
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, I'm not sure I 100% understand this part. Isn't the Also, I'm not sure how I would implement the methods within the object LambdaRuntimeEnv {
def apply[F[_]](env: Env[F]): LambdaRuntimeEnv[F] = new LambdaRuntimeEnv[F] {
def lambdaRuntimeDir: F[String] = ???
// ...
}
} instead of giving implementations directly within the trait. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It is now available in v3.4.0-RC1, you can update to that and use it :)
Sorry, I typed that out too hasty :) you are right, the implementations should go in the |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,95 @@ | ||||||||
/* | ||||||||
* Copyright 2021 Typelevel | ||||||||
* | ||||||||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||
* you may not use this file except in compliance with the License. | ||||||||
* You may obtain a copy of the License at | ||||||||
* | ||||||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||||||
* | ||||||||
* Unless required by applicable law or agreed to in writing, software | ||||||||
* distributed under the License is distributed on an "AS IS" BASIS, | ||||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||
* See the License for the specific language governing permissions and | ||||||||
* limitations under the License. | ||||||||
*/ | ||||||||
|
||||||||
// TODO implement a runtime here | ||||||||
// it should retrieve incoming events and handle them with the handler | ||||||||
// it will run on a background fiber, whose lifecycle is controlled by the resource | ||||||||
|
||||||||
package feral.lambda | ||||||||
package runtime | ||||||||
|
||||||||
import cats.Applicative | ||||||||
import cats.syntax.all._ | ||||||||
import cats.effect.kernel.{Resource, Sync} | ||||||||
import io.circe.Json | ||||||||
import org.http4s.Method.POST | ||||||||
import org.http4s.client.Client | ||||||||
import org.http4s.circe.jsonEncoderWithPrinter | ||||||||
|
||||||||
import scala.concurrent.duration.FiniteDuration | ||||||||
import java.util.concurrent.TimeUnit | ||||||||
import org.http4s.{EntityEncoder, Uri} | ||||||||
import org.http4s.client.dsl.Http4sClientDsl | ||||||||
import io.circe._ | ||||||||
|
||||||||
import java.time.Instant //safe to use in native? | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||
import cats.effect.kernel.Async | ||||||||
|
||||||||
object FeralLambdaRuntime { | ||||||||
|
||||||||
val LAMBDA_VERSION_DATE = "2018-06-01" | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
|
||||||||
def apply[F[_]](client: Client[F])(handler: (Json, Context[F]) => F[Json])(implicit F: Async[F]): Resource[F, Unit] = { | ||||||||
F.background { | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at it again, I think I made a mistake, sorry! We don't want
Suggested change
|
||||||||
val runtimeUrl = getRuntimeUrl(LambdaReservedEnvVars.AWS_LAMBDA_RUNTIME_API) | ||||||||
implicit val jsonEncoder: EntityEncoder[F, Json] = jsonEncoderWithPrinter[F](Printer.noSpaces.copy(dropNullValues = true)) | ||||||||
val http4sClientDsl = new Http4sClientDsl[F] {} | ||||||||
import http4sClientDsl._ | ||||||||
(for { | ||||||||
request <- client.get(runtimeUrl)(LambdaRequest.fromResponse) // unsure how to deal with bad response | ||||||||
context <- createContext(request) | ||||||||
result <- handler(request.body, context) | ||||||||
invocationUrl = getInvocationUrl(LambdaReservedEnvVars.AWS_LAMBDA_RUNTIME_API, request.id) | ||||||||
_ <- client.successful(POST(result, invocationUrl)) | ||||||||
} yield ()).foreverM | ||||||||
} | ||||||||
}.as(()) // how to handle Outcome error and cancellation? | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
|
||||||||
private def createContext[F[_]](request: LambdaRequest)(implicit F: Sync[F]): F[Context[F]] = for { | ||||||||
functionName <- envVar(LambdaReservedEnvVars.AWS_LAMBDA_FUNCTION_NAME) | ||||||||
functionVersion <- envVar(LambdaReservedEnvVars.AWS_LAMBDA_FUNCTION_VERSION) | ||||||||
functionMemorySize <- envVar(LambdaReservedEnvVars.AWS_LAMBDA_FUNCTION_MEMORY_SIZE).map(_.toInt) | ||||||||
logGroupName <- envVar(LambdaReservedEnvVars.AWS_LAMBDA_LOG_GROUP_NAME) | ||||||||
logStreamName <- envVar(LambdaReservedEnvVars.AWS_LAMBDA_LOG_STREAM_NAME) | ||||||||
} yield { | ||||||||
new Context[F]( | ||||||||
functionName, | ||||||||
functionVersion, | ||||||||
request.invokedFunctionArn, | ||||||||
functionMemorySize, | ||||||||
request.id, | ||||||||
logGroupName, | ||||||||
logStreamName, | ||||||||
None, //need | ||||||||
None, //need | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have some memory these can just be parsed from JSON. |
||||||||
F.delay(FiniteDuration(request.deadlineTimeInMs.toEpochMilli - Instant.now.toEpochMilli, TimeUnit.MILLISECONDS)) | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of |
||||||||
) | ||||||||
} | ||||||||
|
||||||||
private def getRuntimeUrl(api: String) = Uri.unsafeFromString(s"http://$api/$LAMBDA_VERSION_DATE/runtime/invocation/next") //need to be unsafe? | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to call unsafe. If we error on these fundamentals it's a fatal error and we should just kill the lambda without reporting to the error url. |
||||||||
|
||||||||
private def getInvocationUrl(api: String, id: String) = Uri.unsafeFromString(s"http://$api/$LAMBDA_VERSION_DATE/runtime/invocation/$id/response") | ||||||||
|
||||||||
// Called if initialization of handler function fails, seems impossible here since handler function is provided as constructor? | ||||||||
private def getInitErrorUrl(api: String) = Uri.unsafeFromString(s"http://$api/$LAMBDA_VERSION_DATE/runtime/init/error") | ||||||||
|
||||||||
// from docs, called "if the function returns an error or the runtime encounters an error", will be used after error handling implemented | ||||||||
private def getInvocationErrorUrl(api: String, errorType: String) = Uri.unsafeFromString(s"http://$api/$LAMBDA_VERSION_DATE/runtime/invocation/$errorType/error") | ||||||||
|
||||||||
private def envVar[F[_]](envVar: String)(implicit F: Sync[F]): F[String] = { | ||||||||
F.delay(sys.env(envVar)) | ||||||||
} | ||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should prefer
class
instead ofcase class
since that is better for binary-compatibility, unfortunately. Indeed, I'm in the process of fixing this across the entire project 😕