-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Custom Lambda Runtime #276
base: main
Are you sure you want to change the base?
Custom Lambda Runtime #276
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for getting this up, this is looking great! Took a pass through with some pointers. Obviously I'm still figuring out some stuff as well :)
Regarding error handling, I guess there are two phases:
-
The setup phase. It looks like we can use the
/runtime/init/error
for this. -
The per-request phase, for which we can use
/runtime/invocation/AwsRequestId/response
. I think what we want to do is run these on a separate fiber, and wait on the outcome (either success, error, or cancellation), and report it. We can map cancellation to aCancellationException
.Of course, there's still the chance that something goes wrong after initialization but before we obtain the
AwsRequestId
so we cannot use either error reporting mechanism. I think in this instance we have no choice but to crash. AFAICT we cannot proceed to consume the next event since we cannot notify the lambda we are done consuming the current event without theAwsRequestId
.
object LambdaReservedEnvVars { | ||
val HANDLER = "_HANDLER" | ||
val AWS_REGION = "AWS_REGION" | ||
val AWS_EXECUTION_ENV = "AWS_EXECUTION_ENV" | ||
val AWS_LAMBDA_FUNCTION_NAME = "AWS_LAMBDA_FUNCTION_NAME" | ||
val AWS_LAMBDA_FUNCTION_MEMORY_SIZE = "AWS_LAMBDA_FUNCTION_MEMORY_SIZE" | ||
val AWS_LAMBDA_FUNCTION_VERSION = "AWS_LAMBDA_FUNCTION_VERSION" | ||
val AWS_LAMBDA_LOG_GROUP_NAME = "AWS_LAMBDA_LOG_GROUP_NAME" | ||
val AWS_LAMBDA_LOG_STREAM_NAME = "AWS_LAMBDA_LOG_STREAM_NAME" | ||
val AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID" | ||
val AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY" | ||
val AWS_LAMBDA_RUNTIME_API = "AWS_LAMBDA_RUNTIME_API" | ||
val LAMBDA_TASK_ROOT = "LAMBDA_TASK_ROOT" | ||
val LAMBDA_RUNTIME_DIR = "LAMBDA_RUNTIME_DIR" | ||
val TZ = "TZ" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should expose this as e.g.
trait LambdaRuntimeEnv[F] {
def lambdaRuntimeDir: F[String] = ???
// ...
}
object LambdaRuntimeEnv {
def apply[F[_]](env: Env[F]): LambdaRuntimeEnv[F] = ???
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I'm not sure I 100% understand this part. Isn't the Env[F]
from cats.effect.std
not included in the current version of cats effect? Or do you mean that I should create a custom Env[F]
for the moment until it becomes available?
Also, I'm not sure how I would implement the methods within the LambdaRuntimeEnv[F]
trait. I would have assumed that the implementations of each of the methods would be given here
object LambdaRuntimeEnv {
def apply[F[_]](env: Env[F]): LambdaRuntimeEnv[F] = new LambdaRuntimeEnv[F] {
def lambdaRuntimeDir: F[String] = ???
// ...
}
}
instead of giving implementations directly within the trait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the
Env[F]
fromcats.effect.std
not included in the current version of cats effect?
It is now available in v3.4.0-RC1, you can update to that and use it :)
Also, I'm not sure how I would implement the methods within the
LambdaRuntimeEnv[F]
trait.
Sorry, I typed that out too hasty :) you are right, the implementations should go in the apply
method as you wrote. Also I think I meant to write apply[F[_]](implicit env: Env[F]) ...
_ <- client.successful(POST(result, invocationUrl)) | ||
} yield ()).foreverM | ||
} | ||
}.as(()) // how to handle Outcome error and cancellation? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
}.as(()) // how to handle Outcome error and cancellation? | |
}.void |
import cats.effect.kernel.Sync | ||
|
||
object LambdaReservedEnvVars { | ||
val HANDLER = "_HANDLER" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val HANDLER = "_HANDLER" | |
final val HANDLER = "_HANDLER" |
import org.http4s.client.dsl.Http4sClientDsl | ||
import io.circe._ | ||
|
||
import java.time.Instant //safe to use in native? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logStreamName, | ||
None, //need | ||
None, //need | ||
F.delay(FiniteDuration(request.deadlineTimeInMs.toEpochMilli - Instant.now.toEpochMilli, TimeUnit.MILLISECONDS)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of Instant.now
you can use F.realTime
. This is better for tests/mocking as well.
def apply[F[_]](client: Client[F])(handler: (Json, Context[F]) => F[Json])(implicit F: Async[F]): Resource[F, Unit] = { | ||
F.background { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at it again, I think I made a mistake, sorry! We don't want .background
here, we should just return the result of foreverM
maybe 🤔 leave it up to the caller to background or not. This way we can properly raise an error (with .background
any error would be swallowed).
def apply[F[_]](client: Client[F])(handler: (Json, Context[F]) => F[Json])(implicit F: Async[F]): Resource[F, Unit] = { | |
F.background { | |
def apply[F[_]](client: Client[F])(handler: (Json, Context[F]) => F[Json])(implicit F: Async[F]): F[Nothing] = { |
None, //need | ||
None, //need |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some memory these can just be parsed from JSON.
// Still need to decide how to handle failed request or invalid header values | ||
def fromResponse[F[_]](response: Response[F])(implicit F: ApplicativeError[F, Throwable]): F[LambdaRequest] = { | ||
F.pure(LambdaRequest(???)) | ||
//Unsure on best way to unpack the response headers into LambdaRequest case class |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Right:tm: way to do this is to define our own header models for each of the AWS headers. Take your pick from examples :)
https://github.com/http4s/http4s/tree/v0.23.16/core/shared/src/main/scala/org/http4s/headers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a go at creating header models using a similar way how the headers defined in the headers
package are defined, but they use functions that are private to the http4s
package. Is it possible to create header models not using these private functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which private methods specifically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main one was the ParseResult.fromParser
method which seems needed to create an instance of Header
. An example of its use is in https://github.com/http4s/http4s/blob/v0.23.16/core/shared/src/main/scala/org/http4s/headers/Accept-Post.scala.
edit: maybe I didn't have a close enough look at other files, there seems to be other ways of defining a ParseResult
so I'll have another go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can just inline that from here, it's pretty straightforward.
https://github.com/http4s/http4s/blob/14afe7a41ee66ef81af4861269b3f574cd9aea8c/core/shared/src/main/scala/org/http4s/MessageFailure.scala#L84-L88
I suspect the try
/catch
is no longer necessary, it is an artifact of old code 😂
case class LambdaRequest( | ||
deadlineTimeInMs: Instant, | ||
id: String, | ||
invokedFunctionArn: String, | ||
body: Json | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should prefer class
instead of case class
since that is better for binary-compatibility, unfortunately. Indeed, I'm in the process of fixing this across the entire project 😕
// it should retrieve incoming events and handle them with the handler | ||
// it will run on a background fiber, whose lifecycle is controlled by the resource | ||
??? | ||
val LAMBDA_VERSION_DATE = "2018-06-01" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val LAMBDA_VERSION_DATE = "2018-06-01" | |
final val ApiVersion = "2018-06-01" |
def apply[F[_]](client: Client[F])(handler: (Json, Context[F]) => F[Json])(implicit F: Async[F]): F[Unit] = { | ||
implicit val lambdaEnv: LambdaRuntimeEnv[F] = LambdaRuntimeEnv(Env.make) // maybe better way |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just directly request an implicit env: LambdaRuntimeEnv[F]
def apply[F[_]](client: Client[F])(handler: (Json, Context[F]) => F[Json])(implicit F: Async[F]): F[Unit] = { | ||
implicit val lambdaEnv: LambdaRuntimeEnv[F] = LambdaRuntimeEnv(Env.make) // maybe better way | ||
implicit val jsonEncoder: EntityEncoder[F, Json] = jsonEncoderWithPrinter[F](Printer.noSpaces.copy(dropNullValues = true)) | ||
val http4sClientDsl = new Http4sClientDsl[F] {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit superstitious, but would you mind if we avoid using the DSL here and just use the Request(...)
constructor? Since our use is so small and straightforward, and DSLs always risk to be more unstable than the core APIs (indeed, there was some discussion of moving the routing DSL to a separate repository).
final val HANDLER = "_HANDLER" | ||
final val AWS_REGION = "AWS_REGION" | ||
final val AWS_EXECUTION_ENV = "AWS_EXECUTION_ENV" | ||
final val AWS_LAMBDA_FUNCTION_NAME = "AWS_LAMBDA_FUNCTION_NAME" | ||
final val AWS_LAMBDA_FUNCTION_MEMORY_SIZE = "AWS_LAMBDA_FUNCTION_MEMORY_SIZE" | ||
final val AWS_LAMBDA_FUNCTION_VERSION = "AWS_LAMBDA_FUNCTION_VERSION" | ||
final val AWS_LAMBDA_LOG_GROUP_NAME = "AWS_LAMBDA_LOG_GROUP_NAME" | ||
final val AWS_LAMBDA_LOG_STREAM_NAME = "AWS_LAMBDA_LOG_STREAM_NAME" | ||
final val AWS_ACCESS_KEY_ID = "AWS_ACCESS_KEY_ID" | ||
final val AWS_SECRET_ACCESS_KEY = "AWS_SECRET_ACCESS_KEY" | ||
final val AWS_LAMBDA_RUNTIME_API = "AWS_LAMBDA_RUNTIME_API" | ||
final val LAMBDA_TASK_ROOT = "LAMBDA_TASK_ROOT" | ||
final val LAMBDA_RUNTIME_DIR = "LAMBDA_RUNTIME_DIR" | ||
final val TZ = "TZ" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These can all be private[this]
final val TZ = "TZ" | ||
|
||
def apply[F[_]: Functor](env: Env[F]): LambdaRuntimeEnv[F] = new LambdaRuntimeEnv[F] { // should be error effect? | ||
override def handler: F[String] = env.get(HANDLER).map(_.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid .get
, instead something like this with MonadThrow[F]
.
override def handler: F[String] = env.get(HANDLER).map(_.get) | |
def handler: F[String] = env.get(HANDLER).flatMap(_.liftTo(new NoSuchElementException(HANDLER)) |
final val LAMBDA_RUNTIME_DIR = "LAMBDA_RUNTIME_DIR" | ||
final val TZ = "TZ" | ||
|
||
def apply[F[_]: Functor](env: Env[F]): LambdaRuntimeEnv[F] = new LambdaRuntimeEnv[F] { // should be error effect? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, on second thought, I think we want:
def apply[F[_]: Functor](env: Env[F]): LambdaRuntimeEnv[F] = new LambdaRuntimeEnv[F] { // should be error effect? | |
def apply[F[_]](implicit lre: LambdaRuntimeEnv[F]): LambdaRuntimeEnv[F] = lre | |
implicit def forEnv[F[_]: MonadThrow](implicit env: Env[F]): LambdaRuntimeEnv[F] = new LambdaRuntimeEnv[F] { |
context <- createContext(request) | ||
result <- handler(request.body, context) | ||
invocationUrl = getInvocationUrl(LambdaRuntimeEnv.AWS_LAMBDA_RUNTIME_API, request.id) | ||
_ <- client.successful(POST(result, invocationUrl)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
successful
returns a Boolean
, which is currently being ignored. I wonder if expect[Unit]
will work, so it will raise an exception on failure.
F.delay(FiniteDuration(request.deadlineTimeInMs.toEpochMilli - Instant.now.toEpochMilli, TimeUnit.MILLISECONDS)) | ||
None, | ||
None, | ||
F.realTimeInstant.map(curTime => FiniteDuration(request.deadlineTimeInMs.toEpochMilli - curTime.toEpochMilli, TimeUnit.MILLISECONDS))/// how to provide test version?, maybe separate Clock parameter? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of converting to millis and back to duration, you can use F.realTime
and do that math directly with FiniteDuration
.
private def getRuntimeUrl(api: String) = Uri.unsafeFromString(s"http://$api/$LAMBDA_VERSION_DATE/runtime/invocation/next") //need to be unsafe? | ||
private def getRuntimeUrl(api: String) = uri"http://$api/$ApiVersion/runtime/invocation/next" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
huh, does using uri"..."
here actually work? That seems like a bug 😅 since it has no way of proving at compile-time that api
is valid.
build.sbt
Outdated
.in(file("feral-lambda-runtime")) | ||
.settings( | ||
name := "feral-lambda-kernel", | ||
name := "feral-lambda-runtime", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woops :) thanks! can also rename the file to just file("lambda-runtime")
…handling to handler invocation
|
||
override def timezone: F[String] = getOrThrow(TZ) | ||
|
||
private[this] def getOrThrow(envName: String): F[String] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[this] def getOrThrow(envName: String): F[String] = | |
private[this] def getOrRaise(envName: String): F[String] = |
def apply[F[_]](client: Client[F])(handler: (Json, Context[F]) => F[Json])(implicit F: Async[F]): F[Unit] = { | ||
implicit val lambdaEnv: LambdaRuntimeEnv[F] = LambdaRuntimeEnv(Env.make) // maybe better way | ||
// TODO find out where to use initErrorUrl | ||
def apply[F[_]](client: Client[F])(handler: (Json, Context[F]) => F[Json])(implicit F: Async[F], env: LambdaRuntimeEnv[F]): F[Unit] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can make this Temporal
.
def apply[F[_]](client: Client[F])(handler: (Json, Context[F]) => F[Json])(implicit F: Async[F], env: LambdaRuntimeEnv[F]): F[Unit] = { | |
def apply[F[_]](client: Client[F])(handler: (Json, Context[F]) => F[Json])(implicit F: Temporal[F], env: LambdaRuntimeEnv[F]): F[Unit] = { |
private def getRuntimeUrl(api: String) = Uri.unsafeFromString("http://$api/$ApiVersion/runtime/invocation/next") | ||
|
||
private def getInvocationUrl(api: String, id: String) = uri"http://$api/$ApiVersion/runtime/invocation/$id/response" | ||
private def getInvocationUrl(api: String, id: String) = Uri.unsafeFromString("http://$api/$ApiVersion/runtime/invocation/$id/response") | ||
|
||
private def getInitErrorUrl(api: String) = uri"http://$api/$ApiVersion/runtime/init/error" | ||
private def getInitErrorUrl(api: String) = Uri.unsafeFromString("http://$api/$ApiVersion/runtime/init/error") | ||
|
||
private def getInvocationErrorUrl(api: String, errorType: String) = uri"http://$api/$ApiVersion/runtime/invocation/$errorType/error" | ||
private def getInvocationErrorUrl(api: String, requestId: String) = Uri.unsafeFromString("http://$api/$ApiVersion/runtime/invocation/$requestId/error") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid the unsafe
here, and use liftTo[F]
instead.
client.expect[Unit](Request(POST, invocationErrorUrl).withEntity(error.asJson)) >> F.raiseError(e) | ||
case Canceled() => | ||
val error = LambdaErrorRequest("cancelled", "cancellation", List()) // TODO need to think about better messages | ||
client.expect[Unit](Request(POST, invocationErrorUrl).withEntity(error.asJson)) >> F.raiseError(new CancellationException) // is this correct behaviour |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we should not raise a CancellationException
. That will stop the entire Lambda, just because of processing of one event was self-canceled :)
The idea behind the running the handler on a separate fiber, and then joining that fiber to check the outcome, is so that if the handler self-cancels for a particular event, the Lambda can continue processing additional events.
functionMemorySize <- lambdaEnv.lambdaFunctionMemorySize | ||
logGroupName <- lambdaEnv.lambdaLogGroupName | ||
logStreamName <- lambdaEnv.lambdaLogStreamName | ||
private def createContext[F[_]](request: LambdaRequest)(implicit F: Sync[F], env: LambdaRuntimeEnv[F]): F[Context[F]] = for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of Sync
I think we can ask for FlatMap
and Clock
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eh, since it's a private method you can just re-use the Temporal
here.
deadlineTimeInMs <- F.fromOption(response.headers.get(CIString(deadlineTimeHeader)), new NoSuchElementException(deadlineTimeHeader)).map(_.head.value.toLong) | ||
identity <- F.pure(response.headers.get(CIString(cognitoIdentityHeader)).flatMap(_.head.value.asJson.as[CognitoIdentity].toOption)) | ||
clientContext <- F.pure(response.headers.get(CIString(clientContextHeader)).flatMap(_.head.value.asJson.as[ClientContext].toOption)) | ||
body <- F.rethrow(jsonDecoder.decode(response, strict = false).value) // basically a reimplementation of response.as[Json] from dsl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
response.as
is not DSL, it's a core method :)
By DSL, I very specifically meant things you need to import from org.http4s.client.dsl._
final val ApiVersion = "2018-06-01" | ||
|
||
// TODO refactor into setup and per-request phases, need to find boundary between two phases | ||
def apply[F[_]](client: Client[F])(handler: (Json, Context[F]) => F[Json])(implicit F: Temporal[F], env: LambdaRuntimeEnv[F]): F[Unit] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, thanks for raising the issue about loading the handler function. After thinking about it, you are right that we need a small adjustment here actually.
def apply[F[_]](client: Client[F])(handler: (Json, Context[F]) => F[Json])(implicit F: Temporal[F], env: LambdaRuntimeEnv[F]): F[Unit] = { | |
def apply[F[_]](client: Client[F])(handler: Resource[F, (Json, Context[F]) => F[Json]])(implicit F: Temporal[F], env: LambdaRuntimeEnv[F]): F[Unit] = { |
Instead of requesting the handler, we should request a Resource
that is used to acquired the handler. This way, if there are any errors during that process, we can log them correctly. It also leaves the door open for someone to implement dynamic loading, but we shouldn't worry about that here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am still a bit confused on where initialization errors should be handled. It seems like using handleErrorWith
on the result of handler.use(...)
would pick up errors that could have occurred at any point, including per-request errors. I checked the C++ and Rust runtimes and from what I can see they don't seem to use the initialization error URL at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I think the initializer errors should be caught and handled right before the use
. If you've made it to the use
, I think it's safe to say that you've initialized successfully :)
I checked the C++ and Rust runtimes and from what I can see they don't seem to use the initialization error URL at all.
Interesting. What about the JVM or JS runtimes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the JVM runtime uses reflection based loading of the handler class and the JS runtime uses dynamic loading. Errors can occur during these steps so that's where the initialization error URL is used.
private def getNextInvocationUrl[F[_]: MonadThrow](api: String): F[Uri] = Uri.fromString(s"/$api/$ApiVersion/runtime/invocation/next").liftTo[F] | ||
|
||
private def getInvocationResponseUrl[F[_]: MonadThrow](api: String, id: String): F[Uri] = Uri.fromString(s"/$api/$ApiVersion/runtime/invocation/$id/response").liftTo[F] | ||
|
||
private def getInitErrorUrl[F[_]: MonadThrow](api: String): F[Uri] = Uri.fromString(s"/$api/$ApiVersion/runtime/init/error").liftTo[F] | ||
|
||
private def getInvocationErrorUrl[F[_]: MonadThrow](api: String, requestId: String): F[Uri] = Uri.fromString(s"/$api/$ApiVersion/runtime/invocation/$requestId/error").liftTo[F] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can do Uri.fromString(s"/$api/$ApiVersion/runtime/")
once, and then safely build all the other URIs using that.
result <- handlerFiber.join.flatMap { | ||
case Succeeded(result: F[Json]) => result | ||
case Errored(e: Throwable) => F.raiseError[Json](e) | ||
case Canceled() => F.raiseError[Json](new CancellationException) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to raiseError
s here, since that will crash the Lambda.
When we join the fiber, if there is an error or cancellation we should send it to the invocation error URL and suppress it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking was that these potential errors would get caught in the handleErrorWith
method below and instead of crashing the lambda, it would send a invocation error to the invocation error URL. Although I haven't actually tested that this works yet and will need to add another test to see if it can process multiple requests with failures.
My reasoning in changing the existing code (using Option[Json]
as the type for result
) to instead raising errors, was so that failures in places such as creating the context could be caught properly and handled as an invocation error. Previously, this wasn't the case and context errors would crash the lambda.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, sorry, you are right, I read that much too hastily :) indeed, the handleErrorWith
will catch these errors. It looks like it will also catch an error POSTing a response.
I guess there could be a distinction between handler errors (since these occur in userland) and errors that occur outside/after the handler, since I assume those would be our fault (or an issue in the runtime)? I actually don't know what the response URL returns if the user handler returns a Json
that is invalid for this particular Lambda.
trait LambdaRuntimeEnv[F[_]] { | ||
def lambdaFunctionName: F[String] | ||
def lambdaFunctionMemorySize: F[Int] | ||
def lambdaFunctionVersion: F[String] | ||
def lambdaLogGroupName: F[String] | ||
def lambdaLogStreamName: F[String] | ||
def lambdaRuntimeApi: F[Uri] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know if these values ever change while the process is running? If not maybe we should load them from the Env
once, and keep them cached.
I cannot imagine why they'd change, but I know Lambda does rely on other envs changing at runtime, so it wouldn't be surprising.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah this might have been my mistake. The custom lambda runtime docs mention that reading environment variables is an initialization task while the custom runtime is currently reading them for every invocation. So no, I don't think they will change so I think it is safe to read them once during initialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! No not at all, I think it was my bad suggestion in #276 (comment) 😅 sorry!
So then let's refactor this so they are read once during initialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I can do that if you haven't already started?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, please go ahead!
implicit F: Temporal[F], | ||
env: LambdaRuntimeEnv[F]): F[Nothing] = | ||
handlerResource.attempt.use[Nothing] { handlerOrError => | ||
env.lambdaRuntimeApi.flatMap[Nothing] { api => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I ask why Nothing
is used instead of Unit
in places such as this? With this code, the compiler complains with
type mismatch;
found : Either[Throwable,feral.lambda.runtime.LambdaSettings] => F[Nothing]
required: Either[Throwable,feral.lambda.runtime.LambdaSettings] => F[B]
but runs fine with Unit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. Technically we should return Nothing
here because these methods never return—they start a process that runs forever, and will never produce an output. But perhaps type inference is too uncooperative for this to be practical 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you @scott-thomson239 should be able to use Nothing
(and should use it, because, as @armanbilge says, it's the right thing to do).
But you probably need to tweak the variance of F[_]
. My hunch is that it should be declared as F[+_]
, so
private[this] def handleInitError[F[+_]](
and similarly for others.
Good luck 🤞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I also suspect that F[+_]
would help things. However type signatures of this sort are sort of an anti-pattern in the Typelevel ecosystem (I'm not aware of any other major projects using them) partly because they prevent the use of datatypes that are not declared covariant at the type-level. To be fair, this might not matter in practice as F[_]
is almost always going to be IO
... but I'd rather not constraint the API here when a few explicit type parameters can do the trick, albeit annoying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
anti-pattern
Is it really, though? AFAIK, maybe contrary to popular opinion, variance is fairly widely used in the TypeLevel world, isn't it? For example IO in Cats Effect is most definitely declared as IO[+A]
, as you say, and there are surely many other examples.
But fair enough.
But if you @scott-thomson239 don't want to dive into variance, you shouldn't fix it by using Unit
. The proper way then is to declare additional type variable which can then by inferred to anything that's necessary, e.g.
private[this] def handleInitError[F[_], X](runtimeUri: Uri, client: Client[F], ex: Throwable)(
implicit F: Temporal[F]): F[X] = {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify: I'm talking about prescribing variance in the effect type parameter i.e. F[+_]
declarations. That is extremely unusual.
Indeed, there are many covariant datatypes such as IO
or Stream
. But there are also non-covariant datatypes, such as Resource
or Kleisli
.
It should be fine to keep using Nothing
here instead of adding a type parameter. Type inference may need a helping hand here and there, but that's all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, as you wish 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay no problem, I will try and get Nothing
working instead of Unit
. I was just using Unit
temporarily so that I could fix the failing tests so I will revert that.
This is a draft PR of my current progress on issue #134 so people can have a look and give feedback/suggestions. I think the main parts that still need work are handling the errors for a possible failure during handler invocation and figuring out how to extract the headers and body from the
Response
gained from theGET
request sent to the client into aLambdaRequest
. I also need to test the runtime, which I have left for just now in case something needs changed.