Skip to content

Commit

Permalink
Move ByteBuffer extension to separate file
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <piotr@synadia.com>
  • Loading branch information
piotrpio committed Apr 5, 2024
1 parent 55c30d9 commit a0a3582
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 80 deletions.
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ let package = Package(
),

.executableTarget(name: "bench", dependencies: ["Nats"]),
.executableTarget(name: "Benchmark", dependencies: ["Nats", .product(name: "NIOCore", package: "swift-nio")]),
.executableTarget(name: "Benchmark", dependencies: ["Nats"]),
.executableTarget(name: "BenchmarkPubSub", dependencies: ["Nats"]),
.executableTarget(name: "BenchmarkSub", dependencies: ["Nats"]),
.executableTarget(name: "Example", dependencies: ["Nats"]),
Expand Down
3 changes: 1 addition & 2 deletions Sources/Benchmark/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// limitations under the License.

import Foundation
import NIOCore
import Nats

let nats = NatsClientOptions()
Expand All @@ -30,7 +29,7 @@ for _ in 0..<10_000 {
}
print("Starting benchmark...")
let now = DispatchTime.now()
let numMsgs = 100_000
let numMsgs = 1_000_000
for _ in 0..<numMsgs {
try! await nats.publish(data, subject: "foo")
}
Expand Down
4 changes: 3 additions & 1 deletion Sources/Nats/BatchBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ internal class BatchBuffer {
}
state.waitingPromises.removeAll()
case .failure(let error):
state.waitingPromises.forEach { $0.1.resume(throwing: error) }
for promise in state.waitingPromises {
promise.1.resume(throwing: error)
}
state.waitingPromises.removeAll()
}

Expand Down
87 changes: 87 additions & 0 deletions Sources/Nats/Extensions/ByteBuffer+Writer.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import Foundation
import NIO

extension ByteBuffer {
mutating func writeClientOp(_ op: ClientOp) {
switch op {
case .publish((let subject, let reply, let payload, let headers)):
if let payload = payload {
self.reserveCapacity(
minimumWritableBytes: payload.count + subject.utf8.count
+ NatsOperation.publish.rawValue.count + 12)
if headers != nil {
self.writeBytes(NatsOperation.hpublish.rawBytes)
} else {
self.writeBytes(NatsOperation.publish.rawBytes)
}
self.writeString(" ")
self.writeString(subject)
self.writeString(" ")
if let reply = reply {
self.writeString("\(reply) ")
}
if let headers = headers {
let headers = headers.toBytes()
let totalLen = headers.count + payload.count
let headersLen = headers.count
self.writeString("\(headersLen) \(totalLen)\r\n")
self.writeData(headers)
} else {
self.writeString("\(payload.count)\r\n")
}
self.writeData(payload)
self.writeString("\r\n")
} else {
self.reserveCapacity(
minimumWritableBytes: subject.utf8.count + NatsOperation.publish.rawValue.count
+ 12)
self.writeBytes(NatsOperation.publish.rawBytes)
self.writeString(" ")
self.writeString(subject)
if let reply = reply {
self.writeString("\(reply) ")
}
self.writeString("\r\n")
}

case .subscribe((let sid, let subject, let queue)):
if let queue {
self.writeString(
"\(NatsOperation.subscribe.rawValue) \(subject) \(queue) \(sid)\r\n")
} else {
self.writeString("\(NatsOperation.subscribe.rawValue) \(subject) \(sid)\r\n")
}

case .unsubscribe((let sid, let max)):
if let max {
self.writeString("\(NatsOperation.unsubscribe.rawValue) \(sid) \(max)\r\n")
} else {
self.writeString("\(NatsOperation.unsubscribe.rawValue) \(sid)\r\n")
}
case .connect(let info):
// This encode can't actually fail
let json = try! JSONEncoder().encode(info)
self.reserveCapacity(minimumWritableBytes: json.count + 5)
self.writeString("\(NatsOperation.connect.rawValue) ")
self.writeData(json)
self.writeString("\r\n")
case .ping:
self.writeString("\(NatsOperation.ping.rawValue)\r\n")
case .pong:
self.writeString("\(NatsOperation.pong.rawValue)\r\n")
}
}
}
4 changes: 0 additions & 4 deletions Sources/Nats/NatsClient/NatsClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ public class NatsClient {
internal var buffer: ByteBuffer
internal var connectionHandler: ConnectionHandler?

internal var batchBuffer: BatchBuffer? {
self.connectionHandler?.batchBuffer
}

internal init() {
self.buffer = allocator.buffer(capacity: 1024)
}
Expand Down
72 changes: 0 additions & 72 deletions Sources/Nats/NatsProto.swift
Original file line number Diff line number Diff line change
Expand Up @@ -251,78 +251,6 @@ struct ServerInfo: Codable, Equatable {
}
}

extension ByteBuffer {
mutating func writeClientOp(_ op: ClientOp) {
switch op {
case .publish((let subject, let reply, let payload, let headers)):
if let payload = payload {
self.reserveCapacity(
minimumWritableBytes: payload.count + subject.utf8.count
+ NatsOperation.publish.rawValue.count + 12)
if headers != nil {
self.writeBytes(NatsOperation.hpublish.rawBytes)
} else {
self.writeBytes(NatsOperation.publish.rawBytes)
}
self.writeString(" ")
self.writeString(subject)
self.writeString(" ")
if let reply = reply {
self.writeString("\(reply) ")
}
if let headers = headers {
let headers = headers.toBytes()
let totalLen = headers.count + payload.count
let headersLen = headers.count
self.writeString("\(headersLen) \(totalLen)\r\n")
self.writeData(headers)
} else {
self.writeString("\(payload.count)\r\n")
}
self.writeData(payload)
self.writeString("\r\n")
} else {
self.reserveCapacity(
minimumWritableBytes: subject.utf8.count + NatsOperation.publish.rawValue.count
+ 12)
self.writeBytes(NatsOperation.publish.rawBytes)
self.writeString(" ")
self.writeString(subject)
if let reply = reply {
self.writeString("\(reply) ")
}
self.writeString("\r\n")
}

case .subscribe((let sid, let subject, let queue)):
if let queue {
self.writeString(
"\(NatsOperation.subscribe.rawValue) \(subject) \(queue) \(sid)\r\n")
} else {
self.writeString("\(NatsOperation.subscribe.rawValue) \(subject) \(sid)\r\n")
}

case .unsubscribe((let sid, let max)):
if let max {
self.writeString("\(NatsOperation.unsubscribe.rawValue) \(sid) \(max)\r\n")
} else {
self.writeString("\(NatsOperation.unsubscribe.rawValue) \(sid)\r\n")
}
case .connect(let info):
// This encode can't actually fail
let json = try! JSONEncoder().encode(info)
self.reserveCapacity(minimumWritableBytes: json.count + 5)
self.writeString("\(NatsOperation.connect.rawValue) ")
self.writeData(json)
self.writeString("\r\n")
case .ping:
self.writeString("\(NatsOperation.ping.rawValue)\r\n")
case .pong:
self.writeString("\(NatsOperation.pong.rawValue)\r\n")
}
}
}

enum ClientOp {
case publish((subject: String, reply: String?, payload: Data?, headers: NatsHeaderMap?))
case subscribe((sid: UInt64, subject: String, queue: String?))
Expand Down

0 comments on commit a0a3582

Please sign in to comment.