Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async connection #12

Merged
merged 7 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import PackageDescription

let package = Package(
name: "NatsSwift",
platforms: [
.macOS(.v10_15)
],
products: [
.library(name: "NatsSwift", targets: ["NatsSwift"])
],
Expand All @@ -19,3 +22,4 @@ let package = Package(
.testTarget(name: "NatsSwiftTests", dependencies: ["NatsSwift"])
]
)

5 changes: 2 additions & 3 deletions Sources/NatsSwift/NatsClient/NatsClient+ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ extension NatsClient: ChannelInboundHandler {

switch type {
case .ping:
self.sendMessage(NatsMessage.pong())
self.sendMessage(OldNatsMessage.pong())
case .ok:
self.fire(.response)
case .error:
Expand Down Expand Up @@ -75,7 +75,6 @@ extension NatsClient: ChannelInboundHandler {
}
}


extension NatsClient {

// MARK: - Implement Internal Methods
Expand Down Expand Up @@ -108,7 +107,7 @@ extension NatsClient {
return
}

guard let message = NatsMessage.parse(messageStr) else { return }
guard let message = OldNatsMessage.parse(messageStr) else { return }

guard let handler = self.subjectHandlerStore[message.subject] else { return }

Expand Down
8 changes: 4 additions & 4 deletions Sources/NatsSwift/NatsClient/NatsClient+Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ extension NatsClient: NatsConnection {
// MARK: - Implement NatsConnection Protocol

/// Connect to the NATS server
open func connect() throws {
public func connect() throws {
logger.debug("Try to connect.")
guard self.state != .connected else {
logger.info("Already connected, skip connection.")
Expand Down Expand Up @@ -41,7 +41,7 @@ extension NatsClient: NatsConnection {
}

/// Disconnect from the NATS server
open func disconnect() {
public func disconnect() {
logger.debug("Try to disconnect.")
do {
try self.channel?.close().wait()
Expand All @@ -58,7 +58,7 @@ extension NatsClient: NatsConnection {

// MARK: - Internal Methods

open func reconnect() throws {
public func reconnect() throws {
self.fire(.reconnecting)

// disconnect - if not already
Expand Down Expand Up @@ -187,7 +187,7 @@ extension NatsClient: NatsConnection {
"pass": password
] as [String : Any]

self.sendMessage(NatsMessage.connect(config: config))
self.sendMessage(OldNatsMessage.connect(config: config))
}

}
6 changes: 3 additions & 3 deletions Sources/NatsSwift/NatsClient/NatsClient+Publish.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ extension NatsClient: NatsPublish {

open func publish(_ payload: String, to subject: String) {
logger.info("publish \(payload.count) characters to subject \(subject)")
sendMessage(NatsMessage.publish(payload: payload, subject: subject))
sendMessage(OldNatsMessage.publish(payload: payload, subject: subject))
}

open func publish(_ payload: String, to subject: NatsSubject) {
publish(payload, to: subject.subject)
}

open func reply(to message: NatsMessage, withPayload payload: String) {
open func reply(to message: OldNatsMessage, withPayload payload: String) {
guard let replySubject = message.replySubject else { return }
logger.info("reply \(payload.count) characters to subject \(replySubject.subject)")
publish(payload, to: replySubject.subject)
Expand Down Expand Up @@ -51,7 +51,7 @@ extension NatsClient: NatsPublish {
try publishSync(payload, to: subject.subject)
}

open func replySync(to message: NatsMessage, withPayload payload: String) throws {
open func replySync(to message: OldNatsMessage, withPayload payload: String) throws {
guard let replySubject = message.replySubject else { return }
try publishSync(payload, to: replySubject.subject)
}
Expand Down
20 changes: 10 additions & 10 deletions Sources/NatsSwift/NatsClient/NatsClient+Subscribe.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,23 @@ extension NatsClient: NatsSubscribe {
// MARK: - Implement NatsSubscribe Protocol

@discardableResult
open func subscribe(to subject: String, _ handler: @escaping (NatsMessage) -> Void) -> NatsSubject {
open func subscribe(to subject: String, _ handler: @escaping (OldNatsMessage) -> Void) -> NatsSubject {
logger.info("subscribe to subject \(subject)")
let nsub = NatsSubject(subject: subject)

self.sendMessage(NatsMessage.subscribe(subject: nsub.subject, sid: nsub.id))
self.sendMessage(OldNatsMessage.subscribe(subject: nsub.subject, sid: nsub.id))

self.subjectHandlerStore[nsub] = handler

return nsub
}

@discardableResult
open func subscribe(to subject: String, asPartOf queue: String, _ handler: @escaping (NatsMessage) -> Void) -> NatsSubject {
open func subscribe(to subject: String, asPartOf queue: String, _ handler: @escaping (OldNatsMessage) -> Void) -> NatsSubject {
logger.info("subscribe to subject \(subject)")
let nsub = NatsSubject(subject: subject)

self.sendMessage(NatsMessage.subscribe(subject: nsub.subject, sid: nsub.id, queue: queue))
self.sendMessage(OldNatsMessage.subscribe(subject: nsub.subject, sid: nsub.id, queue: queue))

self.subjectHandlerStore[nsub] = handler

Expand All @@ -37,7 +37,7 @@ extension NatsClient: NatsSubscribe {

open func unsubscribe(from subject: NatsSubject) {
logger.info("unsubscribe from subject \(subject)")
self.sendMessage(NatsMessage.unsubscribe(sid: subject.id))
self.sendMessage(OldNatsMessage.unsubscribe(sid: subject.id))
self.subjectHandlerStore[subject] = nil

}
Expand All @@ -54,7 +54,7 @@ extension NatsClient: NatsSubscribe {
group.leave()
}

self.sendMessage(NatsMessage.unsubscribe(sid: subject.id))
self.sendMessage(OldNatsMessage.unsubscribe(sid: subject.id))

group.wait()

Expand All @@ -67,18 +67,18 @@ extension NatsClient: NatsSubscribe {
}

@discardableResult
open func subscribeSync(to subject: String, _ handler: @escaping (NatsMessage) -> Void) throws -> NatsSubject {
open func subscribeSync(to subject: String, _ handler: @escaping (OldNatsMessage) -> Void) throws -> NatsSubject {
return try subSync(to: subject, asPartOf: "", handler)
}

@discardableResult
open func subscribeSync(to subject: String, asPartOf queue: String, _ handler: @escaping (NatsMessage) -> Void) throws -> NatsSubject {
open func subscribeSync(to subject: String, asPartOf queue: String, _ handler: @escaping (OldNatsMessage) -> Void) throws -> NatsSubject {
return try subSync(to: subject, asPartOf: queue, handler)
}

// MARK: - Private methods

private func subSync(to subject: String, asPartOf queue: String, _ handler: @escaping (NatsMessage) -> Void) throws -> NatsSubject {
private func subSync(to subject: String, asPartOf queue: String, _ handler: @escaping (OldNatsMessage) -> Void) throws -> NatsSubject {
logger.info("subscribe synchronous from subject \(subject)")
let group = DispatchGroup()
group.enter()
Expand All @@ -91,7 +91,7 @@ extension NatsClient: NatsSubscribe {
}

let nsub = NatsSubject(subject: subject)
self.sendMessage(NatsMessage.subscribe(subject: nsub.subject, sid: nsub.id))
self.sendMessage(OldNatsMessage.subscribe(subject: nsub.subject, sid: nsub.id))

group.wait()

Expand Down
Loading
Loading