Skip to content

Commit

Permalink
Add reconnect on errors in read loop
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio authored and Jarema committed Jan 23, 2024
1 parent 799e9cc commit 68072d8
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 15 deletions.
12 changes: 2 additions & 10 deletions Sources/NatsSwift/Extensions/Data+Parser.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ extension Data {
self = other + self
}

internal func parseOutMessages() -> (ops: [ServerOp], remainder: Data?) {
internal func parseOutMessages() throws -> (ops: [ServerOp], remainder: Data?) {
var serverOps = [ServerOp]()
var startIndex = self.startIndex
var remainder: Data?
Expand All @@ -88,15 +88,7 @@ extension Data {
continue
}

let serverOp: ServerOp
do {
serverOp = try ServerOp.parse(from: lineData)
} catch {
// TODO(pp): handle this error properly (maybe surface in throw)
logger.error("Error parsing message: \(error)")
startIndex = nextLineStartIndex
continue
}
let serverOp = try ServerOp.parse(from: lineData)

// if it's a message, get the full payload and add to returned data
if case .Message(var msg) = serverOp {
Expand Down
5 changes: 5 additions & 0 deletions Sources/NatsSwift/Extensions/String+Utilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@
import Foundation

extension String {
private static let charactersToTrim: CharacterSet = .whitespacesAndNewlines.union(CharacterSet(charactersIn: "'"))

static func hash() -> String {
let uuid = String.uuid()
return uuid[0...7]
}

func trimWhitespacesAndApostrophes() -> String {
return self.trimmingCharacters(in: String.charactersToTrim)
}

static func uuid() -> String {
return UUID().uuidString.trimmingCharacters(in: .punctuationCharacters)
Expand Down
18 changes: 17 additions & 1 deletion Sources/NatsSwift/NatsConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,15 @@ class ConnectionHandler: ChannelInboundHandler {
}

self.parseRemainder = nil
let parseResult = inputChunk.parseOutMessages()
let parseResult: (ops: [ServerOp], remainder: Data?)
do {
parseResult = try inputChunk.parseOutMessages()
} catch {
// if parsing throws an error, return and reconnect
inputBuffer.clear()
context.fireErrorCaught(error)
return
}
if let remainder = parseResult.remainder {
self.parseRemainder = remainder
}
Expand Down Expand Up @@ -96,6 +104,14 @@ class ConnectionHandler: ChannelInboundHandler {
self.outstandingPings.store(0, ordering: AtomicStoreOrdering.relaxed)
case let .Error(err):
logger.debug("error \(err)")

let normalizedError = err.normalizedError
// on some errors, force reconnect
if normalizedError == "stale connection" || normalizedError == "maximum connections exceeded" {
inputBuffer.clear()
context.fireErrorCaught(err)
}
// TODO(pp): handle auth errors here
case let .Message(msg):
self.handleIncomingMessage(msg)
case let .HMessage(msg):
Expand Down
3 changes: 3 additions & 0 deletions Sources/NatsSwift/NatsError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ protocol NatsError: Error {

struct NatsConnectionError: NatsError {
var description: String
var normalizedError: String {
return description.trimWhitespacesAndApostrophes().lowercased()
}
init(_ description: String) {
self.description = description
}
Expand Down
6 changes: 3 additions & 3 deletions Sources/NatsSwift/NatsProto.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ enum ServerOp {
case Info(ServerInfo)
case Ping
case Pong
case Error(NatsError)
case Error(NatsConnectionError)
case Message(MessageInbound)
case HMessage(HMessageInbound)

static func parse(from message: Data) throws -> ServerOp {
guard message.count > 2 else {
throw NSError(domain: "nats_swift", code: 1, userInfo: ["message": "unable to parse inbound message: \(message)"])
Expand All @@ -53,7 +53,7 @@ enum ServerOp {
case .ok:
return Ok
case .error:
if let errMsg = message.toString() {
if let errMsg = message.removePrefix(Data(NatsOperation.error.rawBytes)).toString() {
return Error(NatsConnectionError(errMsg))
}
return Error(NatsConnectionError("unexpected error"))
Expand Down
2 changes: 1 addition & 1 deletion Tests/NatsSwiftTests/Unit/ParserTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class ParserTests: XCTestCase {
if let prevRemainder {
chunkData.prepend(prevRemainder)
}
let res = chunkData.parseOutMessages()
let res = try! chunkData.parseOutMessages()
prevRemainder = res.remainder
ops.append(contentsOf: res.ops)
}
Expand Down

0 comments on commit 68072d8

Please sign in to comment.