Skip to content

Commit

Permalink
Format code with swift-format
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Feb 1, 2024
1 parent ba679e8 commit 646d019
Show file tree
Hide file tree
Showing 21 changed files with 362 additions and 235 deletions.
29 changes: 15 additions & 14 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,27 @@ let package = Package(
dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.0.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.2"),
.package(url: "https://github.com/nats-io/nkeys.swift.git", from: "0.1.1")
.package(url: "https://github.com/nats-io/nkeys.swift.git", from: "0.1.1"),
],
targets: [
.target(name: "NatsSwift", dependencies: [
.product(name: "NIO", package: "swift-nio"),
.product(name: "Logging", package: "swift-log"),
.product(name: "NIOFoundationCompat", package: "swift-nio"),
.product(name: "NKeys", package: "nkeys.swift")
]),
.target(
name: "NatsSwift",
dependencies: [
.product(name: "NIO", package: "swift-nio"),
.product(name: "Logging", package: "swift-log"),
.product(name: "NIOFoundationCompat", package: "swift-nio"),
.product(name: "NKeys", package: "nkeys.swift"),
]),
.testTarget(
name: "NatsSwiftTests",
dependencies: ["NatsSwift"],
resources: [
.process("Integration/Resources")
]
name: "NatsSwiftTests",
dependencies: ["NatsSwift"],
resources: [
.process("Integration/Resources")
]
),

.executableTarget(name: "Benchmark", dependencies: ["NatsSwift"]),
.executableTarget(name: "BenchmarkPubSub", dependencies: ["NatsSwift"]),
.executableTarget(name: "BenchmarkSub", dependencies: ["NatsSwift"])
.executableTarget(name: "BenchmarkSub", dependencies: ["NatsSwift"]),
]
)

6 changes: 3 additions & 3 deletions Sources/Benchmark/main.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import NatsSwift
import Foundation
import NatsSwift

let nats = ClientOptions()
.url(URL(string: "nats://localhost:4222")!)
Expand All @@ -22,6 +22,6 @@ for _ in 0..<numMsgs {
}
try! await nats.flush()
let elapsed = DispatchTime.now().uptimeNanoseconds - now.uptimeNanoseconds
let msgsPerSec: Double = Double(numMsgs)/(Double(elapsed)/1_000_000_000)
print("Elapsed: \(elapsed / 1000000)ms")
let msgsPerSec: Double = Double(numMsgs) / (Double(elapsed) / 1_000_000_000)
print("Elapsed: \(elapsed / 1_000_000)ms")
print("\(msgsPerSec) msgs/s")
14 changes: 8 additions & 6 deletions Sources/BenchmarkPubSub/main.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import NatsSwift
import Foundation
import NatsSwift

let nats = ClientOptions().urls([URL(string: "nats://localhost:4222")!]).build()
print("Connecting...")
Expand Down Expand Up @@ -27,7 +27,9 @@ try await withThrowingTaskGroup(of: Void.self) { group in
let payload = msg.payload!
if String(data: payload, encoding: .utf8) != "\(i)" {
let emptyString = ""
print("invalid payload; expected: \(i); got: \(String(data: payload, encoding: .utf8) ?? emptyString)")
print(
"invalid payload; expected: \(i); got: \(String(data: payload, encoding: .utf8) ?? emptyString)"
)
}
guard let headers = msg.headers else {
print("empty headers!")
Expand All @@ -36,7 +38,7 @@ try await withThrowingTaskGroup(of: Void.self) { group in
if headers != hm {
print("invalid headers; expected: \(hm); got: \(headers)")
}
if i%1000 == 0 {
if i % 1000 == 0 {
print("received \(i) msgs")
}
i += 1
Expand All @@ -50,7 +52,7 @@ try await withThrowingTaskGroup(of: Void.self) { group in
hm.insert(try! HeaderName("another"), HeaderValue("one"))
for i in 0..<numMsgs {
try nats.publish("\(i)".data(using: .utf8)!, subject: "foo", headers: hm)
if i%1000 == 0 {
if i % 1000 == 0 {
print("published \(i) msgs")
}
}
Expand All @@ -62,6 +64,6 @@ try await withThrowingTaskGroup(of: Void.self) { group in

try! await nats.flush()
let elapsed = DispatchTime.now().uptimeNanoseconds - now.uptimeNanoseconds
let msgsPerSec: Double = Double(numMsgs)/(Double(elapsed)/1_000_000_000)
print("Elapsed: \(elapsed / 1000000)ms")
let msgsPerSec: Double = Double(numMsgs) / (Double(elapsed) / 1_000_000_000)
print("Elapsed: \(elapsed / 1_000_000)ms")
print("\(msgsPerSec) msgs/s")
12 changes: 7 additions & 5 deletions Sources/BenchmarkSub/main.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import NatsSwift
import Foundation
import NatsSwift

let nats = ClientOptions().url(URL(string: "nats://localhost:4222")!).build()
print("Connecting...")
Expand Down Expand Up @@ -28,18 +28,20 @@ for i in 1...numMsgs {
}
if String(data: payload, encoding: .utf8) != "\(i)" {
let emptyString = ""
print("invalid payload; expected: \(i); got: \(String(data: payload, encoding: .utf8) ?? emptyString)")
print(
"invalid payload; expected: \(i); got: \(String(data: payload, encoding: .utf8) ?? emptyString)"
)
}
guard msg?.headers != nil else {
print("empty headers!")
continue
}
if i%1000 == 0 {
if i % 1000 == 0 {
print("received \(i) msgs")
}
}

let elapsed = DispatchTime.now().uptimeNanoseconds - now.uptimeNanoseconds
let msgsPerSec: Double = Double(numMsgs)/(Double(elapsed)/1_000_000_000)
print("Elapsed: \(elapsed / 1000000)ms")
let msgsPerSec: Double = Double(numMsgs) / (Double(elapsed) / 1_000_000_000)
print("Elapsed: \(elapsed / 1_000_000)ms")
print("\(msgsPerSec) msgs/s")
34 changes: 23 additions & 11 deletions Sources/NatsSwift/Extensions/Data+Parser.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ extension Data {
return self.dropFirst(prefix.count)
}

func split(separator: Data, maxSplits: Int = .max, omittingEmptySubsequences: Bool = true) -> [Data] {
func split(separator: Data, maxSplits: Int = .max, omittingEmptySubsequences: Bool = true)
-> [Data]
{
var chunks: [Data] = []
var start = startIndex
var end = startIndex
Expand Down Expand Up @@ -77,7 +79,9 @@ extension Data {
var lineData: Data
if let range = self[startIndex...].range(of: Data.crlf) {
let lineEndIndex = range.lowerBound
nextLineStartIndex = self.index(range.upperBound, offsetBy: 0, limitedBy: self.endIndex) ?? self.endIndex
nextLineStartIndex =
self.index(range.upperBound, offsetBy: 0, limitedBy: self.endIndex)
?? self.endIndex
lineData = self[startIndex..<lineEndIndex]
} else {
remainder = self[startIndex..<self.endIndex]
Expand Down Expand Up @@ -105,34 +109,37 @@ extension Data {
}
payload.append(self[payloadStartIndex..<payloadEndIndex])
msg.payload = payload
startIndex = self.index(payloadEndIndex, offsetBy: Data.crlf.count, limitedBy: self.endIndex) ?? self.endIndex
startIndex =
self.index(
payloadEndIndex, offsetBy: Data.crlf.count, limitedBy: self.endIndex)
?? self.endIndex
serverOps.append(.Message(msg))
continue
}
//TODO(jrm): Add HMSG handling here too.
} else if case .HMessage(var msg) = serverOp {
//TODO(jrm): Add HMSG handling here too.
} else if case .HMessage(var msg) = serverOp {
if msg.length == 0 {
serverOps.append(serverOp)
} else {
let headersStartIndex = nextLineStartIndex
let headersEndIndex = nextLineStartIndex + msg.headersLength
let payloadStartIndex = headersEndIndex
let payloadEndIndex = nextLineStartIndex + msg.length

var payload: Data?
if msg.length > msg.headersLength {
payload = Data()
}
var headers = HeaderMap()

// if the whole msg length (including training crlf) is longer
// than the remaining chunk, break and return the remainder
if payloadEndIndex + Data.crlf.count > endIndex {
remainder = self[startIndex..<self.endIndex]
break
}

let headersData = self[headersStartIndex..<headersEndIndex]
let headersData = self[headersStartIndex..<headersEndIndex]
if let headersString = String(data: headersData, encoding: .utf8) {
let headersArray = headersString.split(separator: "\r\n")
// TODO: unused now, but probably we should validate?
Expand All @@ -141,7 +148,9 @@ extension Data {
for header in headersArray.dropFirst() {
let headerParts = header.split(separator: ":")
if headerParts.count == 2 {
headers.append(try! HeaderName(String(headerParts[0])), HeaderValue(String(headerParts[1])))
headers.append(
try! HeaderName(String(headerParts[0])),
HeaderValue(String(headerParts[1])))
} else {
logger.error("Error parsing header: \(header)")
}
Expand All @@ -153,8 +162,11 @@ extension Data {
payload.append(self[payloadStartIndex..<payloadEndIndex])
msg.payload = payload
}

startIndex = self.index(payloadEndIndex, offsetBy: Data.crlf.count, limitedBy: self.endIndex) ?? self.endIndex

startIndex =
self.index(
payloadEndIndex, offsetBy: Data.crlf.count, limitedBy: self.endIndex)
?? self.endIndex
serverOps.append(.HMessage(msg))
continue
}
Expand Down
7 changes: 4 additions & 3 deletions Sources/NatsSwift/Extensions/String+Utilities.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
import Foundation

extension String {
private static let charactersToTrim: CharacterSet = .whitespacesAndNewlines.union(CharacterSet(charactersIn: "'"))
private static let charactersToTrim: CharacterSet = .whitespacesAndNewlines.union(
CharacterSet(charactersIn: "'"))

static func hash() -> String {
let uuid = String.uuid()
return uuid[0...7]
}

func trimWhitespacesAndApostrophes() -> String {
return self.trimmingCharacters(in: String.charactersToTrim)
}
Expand All @@ -21,7 +22,7 @@ extension String {
return UUID().uuidString.trimmingCharacters(in: .punctuationCharacters)
}

subscript (bounds: CountableClosedRange<Int>) -> String {
subscript(bounds: CountableClosedRange<Int>) -> String {
let start = index(startIndex, offsetBy: bounds.lowerBound)
let end = index(startIndex, offsetBy: bounds.upperBound)
return String(self[start...end])
Expand Down
26 changes: 16 additions & 10 deletions Sources/NatsSwift/NatsClient/NatsClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
// NatsSwift
//

import Dispatch
import Foundation
import Logging
import NIO
import NIOFoundationCompat
import Dispatch

import Logging

var logger = Logger(label: "NatsSwift")

Expand Down Expand Up @@ -69,44 +68,51 @@ public class Client {
}

extension Client {
public func connect() async throws {
public func connect() async throws {
//TODO(jrm): reafactor for reconnection and review error handling.
//TODO(jrm): handle response
logger.debug("connect")
guard let connectionHandler = self.connectionHandler else {
throw NSError(domain: "nats_swift", code: 1, userInfo: ["message": "empty connection handler"])
throw NSError(
domain: "nats_swift", code: 1, userInfo: ["message": "empty connection handler"])
}
try await connectionHandler.connect()
}

public func close() async throws {
logger.debug("close")
guard let connectionHandler = self.connectionHandler else {
throw NSError(domain: "nats_swift", code: 1, userInfo: ["message": "empty connection handler"])
throw NSError(
domain: "nats_swift", code: 1, userInfo: ["message": "empty connection handler"])
}
try await connectionHandler.close()
}

public func publish(_ payload: Data, subject: String, reply: String? = nil, headers: HeaderMap? = nil) throws {
public func publish(
_ payload: Data, subject: String, reply: String? = nil, headers: HeaderMap? = nil
) throws {
logger.debug("publish")
guard let connectionHandler = self.connectionHandler else {
throw NSError(domain: "nats_swift", code: 1, userInfo: ["message": "empty connection handler"])
throw NSError(
domain: "nats_swift", code: 1, userInfo: ["message": "empty connection handler"])
}
try connectionHandler.write(operation: ClientOp.Publish((subject, reply, payload, headers)))
}

public func flush() async throws {
logger.debug("flush")
guard let connectionHandler = self.connectionHandler else {
throw NSError(domain: "nats_swift", code: 1, userInfo: ["message": "empty connection handler"])
throw NSError(
domain: "nats_swift", code: 1, userInfo: ["message": "empty connection handler"])
}
connectionHandler.channel?.flush()
}

public func subscribe(to subject: String) async throws -> Subscription {
logger.info("subscribe to subject \(subject)")
guard let connectionHandler = self.connectionHandler else {
throw NSError(domain: "nats_swift", code: 1, userInfo: ["message": "empty connection handler"])
throw NSError(
domain: "nats_swift", code: 1, userInfo: ["message": "empty connection handler"])
}
return try await connectionHandler.subscribe(subject)

Expand Down
2 changes: 1 addition & 1 deletion Sources/NatsSwift/NatsClient/NatsClientOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
// NatsSwift
//

import Dispatch
import Foundation
import NIO
import NIOFoundationCompat
import Dispatch

public class ClientOptions {
private var urls: [URL] = []
Expand Down
Loading

0 comments on commit 646d019

Please sign in to comment.