Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Sources/SwiftAwsLambda/Lambda+Codable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ extension Lambda {

// for testing
internal static func run<In: Decodable, Out: Encodable>(configuration: Configuration = .init(), closure: @escaping LambdaCodableClosure<In, Out>) -> LambdaLifecycleResult {
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
return self.run(configuration: configuration, handler: LambdaClosureWrapper(closure))
}
}

Expand Down Expand Up @@ -64,7 +64,7 @@ public class LambdaCodableCodec<In: Decodable, Out: Encodable> {

/// Default implementation of `Encodable` -> `[UInt8]` encoding and `[UInt8]` -> `Decodable' decoding
public extension LambdaCodableHandler {
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping (LambdaResult) -> Void) {
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping LambdaCallback) {
switch self.codec.decode(payload) {
case .failure(let error):
return callback(.failure(Errors.requestDecoding(error)))
Expand Down
2 changes: 1 addition & 1 deletion Sources/SwiftAwsLambda/Lambda+String.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ extension Lambda {

// for testing
internal static func run(configuration: Configuration = .init(), _ closure: @escaping LambdaStringClosure) -> LambdaLifecycleResult {
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
return self.run(configuration: configuration, handler: LambdaClosureWrapper(closure))
}
}

Expand Down
130 changes: 87 additions & 43 deletions Sources/SwiftAwsLambda/Lambda.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,69 @@ public enum Lambda {
self.run(handler: handler)
}

/// Run a Lambda defined by implementing the `LambdaHandler` protocol via a `LambdaHandlerFactory`.
///
/// - note: This is a blocking operation that will run forever, as it's lifecycle is managed by the AWS Lambda Runtime Engine.
@inlinable
public static func run(_ factory: @escaping LambdaHandlerFactory) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we use a typealias LambdaHandlerFactory

I would guess we will also offer just a handler closure option. Do we want to name those run() {} and the factory closures functions like this one createHandlerAndRun() {}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personally like the shorter name but can consider makeAndRun in a different PR?

self.run(factory: factory)
}

/// Run a Lambda defined by implementing the `LambdaHandler` protocol via a factory.
///
/// - note: This is a blocking operation that will run forever, as it's lifecycle is managed by the AWS Lambda Runtime Engine.
@inlinable
public static func run(_ factory: @escaping (EventLoop) throws -> LambdaHandler) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we don't use a typealias.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's because its used in much less place and not user facing, typically you would pass in a ctor here

Copy link
Contributor Author

@tomerd tomerd Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally like the shorter name but can consider makeAndRun in a different PR? wrong comment

self.run(factory: factory)
}

// for testing and internal use
@usableFromInline
@discardableResult
internal static func run(configuration: Configuration = .init(), closure: @escaping LambdaClosure) -> LambdaLifecycleResult {
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
return self.run(configuration: configuration, handler: LambdaClosureWrapper(closure))
}

// for testing and internal use
@usableFromInline
@discardableResult
internal static func run(configuration: Configuration = .init(), handler: LambdaHandler) -> LambdaLifecycleResult {
return self.run(configuration: configuration, factory: { _, callback in callback(.success(handler)) })
}

// for testing and internal use
@usableFromInline
@discardableResult
internal static func run(configuration: Configuration = .init(), factory: @escaping (EventLoop) throws -> LambdaHandler) -> LambdaLifecycleResult {
return self.run(configuration: configuration, factory: { (eventloop: EventLoop, callback: (Result<LambdaHandler, Error>) -> Void) -> Void in
do {
let handler = try factory(eventloop)
callback(.success(handler))
} catch {
callback(.failure(error))
}
})
}

// for testing and internal use
@usableFromInline
@discardableResult
internal static func run(handler: LambdaHandler, configuration: Configuration = .init()) -> LambdaLifecycleResult {
internal static func run(configuration: Configuration = .init(), factory: @escaping LambdaHandlerFactory) -> LambdaLifecycleResult {
do {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) // only need one thread, will improve performance
defer { try! eventLoopGroup.syncShutdownGracefully() }
let result = try self.runAsync(eventLoopGroup: eventLoopGroup, handler: handler, configuration: configuration).wait()
let result = try self.runAsync(eventLoopGroup: eventLoopGroup, configuration: configuration, factory: factory).wait()
return .success(result)
} catch {
return .failure(error)
}
}

internal static func runAsync(eventLoopGroup: EventLoopGroup, handler: LambdaHandler, configuration: Configuration) -> EventLoopFuture<Int> {
internal static func runAsync(eventLoopGroup: EventLoopGroup, configuration: Configuration, factory: @escaping LambdaHandlerFactory) -> EventLoopFuture<Int> {
Backtrace.install()
var logger = Logger(label: "Lambda")
logger.logLevel = configuration.general.logLevel
let lifecycle = Lifecycle(eventLoop: eventLoopGroup.next(), logger: logger, configuration: configuration, handler: handler)
let lifecycle = Lifecycle(eventLoop: eventLoopGroup.next(), logger: logger, configuration: configuration, factory: factory)
let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in
logger.info("intercepted signal: \(signal)")
lifecycle.stop()
Expand Down Expand Up @@ -132,31 +169,33 @@ public enum Lambda {
private let eventLoop: EventLoop
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should move this Lambda.Lifecycle into another file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do in another PR

private let logger: Logger
private let configuration: Configuration
private let handler: LambdaHandler
private let factory: LambdaHandlerFactory

private var _state = LifecycleState.idle
private var _state = State.idle
private let stateLock = Lock()

init(eventLoop: EventLoop, logger: Logger, configuration: Configuration, handler: LambdaHandler) {
init(eventLoop: EventLoop, logger: Logger, configuration: Configuration, factory: @escaping LambdaHandlerFactory) {
self.eventLoop = eventLoop
self.logger = logger
self.configuration = configuration
self.handler = handler
self.factory = factory
}

deinit {
precondition(self.state == .shutdown, "invalid state \(self.state)")
guard case .shutdown = self.state else {
preconditionFailure("invalid state \(self.state)")
}
}

private var state: LifecycleState {
private var state: State {
get {
return self.stateLock.withLock {
self._state
}
}
set {
self.stateLock.withLockVoid {
precondition(newValue.rawValue > _state.rawValue, "invalid state \(newValue) after \(self._state)")
precondition(newValue.order > _state.order, "invalid state \(newValue) after \(self._state)")
self._state = newValue
}
}
Expand All @@ -167,10 +206,10 @@ public enum Lambda {
self.state = .initializing
var logger = self.logger
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
let runner = LambdaRunner(eventLoop: self.eventLoop, configuration: self.configuration, lambdaHandler: self.handler)
return runner.initialize(logger: logger).flatMap { _ in
self.state = .active
return self.run(runner: runner)
let runner = LambdaRunner(eventLoop: self.eventLoop, configuration: self.configuration)
return runner.initialize(logger: logger, factory: self.factory).flatMap { handler in
self.state = .active(runner, handler)
return self.run()
}
}

Expand All @@ -185,18 +224,18 @@ public enum Lambda {
}

@inline(__always)
private func run(runner: LambdaRunner) -> EventLoopFuture<Int> {
private func run() -> EventLoopFuture<Int> {
let promise = self.eventLoop.makePromise(of: Int.self)

func _run(_ count: Int) {
switch self.state {
case .active:
case .active(let runner, let handler):
if self.configuration.lifecycle.maxTimes > 0, count >= self.configuration.lifecycle.maxTimes {
return promise.succeed(count)
}
var logger = self.logger
logger[metadataKey: "lifecycleIteration"] = "\(count)"
runner.run(logger: logger).whenComplete { result in
runner.run(logger: logger, handler: handler).whenComplete { result in
switch result {
case .success:
// recursive! per aws lambda runtime spec the polling requests are to be done one at a time
Expand All @@ -216,6 +255,29 @@ public enum Lambda {

return promise.futureResult
}

private enum State {
case idle
case initializing
case active(LambdaRunner, LambdaHandler)
case stopping
case shutdown

internal var order: Int {
switch self {
case .idle:
return 0
case .initializing:
return 1
case .active:
return 2
case .stopping:
return 3
case .shutdown:
return 4
}
}
}
}

@usableFromInline
Expand Down Expand Up @@ -293,44 +355,26 @@ public enum Lambda {
return "\(Configuration.self)\n \(self.general))\n \(self.lifecycle)\n \(self.runtimeEngine)"
}
}

private enum LifecycleState: Int {
case idle
case initializing
case active
case stopping
case shutdown
}
}

/// A result type for a Lambda that returns a `[UInt8]`.
public typealias LambdaResult = Result<[UInt8], Error>

public typealias LambdaCallback = (LambdaResult) -> Void

/// A processing closure for a Lambda that takes a `[UInt8]` and returns a `LambdaResult` result type asynchronously.
/// A processing closure for a Lambda that takes a `[UInt8]` and returns a `LambdaResult` result type asynchronously via`LambdaCallback` .
public typealias LambdaClosure = (Lambda.Context, [UInt8], LambdaCallback) -> Void

/// A result type for a Lambda initialization.
public typealias LambdaInitResult = Result<Void, Error>

/// A callback to provide the result of Lambda initialization.
public typealias LambdaInitCallBack = (LambdaInitResult) -> Void
public typealias LambdaInitCallBack = (Result<LambdaHandler, Error>) -> Void

public typealias LambdaHandlerFactory = (EventLoop, LambdaInitCallBack) -> Void

/// A processing protocol for a Lambda that takes a `[UInt8]` and returns a `LambdaResult` result type asynchronously.
/// A processing protocol for a Lambda that takes a `[UInt8]` and returns a `LambdaResult` result type asynchronously via `LambdaCallback`.
public protocol LambdaHandler {
/// Initializes the `LambdaHandler`.
func initialize(callback: @escaping LambdaInitCallBack)
/// Handles the Lambda request.
func handle(context: Lambda.Context, payload: [UInt8], callback: @escaping LambdaCallback)
}

extension LambdaHandler {
@inlinable
public func initialize(callback: @escaping LambdaInitCallBack) {
callback(.success(()))
}
}

@usableFromInline
internal typealias LambdaLifecycleResult = Result<Int, Error>

Expand Down
52 changes: 25 additions & 27 deletions Sources/SwiftAwsLambda/LambdaRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,26 @@ import NIO
/// LambdaRunner manages the Lambda runtime workflow, or business logic.
internal struct LambdaRunner {
private let runtimeClient: LambdaRuntimeClient
private let lambdaHandler: LambdaHandler
private let eventLoop: EventLoop
private let lifecycleId: String
private let offload: Bool

init(eventLoop: EventLoop, configuration: Lambda.Configuration, lambdaHandler: LambdaHandler) {
init(eventLoop: EventLoop, configuration: Lambda.Configuration) {
self.eventLoop = eventLoop
self.runtimeClient = LambdaRuntimeClient(eventLoop: self.eventLoop, configuration: configuration.runtimeEngine)
self.lambdaHandler = lambdaHandler
self.lifecycleId = configuration.lifecycle.id
self.offload = configuration.runtimeEngine.offload
}

/// Run the user provided initializer. This *must* only be called once.
///
/// - Returns: An `EventLoopFuture<Void>` fulfilled with the outcome of the initialization.
func initialize(logger: Logger) -> EventLoopFuture<Void> {
/// - Returns: An `EventLoopFuture<LambdaHandler>` fulfilled with the outcome of the initialization.
func initialize(logger: Logger, factory: @escaping LambdaHandlerFactory) -> EventLoopFuture<LambdaHandler> {
logger.debug("initializing lambda")
// We need to use `flatMap` instead of `whenFailure` to ensure we complete reporting the result before stopping.
return self.lambdaHandler.initialize(eventLoop: self.eventLoop,
lifecycleId: self.lifecycleId,
offload: self.offload).peekError { error in
// 1. create the handler from the factory
let future = bootstrap(eventLoop: self.eventLoop, lifecycleId: self.lifecycleId, offload: self.offload, factory: factory)
// 2. report initialization error if one occured
return future.peekError { error in
self.runtimeClient.reportInitializationError(logger: logger, error: error).peekError { reportingError in
// We're going to bail out because the init failed, so there's not a lot we can do other than log
// that we couldn't report this error back to the runtime.
Expand All @@ -49,24 +47,24 @@ internal struct LambdaRunner {
}
}

func run(logger: Logger) -> EventLoopFuture<Void> {
func run(logger: Logger, handler: LambdaHandler) -> EventLoopFuture<Void> {
logger.debug("lambda invocation sequence starting")
// 1. request work from lambda runtime engine
return self.runtimeClient.requestWork(logger: logger).peekError { error in
logger.error("could not fetch work from lambda runtime engine: \(error)")
}.flatMap { invocation, payload in
// 2. send work to handler
let context = Lambda.Context(logger: logger, eventLoop: self.eventLoop, invocation: invocation)
logger.debug("sending work to lambda handler \(self.lambdaHandler)")
logger.debug("sending work to lambda handler \(handler)")

// TODO: This is just for now, so that we can work with ByteBuffers only
// in the LambdaRuntimeClient
let bytes = [UInt8](payload.readableBytesView)
return self.lambdaHandler.handle(eventLoop: self.eventLoop,
lifecycleId: self.lifecycleId,
offload: self.offload,
context: context,
payload: bytes)
return handler.handle(eventLoop: self.eventLoop,
lifecycleId: self.lifecycleId,
offload: self.offload,
context: context,
payload: bytes)
.map {
// TODO: This mapping shall be removed as soon as the LambdaHandler protocol
// works with ByteBuffer? instead of [UInt8]
Expand All @@ -93,24 +91,24 @@ internal struct LambdaRunner {
}
}

private extension LambdaHandler {
func initialize(eventLoop: EventLoop, lifecycleId: String, offload: Bool) -> EventLoopFuture<Void> {
private func bootstrap(eventLoop: EventLoop, lifecycleId: String, offload: Bool, factory: @escaping LambdaHandlerFactory) -> EventLoopFuture<LambdaHandler> {
let promise = eventLoop.makePromise(of: LambdaHandler.self)
if offload {
// offloading so user code never blocks the eventloop
let promise = eventLoop.makePromise(of: Void.self)
if offload {
DispatchQueue(label: "lambda-\(lifecycleId)").async {
self.initialize { promise.completeWith($0) }
}
} else {
self.initialize { promise.completeWith($0) }
DispatchQueue(label: "lambda-\(lifecycleId)").async {
factory(eventLoop, promise.completeWith)
}
return promise.futureResult
} else {
factory(eventLoop, promise.completeWith)
}
return promise.futureResult
}

private extension LambdaHandler {
func handle(eventLoop: EventLoop, lifecycleId: String, offload: Bool, context: Lambda.Context, payload: [UInt8]) -> EventLoopFuture<LambdaResult> {
// offloading so user code never blocks the eventloop
let promise = eventLoop.makePromise(of: LambdaResult.self)
if offload {
// offloading so user code never blocks the eventloop
DispatchQueue(label: "lambda-\(lifecycleId)").async {
self.handle(context: context, payload: payload) { result in
promise.succeed(result)
Expand Down
1 change: 1 addition & 0 deletions Tests/SwiftAwsLambdaTests/Lambda+CodeableTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ extension CodableLambdaTest {
("testFailure", testFailure),
("testClosureSuccess", testClosureSuccess),
("testClosureFailure", testClosureFailure),
("testBootstrapFailure", testBootstrapFailure),
]
}
}
Loading