From 256727a12b0818c67a8d31d22694f413ece066c1 Mon Sep 17 00:00:00 2001 From: Michael van Straten Date: Fri, 29 Sep 2023 13:07:39 +0200 Subject: [PATCH] Mockup implementation with swift-nio --- Package.resolved | 36 ++++ Package.swift | 13 +- Sources/SwiftyRedis/AsyncNetworking.swift | 187 ------------------ Sources/SwiftyRedis/AsyncUtils.swift | 104 ++++++++++ Sources/SwiftyRedis/Client.swift | 41 ++-- Sources/SwiftyRedis/Connection.swift | 14 +- Sources/SwiftyRedis/ResponseValueParser.swift | 5 +- Tests/SwiftyRedisTests/ConnectionTests.swift | 5 +- .../ResponseValueParserTests.swift | 11 +- 9 files changed, 189 insertions(+), 227 deletions(-) delete mode 100644 Sources/SwiftyRedis/AsyncNetworking.swift create mode 100644 Sources/SwiftyRedis/AsyncUtils.swift diff --git a/Package.resolved b/Package.resolved index ecd6980be..fc2e41ad6 100644 --- a/Package.resolved +++ b/Package.resolved @@ -9,6 +9,24 @@ "version" : "0.0.7" } }, + { + "identity" : "swift-atomics", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-atomics.git", + "state" : { + "revision" : "6c89474e62719ddcc1e9614989fff2f68208fe10", + "version" : "1.1.0" + } + }, + { + "identity" : "swift-collections", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-collections.git", + "state" : { + "revision" : "937e904258d22af6e447a0b72c0bc67583ef64a2", + "version" : "1.0.4" + } + }, { "identity" : "swift-docc-plugin", "kind" : "remoteSourceControl", @@ -17,6 +35,24 @@ "revision" : "3303b164430d9a7055ba484c8ead67a52f7b74f6", "version" : "1.0.0" } + }, + { + "identity" : "swift-nio", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio", + "state" : { + "revision" : "3db5c4aeee8100d2db6f1eaf3864afdad5dc68fd", + "version" : "2.59.0" + } + }, + { + "identity" : "swift-nio-ssl", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-ssl", + "state" : { + "revision" : "320bd978cceb8e88c125dcbb774943a92f6286e9", + "version" : "2.25.0" + } } ], "version" : 2 diff --git a/Package.swift b/Package.swift index df2464fc0..cc147b7fe 100644 --- a/Package.swift +++ b/Package.swift @@ -7,7 +7,7 @@ let package = Package( name: "SwiftyRedis", platforms: [ .iOS(.v13), - .macOS(.v10_15), + .macOS(.v13), .macCatalyst(.v13), .tvOS(.v13), .watchOS(.v6), @@ -24,13 +24,22 @@ let package = Package( // .package(url: /* package url */, from: "1.0.0"), .package(url: "https://github.com/apple/swift-docc-plugin", from: "1.0.0"), .package(url: "https://github.com/groue/Semaphore", from: "0.0.7"), + .package(url: "https://github.com/apple/swift-nio", from: "2.59.0"), + .package(url: "https://github.com/apple/swift-nio-ssl", from: "2.25.0") ], targets: [ // Targets are the basic building blocks of a package. A target can define a module or a test suite. // Targets can depend on other targets in this package, and on products in packages this package depends on. .target( name: "SwiftyRedis", - dependencies: ["Semaphore"] + dependencies: [ + "Semaphore", + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "NIOPosix", package: "swift-nio"), + .product(name: "NIO", package: "swift-nio"), + .product(name: "NIOConcurrencyHelpers", package: "swift-nio"), + .product(name: "NIOSSL", package: "swift-nio-ssl") + ] ), .testTarget( name: "SwiftyRedisTests", diff --git a/Sources/SwiftyRedis/AsyncNetworking.swift b/Sources/SwiftyRedis/AsyncNetworking.swift deleted file mode 100644 index e379ec3b9..000000000 --- a/Sources/SwiftyRedis/AsyncNetworking.swift +++ /dev/null @@ -1,187 +0,0 @@ -// -// AsyncNetworking.swift -// -// -// Created by Michael van Straten on 27.05.23. -// - -import Foundation -import Network - -/** - A struct representing an asynchronous data stream. - */ -struct AsyncDataStream { - let con: NWConnection - var buffer: Data = .init() - - /** - Polls for new data and appends it to the buffer. - */ - private mutating func poll() async throws { - switch try await con.recieve(minimumIncompleteLength: 0, maximumLength: .max) { - case let .some(newData): - buffer.append(newData) - case .none: - return - } - } - - /** - Checks if there is insufficient data in the buffer. - - Parameter requiredAmount: The required amount of data. - - Returns: `true` if there is insufficient data, `false` otherwise. - */ - private func hasInsufficientData(requiredAmount: Int) -> Bool { - return requiredAmount > buffer.count - } -} - -extension AsyncDataStream { - /** - Retrieves the next byte from the data stream. - - Returns: The next byte. - */ - mutating func next() async throws -> UInt8 { - while hasInsufficientData(requiredAmount: 1) { - try await poll() - } - - return buffer.popFirst()! - } - - /** - Retrieves the next `n` bytes from the data stream. - - Parameter n: The number of bytes to retrieve. - - Returns: The retrieved data. - */ - mutating func next(n: Int) async throws -> Data { - if n == 0 { - return Data() - } - - while hasInsufficientData(requiredAmount: n) { - try await poll() - } - - return Data((1 ... n).map { _ in self.buffer.removeFirst() }) - } - - /** - Retrieves data from the data stream until a specific subsequence is found. - - Parameter subsequence: The subsequence to search for. - - Returns: The retrieved data. - */ - mutating func nextUntil(subsequence: S) async throws -> Data { - var range = buffer.firstRange(of: subsequence) - while range == nil { - try await poll() - range = buffer.firstRange(of: subsequence) - } - - return try await next(n: range!.lowerBound - buffer.indices.lowerBound) - } -} - -extension NWConnection { - /** - Creates an ``AsyncDataStream`` from the current connection. - - Returns: An ``AsyncDataStream` instance. - */ - func dataStream() -> AsyncDataStream { - AsyncDataStream(con: self) - } -} - -extension NWConnection { - /** - Starts the connection asynchronously. - - Parameters: - - queue: The dispatch queue to use for handling events. - - Throws: An error if the connection fails to start. - */ - func start(queue: DispatchQueue) async throws { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - self.stateUpdateHandler = { newState in - switch newState { - case let .waiting(error), let .failed(error): - self.stateUpdateHandler = nil - continuation.resume(throwing: error) - case .ready: - self.stateUpdateHandler = nil - continuation.resume() - case .cancelled: - self.restart() - default: - break - } - } - self.start(queue: queue) - } - } - - /** - Sends data over the connection asynchronously. - - Parameters: - - content: The data to send. - - contentContext: The context for the content being sent. - - isComplete: A flag indicating if the content is complete. - - Throws: An error if the send operation fails. - */ - func send(content: Data?, contentContext _: NWConnection.ContentContext = .defaultMessage, isComplete _: Bool = true) async throws { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - self.send( - content: content, - completion: .contentProcessed { error in - if let error = error { - continuation.resume(throwing: error) - } else { - continuation.resume(returning: ()) - } - } - ) - } - } - - /** - Receives data from the connection asynchronously. - - Parameters: - - minimumIncompleteLength: The minimum length of incomplete data to receive. - - maximumLength: The maximum length of data to receive. - - Returns: The received data, or `nil` if no data is available. - - Throws: An error if the receive operation fails. - */ - func recieve(minimumIncompleteLength: Int, maximumLength: Int) async throws -> Data? { - try await withCheckedThrowingContinuation { continuation in - self.receive(minimumIncompleteLength: minimumIncompleteLength, maximumLength: maximumLength) { data, _, _, network_error in - if let network_error = network_error { - continuation.resume(throwing: network_error) - } else { - continuation.resume(returning: data) - } - } - } - } -} - -/** - Maps each element of the sequence asynchronously. - - - Parameter transform: A closure that takes an element of the sequence as a parameter and returns a transformed value. - - Returns: An array containing the transformed values. - - Throws: Errors thrown by the `transform` closure. - - Note: This extension method is used internally by the ``ResponseValueParser`` and ``RedisPipeline`` class. - */ -extension Sequence { - func mapAsync( - _ transform: (Element) async throws -> T - ) async rethrows -> [T] { - var values = [T]() - - for element in self { - try await values.append(transform(element)) - } - - return values - } -} diff --git a/Sources/SwiftyRedis/AsyncUtils.swift b/Sources/SwiftyRedis/AsyncUtils.swift new file mode 100644 index 000000000..b9fcfddc8 --- /dev/null +++ b/Sources/SwiftyRedis/AsyncUtils.swift @@ -0,0 +1,104 @@ +// +// AsyncNetworking.swift +// +// +// Created by Michael van Straten on 27.05.23. +// + +import Foundation +import Network +@_spi(AsyncChannel) import NIOCore + +/** + A struct representing an asynchronous data stream. + */ +struct AsyncDataStream { + var con: NIOAsyncChannelInboundStream.AsyncIterator + var buffer: ByteBuffer = ByteBuffer() + + /** + Polls for new data and appends it to the buffer. + */ + private mutating func poll() async throws { + if let new_data = try await con.next() { + buffer.writeImmutableBuffer(new_data) + } + } + + /** + Checks if there is insufficient data in the buffer. + - Parameter requiredAmount: The required amount of data. + - Returns: `true` if there is insufficient data, `false` otherwise. + */ + private func hasInsufficientData(requiredAmount: Int) -> Bool { + return requiredAmount > buffer.readableBytes + } +} + +extension AsyncDataStream { + /** + Retrieves the next byte from the data stream. + - Returns: The next byte. + */ + mutating func next() async throws -> UInt8 { + while hasInsufficientData(requiredAmount: 1) { + try await poll() + } + + return buffer.readBytes(length: 1)![0] + } + + /** + Retrieves the next `n` bytes from the data stream. + - Parameter n: The number of bytes to retrieve. + - Returns: The retrieved data. + */ + mutating func next(n: Int) async throws -> Data { + if n == 0 { + return Data() + } + + while hasInsufficientData(requiredAmount: n) { + try await poll() + } + + return Data(self.buffer.readBytes(length: n)!) + } + + /** + Retrieves data from the data stream until a specific subsequence is found. + - Parameter subsequence: The subsequence to search for. + - Returns: The retrieved data. + */ + mutating func nextUntil(subsequence: S) async throws -> Data { + var range = buffer.readableBytesView.firstRange(of: subsequence) + while range == nil { + try await poll() + range = buffer.readableBytesView.firstRange(of: subsequence) + } + + return try await next(n: range!.lowerBound - buffer.readableBytesView.startIndex) + } +} + +/** + Maps each element of the sequence asynchronously. + + - Parameter transform: A closure that takes an element of the sequence as a parameter and returns a transformed value. + - Returns: An array containing the transformed values. + - Throws: Errors thrown by the `transform` closure. + - Note: This extension method is used internally by the ``ResponseValueParser`` and ``RedisPipeline`` class. + */ +extension Sequence { + func mapAsync( + _ transform: (Element) async throws -> T + ) async rethrows -> [T] { + var values = [T]() + + for element in self { + try await values.append(transform(element)) + } + + return values + } +} diff --git a/Sources/SwiftyRedis/Client.swift b/Sources/SwiftyRedis/Client.swift index 8629c47a6..98507503e 100644 --- a/Sources/SwiftyRedis/Client.swift +++ b/Sources/SwiftyRedis/Client.swift @@ -5,7 +5,8 @@ // Created by Michael Van straten on 11.07.22. // -import Network +@_spi(AsyncChannel) import NIOCore +@_spi(AsyncChannel) import NIOPosix /** The `RedisClient` acts as connector to the redis server. By itself it does not @@ -22,9 +23,9 @@ public class RedisClient { /// As RedisClient initialized to connect to localhost:6379 without using TLS. This can be used for easier development. public static let LOCAL = RedisClient(.init("localhost")) - let host: NWEndpoint.Host - let port: NWEndpoint.Port - let params: NWParameters + let host: String + let port: Int + // let tlsConfiguration: TLSConfiguration? let database: UInt let username: String? let password: String? @@ -42,16 +43,16 @@ public class RedisClient { - password: The password to use when authenticating. */ public init( - _ host: NWEndpoint.Host, - port: NWEndpoint.Port = .init(integerLiteral: 6379), - params: NWParameters = .tcp, + _ host: String, + port: Int = 6379, + // tlsConfiguration: TLSConfiguration? = .clientDefault, database: UInt = 0, username: String? = nil, password: String? = nil ) { self.host = host self.port = port - self.params = params + // self.tlsConfiguration = tlsConfiguration self.database = database self.username = username self.password = password @@ -63,10 +64,14 @@ public class RedisClient { - Returns: A connection object that can be used to send commands to the Redis server. - Throws: An error if the connection cannot be established. */ - public func get_connection() async throws -> RedisConnection { - let actual_connection = create_nw_connection() - try await actual_connection.start(queue: DispatchQueue(label: "redis-connection-updates")) - let redis_connection = RedisConnection(actual_connection) + public func get_connection(group: EventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)) async throws -> RedisConnection { + let clientChannel = try await ClientBootstrap(group: group) + .connect(host: host, port: port) { channel in + channel.eventLoop.makeSucceededFuture(try! NIOAsyncChannel(synchronouslyWrapping: channel)) + } + + let redis_connection = RedisConnection(clientChannel) + if database != 0 { try await redis_connection.select(Int(database)) } @@ -82,16 +87,8 @@ public class RedisClient { - Returns: A connection object that can be used to subscribe to channels and receive messages. - Throws: An error if the connection cannot be established. */ - public func get_pub_sub_connection() async throws -> PubSubConnection { - let redis_connection = try await get_connection() + public func get_pub_sub_connection(group: EventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)) async throws -> PubSubConnection { + let redis_connection = try await get_connection(group: group) return PubSubConnection(con: redis_connection) } - - private func create_nw_connection() -> NWConnection { - return NWConnection( - host: host, - port: port, - using: params - ) - } } diff --git a/Sources/SwiftyRedis/Connection.swift b/Sources/SwiftyRedis/Connection.swift index 9c51f4060..43adfa41c 100644 --- a/Sources/SwiftyRedis/Connection.swift +++ b/Sources/SwiftyRedis/Connection.swift @@ -6,7 +6,7 @@ // import Foundation -import Network +@_spi(AsyncChannel) import NIOCore import Semaphore /** @@ -17,18 +17,18 @@ import Semaphore It is bound to a single database and can only be created from the ``RedisClient` class. */ public class RedisConnection { - var con: NWConnection + var channel: NIOAsyncChannel let parser: ResponseValueParser let semaphore: AsyncSemaphore /** Initializes a Redis connection object. - - Parameter actual_connection: The underlying network connection. + - Parameter clientChannel: The underlying network channel. */ - internal init(_ actual_connection: NWConnection) { - con = actual_connection - parser = ResponseValueParser(parse: con.dataStream()) + internal init(_ clientChannel: NIOAsyncChannel) { + channel = clientChannel + parser = ResponseValueParser(parse: AsyncDataStream(con: channel.inboundStream.makeAsyncIterator())) semaphore = AsyncSemaphore(value: 1) } @@ -39,7 +39,7 @@ public class RedisConnection { - Throws: An error if the command sending fails. */ internal func send_packed_command(_ cmd: Data) async throws { - try await con.send(content: cmd) + try await channel.outboundWriter.write(ByteBuffer(bytes: cmd)) } /** diff --git a/Sources/SwiftyRedis/ResponseValueParser.swift b/Sources/SwiftyRedis/ResponseValueParser.swift index 3a05a6002..3e9cd52ec 100644 --- a/Sources/SwiftyRedis/ResponseValueParser.swift +++ b/Sources/SwiftyRedis/ResponseValueParser.swift @@ -6,6 +6,7 @@ // import Foundation +@_spi(AsyncChannel) import NIOCore /** The ResponseValueParser class is responsible for parsing Redis server responses. @@ -18,8 +19,8 @@ class ResponseValueParser { - Parameter toParseStream: The `AsyncDataStream` to parse. */ - init(parse toParseStream: AsyncDataStream) { - stream = toParseStream + init(parse streamToParse: AsyncDataStream) { + stream = streamToParse } /** diff --git a/Tests/SwiftyRedisTests/ConnectionTests.swift b/Tests/SwiftyRedisTests/ConnectionTests.swift index 0bf112c89..0be3de362 100644 --- a/Tests/SwiftyRedisTests/ConnectionTests.swift +++ b/Tests/SwiftyRedisTests/ConnectionTests.swift @@ -5,15 +5,16 @@ // Created by Michael Van straten on 13.07.22. // -import Network @testable import SwiftyRedis +import NIO import XCTest final class ConnectionTests: XCTestCase { let client = RedisClient.LOCAL + let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) func test_simple() async throws { - let connection = try await client.get_connection() + let connection = try await client.get_connection(group: group ) try await connection.acl_setuser("virginia", "on", "+GET", "allkeys", "(+SET ~app2*)") let user_info: RedisValue = try await connection.acl_getuser("virginia") print(user_info) diff --git a/Tests/SwiftyRedisTests/ResponseValueParserTests.swift b/Tests/SwiftyRedisTests/ResponseValueParserTests.swift index aa8bbff21..abed9f6d6 100644 --- a/Tests/SwiftyRedisTests/ResponseValueParserTests.swift +++ b/Tests/SwiftyRedisTests/ResponseValueParserTests.swift @@ -5,8 +5,9 @@ // Created by Michael Van straten on 12.07.22. // -import Network @testable import SwiftyRedis +@_spi(AsyncChannel) import NIOCore +import NIOEmbedded import XCTest final class ResponseValueParserTests: XCTestCase { @@ -71,7 +72,7 @@ final class ResponseValueParserTests: XCTestCase { private extension String { func parseRedisValue() async throws -> RedisValue { - let dummyStream = AsyncDataStream(finalData: data(using: .utf8)!) + let dummyStream = AsyncDataStream(finalData: ByteBuffer(string: self)) let parser = ResponseValueParser(parse: dummyStream) return try await parser.parse_value() @@ -79,9 +80,9 @@ private extension String { } private extension AsyncDataStream { - init(finalData: Data) { - let dummyConnection = NWConnection(host: .name("DummyConnection", .none), port: .any, using: .tls) - self.init(con: dummyConnection) + init(finalData: ByteBuffer) { + + self.init(con: try! NIOAsyncChannel(synchronouslyWrapping: EmbeddedChannel()).inboundStream.makeAsyncIterator()) buffer = finalData } }