Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic effects, take 2, and more? #45

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ lazy val lambda = crossProject(JSPlatform, JVMPlatform)
.settings(
name := "feral-lambda",
libraryDependencies ++= Seq(
"io.circe" %%% "circe-core" % circeVersion
"io.circe" %%% "circe-core" % circeVersion,
"org.tpolecat" %%% "natchez-core" % "0.1.5+47-0f7e4bf4-SNAPSHOT",
"org.tpolecat" %%% "natchez-xray" % "0.1.5+47-0f7e4bf4-SNAPSHOT",
"org.tpolecat" %%% "natchez-noop" % "0.1.5+47-0f7e4bf4-SNAPSHOT",
)
)
.jsSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package feral.lambda

import cats.effect.IO
import cats.effect.Resource
import cats.effect.SyncIO
import cats.effect.Sync
import cats.effect.syntax.all._
import cats.syntax.all._
import feral.lambda.events.ApiGatewayProxyEventV2
import feral.lambda.events.ApiGatewayProxyStructuredResultV2
import fs2.Stream
Expand All @@ -33,56 +34,52 @@ import org.http4s.Uri
import org.typelevel.vault.Key
import org.typelevel.vault.Vault

abstract class ApiGatewayProxyHttp4sLambda
extends IOLambda[ApiGatewayProxyEventV2, ApiGatewayProxyStructuredResultV2] {
object ApiGatewayProxyHttp4sLambda {

val ContextKey = Key.newKey[SyncIO, Context].unsafeRunSync()
val EventKey = Key.newKey[SyncIO, ApiGatewayProxyEventV2].unsafeRunSync()

def routes: Resource[IO, HttpRoutes[IO]]

protected type Setup = HttpRoutes[IO]
protected override final val setup: Resource[IO, HttpRoutes[IO]] = routes

override final def apply(
event: ApiGatewayProxyEventV2,
context: Context,
routes: HttpRoutes[IO]): IO[Some[ApiGatewayProxyStructuredResultV2]] =
def apply[F[_]: Sync](
f: (Key[Context[F]], Key[ApiGatewayProxyEventV2]) => Resource[F, HttpRoutes[F]])
: Resource[F, Lambda[F, ApiGatewayProxyEventV2, ApiGatewayProxyStructuredResultV2]] =
for {
method <- IO.fromEither(Method.fromString(event.requestContext.http.method))
uri <- IO.fromEither(Uri.fromString(event.rawPath))
headers = Headers(event.headers.toList)
requestBody =
if (event.isBase64Encoded)
Stream.fromOption[IO](event.body).through(fs2.text.base64.decode)
else
Stream.fromOption[IO](event.body).through(fs2.text.utf8.encode)
request = Request(
method,
uri,
headers = headers,
body = requestBody,
attributes = Vault.empty.insert(ContextKey, context).insert(EventKey, event))
response <- routes(request).getOrElse(Response.notFound[IO])
isBase64Encoded = !response.charset.contains(Charset.`UTF-8`)
responseBody <- (if (isBase64Encoded)
response.body.through(fs2.text.base64.encode)
else
response.body.through(fs2.text.utf8.decode)).compile.foldMonoid
} yield Some(
ApiGatewayProxyStructuredResultV2(
response.status.code,
response
.headers
.headers
.map {
case Header.Raw(name, value) =>
name.toString -> value
}
.toMap,
responseBody,
isBase64Encoded
contextKey <- Key.newKey[F, Context[F]].toResource
eventKey <- Key.newKey[F, ApiGatewayProxyEventV2].toResource
routes <- f(contextKey, eventKey)
} yield { (event: ApiGatewayProxyEventV2, context: Context[F]) =>
for {
method <- Method.fromString(event.requestContext.http.method).liftTo[F]
uri <- Uri.fromString(event.rawPath).liftTo[F]
headers = Headers(event.headers.toList)
requestBody =
if (event.isBase64Encoded)
Stream.fromOption[F](event.body).through(fs2.text.base64.decode)
else
Stream.fromOption[F](event.body).through(fs2.text.utf8.encode)
request = Request(
method,
uri,
headers = headers,
body = requestBody,
attributes = Vault.empty.insert(contextKey, context).insert(eventKey, event))
response <- routes(request).getOrElse(Response.notFound[F])
isBase64Encoded = !response.charset.contains(Charset.`UTF-8`)
responseBody <- (if (isBase64Encoded)
response.body.through(fs2.text.base64.encode)
else
response.body.through(fs2.text.utf8.decode)).compile.foldMonoid
} yield Some(
ApiGatewayProxyStructuredResultV2(
response.status.code,
response
.headers
.headers
.map {
case Header.Raw(name, value) =>
name.toString -> value
}
.toMap,
responseBody,
isBase64Encoded
)
)
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,67 +14,61 @@
* limitations under the License.
*/

package feral.lambda.cloudformation
package feral.lambda
package cloudformation

import cats.effect._
import cats.ApplicativeThrow
import cats.MonadThrow
import cats.effect.kernel.Resource
import cats.syntax.all._
import feral.lambda.cloudformation.CloudFormationCustomResourceHandler.stackTraceLines
import feral.lambda.cloudformation.CloudFormationRequestType._
import feral.lambda.{Context, IOLambda}
import io.circe._
import io.circe.syntax._
import org.http4s.Method.POST
import org.http4s.client.Client
import org.http4s.circe._
import org.http4s.client.dsl.io._
import org.http4s.ember.client.EmberClientBuilder
import org.http4s.client.Client
import org.http4s.client.dsl.Http4sClientDsl

import java.io.{PrintWriter, StringWriter}
import java.io.PrintWriter
import java.io.StringWriter

trait CloudFormationCustomResource[F[_], Input, Output] {
def createResource(
bpholt marked this conversation as resolved.
Show resolved Hide resolved
event: CloudFormationCustomResourceRequest[Input],
context: Context): F[HandlerResponse[Output]]
context: Context[F]): F[HandlerResponse[Output]]
def updateResource(
event: CloudFormationCustomResourceRequest[Input],
context: Context): F[HandlerResponse[Output]]
context: Context[F]): F[HandlerResponse[Output]]
def deleteResource(
event: CloudFormationCustomResourceRequest[Input],
context: Context): F[HandlerResponse[Output]]
context: Context[F]): F[HandlerResponse[Output]]
}

abstract class CloudFormationCustomResourceHandler[Input: Decoder, Output: Encoder]
extends IOLambda[CloudFormationCustomResourceRequest[Input], Unit] {
type Setup = (Client[IO], CloudFormationCustomResource[IO, Input, Output])

override final def setup: Resource[IO, Setup] =
client.mproduct(handler)

protected def client: Resource[IO, Client[IO]] =
EmberClientBuilder.default[IO].build
object CloudFormationCustomResource {

def handler(client: Client[IO]): Resource[IO, CloudFormationCustomResource[IO, Input, Output]]
def apply[F[_]: MonadThrow, Input, Output: Encoder](client: Client[F])(
handler: Resource[F, CloudFormationCustomResource[F, Input, Output]])
: Resource[F, Lambda[F, CloudFormationCustomResourceRequest[Input], Unit]] =
handler.map { handler => (event, context) =>
val http4sClientDsl = new Http4sClientDsl[F] {}
import http4sClientDsl._

override def apply(
event: CloudFormationCustomResourceRequest[Input],
context: Context,
setup: Setup): IO[Option[Unit]] =
(event.RequestType match {
case CreateRequest => setup._2.createResource(event, context)
case UpdateRequest => setup._2.updateResource(event, context)
case DeleteRequest => setup._2.deleteResource(event, context)
case OtherRequestType(other) => illegalRequestType(other)
}).attempt
.map(_.fold(exceptionResponse(event)(_), successResponse(event)(_)))
.flatMap { resp => setup._1.successful(POST(resp.asJson, event.ResponseURL)) }
.as(None)
(event.RequestType match {
case CreateRequest => handler.createResource(event, context)
case UpdateRequest => handler.updateResource(event, context)
case DeleteRequest => handler.deleteResource(event, context)
case OtherRequestType(other) => illegalRequestType(other)
}).attempt
.map(_.fold(exceptionResponse(event)(_), successResponse(event)(_)))
.flatMap { resp => client.successful(POST(resp.asJson, event.ResponseURL)) }
.as(None)
}

private def illegalRequestType[A](other: String): IO[A] =
private def illegalRequestType[F[_]: ApplicativeThrow, A](other: String): F[A] =
(new IllegalArgumentException(
s"unexpected CloudFormation request type `$other``"): Throwable).raiseError[IO, A]
s"unexpected CloudFormation request type `$other``"): Throwable).raiseError[F, A]

private def exceptionResponse(req: CloudFormationCustomResourceRequest[Input])(
private def exceptionResponse[Input](req: CloudFormationCustomResourceRequest[Input])(
ex: Throwable): CloudFormationCustomResourceResponse =
CloudFormationCustomResourceResponse(
Status = RequestResponseStatus.Failed,
Expand All @@ -87,7 +81,8 @@ abstract class CloudFormationCustomResourceHandler[Input: Decoder, Output: Encod
"StackTrace" -> Json.arr(stackTraceLines(ex).map(Json.fromString): _*)).asJson
)

private def successResponse(req: CloudFormationCustomResourceRequest[Input])(
private def successResponse[Input, Output: Encoder](
req: CloudFormationCustomResourceRequest[Input])(
res: HandlerResponse[Output]): CloudFormationCustomResourceResponse =
CloudFormationCustomResourceResponse(
Status = RequestResponseStatus.Success,
Expand All @@ -99,12 +94,10 @@ abstract class CloudFormationCustomResourceHandler[Input: Decoder, Output: Encod
Data = res.data.asJson
)

}

object CloudFormationCustomResourceHandler {
def stackTraceLines(throwable: Throwable): List[String] = {
val writer = new StringWriter()
throwable.printStackTrace(new PrintWriter(writer))
writer.toString.linesIterator.toList
}

}
6 changes: 3 additions & 3 deletions lambda/js/src/main/scala/feral/lambda/ContextPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package feral.lambda

import cats.effect.IO
import cats.effect.Sync

import scala.concurrent.duration._

private[lambda] trait ContextCompanionPlatform {

private[lambda] def fromJS(context: facade.Context): Context =
private[lambda] def fromJS[F[_]: Sync](context: facade.Context): Context[F] =
Context(
context.functionName,
context.functionVersion,
Expand Down Expand Up @@ -55,6 +55,6 @@ private[lambda] trait ContextCompanionPlatform {
)
)
},
IO(context.getRemainingTimeInMillis().millis)
Sync[F].delay(context.getRemainingTimeInMillis().millis)
)
}
4 changes: 2 additions & 2 deletions lambda/js/src/main/scala/feral/lambda/IOLambdaPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ private[lambda] trait IOLambdaPlatform[Event, Result] {
// @JSExportTopLevel("handler") // TODO
final def handler(event: js.Any, context: facade.Context): js.Promise[js.Any | Unit] =
(for {
setup <- setupMemo
lambda <- setupMemo
event <- IO.fromEither(decodeJs[Event](event))
result <- apply(event, Context.fromJS(context), setup)
result <- lambda(event, Context.fromJS(context))
} yield result.map(_.asJsAny).orUndefined).unsafeToPromise()(runtime)
}
6 changes: 3 additions & 3 deletions lambda/jvm/src/main/scala/feral/lambda/ContextPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package feral.lambda

import cats.effect.IO
import cats.effect.Sync
import com.amazonaws.services.lambda.runtime

import scala.concurrent.duration._

private[lambda] trait ContextCompanionPlatform {

private[lambda] def fromJava(context: runtime.Context): Context =
private[lambda] def fromJava[F[_]: Sync](context: runtime.Context): Context[F] =
Context(
context.getFunctionName(),
context.getFunctionVersion(),
Expand Down Expand Up @@ -53,7 +53,7 @@ private[lambda] trait ContextCompanionPlatform {
)
)
},
IO(context.getRemainingTimeInMillis().millis)
Sync[F].delay(context.getRemainingTimeInMillis().millis)
)

}
6 changes: 3 additions & 3 deletions lambda/jvm/src/main/scala/feral/lambda/IOLambdaPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[lambda] abstract class IOLambdaPlatform[Event, Result]
Resource
.eval {
for {
setup <- setupMemo
lambda <- setupMemo
event <- fs2
.io
.readInputStream(IO.pure(input), 8192, closeAfterUse = false)
Expand All @@ -45,8 +45,8 @@ private[lambda] abstract class IOLambdaPlatform[Event, Result]
.head
.compile
.lastOrError
context <- IO(Context.fromJava(context))
_ <- OptionT(apply(event, context, setup)).foldF(IO.unit) { result =>
context <- IO(Context.fromJava[IO](context))
_ <- OptionT(lambda(event, context)).foldF(IO.unit) { result =>
// TODO can circe write directly to output?
IO(output.write(encoder(result).noSpaces.getBytes(StandardCharsets.UTF_8)))
}
Expand Down
11 changes: 7 additions & 4 deletions lambda/shared/src/main/scala/feral/lambda/Context.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package feral.lambda

import cats.effect.IO
import cats.~>

import scala.concurrent.duration.FiniteDuration

final case class Context(
final case class Context[F[_]](
functionName: String,
functionVersion: String,
invokedFunctionArn: String,
Expand All @@ -30,8 +30,11 @@ final case class Context(
logStreamName: String,
identity: Option[CognitoIdentity],
clientContext: Option[ClientContext],
remainingTime: IO[FiniteDuration]
)
remainingTime: F[FiniteDuration]
) {
def mapK[G[_]](fk: F ~> G): Context[G] =
this.copy(remainingTime = fk(this.remainingTime))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably isn't the best approach for bincompat but it's enough for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mapK? I think that's fine. Anything case classes however is dubious 馃槅

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I meant the case class copy. We'll probably want to make Context itself not a case class, right?

Copy link
Member Author

@armanbilge armanbilge Nov 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah absolutely, Context we should (eventually) make an ordinary class. But the bigger issue for bincompat is our use of case classes for the events/responses for circe derivation ...

}

object Context extends ContextCompanionPlatform

Expand Down
8 changes: 7 additions & 1 deletion lambda/shared/src/main/scala/feral/lambda/IOLambda.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@ package lambda
import cats.effect.IO
import io.circe.Decoder
import io.circe.Encoder
import cats.effect.kernel.Resource

abstract class IOLambda[Event, Result](
implicit private[lambda] val decoder: Decoder[Event],
private[lambda] val encoder: Encoder[Result]
) extends IOLambdaPlatform[Event, Result]
with IOSetup {

def apply(event: Event, context: Context, setup: Setup): IO[Option[Result]]
final type Setup = Lambda[IO, Event, Result]

final override protected def setup: Resource[IO, Setup] = run

def run: Resource[IO, Lambda[IO, Event, Result]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the spirit of bikeshedding, I like install for this better than run or handle. I think the latter two imply this would run for every invocation of the Lambda, but if I understand the lifecycle correctly, it will really only run during initialization, and subsequent requests will basically go straight to the Lambda[IO, Event, Result].

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

馃憤 that was exactly my thinking as well, if we think of it as a verb/action ("install the lambda"). If we think of it as a thing/noun then it is the "handler".


}
21 changes: 21 additions & 0 deletions lambda/shared/src/main/scala/feral/lambda/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package feral

package object lambda {
type Lambda[F[_], Event, Result] = (Event, Context[F]) => F[Option[Result]]
}
Loading