Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracing doodles #50

Closed
wants to merge 10 commits into from
Closed
20 changes: 17 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,24 @@ val catsEffectVersion = "3.2.9"
val circeVersion = "0.14.1"
val fs2Version = "3.2.2"
val http4sVersion = "0.23.6"
val natchezVersion = "0.1.5"

lazy val root =
project
.in(file("."))
.aggregate(
core.js,
core.jvm,
lambdaCloudFormationCustomResource.js,
lambdaCloudFormationCustomResource.jvm,
lambda.js,
lambda.jvm,
lambdaEvents.js,
lambdaEvents.jvm,
lambdaNatchez.js,
lambdaNatchez.jvm,
lambdaApiGatewayProxyHttp4s.js,
lambdaApiGatewayProxyHttp4s.jvm
lambdaApiGatewayProxyHttp4s.jvm,
lambdaCloudFormationCustomResource.js,
lambdaCloudFormationCustomResource.jvm
)
.enablePlugins(NoPublishPlugin)

Expand Down Expand Up @@ -98,6 +101,17 @@ lazy val lambdaEvents = crossProject(JSPlatform, JVMPlatform)
)
)

lazy val lambdaNatchez = crossProject(JSPlatform, JVMPlatform)
.crossType(CrossType.Pure)
.in(file("lambda-natchez"))
.settings(
name := "feral-lambda-natchez",
libraryDependencies ++= Seq(
"org.tpolecat" %%% "natchez-core" % natchezVersion
)
)
.dependsOn(lambda, lambdaEvents)

lazy val lambdaApiGatewayProxyHttp4s = crossProject(JSPlatform, JVMPlatform)
.crossType(CrossType.Pure)
.in(file("lambda-api-gateway-proxy-http4s"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package feral.lambda

import cats.effect.IO
import cats.effect.Resource
import cats.effect.SyncIO
import cats.effect.Sync
import cats.effect.syntax.all._
import cats.syntax.all._
import feral.lambda.events.ApiGatewayProxyEventV2
import feral.lambda.events.ApiGatewayProxyStructuredResultV2
import fs2.Stream
Expand All @@ -33,56 +34,52 @@ import org.http4s.Uri
import org.typelevel.vault.Key
import org.typelevel.vault.Vault

abstract class ApiGatewayProxyHttp4sLambda
extends IOLambda[ApiGatewayProxyEventV2, ApiGatewayProxyStructuredResultV2] {
object ApiGatewayProxyHttp4sLambda {

val ContextKey = Key.newKey[SyncIO, Context].unsafeRunSync()
val EventKey = Key.newKey[SyncIO, ApiGatewayProxyEventV2].unsafeRunSync()

def routes: Resource[IO, HttpRoutes[IO]]

protected type Setup = HttpRoutes[IO]
protected override final val setup: Resource[IO, HttpRoutes[IO]] = routes

override final def apply(
event: ApiGatewayProxyEventV2,
context: Context,
routes: HttpRoutes[IO]): IO[Some[ApiGatewayProxyStructuredResultV2]] =
def apply[F[_]: Sync](
f: (Key[Context[F]], Key[ApiGatewayProxyEventV2]) => Resource[F, HttpRoutes[F]])
: Resource[F, Lambda[F, ApiGatewayProxyEventV2, ApiGatewayProxyStructuredResultV2]] =
for {
method <- IO.fromEither(Method.fromString(event.requestContext.http.method))
uri <- IO.fromEither(Uri.fromString(event.rawPath))
headers = Headers(event.headers.toList)
requestBody =
if (event.isBase64Encoded)
Stream.fromOption[IO](event.body).through(fs2.text.base64.decode)
else
Stream.fromOption[IO](event.body).through(fs2.text.utf8.encode)
request = Request(
method,
uri,
headers = headers,
body = requestBody,
attributes = Vault.empty.insert(ContextKey, context).insert(EventKey, event))
response <- routes(request).getOrElse(Response.notFound[IO])
isBase64Encoded = !response.charset.contains(Charset.`UTF-8`)
responseBody <- (if (isBase64Encoded)
response.body.through(fs2.text.base64.encode)
else
response.body.through(fs2.text.utf8.decode)).compile.foldMonoid
} yield Some(
ApiGatewayProxyStructuredResultV2(
response.status.code,
response
.headers
.headers
.map {
case Header.Raw(name, value) =>
name.toString -> value
}
.toMap,
responseBody,
isBase64Encoded
contextKey <- Key.newKey[F, Context[F]].toResource
eventKey <- Key.newKey[F, ApiGatewayProxyEventV2].toResource
routes <- f(contextKey, eventKey)
} yield { (event: ApiGatewayProxyEventV2, context: Context[F]) =>
for {
method <- Method.fromString(event.requestContext.http.method).liftTo[F]
uri <- Uri.fromString(event.rawPath).liftTo[F]
headers = Headers(event.headers.toList)
requestBody =
if (event.isBase64Encoded)
Stream.fromOption[F](event.body).through(fs2.text.base64.decode)
else
Stream.fromOption[F](event.body).through(fs2.text.utf8.encode)
request = Request(
method,
uri,
headers = headers,
body = requestBody,
attributes = Vault.empty.insert(contextKey, context).insert(eventKey, event))
response <- routes(request).getOrElse(Response.notFound[F])
isBase64Encoded = !response.charset.contains(Charset.`UTF-8`)
responseBody <- (if (isBase64Encoded)
response.body.through(fs2.text.base64.encode)
else
response.body.through(fs2.text.utf8.decode)).compile.foldMonoid
} yield Some(
ApiGatewayProxyStructuredResultV2(
response.status.code,
response
.headers
.headers
.map {
case Header.Raw(name, value) =>
name.toString -> value
}
.toMap,
responseBody,
isBase64Encoded
)
)
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,67 +14,61 @@
* limitations under the License.
*/

package feral.lambda.cloudformation
package feral.lambda
package cloudformation

import cats.effect._
import cats.ApplicativeThrow
import cats.MonadThrow
import cats.effect.kernel.Resource
import cats.syntax.all._
import feral.lambda.cloudformation.CloudFormationCustomResourceHandler.stackTraceLines
import feral.lambda.cloudformation.CloudFormationRequestType._
import feral.lambda.{Context, IOLambda}
import io.circe._
import io.circe.syntax._
import org.http4s.Method.POST
import org.http4s.client.Client
import org.http4s.circe._
import org.http4s.client.dsl.io._
import org.http4s.ember.client.EmberClientBuilder
import org.http4s.client.Client
import org.http4s.client.dsl.Http4sClientDsl

import java.io.{PrintWriter, StringWriter}
import java.io.PrintWriter
import java.io.StringWriter

trait CloudFormationCustomResource[F[_], Input, Output] {
def createResource(
event: CloudFormationCustomResourceRequest[Input],
context: Context): F[HandlerResponse[Output]]
context: Context[F]): F[HandlerResponse[Output]]
def updateResource(
event: CloudFormationCustomResourceRequest[Input],
context: Context): F[HandlerResponse[Output]]
context: Context[F]): F[HandlerResponse[Output]]
def deleteResource(
event: CloudFormationCustomResourceRequest[Input],
context: Context): F[HandlerResponse[Output]]
context: Context[F]): F[HandlerResponse[Output]]
}

abstract class CloudFormationCustomResourceHandler[Input: Decoder, Output: Encoder]
extends IOLambda[CloudFormationCustomResourceRequest[Input], Unit] {
type Setup = (Client[IO], CloudFormationCustomResource[IO, Input, Output])

override final def setup: Resource[IO, Setup] =
client.mproduct(handler)

protected def client: Resource[IO, Client[IO]] =
EmberClientBuilder.default[IO].build
object CloudFormationCustomResource {

def handler(client: Client[IO]): Resource[IO, CloudFormationCustomResource[IO, Input, Output]]
def apply[F[_]: MonadThrow, Input, Output: Encoder](client: Client[F])(
handler: Resource[F, CloudFormationCustomResource[F, Input, Output]])
: Resource[F, Lambda[F, CloudFormationCustomResourceRequest[Input], Unit]] =
handler.map { handler => (event, context) =>
val http4sClientDsl = new Http4sClientDsl[F] {}
import http4sClientDsl._

override def apply(
event: CloudFormationCustomResourceRequest[Input],
context: Context,
setup: Setup): IO[Option[Unit]] =
(event.RequestType match {
case CreateRequest => setup._2.createResource(event, context)
case UpdateRequest => setup._2.updateResource(event, context)
case DeleteRequest => setup._2.deleteResource(event, context)
case OtherRequestType(other) => illegalRequestType(other)
}).attempt
.map(_.fold(exceptionResponse(event)(_), successResponse(event)(_)))
.flatMap { resp => setup._1.successful(POST(resp.asJson, event.ResponseURL)) }
.as(None)
(event.RequestType match {
case CreateRequest => handler.createResource(event, context)
case UpdateRequest => handler.updateResource(event, context)
case DeleteRequest => handler.deleteResource(event, context)
case OtherRequestType(other) => illegalRequestType(other)
}).attempt
.map(_.fold(exceptionResponse(event)(_), successResponse(event)(_)))
.flatMap { resp => client.successful(POST(resp.asJson, event.ResponseURL)) }
.as(None)
}

private def illegalRequestType[A](other: String): IO[A] =
private def illegalRequestType[F[_]: ApplicativeThrow, A](other: String): F[A] =
(new IllegalArgumentException(
s"unexpected CloudFormation request type `$other``"): Throwable).raiseError[IO, A]
s"unexpected CloudFormation request type `$other``"): Throwable).raiseError[F, A]

private def exceptionResponse(req: CloudFormationCustomResourceRequest[Input])(
private def exceptionResponse[Input](req: CloudFormationCustomResourceRequest[Input])(
ex: Throwable): CloudFormationCustomResourceResponse =
CloudFormationCustomResourceResponse(
Status = RequestResponseStatus.Failed,
Expand All @@ -87,7 +81,8 @@ abstract class CloudFormationCustomResourceHandler[Input: Decoder, Output: Encod
"StackTrace" -> Json.arr(stackTraceLines(ex).map(Json.fromString): _*)).asJson
)

private def successResponse(req: CloudFormationCustomResourceRequest[Input])(
private def successResponse[Input, Output: Encoder](
req: CloudFormationCustomResourceRequest[Input])(
res: HandlerResponse[Output]): CloudFormationCustomResourceResponse =
CloudFormationCustomResourceResponse(
Status = RequestResponseStatus.Success,
Expand All @@ -99,12 +94,10 @@ abstract class CloudFormationCustomResourceHandler[Input: Decoder, Output: Encod
Data = res.data.asJson
)

}

object CloudFormationCustomResourceHandler {
def stackTraceLines(throwable: Throwable): List[String] = {
val writer = new StringWriter()
throwable.printStackTrace(new PrintWriter(writer))
writer.toString.linesIterator.toList
}

}
35 changes: 35 additions & 0 deletions lambda-natchez/src/main/scala/feral/lambda/natchez/HasKernel.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package feral.lambda.natchez

import feral.lambda.events.ApiGatewayProxyEventV2
import natchez.Kernel

trait HasKernel[Event] {
def extract(event: Event): Kernel
}

object HasKernel extends HasKernelLowPriority {
@inline def apply[E](implicit hk: HasKernel[E]): hk.type = hk

implicit val apiGateProxyEventV2Kernel: HasKernel[ApiGatewayProxyEventV2] =
e => Kernel(e.headers)
}
Comment on lines +26 to +31
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the nice things about having all the events together in the same module is it makes it easy to provide these implicits. Although I'm doubtful if any will have a special implementation except this one.


private[natchez] sealed class HasKernelLowPriority {
implicit def emptyKernel[E]: HasKernel[E] = _ => Kernel(Map.empty)
}
27 changes: 27 additions & 0 deletions lambda-natchez/src/main/scala/feral/lambda/natchez/LiftTrace.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package feral.lambda.natchez

import cats.~>
import natchez.Span
import natchez.Trace

// https://github.com/tpolecat/natchez/pull/448
trait LiftTrace[F[_], G[_]] {
def run[A](span: Span[F])(f: Trace[G] => G[A]): F[A]
def liftK: F ~> G
}
25 changes: 25 additions & 0 deletions lambda-natchez/src/main/scala/feral/lambda/natchez/Tags.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2021 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package feral.lambda.natchez

import natchez.TraceValue

object Tags {
private[this] val prefix = "aws"
def arn(s: String): (String, TraceValue) = s"$prefix.arn" -> s
def requestId(s: String): (String, TraceValue) = s"$prefix.requestId" -> s
}
Loading