Skip to content

Commit

Permalink
Handle lame duck mode (#34)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Feb 15, 2024
1 parent 0f03482 commit 53a83cb
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 8 deletions.
11 changes: 9 additions & 2 deletions Sources/NatsSwift/NatsConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ class ConnectionHandler: ChannelInboundHandler {
self.handleIncomingHMessage(msg)
case .info(let serverInfo):
logger.debug("info \(op)")
self.serverInfo = serverInfo
self.serverInfo = serverInfo
if serverInfo.lameDuckMode {
self.fire(.lameDuckMode)
}
default:
logger.debug("unknown operation type")
}
Expand Down Expand Up @@ -539,14 +542,16 @@ public enum NatsEventKind: String {
case connected = "connected"
case disconnected = "disconnected"
case closed = "closed"
case lameDuckMode = "lameDuckMode"
case error = "error"
static let all = [connected, disconnected, closed, error]
static let all = [connected, disconnected, closed, lameDuckMode, error]
}

public enum NatsEvent {
case connected
case disconnected
case closed
case lameDuckMode
case error(NatsError)

func kind() -> NatsEventKind {
Expand All @@ -557,6 +562,8 @@ public enum NatsEvent {
return .disconnected
case .closed:
return .closed
case .lameDuckMode:
return .lameDuckMode
case .error(_):
return .error
}
Expand Down
11 changes: 8 additions & 3 deletions Sources/NatsSwift/NatsProto.swift
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,13 @@ struct ServerInfo: Codable, Equatable {
let clientIp: String
/// Whether the server supports headers.
let headers: Bool
/// Whether server goes into lame duck mode.
let lameDuckMode: Bool?
/// Whether server goes into lame duck
private let _lameDuckMode: Bool?
var lameDuckMode: Bool {
get{
return _lameDuckMode ?? false
}
}

private static let prefix = NatsOperation.info.rawValue.data(using: .utf8)!

Expand All @@ -225,7 +230,7 @@ struct ServerInfo: Codable, Equatable {
case connectUrls = "connect_urls"
case clientIp = "client_ip"
case headers
case lameDuckMode = "ldm"
case _lameDuckMode = "ldm"
}

internal static func parse(data: Data) throws -> ServerInfo {
Expand Down
19 changes: 19 additions & 0 deletions Tests/NatsSwiftTests/Integration/ConnectionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -361,5 +361,24 @@ class CoreNatsTests: XCTestCase {
}
XCTFail("Expected error from connect")
}

func testLameDuckMode() async throws {
natsServer.start()
logger.logLevel = .debug

let client = ClientOptions().url(URL(string: natsServer.clientURL)!).build()

let expectation = XCTestExpectation(
description: "client was not notified of connection established event")
client.on(.lameDuckMode) { event in
XCTAssertEqual(event.kind(), NatsEventKind.lameDuckMode)
expectation.fulfill()
}
try await client.connect()

natsServer.setLameDuckMode()
await fulfillment(of: [expectation], timeout: 1.0)
try await client.close()
}
}

23 changes: 20 additions & 3 deletions Tests/NatsSwiftTests/NatsServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class NatsServer {
private var process: Process?
private var natsServerPort: Int?
private var tlsEnabled = false
private var pidFile: URL?

// TODO: When implementing JetStream, creating and deleting store dir should be handled in start/stop methods
func start(port: Int = -1, cfg: String? = nil, file: StaticString = #file, line: UInt = #line) {
Expand All @@ -28,8 +29,11 @@ class NatsServer {
let process = Process()
let pipe = Pipe()

let fileManager = FileManager.default
pidFile = fileManager.temporaryDirectory.appendingPathComponent("nats-server.pid")

process.executableURL = URL(fileURLWithPath: "/usr/bin/env")
process.arguments = ["nats-server", "-p", "\(port)"]
process.arguments = ["nats-server", "-p", "\(port)", "-P", pidFile!.path]
if let cfg {
process.arguments?.append(contentsOf: ["-c", cfg])
}
Expand Down Expand Up @@ -75,8 +79,10 @@ class NatsServer {
self.natsServerPort = serverPort
}

func stop(file: StaticString = #file, line: UInt = #line) {
XCTAssertNotNil(self.process, "nats-server is not running", file: file, line: line)
func stop() {
if process == nil {
return
}

self.process?.terminate()
process?.waitUntilExit()
Expand All @@ -85,6 +91,17 @@ class NatsServer {
tlsEnabled = false
}

func setLameDuckMode(file: StaticString = #file, line: UInt = #line) {
let process = Process()

process.executableURL = URL(fileURLWithPath: "/usr/bin/env")
process.arguments = ["nats-server", "--signal", "ldm=\(self.pidFile!.path)"]

XCTAssertNoThrow(
try process.run(), "error setting lame duck mode", file: file, line: line)
self.process = nil
}

private func extractPort(from string: String) -> Int? {
let pattern = "Listening for client connections on [^:]+:(\\d+)"

Expand Down

0 comments on commit 53a83cb

Please sign in to comment.