Skip to content

Commit

Permalink
Examples and headers tweak
Browse files Browse the repository at this point in the history
  • Loading branch information
mtmk committed Feb 27, 2024
1 parent 3146904 commit 50ddb17
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 13 deletions.
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ let package = Package(
.executableTarget(name: "Benchmark", dependencies: ["NatsSwift"]),
.executableTarget(name: "BenchmarkPubSub", dependencies: ["NatsSwift"]),
.executableTarget(name: "BenchmarkSub", dependencies: ["NatsSwift"]),
.executableTarget(name: "Example", dependencies: ["NatsSwift"]),
]
)
5 changes: 2 additions & 3 deletions Sources/BenchmarkPubSub/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ let data = "foo".data(using: .utf8)!
// Warmup
print("Warming up...")
for _ in 0..<10_000 {
try! nats.publish(data, subject: "foo")
try! await nats.publish(data, subject: "foo")
}
print("Starting benchmark...")
let now = DispatchTime.now()
Expand Down Expand Up @@ -51,7 +51,7 @@ try await withThrowingTaskGroup(of: Void.self) { group in
hm.append(try! HeaderName("foo"), HeaderValue("baz"))
hm.insert(try! HeaderName("another"), HeaderValue("one"))
for i in 0..<numMsgs {
try nats.publish("\(i)".data(using: .utf8)!, subject: "foo", headers: hm)
try await nats.publish("\(i)".data(using: .utf8)!, subject: "foo", headers: hm)
if i % 1000 == 0 {
print("published \(i) msgs")
}
Expand All @@ -62,7 +62,6 @@ try await withThrowingTaskGroup(of: Void.self) { group in
try await group.waitForAll()
}

try! await nats.flush()
let elapsed = DispatchTime.now().uptimeNanoseconds - now.uptimeNanoseconds
let msgsPerSec: Double = Double(numMsgs) / (Double(elapsed) / 1_000_000_000)
print("Elapsed: \(elapsed / 1_000_000)ms")
Expand Down
118 changes: 111 additions & 7 deletions Sources/Example/main.swift
Original file line number Diff line number Diff line change
@@ -1,8 +1,112 @@
//
// File.swift
//
//
// Created by mtmk on 27/02/2024.
//

import Foundation
import NatsSwift

print("\n### Setup NATS Connection")

let nats = ClientOptions()
.url(URL(string: "nats://localhost:4222")!)
.logLevel(.error)
.build()

do {
print("connecting...")
try await nats.connect()
} catch {
print("Error: \(error)")
exit(1)
}

print("\n### Publish / Subscribe")
do {
print("subscribing...")
let sub = try await nats.subscribe(to: "foo.>")

let loop = Task {
print("starting message loop...")

for try await msg in sub {

if msg.subject == "foo.done" {
break
}

if let payload = msg.payload {
print("received \(msg.subject): \(String(data: payload, encoding: .utf8) ?? "")")
}
}

print("message loop done...")
}

print("publishing data...")
for i in 1...3 {
if let data = "data\(i)".data(using: .utf8) {
try await nats.publish(data, subject: "foo.\(i)")
}
}

print("signal done...")
try await nats.publish(Data(), subject: "foo.done")

try await loop.value

print("done")

} catch {
print("Error: \(error)")
exit(1)
}

print("\n### Publish / Subscribe with Headers")
do {
print("subscribing...")
let sub = try await nats.subscribe(to: "foo.>")

let loop = Task {
print("starting message loop...")

for try await msg in sub {

if msg.subject == "foo.done" {
break
}

if let payload = msg.payload {
print("received \(msg.subject): \(String(data: payload, encoding: .utf8) ?? "")")
}

if let headers = msg.headers {
for (name, value) in headers {
print(" \(name): \(value)")
}
}
}

print("message loop done...")
}

print("publishing data...")
for i in 1...3 {

var headers = HeaderMap()
headers["X-Type"] = "data"
headers["X-Index"] = "index-\(i)"

if let data = "data\(i)".data(using: .utf8) {
try await nats.publish(data, subject: "foo.\(i)", headers: headers)
}
}

print("signal done...")
try await nats.publish(Data(), subject: "foo.done")

try await loop.value

print("done")

} catch {
print("Error: \(error)")
exit(1)
}

print("bye")
11 changes: 10 additions & 1 deletion Sources/NatsSwift/NatsClient/NatsClientOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

import Dispatch
import Foundation
import Logging
import NIO
import NIOFoundationCompat

public class ClientOptions {
private var urls: [URL] = []
private var logerLevel: Logger.Level = .info
private var pingInterval: TimeInterval = 60.0
private var reconnectWait: TimeInterval = 2.0
private var maxReconnects: Int?
Expand All @@ -34,6 +36,11 @@ public class ClientOptions {
return self
}

public func logLevel(_ logerLevel: Logger.Level) -> ClientOptions {
self.logerLevel = logerLevel
return self
}

public func pingInterval(_ pingInterval: TimeInterval) -> ClientOptions {
self.pingInterval = pingInterval
return self
Expand Down Expand Up @@ -125,7 +132,9 @@ public class ClientOptions {
rootCertificate: rootCertificate,
retryOnFailedConnect: initialReconnect
)


logger.logLevel = self.logerLevel

return client
}
}
2 changes: 1 addition & 1 deletion Sources/NatsSwift/NatsConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class ConnectionHandler: ChannelInboundHandler {
// if there are more reconnect attempts than the number of servers,
// we are after the initial connect, so sleep between servers
let shouldSleep = self.reconnectAttempts >= self.urls.count
print(self.reconnectAttempts)
logger.debug("reconnect attempts: \(self.reconnectAttempts)")
for s in servers {
if let maxReconnects {
if reconnectAttempts >= maxReconnects {
Expand Down
70 changes: 69 additions & 1 deletion Sources/NatsSwift/NatsHeaders.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public struct HeaderName: Equatable, Hashable, CustomStringConvertible {
}

// Represents a NATS header map in Swift.
public struct HeaderMap: Equatable {
public struct HeaderMap: Equatable, Sequence {
private var inner: [HeaderName: [HeaderValue]]

public init() {
Expand Down Expand Up @@ -78,6 +78,10 @@ public struct HeaderMap: Equatable {
return inner[name] ?? []
}

public func makeIterator() -> HeaderMapIterator {
return HeaderMapIterator(dictionary: inner)
}

//TODO(jrm): can we use unsafe methods here? Probably yes.
func toBytes() -> [UInt8] {
var bytes: [UInt8] = []
Expand Down Expand Up @@ -108,4 +112,68 @@ extension HeaderMap {
}
}
}

public subscript(name: String) -> String? {
get {
return get(safeEncoded(name: name))?.description
}
set {
if let value = newValue {
insert(safeEncoded(name: name), HeaderValue(value))
} else {
inner[safeEncoded(name: name)] = nil
}
}
}

func safeEncoded(name: String) -> HeaderName {
let safeName = name.map { character -> String in
if character == ":" || character.asciiValue.map({ $0 < 33 || $0 > 126 }) ?? true {
// Encode the character manually, since it meets the criteria or is non-ASCII
let utf8 = String(character).utf8
return utf8.reduce("") { partialResult, byte in
partialResult + String(format: "%%%02X", byte)
}
} else {
// Return the character as is, since it doesn't need encoding
return String(character)
}
}.joined()

return try! HeaderName(safeName)
}
}

public struct HeaderMapIterator: IteratorProtocol {
private let dictionary: [HeaderName: [HeaderValue]]
private var keyIterator: Dictionary<HeaderName, [HeaderValue]>.Iterator
private var currentKey: HeaderName?
private var currentValueIterator: Array<HeaderValue>.Iterator?

init(dictionary: [HeaderName: [HeaderValue]]) {
self.dictionary = dictionary
self.keyIterator = dictionary.makeIterator()
}

public mutating func next() -> (name: HeaderName, value: HeaderValue)? {
// If there is a current array iterator and it has a next value, return the current key and value
if let value = currentValueIterator?.next() {
return (currentKey!, value)
}

// If the current array iterator is done or nil, move to the next key in the dictionary
if let nextKeyValPair = keyIterator.next() {
currentKey = nextKeyValPair.key
currentValueIterator = nextKeyValPair.value.makeIterator()

// Try to get the next value for the new key
if let value = currentValueIterator?.next() {
return (currentKey!, value)
}
}

// If no more keys or values, iteration is complete
return nil
}
}

0 comments on commit 50ddb17

Please sign in to comment.