Skip to content

Commit

Permalink
Rename to NatsSwift
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Nov 27, 2023
1 parent 2f1273f commit b03a7e0
Show file tree
Hide file tree
Showing 28 changed files with 84 additions and 84 deletions.
12 changes: 6 additions & 6 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
// swift-tools-version:5.0
// swift-tools-version:5.7

import PackageDescription

let package = Package(
name: "SwiftyNats",
name: "NatsSwift",
products: [
.library(name: "SwiftyNats", targets: ["SwiftyNats"])
.library(name: "NatsSwift", targets: ["NatsSwift"])
],
dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.33.0"),
.package(url: "https://github.com/apple/swift-nio.git", from: "2.0.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.2")
],
targets: [
.target(name: "SwiftyNats", dependencies: [
.target(name: "NatsSwift", dependencies: [
.product(name: "NIO", package: "swift-nio"),
.product(name: "Logging", package: "swift-log")
]),
.testTarget(name: "SwiftyNatsTests", dependencies: ["SwiftyNats"])
.testTarget(name: "NatsSwiftTests", dependencies: ["NatsSwift"])
]
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ import Dispatch
extension NatsClient: NatsConnection {

// MARK: - Implement NatsConnection Protocol

/// Connect to the NATS server
open func connect() throws {
logger.debug("Try to connect.")
guard self.state != .connected else {
logger.info("Already connected, skip connection.")
return
}

self.dispatchGroup.enter()

#if os(Linux)
thread = Thread { self.setupConnection() }
#else
Expand Down Expand Up @@ -60,7 +60,7 @@ extension NatsClient: NatsConnection {

open func reconnect() throws {
self.fire(.reconnecting)

// disconnect - if not already
if state == .connected {
self.disconnect()
Expand All @@ -70,7 +70,7 @@ extension NatsClient: NatsConnection {
}

// MARK: - Private Methods

fileprivate func _setupConnection() {
self.connectionError = nil
// If we have a list of `connectUrls` in our current server
Expand All @@ -96,7 +96,7 @@ extension NatsClient: NatsConnection {
self.dispatchGroup.leave()
RunLoop.current.run()
}

#if os(macOS) || os(tvOS) || os(iOS)
@objc fileprivate func setupConnection() {
_setupConnection()
Expand All @@ -106,11 +106,11 @@ extension NatsClient: NatsConnection {
_setupConnection()
}
#endif

/// open the client connection to the streaming serer
fileprivate func openStream(to url: String) throws {
group = MultiThreadedEventLoopGroup(numberOfThreads: 1)

guard let server = URL(string: url) else {
throw NatsConnectionError("Invalid url provided: (\(url))")
}
Expand All @@ -134,14 +134,14 @@ extension NatsClient: NatsConnection {
break
}
}

let bootstrap = ClientBootstrap(group: self.group!)
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.channelInitializer { channel in
channel.pipeline.addHandler(self)
}.connectTimeout( TimeAmount.seconds(5) )


let futureConnection = bootstrap.connect(host: host, port: port)
futureConnection.whenFailure({ err in
logger.error("\(err.localizedDescription)")
Expand All @@ -155,7 +155,7 @@ extension NatsClient: NatsConnection {
}
})
_ = try futureConnection.wait()

// after the connection is done, we need to wait for the server answer
let timeout: TimeInterval = 5
let waiterStartTime = Date()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,47 @@
//

extension NatsClient: NatsEventBus {

// MARK: - Implement NatsEvents Protocol

@discardableResult
open func on(_ events: [NatsEvent], _ handler: @escaping (NatsEvent) -> Void) -> String {

return self.addListeners(for: events, using: handler)

}

@discardableResult
open func on(_ event: NatsEvent, _ handler: @escaping (NatsEvent) -> Void) -> String {

return self.addListeners(for: [event], using: handler)

}

@discardableResult
open func on(_ event: NatsEvent, autoOff: Bool, _ handler: @escaping (NatsEvent) -> Void) -> String {

return self.addListeners(for: [event], using: handler, autoOff)

}

@discardableResult
open func on(_ events: [NatsEvent], autoOff: Bool, _ handler: @escaping (NatsEvent) -> Void) -> String {

return self.addListeners(for: events, using: handler, autoOff)

}

open func off(_ id: String) {

self.removeListener(id)

}

// MARK: - Implement internal methods

internal func fire(_ event: NatsEvent) {

guard let handlerStore = self.eventHandlerStore[event] else { return }

handlerStore.forEach {
Expand All @@ -53,15 +53,15 @@ extension NatsClient: NatsEventBus {
removeListener($0.listenerId)
}
}

}

// MARK: - Implement private methods

fileprivate func addListeners(for events: [NatsEvent], using handler: @escaping (NatsEvent) -> Void, _ autoOff: Bool = false) -> String {

let id = String.hash()

for event in events {
if self.eventHandlerStore[event] == nil {
self.eventHandlerStore[event] = []
Expand All @@ -70,20 +70,20 @@ extension NatsClient: NatsEventBus {
}

return id

}

fileprivate func removeListener(_ id: String) {

for event in NatsEvent.all {

let handlerStore = self.eventHandlerStore[event]
if let store = handlerStore {
self.eventHandlerStore[event] = store.filter { $0.listenerId != id }
}

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ extension NatsClient: NatsQueue {
}

open func flushQueue(maxWait: TimeInterval? = nil) throws {

let startTimestamp = Date().timeIntervalSinceReferenceDate

self.disconnect()

DispatchQueue.global(qos: .default).async { [weak self] in
self?.messageQueue.waitUntilAllOperationsAreFinished()
}

while true {
if self.queueCount == 0 { break }
if let maxSeconds = maxWait {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ open class NatsClient: NSObject {
internal var thread: Thread?
internal var channel: Channel?
internal let dispatchGroup = DispatchGroup()

// Buffer where incoming messages will be stroed
internal var inputBuffer: ByteBuffer?

public init(_ aUrls: [String], _ config: NatsClientConfig) {
for u in aUrls { self.urls.append(u) }
self.config = config

writeQueue.maxConcurrentOperationCount = 1
logger.debug("Init NatsClient with config: \(config)")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@ var logger = Logger(label: "SwiftyNats")
public let libVersion = "2.2"

public struct NatsClientConfig {

// logging
public var loglevel: Logger.Level = .error {
didSet {
logger.logLevel = loglevel
}
}

// Required for nats server
public let verbose: Bool
public let pedantic: Bool
public let name: String
let lang: String = "Swift"
let version: String = libVersion

// Internal config vars
public var internalQueueMax: Int = Int.max

public init(
verbose: Bool = false,
pedantic: Bool = false,
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,25 @@
import Foundation

public struct NatsMessage {

public let payload: String?
public let byteCount: UInt32?
public let subject: NatsSubject
public let replySubject: NatsSubject?
public let mid: String

init(payload: String?, byteCount: UInt32?, subject: NatsSubject, replySubject: NatsSubject? = nil) {
self.payload = payload
self.byteCount = byteCount
self.subject = subject
self.replySubject = replySubject
self.mid = String.hash()
}

}

extension NatsMessage {

internal static func publish(payload: String, subject: String) -> String {
guard let data = payload.data(using: String.Encoding.utf8) else { return "" }
return "\(NatsOperation.publish.rawValue) \(subject) \(data.count)\r\n\(payload)\r\n"
Expand All @@ -46,26 +46,26 @@ extension NatsMessage {
guard let payload = data.toString() else { return "" }
return "\(NatsOperation.connect.rawValue) \(payload)\r\n"
}

internal static func parse(_ message: String) -> NatsMessage? {

logger.debug("Parsing message: \(message)")

let components = message.components(separatedBy: CharacterSet.newlines).filter { !$0.isEmpty }

if components.count <= 0 { return nil }

let payload = components[1]
let header = components[0]
.removePrefix(NatsOperation.message.rawValue)
.components(separatedBy: CharacterSet.whitespaces)
.filter { !$0.isEmpty }

let subject: String
let sid: String
let byteCount: UInt32?
let replySubject: String?

switch (header.count) {
case 3:
subject = header[0]
Expand All @@ -82,14 +82,14 @@ extension NatsMessage {
default:
return nil
}

return NatsMessage(
payload: payload,
byteCount: byteCount,
subject: NatsSubject(subject: subject, id: sid),
replySubject: replySubject == nil ? nil : NatsSubject(subject: replySubject!)
)

}

}
File renamed without changes.
Loading

0 comments on commit b03a7e0

Please sign in to comment.