Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2020d6b
return HTTP accepted on error
sebsto Aug 1, 2025
6e01c6e
force exit() when we loose connection to Lambda service
sebsto Aug 1, 2025
166cd46
propagate the connection closed info through a Future
sebsto Aug 3, 2025
a69ed54
fix typos
sebsto Aug 3, 2025
04d9fc7
fix unit tests
sebsto Aug 3, 2025
092da82
Merge branch 'main' into sebsto/shutdown_on_lost_connection
sebsto Aug 3, 2025
5efb706
Merge branch 'main' into sebsto/shutdown_on_lost_connection
sebsto Aug 4, 2025
1d98a7c
Merge branch 'main' into sebsto/shutdown_on_lost_connection
sebsto Aug 5, 2025
4b23d4f
Merge branch 'main' into sebsto/shutdown_on_lost_connection
sebsto Aug 5, 2025
5822e0a
Merge branch 'main' into sebsto/shutdown_on_lost_connection
sebsto Aug 5, 2025
025a0e5
simplify by checking connection state in the `nextInvocation()` call
sebsto Aug 7, 2025
ce8b567
introducing a new connection state "lostConnection"
sebsto Aug 7, 2025
be4cb20
add state change
sebsto Aug 7, 2025
b37ea0e
fix lost continuation
sebsto Aug 7, 2025
cd00948
fix compilation error
sebsto Aug 7, 2025
008c542
DRY: move the error handling to the _run() function
sebsto Aug 7, 2025
9dcb4b3
fix a case where continuation was resumed twice
sebsto Aug 7, 2025
f2d94a2
fix unit test
sebsto Aug 7, 2025
852391e
swift format
sebsto Aug 7, 2025
b13bf5c
remove comment on max payload size
sebsto Aug 7, 2025
1aa07b1
Merge branch 'main' into sebsto/shutdown_on_lost_connection
sebsto Aug 24, 2025
9c283c3
further simplify by removing the new state `lostConnection`
sebsto Aug 24, 2025
a620a2f
remove unecessary code
sebsto Aug 24, 2025
f671228
restrict access to channel handler state variable
sebsto Aug 25, 2025
b0234f5
add a unit test to verify that an error is thrown when the connection…
sebsto Aug 27, 2025
482e09e
swift-format
sebsto Aug 27, 2025
156e6c7
make sure connectionToControlPlaneLost error is triggered by the test
sebsto Aug 27, 2025
a0b1d57
give more time to the server to close the connection
sebsto Aug 27, 2025
54459e0
add catch for IOError
sebsto Aug 27, 2025
89cd259
swift-format
sebsto Aug 27, 2025
6b07955
remove compilation warning
sebsto Aug 27, 2025
86ccbf4
improve test with a timeout
sebsto Aug 28, 2025
cbf9c5e
remove debugging print statements
sebsto Aug 28, 2025
9a70915
add logger trace
sebsto Aug 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Sources/AWSLambdaRuntime/Lambda+LocalServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ internal struct LambdaHTTPServer {
await self.responsePool.push(
LocalServerResponse(
id: requestId,
status: .ok,
status: .accepted,
// the local server has no mecanism to collect headers set by the lambda function
headers: HTTPHeaders(),
body: body,
Expand Down
6 changes: 6 additions & 0 deletions Sources/AWSLambdaRuntime/Lambda.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public enum Lambda {
var logger = logger
do {
while !Task.isCancelled {

logger.trace("Waiting for next invocation")
let (invocation, writer) = try await runtimeClient.nextInvocation()
logger[metadataKey: "aws-request-id"] = "\(invocation.metadata.requestID)"

Expand Down Expand Up @@ -76,14 +78,18 @@ public enum Lambda {
logger: logger
)
)
logger.trace("Handler finished processing invocation")
} catch {
logger.trace("Handler failed processing invocation", metadata: ["Handler error": "\(error)"])
try await writer.reportError(error)
continue
}
logger.handler.metadata.removeValue(forKey: "aws-request-id")
}
} catch is CancellationError {
// don't allow cancellation error to propagate further
}

}

/// The default EventLoop the Lambda is scheduled on.
Expand Down
31 changes: 22 additions & 9 deletions Sources/AWSLambdaRuntime/LambdaRuntime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,29 @@ public final class LambdaRuntime<Handler>: Sendable where Handler: StreamingLamb
let ip = String(ipAndPort[0])
guard let port = Int(ipAndPort[1]) else { throw LambdaRuntimeError(code: .invalidPort) }

try await LambdaRuntimeClient.withRuntimeClient(
configuration: .init(ip: ip, port: port),
eventLoop: self.eventLoop,
logger: self.logger
) { runtimeClient in
try await Lambda.runLoop(
runtimeClient: runtimeClient,
handler: handler,
do {
try await LambdaRuntimeClient.withRuntimeClient(
configuration: .init(ip: ip, port: port),
eventLoop: self.eventLoop,
logger: self.logger
)
) { runtimeClient in
try await Lambda.runLoop(
runtimeClient: runtimeClient,
handler: handler,
logger: self.logger
)
}
} catch {
// catch top level errors that have not been handled until now
// this avoids the runtime to crash and generate a backtrace
self.logger.error("LambdaRuntime.run() failed with error", metadata: ["error": "\(error)"])
if let error = error as? LambdaRuntimeError,
error.code != .connectionToControlPlaneLost
{
// if the error is a LambdaRuntimeError but not a connection error,
// we rethrow it to preserve existing behaviour
throw error
}
}

} else {
Expand Down
29 changes: 22 additions & 7 deletions Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
private let configuration: Configuration

private var connectionState: ConnectionState = .disconnected

private var lambdaState: LambdaState = .idle(previousRequestID: nil)
private var closingState: ClosingState = .notClosing

Expand All @@ -118,10 +119,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
} catch {
result = .failure(error)
}

await runtime.close()

//try? await runtime.close()
return try result.get()
}

Expand Down Expand Up @@ -163,12 +161,16 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {

@usableFromInline
func nextInvocation() async throws -> (Invocation, Writer) {
try await withTaskCancellationHandler {

try Task.checkCancellation()

return try await withTaskCancellationHandler {
switch self.lambdaState {
case .idle:
self.lambdaState = .waitingForNextInvocation
let handler = try await self.makeOrGetConnection()
let invocation = try await handler.nextInvocation()

guard case .waitingForNextInvocation = self.lambdaState else {
fatalError("Invalid state: \(self.lambdaState)")
}
Expand Down Expand Up @@ -283,7 +285,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
case (.connecting(let array), .notClosing):
self.connectionState = .disconnected
for continuation in array {
continuation.resume(throwing: LambdaRuntimeError(code: .lostConnectionToControlPlane))
continuation.resume(throwing: LambdaRuntimeError(code: .connectionToControlPlaneLost))
}

case (.connecting(let array), .closing(let continuation)):
Expand Down Expand Up @@ -363,7 +365,19 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
)
channel.closeFuture.whenComplete { result in
self.assumeIsolated { runtimeClient in

// resume any pending continuation on the handler
if case .connected(_, let handler) = runtimeClient.connectionState {
if case .connected(_, let lambdaState) = handler.state {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be necessary, as LambdaChannelHandler.channelInactive will have resumed the continuation (with error ioOnClosedChannel). None of the tests trigger this code either.

Copy link
Contributor Author

@sebsto sebsto Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LambdaChannelHandlerHandler.channelInnactive is indeed called when the server closes a connection but not in the state waitingForNextInvocation.

When the server closes the connection, the state in the LambdaChannelHandler is
connected and LambdaState.idle

To reproduce what I observe:

  1. MAX_INVOCATIONS=3 MODE=json PORT=7777 LOG_LEVEL=trace swift run MockServer
  2. from another terminal, on the sebsto/shutdown_on_lost_connection branch : cd Examples/HelloJSON ; LAMBDA_USE_LOCAL_DEPS=../.. LOG_LEVEL=trace AWS_LAMBDA_RUNTIME_API=127.0.0.1:7777 swift run

The MockServer will start and will shutdown after three invocations.
The runtime will pull and process three events from the MockServer, and then will receive a connection closed event when trying to fetch the next event.

You will see that sometimes, the runtime catches the closed connection and gracefully shutdowns with Connection refused (errno: 61) and sometimes it hangs. In both cases LambdaChannelHandler.channelInactive() is called with self state == .connected(_, .idle) and lastError == nil.

Copy link
Contributor Author

@sebsto sebsto Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change to .idle happens after the response has been sent on this line

Looks like we have two behaviors, depending when the client detects the connection in closed.

Either the response has been sent, LambdaChannelHandler is in .idle state and the runtime detects the close of the connection before nextInvocation() has a chance to change the status and to send the next request. nextInvocation() correctly reports Connection refused (errno: 61) (we can trap the error by adding a do cacth block here

Either nextInvocation() already changed the state. It created a new promise and switched the state again to .waitingForNextInvocation (on this line)

This is where we have a problem, because this new promise is never fulfilled. We are after the call to LambdaChannelHandler.channelInactive() and there is no throwing functions that we can wrap in a do catch to trap the error.

Copy link
Contributor Author

@sebsto sebsto Aug 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of capturing the new unfulfilled promise in channel.closeFuture.whenComplete, I can capture it in LambdaRuntimeClient.channelClosed() which is called by the latter. It's not a big difference.

Anyway, when we detect a connection is closed in the LambdaRuntimeClient, sometimes it happens after the call to LambdaChannelHandler.channelInactive()

And this is IMHO the code that allows to fulfill the promise.

      if case .connected(_, let handler) = self.connectionState {
          if case .connected(_, let lambdaState) = handler.state {
              if case .waitingForNextInvocation(let continuation) = lambdaState {
                  continuation.resume(throwing: LambdaRuntimeError(code: .connectionToControlPlaneLost))
              }
          }
      }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a test that replicates this code being called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, b0234f5

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you noticed, the connnectionToControlPlaneLost error was not triggered by the previous commit. This commit adds a test to make sure the two types of errors are triggered 156e6c7

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @adam-fowler can you check the updated test? It covers the two behaviours I’m observing (connection lost in .idle state and .waitingForNextInvocation state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok had a look at the code, when running your new test and found what I think is an issue. The channelInactive only cleans up if the state is .connected(_, .waitingForNextInvocation). I think the code should be the following. It is better to do the cleanup in channelInactive than here. The runtime client shouldn't be cleaning up after the channel handler state machine, that is up to the channel handler.

    func channelInactive(context: ChannelHandlerContext) {
        // fail any pending responses with last error or assume peer disconnected
        switch self.state {
        case .connected(_, let lambdaState):
            switch lambdaState {
            case .waitingForNextInvocation(let continuation):
                continuation.resume(throwing: self.lastError ?? ChannelError.ioOnClosedChannel)
            case .sentResponse(let continuation):
                continuation.resume(throwing: self.lastError ?? ChannelError.ioOnClosedChannel)
            case .idle, .sendingResponse, .waitingForResponse:
                break
            }
            self.state = .disconnected
        default:
            break
        }

        // we don't need to forward channelInactive to the delegate, as the delegate observes the
        // closeFuture
        context.fireChannelInactive()
    }

if case .waitingForNextInvocation(let continuation) = lambdaState {
continuation.resume(throwing: LambdaRuntimeError(code: .connectionToControlPlaneLost))
}
}
}

// close the channel
runtimeClient.channelClosed(channel)
runtimeClient.connectionState = .disconnected
}
}

Expand All @@ -382,6 +396,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
return handler
}
} catch {

switch self.connectionState {
case .disconnected, .connected:
fatalError("Unexpected state: \(self.connectionState)")
Expand Down Expand Up @@ -430,7 +445,6 @@ extension LambdaRuntimeClient: LambdaChannelHandlerDelegate {
}

isolated.connectionState = .disconnected

}
}
}
Expand Down Expand Up @@ -463,7 +477,7 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
}
}

private var state: State = .disconnected
private(set) var state: State = .disconnected
private var lastError: Error?
private var reusableErrorBuffer: ByteBuffer?
private let logger: Logger
Expand Down Expand Up @@ -885,6 +899,7 @@ extension LambdaChannelHandler: ChannelInboundHandler {
// fail any pending responses with last error or assume peer disconnected
switch self.state {
case .connected(_, .waitingForNextInvocation(let continuation)):
self.state = .disconnected
continuation.resume(throwing: self.lastError ?? ChannelError.ioOnClosedChannel)
default:
break
Expand Down
1 change: 0 additions & 1 deletion Sources/AWSLambdaRuntime/LambdaRuntimeError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ package struct LambdaRuntimeError: Error {

case writeAfterFinishHasBeenSent
case finishAfterFinishHasBeenSent
case lostConnectionToControlPlane
case unexpectedStatusCodeForRequest

case nextInvocationMissingHeaderRequestID
Expand Down
2 changes: 1 addition & 1 deletion Sources/MockServer/MockHTTPServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ struct HttpServer {
} else if requestHead.uri.hasSuffix("/response") {
responseStatus = .accepted
} else if requestHead.uri.hasSuffix("/error") {
responseStatus = .ok
responseStatus = .accepted
} else {
responseStatus = .notFound
}
Expand Down
99 changes: 93 additions & 6 deletions Tests/AWSLambdaRuntimeTests/LambdaRuntimeClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ struct LambdaRuntimeClientTests {
.success((self.requestId, self.event))
}

func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
func processResponse(requestId: String, response: String?) -> Result<String?, ProcessResponseError> {
#expect(self.requestId == requestId)
#expect(self.event == response)
return .success(())
return .success(nil)
}

func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
Expand Down Expand Up @@ -102,9 +102,9 @@ struct LambdaRuntimeClientTests {
.success((self.requestId, self.event))
}

func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
func processResponse(requestId: String, response: String?) -> Result<String?, ProcessResponseError> {
#expect(self.requestId == requestId)
return .success(())
return .success(nil)
}

mutating func captureHeaders(_ headers: HTTPHeaders) {
Expand Down Expand Up @@ -197,10 +197,10 @@ struct LambdaRuntimeClientTests {
.success((self.requestId, self.event))
}

func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
func processResponse(requestId: String, response: String?) -> Result<String?, ProcessResponseError> {
#expect(self.requestId == requestId)
#expect(self.event == response)
return .success(())
return .success(nil)
}

func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
Expand Down Expand Up @@ -238,4 +238,91 @@ struct LambdaRuntimeClientTests {
}
}
}

struct DisconnectAfterSendingResponseBehavior: LambdaServerBehavior {
func getInvocation() -> GetInvocationResult {
.success((UUID().uuidString, "hello"))
}

func processResponse(requestId: String, response: String?) -> Result<String?, ProcessResponseError> {
// Return "delayed-disconnect" to trigger server closing the connection
// after having accepted the first response
.success("delayed-disconnect")
}

func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
Issue.record("should not report error")
return .failure(.internalServerError)
}

func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
Issue.record("should not report init error")
return .failure(.internalServerError)
}
}

struct DisconnectBehavior: LambdaServerBehavior {
func getInvocation() -> GetInvocationResult {
.success(("disconnect", "0"))
}

func processResponse(requestId: String, response: String?) -> Result<String?, ProcessResponseError> {
.success(nil)
}

func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
Issue.record("should not report error")
return .failure(.internalServerError)
}

func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
Issue.record("should not report init error")
return .failure(.internalServerError)
}
}

@Test(
"Server closing the connection when waiting for next invocation throws an error",
arguments: [DisconnectBehavior(), DisconnectAfterSendingResponseBehavior()] as [any LambdaServerBehavior]
)
func testChannelCloseFutureWithWaitingForNextInvocation(behavior: LambdaServerBehavior) async throws {
try await withMockServer(behaviour: behavior) { port in
let configuration = LambdaRuntimeClient.Configuration(ip: "127.0.0.1", port: port)

try await LambdaRuntimeClient.withRuntimeClient(
configuration: configuration,
eventLoop: NIOSingletons.posixEventLoopGroup.next(),
logger: self.logger
) { runtimeClient in
do {

// simulate traffic until the server reports it has closed the connection
// or a timeout, whichever comes first
// result is ignored here, either there is a connection error or a timeout
let _ = try await timeout(deadline: .seconds(1)) {
while true {
let (_, writer) = try await runtimeClient.nextInvocation()
try await writer.writeAndFinish(ByteBuffer(string: "hello"))
}
}
// result is ignored here, we should never reach this line
Issue.record("Connection reset test did not throw an error")

} catch is CancellationError {
Issue.record("Runtime client did not send connection closed error")
} catch let error as LambdaRuntimeError {
logger.trace("LambdaRuntimeError - expected")
#expect(error.code == .connectionToControlPlaneLost)
} catch let error as ChannelError {
logger.trace("ChannelError - expected")
#expect(error == .ioOnClosedChannel)
} catch let error as IOError {
logger.trace("IOError - expected")
#expect(error.errnoCode == ECONNRESET || error.errnoCode == EPIPE)
} catch {
Issue.record("Unexpected error type: \(error)")
}
}
}
}
}
28 changes: 22 additions & 6 deletions Tests/AWSLambdaRuntimeTests/MockLambdaServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ final class HTTPHandler: ChannelInboundHandler {
var responseStatus: HTTPResponseStatus
var responseBody: String?
var responseHeaders: [(String, String)]?
var disconnectAfterSend = false

// Handle post-init-error first to avoid matching the less specific post-error suffix.
if request.head.uri.hasSuffix(Consts.postInitErrorURL) {
Expand Down Expand Up @@ -202,8 +203,11 @@ final class HTTPHandler: ChannelInboundHandler {
behavior.captureHeaders(request.head.headers)

switch behavior.processResponse(requestId: String(requestId), response: requestBody) {
case .success:
case .success(let next):
responseStatus = .accepted
if next == "delayed-disconnect" {
disconnectAfterSend = true
}
case .failure(let error):
responseStatus = .init(statusCode: error.rawValue)
}
Expand All @@ -223,14 +227,21 @@ final class HTTPHandler: ChannelInboundHandler {
} else {
responseStatus = .notFound
}
self.writeResponse(context: context, status: responseStatus, headers: responseHeaders, body: responseBody)
self.writeResponse(
context: context,
status: responseStatus,
headers: responseHeaders,
body: responseBody,
closeConnection: disconnectAfterSend
)
}

func writeResponse(
context: ChannelHandlerContext,
status: HTTPResponseStatus,
headers: [(String, String)]? = nil,
body: String? = nil
body: String? = nil,
closeConnection: Bool = false
) {
var headers = HTTPHeaders(headers ?? [])
headers.add(name: "Content-Length", value: "\(body?.utf8.count ?? 0)")
Expand All @@ -253,14 +264,19 @@ final class HTTPHandler: ChannelInboundHandler {
}

let loopBoundContext = NIOLoopBound(context, eventLoop: context.eventLoop)

let keepAlive = self.keepAlive
context.writeAndFlush(wrapOutboundOut(.end(nil))).whenComplete { result in
let context = loopBoundContext.value
if closeConnection {
context.close(promise: nil)
return
}

if case .failure(let error) = result {
logger.error("write error \(error)")
}

if !keepAlive {
let context = loopBoundContext.value
context.close().whenFailure { error in
logger.error("close error \(error)")
}
Expand All @@ -271,7 +287,7 @@ final class HTTPHandler: ChannelInboundHandler {

protocol LambdaServerBehavior: Sendable {
func getInvocation() -> GetInvocationResult
func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError>
func processResponse(requestId: String, response: String?) -> Result<String?, ProcessResponseError>
func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError>
func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError>

Expand Down
Loading
Loading