diff --git a/Sources/NatsSwift/Extensions/Data+Parser.swift b/Sources/NatsSwift/Extensions/Data+Parser.swift index e20b99d..d360ebe 100644 --- a/Sources/NatsSwift/Extensions/Data+Parser.swift +++ b/Sources/NatsSwift/Extensions/Data+Parser.swift @@ -95,7 +95,7 @@ extension Data { let serverOp = try ServerOp.parse(from: lineData) // if it's a message, get the full payload and add to returned data - if case .Message(var msg) = serverOp { + if case .message(var msg) = serverOp { if msg.length == 0 { serverOps.append(serverOp) } else { @@ -113,11 +113,11 @@ extension Data { self.index( payloadEndIndex, offsetBy: Data.crlf.count, limitedBy: self.endIndex) ?? self.endIndex - serverOps.append(.Message(msg)) + serverOps.append(.message(msg)) continue } //TODO(jrm): Add HMSG handling here too. - } else if case .HMessage(var msg) = serverOp { + } else if case .hMessage(var msg) = serverOp { if msg.length == 0 { serverOps.append(serverOp) } else { @@ -167,7 +167,7 @@ extension Data { self.index( payloadEndIndex, offsetBy: Data.crlf.count, limitedBy: self.endIndex) ?? self.endIndex - serverOps.append(.HMessage(msg)) + serverOps.append(.hMessage(msg)) continue } diff --git a/Sources/NatsSwift/NatsClient/NatsClient.swift b/Sources/NatsSwift/NatsClient/NatsClient.swift index 30a4bc6..15232f6 100755 --- a/Sources/NatsSwift/NatsClient/NatsClient.swift +++ b/Sources/NatsSwift/NatsClient/NatsClient.swift @@ -13,10 +13,10 @@ var logger = Logger(label: "NatsSwift") /// Client connection states public enum NatsState { - case Pending - case Connected - case Disconnected - case Closed + case pending + case connected + case disconnected + case closed } public struct Auth { @@ -96,7 +96,7 @@ extension Client { throw NSError( domain: "nats_swift", code: 1, userInfo: ["message": "empty connection handler"]) } - try connectionHandler.write(operation: ClientOp.Publish((subject, reply, payload, headers))) + try connectionHandler.write(operation: ClientOp.publish((subject, reply, payload, headers))) } public func flush() async throws { diff --git a/Sources/NatsSwift/NatsClient/NatsClientOptions.swift b/Sources/NatsSwift/NatsClient/NatsClientOptions.swift index 1341556..9089ea6 100644 --- a/Sources/NatsSwift/NatsClient/NatsClientOptions.swift +++ b/Sources/NatsSwift/NatsClient/NatsClientOptions.swift @@ -42,7 +42,7 @@ public class ClientOptions { return self } - public func username_and_password(_ username: String, _ password: String) -> ClientOptions { + public func usernameAndPassword(_ username: String, _ password: String) -> ClientOptions { if self.auth == nil { self.auth = Auth(user: username, password: password) } else { @@ -61,7 +61,7 @@ public class ClientOptions { return self } - public func credentials_file(_ credentials: URL) -> ClientOptions { + public func credentialsFile(_ credentials: URL) -> ClientOptions { if self.auth == nil { self.auth = Auth.fromCredentials(credentials) } else { diff --git a/Sources/NatsSwift/NatsConnection.swift b/Sources/NatsSwift/NatsConnection.swift index c55b99f..784acd8 100644 --- a/Sources/NatsSwift/NatsConnection.swift +++ b/Sources/NatsSwift/NatsConnection.swift @@ -25,7 +25,7 @@ class ConnectionHandler: ChannelInboundHandler { internal let pingInterval: TimeInterval typealias InboundIn = ByteBuffer - internal var state: NatsState = .Pending + internal var state: NatsState = .pending internal var subscriptions: [UInt64: Subscription] internal var subscriptionCounter = ManagedAtomic(0) internal var serverInfo: ServerInfo? @@ -66,9 +66,9 @@ class ConnectionHandler: ChannelInboundHandler { self.serverInfoContinuation = nil logger.debug("server info") switch op { - case let .Error(err): + case .error(let err): continuation.resume(throwing: err) - case let .Info(info): + case .info(let info): continuation.resume(returning: info) default: // ignore until we get either error or server info @@ -81,7 +81,7 @@ class ConnectionHandler: ChannelInboundHandler { self.connectionEstablishedContinuation = nil logger.debug("conn established") switch op { - case let .Error(err): + case .error(let err): continuation.resume(throwing: err) default: continuation.resume() @@ -90,19 +90,19 @@ class ConnectionHandler: ChannelInboundHandler { } switch op { - case .Ping: + case .ping: logger.debug("ping") do { - try self.write(operation: .Pong) + try self.write(operation: .pong) } catch { // TODO(pp): handle async error logger.error("error sending pong: \(error)") continue } - case .Pong: + case .pong: logger.debug("pong") self.outstandingPings.store(0, ordering: AtomicStoreOrdering.relaxed) - case let .Error(err): + case .error(let err): logger.debug("error \(err)") let normalizedError = err.normalizedError @@ -114,11 +114,11 @@ class ConnectionHandler: ChannelInboundHandler { context.fireErrorCaught(err) } // TODO(pp): handle auth errors here - case let .Message(msg): + case .message(let msg): self.handleIncomingMessage(msg) - case let .HMessage(msg): + case .hMessage(let msg): self.handleIncomingHMessage(msg) - case let .Info(serverInfo): + case .info(let serverInfo): logger.debug("info \(op)") self.serverInfo = serverInfo default: @@ -221,15 +221,15 @@ class ConnectionHandler: ChannelInboundHandler { self.connectionEstablishedContinuation = continuation Task.detached { do { - try self.write(operation: ClientOp.Connect(connect)) - try self.write(operation: ClientOp.Ping) + try self.write(operation: ClientOp.connect(connect)) + try self.write(operation: ClientOp.ping) self.channel?.flush() } catch { continuation.resume(throwing: error) } } } - self.state = .Connected + self.state = .pending guard let channel = self.channel else { throw NSError(domain: "nats_swift", code: 1, userInfo: ["message": "empty channel"]) } @@ -244,7 +244,7 @@ class ConnectionHandler: ChannelInboundHandler { } func close() async throws { - self.state = .Closed + self.state = .closed try await disconnect() try await self.group.shutdownGracefully() } @@ -261,7 +261,7 @@ class ConnectionHandler: ChannelInboundHandler { handleDisconnect() return } - let ping = ClientOp.Ping + let ping = ClientOp.ping do { try self.write(operation: ping) logger.debug("sent ping: \(pingsOut)") @@ -280,7 +280,7 @@ class ConnectionHandler: ChannelInboundHandler { func channelInactive(context: ChannelHandlerContext) { logger.debug("TCP channel inactive") - if self.state == .Connected { + if self.state == .pending { handleDisconnect() } } @@ -289,15 +289,15 @@ class ConnectionHandler: ChannelInboundHandler { // TODO(pp): implement Close() on the connection and call it here logger.debug("Encountered error on the channel: \(error)") context.close(promise: nil) - if self.state == .Connected { + if self.state == .pending { handleDisconnect() - } else if self.state == .Disconnected { + } else if self.state == .disconnected { handleReconnect() } } func handleDisconnect() { - self.state = .Disconnected + self.state = .disconnected if let channel = self.channel { let promise = channel.eventLoop.makePromise(of: Void.self) Task { @@ -337,7 +337,7 @@ class ConnectionHandler: ChannelInboundHandler { break } for (sid, sub) in self.subscriptions { - try write(operation: ClientOp.Subscribe((sid, sub.subject, nil))) + try write(operation: ClientOp.subscribe((sid, sub.subject, nil))) } } } @@ -378,7 +378,7 @@ class ConnectionHandler: ChannelInboundHandler { func subscribe(_ subject: String) async throws -> Subscription { let sid = self.subscriptionCounter.wrappingIncrementThenLoad( ordering: AtomicUpdateOrdering.relaxed) - try write(operation: ClientOp.Subscribe((sid, subject, nil))) + try write(operation: ClientOp.subscribe((sid, subject, nil))) let sub = Subscription(subject: subject) self.subscriptions[sid] = sub return sub diff --git a/Sources/NatsSwift/NatsProto.swift b/Sources/NatsSwift/NatsProto.swift index 9c79402..28e06be 100644 --- a/Sources/NatsSwift/NatsProto.swift +++ b/Sources/NatsSwift/NatsProto.swift @@ -33,39 +33,39 @@ internal enum NatsOperation: String { } enum ServerOp { - case Ok - case Info(ServerInfo) - case Ping - case Pong - case Error(NatsConnectionError) - case Message(MessageInbound) - case HMessage(HMessageInbound) + case ok + case info(ServerInfo) + case ping + case pong + case error(NatsConnectionError) + case message(MessageInbound) + case hMessage(HMessageInbound) - static func parse(from message: Data) throws -> ServerOp { - guard message.count > 2 else { + static func parse(from msg: Data) throws -> ServerOp { + guard msg.count > 2 else { throw NSError( domain: "nats_swift", code: 1, userInfo: ["message": "unable to parse inbound message: \(message)"]) } - let msgType = message.getMessageType() + let msgType = msg.getMessageType() switch msgType { case .message: - return try Message(MessageInbound.parse(data: message)) + return try message(MessageInbound.parse(data: msg)) case .hmessage: - return try HMessage(HMessageInbound.parse(data: message)) + return try hMessage(HMessageInbound.parse(data: msg)) case .info: - return try Info(ServerInfo.parse(data: message)) + return try info(ServerInfo.parse(data: msg)) case .ok: - return Ok + return ok case .error: - if let errMsg = message.removePrefix(Data(NatsOperation.error.rawBytes)).toString() { - return Error(NatsConnectionError(errMsg)) + if let errMsg = msg.removePrefix(Data(NatsOperation.error.rawBytes)).toString() { + return error(NatsConnectionError(errMsg)) } - return Error(NatsConnectionError("unexpected error")) + return error(NatsConnectionError("unexpected error")) case .ping: - return Ping + return ping case .pong: - return Pong + return pong default: throw NSError( domain: "nats_swift", code: 1, @@ -90,7 +90,7 @@ internal struct HMessageInbound: Equatable { internal static func parse(data: Data) throws -> HMessageInbound { let protoComponents = data - .dropFirst(NatsOperation.hmessage.rawValue.count) // Assuming the message starts with "HMSG " + .dropFirst(NatsOperation.hmessage.rawValue.count) // Assuming msg starts with "HMSG " .split(separator: space) .filter { !$0.isEmpty } @@ -146,7 +146,7 @@ internal struct MessageInbound: Equatable { internal static func parse(data: Data) throws -> MessageInbound { let protoComponents = data - .dropFirst(NatsOperation.message.rawValue.count) // Assuming the message starts with "MSG " + .dropFirst(NatsOperation.message.rawValue.count) // Assuming msg starts with "MSG " .split(separator: space) .filter { !$0.isEmpty } @@ -246,17 +246,17 @@ struct ServerInfo: Codable, Equatable { } enum ClientOp { - case Publish((subject: String, reply: String?, payload: Data?, headers: HeaderMap?)) - case Subscribe((sid: UInt64, subject: String, queue: String?)) - case Unsubscribe((sid: UInt64, max: UInt64?)) - case Connect(ConnectInfo) - case Ping - case Pong + case publish((subject: String, reply: String?, payload: Data?, headers: HeaderMap?)) + case subscribe((sid: UInt64, subject: String, queue: String?)) + case unsubscribe((sid: UInt64, max: UInt64?)) + case connect(ConnectInfo) + case ping + case pong internal func asBytes(using allocator: ByteBufferAllocator) throws -> ByteBuffer { var buffer: ByteBuffer switch self { - case let .Publish((subject, reply, payload, headers)): + case .publish((let subject, let reply, let payload, let headers)): if let payload = payload { buffer = allocator.buffer( capacity: payload.count + subject.utf8.count @@ -295,7 +295,7 @@ enum ClientOp { buffer.writeString("0\r\n") } - case let .Subscribe((sid, subject, queue)): + case .subscribe((let sid, let subject, let queue)): buffer = allocator.buffer(capacity: 0) if let queue { buffer.writeString( @@ -304,24 +304,24 @@ enum ClientOp { buffer.writeString("\(NatsOperation.subscribe.rawValue) \(subject) \(sid)\r\n") } - case let .Unsubscribe((sid, max)): + case .unsubscribe((let sid, let max)): buffer = allocator.buffer(capacity: 0) if let max { buffer.writeString("\(NatsOperation.unsubscribe.rawValue) \(sid) \(max)\r\n") } else { buffer.writeString("\(NatsOperation.unsubscribe.rawValue) \(sid)\r\n") } - case let .Connect(info): + case .connect(let info): let json = try JSONEncoder().encode(info) buffer = allocator.buffer(capacity: json.count + 5) buffer.writeString("\(NatsOperation.connect.rawValue) ") buffer.writeData(json) buffer.writeString("\r\n") return buffer - case .Ping: + case .ping: buffer = allocator.buffer(capacity: 8) buffer.writeString("\(NatsOperation.ping.rawValue)\r\n") - case .Pong: + case .pong: buffer = allocator.buffer(capacity: 8) buffer.writeString("\(NatsOperation.pong.rawValue)\r\n") } diff --git a/Tests/NatsSwiftTests/Integration/ConnectionTests.swift b/Tests/NatsSwiftTests/Integration/ConnectionTests.swift index 7acb0e7..4d99d5a 100755 --- a/Tests/NatsSwiftTests/Integration/ConnectionTests.swift +++ b/Tests/NatsSwiftTests/Integration/ConnectionTests.swift @@ -165,7 +165,7 @@ class CoreNatsTests: XCTestCase { natsServer.start(cfg: resourceURL.path) let client = ClientOptions() .url(URL(string: natsServer.clientURL)!) - .username_and_password("derek", "s3cr3t") + .usernameAndPassword("derek", "s3cr3t") .maxReconnects(5) .build() try await client.connect() @@ -175,14 +175,14 @@ class CoreNatsTests: XCTestCase { XCTAssertNotNil(client, "Client should not be nil") // Test if client with bad credentials throws an error - let bad_creds_client = ClientOptions() + let badCertsClient = ClientOptions() .url(URL(string: natsServer.clientURL)!) - .username_and_password("derek", "badpassword") + .usernameAndPassword("derek", "badpassword") .maxReconnects(5) .build() do { - try await bad_creds_client.connect() + try await badCertsClient.connect() XCTFail("Should have thrown an error") } catch { XCTAssertNotNil(error, "Error should not be nil") @@ -212,14 +212,14 @@ class CoreNatsTests: XCTestCase { XCTAssertNotNil(client, "Client should not be nil") // Test if client with bad credentials throws an error - let bad_creds_client = ClientOptions() + let badCertsClient = ClientOptions() .url(URL(string: natsServer.clientURL)!) .token("badtoken") .maxReconnects(5) .build() do { - try await bad_creds_client.connect() + try await badCertsClient.connect() XCTFail("Should have thrown an error") } catch { XCTAssertNotNil(error, "Error should not be nil") @@ -240,7 +240,7 @@ class CoreNatsTests: XCTestCase { let credsURL = testsDir.appendingPathComponent( "Integration/Resources/TestUser.creds", isDirectory: false) - let client = ClientOptions().url(URL(string: natsServer.clientURL)!).credentials_file( + let client = ClientOptions().url(URL(string: natsServer.clientURL)!).credentialsFile( credsURL ).build() try await client.connect() diff --git a/Tests/NatsSwiftTests/Unit/ParserTests.swift b/Tests/NatsSwiftTests/Unit/ParserTests.swift index bc0904c..53ca30e 100644 --- a/Tests/NatsSwiftTests/Unit/ParserTests.swift +++ b/Tests/NatsSwiftTests/Unit/ParserTests.swift @@ -37,12 +37,12 @@ class ParserTests: XCTestCase { name: "Single chunk, different operations", givenChunks: ["MSG foo 1 5\r\nhello\r\n+OK\r\nPONG\r\n"], expectedOps: [ - .Message( + .message( MessageInbound( subject: "foo", sid: 1, payload: "hello".data(using: .utf8)!, length: 5) ), - .Ok, - .Pong, + .ok, + .pong, ] ), TestCase( @@ -56,31 +56,31 @@ class ParserTests: XCTestCase { "hello\r\n", ], expectedOps: [ - .Message( + .message( MessageInbound( subject: "foo", sid: 1, payload: "hello".data(using: .utf8)!, length: 5) ), - .Message( + .message( MessageInbound( subject: "foo", sid: 1, payload: "world".data(using: .utf8)!, length: 5) ), - .Message( + .message( MessageInbound( subject: "foo", sid: 1, payload: "hello".data(using: .utf8)!, length: 5) ), - .Message( + .message( MessageInbound( subject: "foo", sid: 1, payload: "world".data(using: .utf8)!, length: 5) ), - .Message( + .message( MessageInbound( subject: "foo", sid: 1, payload: "hello".data(using: .utf8)!, length: 5) ), - .Message( + .message( MessageInbound( subject: "foo", sid: 1, payload: "world".data(using: .utf8)!, length: 5) ), - .Message( + .message( MessageInbound( subject: "foo", sid: 1, payload: "hello".data(using: .utf8)!, length: 5) ), @@ -96,15 +96,15 @@ class ParserTests: XCTestCase { "oo 1 5\r\nhello\r\n", ], expectedOps: [ - .Message( + .message( MessageInbound( subject: "foo", sid: 1, payload: "hello".data(using: .utf8)!, length: 5) ), - .Message( + .message( MessageInbound( subject: "foo", sid: 1, payload: "world".data(using: .utf8)!, length: 5) ), - .Message( + .message( MessageInbound( subject: "foo", sid: 1, payload: "hello".data(using: .utf8)!, length: 5) ), @@ -116,7 +116,7 @@ class ParserTests: XCTestCase { "MSG foo 1 7\r\nhe\r\nllo\r\n" ], expectedOps: [ - .Message( + .message( MessageInbound( subject: "foo", sid: 1, payload: Data("he\r\nllo".utf8), length: 7)) ] @@ -139,41 +139,41 @@ class ParserTests: XCTestCase { XCTAssertEqual(ops.count, tc.expectedOps.count) for (i, op) in ops.enumerated() { switch op { - case .Ok: - if case .Ok = tc.expectedOps[i] { + case .ok: + if case .ok = tc.expectedOps[i] { } else { XCTFail(fail(tn, tc.name)) } - case .Info(let info): - if case .Info(let expectedInfo) = tc.expectedOps[i] { + case .info(let info): + if case .info(let expectedInfo) = tc.expectedOps[i] { XCTAssertEqual(info, expectedInfo, fail(tn, tc.name)) } else { XCTFail(fail(tn, tc.name)) } - case .Ping: - if case .Ping = tc.expectedOps[i] { + case .ping: + if case .ping = tc.expectedOps[i] { } else { XCTFail(fail(tn, tc.name)) } - case .Pong: - if case .Pong = tc.expectedOps[i] { + case .pong: + if case .pong = tc.expectedOps[i] { } else { XCTFail(fail(tn, tc.name)) } - case .Error(_): - if case .Error(_) = tc.expectedOps[i] { + case .error(_): + if case .error(_) = tc.expectedOps[i] { } else { XCTFail(fail(tn, tc.name)) } - case .Message(let msg): - if case .Message(let expectedMessage) = tc.expectedOps[i] { + case .message(let msg): + if case .message(let expectedMessage) = tc.expectedOps[i] { XCTAssertEqual(msg, expectedMessage, fail(tn, tc.name)) } else { XCTFail(fail(tn, tc.name)) } - case .HMessage(let msg): - if case .HMessage(let expectedMessage) = tc.expectedOps[i] { + case .hMessage(let msg): + if case .hMessage(let expectedMessage) = tc.expectedOps[i] { XCTAssertEqual(msg, expectedMessage, fail(tn, tc.name)) } else { XCTFail(fail(tn, tc.name))