diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index 9d0e02e6..57b5b477 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -29,7 +29,7 @@ jobs: strategy: matrix: os: [ubuntu-latest] - image: ["swift:6.1"] + image: ["swift:6.2"] container: image: ${{ matrix.image }} diff --git a/Package.swift b/Package.swift index 21d93cbf..70cc0e40 100644 --- a/Package.swift +++ b/Package.swift @@ -3,7 +3,7 @@ import PackageDescription -let defaultSwiftSettings: [SwiftSetting] = +var defaultSwiftSettings: [SwiftSetting] = [ .swiftLanguageMode(.v6), .enableExperimentalFeature("AvailabilityMacro=valkeySwift 1.0:macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0"), @@ -15,6 +15,12 @@ let defaultSwiftSettings: [SwiftSetting] = .enableExperimentalFeature("Lifetimes"), ] +#if compiler(>=6.2) +defaultSwiftSettings.append( + .enableUpcomingFeature("NonisolatedNonsendingByDefault") +) +#endif + let package = Package( name: "valkey-swift", products: [ diff --git a/Sources/Valkey/Cluster/ValkeyClusterClient.swift b/Sources/Valkey/Cluster/ValkeyClusterClient.swift index 6165c94c..2ce839e2 100644 --- a/Sources/Valkey/Cluster/ValkeyClusterClient.swift +++ b/Sources/Valkey/Cluster/ValkeyClusterClient.swift @@ -463,20 +463,18 @@ public final class ValkeyClusterClient: Sendable { /// - Parameters: /// - keys: Keys affected by operation. This is used to choose the cluster node /// - readOnly: Is this connection only going to be used with readonly commands - /// - isolation: Actor isolation /// - operation: Closure handling Valkey connection /// - Returns: Value returned by closure @inlinable public func withConnection( forKeys keys: some Collection, readOnly: Bool = false, - isolation: isolated (any Actor)? = #isolation, - operation: (ValkeyConnection) async throws -> sending Value + operation: (ValkeyConnection) async throws -> Value ) async throws -> Value { let hashSlots = keys.compactMap { HashSlot(key: $0) } let nodeSelection = getNodeSelection(readOnly: readOnly) let node = try await self.nodeClient(for: hashSlots, nodeSelection: nodeSelection) - return try await node.withConnection(isolation: isolation, operation: operation) + return try await node.withConnection(operation: operation) } @inlinable diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index 2b5cd5d5..3641b960 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -80,7 +80,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { /// connection. /// /// To avoid the cost of acquiring the connection and then closing it, it is always - /// preferable to use ``ValkeyClient/withConnection(isolation:operation:)`` which + /// preferable to use ``ValkeyClient/withConnection(operation:)`` which /// uses a persistent connection pool to provide connections to your Valkey database. /// /// - Parameters: @@ -88,7 +88,6 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { /// - configuration: Configuration of Valkey connection /// - eventLoop: EventLoop to run connection on /// - logger: Logger for connection - /// - isolation: Actor isolation /// - operation: Closure handling Valkey connection /// - Returns: Return value of operation closure public static func withConnection( @@ -96,7 +95,6 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { configuration: ValkeyConnectionConfiguration = .init(), eventLoop: any EventLoop = MultiThreadedEventLoopGroup.singleton.any(), logger: Logger, - isolation: isolated (any Actor)? = #isolation, operation: (ValkeyConnection) async throws -> sending Value ) async throws -> sending Value { let connection = try await connect( diff --git a/Sources/Valkey/Documentation.docc/Pubsub.md b/Sources/Valkey/Documentation.docc/Pubsub.md index f64a9711..30f559bb 100644 --- a/Sources/Valkey/Documentation.docc/Pubsub.md +++ b/Sources/Valkey/Documentation.docc/Pubsub.md @@ -16,7 +16,7 @@ try await valkeyClient.publish(channel: "channel1", message: "Hello, World!") ### Subscribing -Use ``ValkeyConnection/subscribe(to:isolation:process:)-(String...,_,_)`` to subscribe to a single or multiple channels and receive every message published to the channel via an AsyncSequence. When you exit the closure provided, the Valkey client sends the relevant `UNSUBSCRIBE` messages. +Use ``ValkeyConnection/subscribe(to:process:)-(String...,_)`` to subscribe to a single or multiple channels and receive every message published to the channel via an AsyncSequence. When you exit the closure provided, the Valkey client sends the relevant `UNSUBSCRIBE` messages. ```swift try await valkeyClient.withConnection { connection in @@ -43,7 +43,7 @@ try await connection.subscribe(to: ["channel1"]) { subscription in ### Patterns -Valkey allows you to use glob style patterns to subscribe to a range of channels. These are available with the function ``ValkeyConnection/psubscribe(to:isolation:process:)-([String],_,_)``. This is formatted in a similar manner to normal subscriptions. +Valkey allows you to use glob style patterns to subscribe to a range of channels. These are available with the function ``ValkeyConnection/psubscribe(to:process:)-([String],_)``. This is formatted in a similar manner to normal subscriptions. ```swift try await connection.subscribe(to: ["channel*"]) { subscription in @@ -85,7 +85,7 @@ try await connection.clientTracking( #### Subscribing to Invalidation Events -Once tracking is enabled you can subscribe to invalidation events using ``ValkeyConnection/subscribeKeyInvalidations(isolation:process:)``. The AsyncSequence passed to the `process` closure is a list of keys that have been invalidated. +Once tracking is enabled you can subscribe to invalidation events using ``ValkeyConnection/subscribeKeyInvalidations(process:)``. The AsyncSequence passed to the `process` closure is a list of keys that have been invalidated. ```swift try await connection.subscribeKeyInvalidations { keys in diff --git a/Sources/Valkey/Node/ValkeyNodeClient.swift b/Sources/Valkey/Node/ValkeyNodeClient.swift index 7a4bab84..55a4d6b8 100644 --- a/Sources/Valkey/Node/ValkeyNodeClient.swift +++ b/Sources/Valkey/Node/ValkeyNodeClient.swift @@ -140,12 +140,10 @@ extension ValkeyNodeClient { /// Get connection from connection pool and run operation using connection /// /// - Parameters: - /// - isolation: Actor isolation /// - operation: Closure handling Valkey connection /// - Returns: Value returned by closure public func withConnection( - isolation: isolated (any Actor)? = #isolation, - operation: (ValkeyConnection) async throws -> sending Value + operation: (ValkeyConnection) async throws -> Value ) async throws -> Value { let lease = try await self.connectionPool.leaseConnection() defer { lease.release() } diff --git a/Sources/Valkey/Subscriptions/ValkeyClient+subscribe.swift b/Sources/Valkey/Subscriptions/ValkeyClient+subscribe.swift index b3eeb427..e65f167a 100644 --- a/Sources/Valkey/Subscriptions/ValkeyClient+subscribe.swift +++ b/Sources/Valkey/Subscriptions/ValkeyClient+subscribe.swift @@ -13,13 +13,11 @@ extension ValkeyClient { /// Run operation with the valkey subscription connection /// /// - Parameters: - /// - isolation: Actor isolation /// - operation: Closure to run with subscription connection @inlinable func withSubscriptionConnection( - isolation: isolated (any Actor)? = #isolation, - _ operation: (ValkeyConnection) async throws -> sending Value - ) async throws -> sending Value { + _ operation: (ValkeyConnection) async throws -> Value + ) async throws -> Value { let node = self.node let id = node.subscriptionConnectionIDGenerator.next() @@ -49,16 +47,14 @@ extension ValkeyClient { /// all subscriptions. /// /// - Parameters: - /// - isolation: Actor isolation /// - process: Closure that is called with async sequence of key invalidations and the client id /// of the connection the subscription is running on. /// - Returns: Return value of closure @inlinable public func subscribeKeyInvalidations( - isolation: isolated (any Actor)? = #isolation, - process: (AsyncMapSequence, Int) async throws -> sending Value - ) async throws -> sending Value { - try await withSubscriptionConnection { connection in + process: (AsyncMapSequence, Int) async throws -> Value + ) async throws -> Value { + try await self.withSubscriptionConnection { connection in let id = try await connection.clientId() return try await connection.subscribe(to: [ValkeySubscriptions.invalidateChannel]) { subscription in let keys = subscription.map { ValkeyKey($0.message) } @@ -71,16 +67,15 @@ extension ValkeyClient { /// AsyncSequence /// /// This should not be called directly, used the related commands - /// ``ValkeyClient/subscribe(to:isolation:process:)`` or - /// ``ValkeyClient/psubscribe(to:isolation:process:)`` + /// ``ValkeyClient/subscribe(to:process:)`` or + /// ``ValkeyClient/psubscribe(to:process:)`` @inlinable public func _subscribe( command: some ValkeySubscribeCommand, - isolation: isolated (any Actor)? = #isolation, - process: (ValkeySubscription) async throws -> sending Value - ) async throws -> sending Value { + process: (ValkeySubscription) async throws -> Value + ) async throws -> Value { try await self.withSubscriptionConnection { connection in - try await connection._subscribe(command: command, isolation: isolation, process: process) + try await connection._subscribe(command: command, process: process) } } } diff --git a/Sources/Valkey/Subscriptions/ValkeyClusterClient+subscribe.swift b/Sources/Valkey/Subscriptions/ValkeyClusterClient+subscribe.swift index 0d1f3a4b..f60a2e68 100644 --- a/Sources/Valkey/Subscriptions/ValkeyClusterClient+subscribe.swift +++ b/Sources/Valkey/Subscriptions/ValkeyClusterClient+subscribe.swift @@ -13,13 +13,11 @@ extension ValkeyClusterClient { /// Run operation with the valkey subscription connection /// /// - Parameters: - /// - isolation: Actor isolation /// - operation: Closure to run with subscription connection @inlinable func withSubscriptionConnection( - isolation: isolated (any Actor)? = #isolation, - _ operation: (ValkeyConnection) async throws -> sending Value - ) async throws -> sending Value { + _ operation: (ValkeyConnection) async throws -> Value + ) async throws -> Value { let node = try await self.nodeClient(for: [], nodeSelection: .primary) let id = node.subscriptionConnectionIDGenerator.next() @@ -49,16 +47,14 @@ extension ValkeyClusterClient { /// all subscriptions. /// /// - Parameters: - /// - isolation: Actor isolation /// - process: Closure that is called with async sequence of key invalidations and the client id /// of the connection the subscription is running on. /// - Returns: Return value of closure @inlinable public func subscribeKeyInvalidations( - isolation: isolated (any Actor)? = #isolation, - process: (AsyncMapSequence, Int) async throws -> sending Value - ) async throws -> sending Value { - try await withSubscriptionConnection { connection in + process: (AsyncMapSequence, Int) async throws -> Value + ) async throws -> Value { + try await self.withSubscriptionConnection { connection in let id = try await connection.clientId() return try await connection.subscribe(to: [ValkeySubscriptions.invalidateChannel]) { subscription in let keys = subscription.map { ValkeyKey($0.message) } @@ -71,16 +67,15 @@ extension ValkeyClusterClient { /// AsyncSequence /// /// This should not be called directly, used the related commands - /// ``ValkeyClusterClient/subscribe(to:isolation:process:)`` or - /// ``ValkeyClusterClient/psubscribe(to:isolation:process:)`` + /// ``ValkeyClusterClient/subscribe(to:process:)`` or + /// ``ValkeyClusterClient/psubscribe(to:process:)`` @inlinable public func _subscribe( command: some ValkeySubscribeCommand, - isolation: isolated (any Actor)? = #isolation, - process: (ValkeySubscription) async throws -> sending Value - ) async throws -> sending Value { + process: (ValkeySubscription) async throws -> Value + ) async throws -> Value { try await self.withSubscriptionConnection { connection in - try await connection._subscribe(command: command, isolation: isolation, process: process) + try await connection._subscribe(command: command, process: process) } } } diff --git a/Sources/Valkey/Subscriptions/ValkeyConnection+subscribe.swift b/Sources/Valkey/Subscriptions/ValkeyConnection+subscribe.swift index 710e2ec4..ca38a767 100644 --- a/Sources/Valkey/Subscriptions/ValkeyConnection+subscribe.swift +++ b/Sources/Valkey/Subscriptions/ValkeyConnection+subscribe.swift @@ -18,15 +18,13 @@ extension ValkeyConnection { /// /// - Parameters: /// - shardchannels: list of shard channels to subscribe to - /// - isolation: Actor isolation /// - process: Closure that is called with subscription async sequence /// - Returns: Return value of closure @inlinable - public func ssubscribe( + public nonisolated func ssubscribe( to shardchannels: String..., - isolation: isolated (any Actor)? = #isolation, - process: (ValkeySubscription) async throws -> sending Value - ) async throws -> sending Value { + process: (ValkeySubscription) async throws -> Value + ) async throws -> Value { try await self.ssubscribe(to: shardchannels, process: process) } @@ -39,18 +37,15 @@ extension ValkeyConnection { /// /// - Parameters: /// - shardchannels: list of shard channels to subscribe to - /// - isolation: Actor isolation /// - process: Closure that is called with subscription async sequence /// - Returns: Return value of closure @inlinable - public func ssubscribe( + public nonisolated func ssubscribe( to shardchannels: [String], - isolation: isolated (any Actor)? = #isolation, - process: (ValkeySubscription) async throws -> sending Value - ) async throws -> sending Value { + process: (ValkeySubscription) async throws -> Value + ) async throws -> Value { try await self._subscribe( command: SSUBSCRIBE(shardchannels: shardchannels), - isolation: isolation, process: process ) } @@ -65,14 +60,12 @@ extension ValkeyConnection { /// channel /// /// - Parameters: - /// - isolation: Actor isolation /// - process: Closure that is called with async sequence of key invalidations /// - Returns: Return value of closure @inlinable - public func subscribeKeyInvalidations( - isolation: isolated (any Actor)? = #isolation, - process: (AsyncMapSequence) async throws -> sending Value - ) async throws -> sending Value { + public nonisolated func subscribeKeyInvalidations( + process: (AsyncMapSequence) async throws -> Value + ) async throws -> Value { try await self.subscribe(to: [ValkeySubscriptions.invalidateChannel]) { subscription in let keys = subscription.map { ValkeyKey($0.message) } return try await process(keys) @@ -83,15 +76,14 @@ extension ValkeyConnection { /// AsyncSequence /// /// This should not be called directly, used the related commands - /// ``ValkeyConnection/subscribe(to:isolation:process:)`` or - /// ``ValkeyConnection/psubscribe(to:isolation:process:)`` + /// ``ValkeyConnection/subscribe(to:process:)`` or + /// ``ValkeyConnection/psubscribe(to:process:)`` @inlinable - public func _subscribe( + public nonisolated func _subscribe( command: some ValkeySubscribeCommand, - isolation: isolated (any Actor)? = #isolation, - process: (ValkeySubscription) async throws -> sending Value - ) async throws -> sending Value { - let (id, stream) = try await subscribe(command: command, filters: command.filters) + process: (ValkeySubscription) async throws -> Value + ) async throws -> Value { + let (id, stream) = try await self.subscribe(command: command, filters: command.filters) let value: Value do { value = try await process(stream) @@ -120,7 +112,7 @@ extension ValkeyConnection { if Task.isCancelled { throw ValkeyClientError(.cancelled) } - let subscriptionID: Int = try await withCheckedThrowingContinuation { continuation in + let subscriptionID: Int = try await withCheckedThrowingContinuation(isolation: self) { continuation in self.channelHandler.subscribe( command: command, streamContinuation: streamContinuation, diff --git a/Sources/Valkey/Subscriptions/ValkeySubscription.swift b/Sources/Valkey/Subscriptions/ValkeySubscription.swift index cdddfcd8..e45ea006 100644 --- a/Sources/Valkey/Subscriptions/ValkeySubscription.swift +++ b/Sources/Valkey/Subscriptions/ValkeySubscription.swift @@ -51,8 +51,19 @@ public struct ValkeySubscription: AsyncSequence, Sendable { public struct AsyncIterator: AsyncIteratorProtocol { var base: BaseAsyncSequence.AsyncIterator + #if compiler(>=6.2) + @concurrent public mutating func next() async throws -> Element? { try await self.base.next() } + #else + public mutating func next() async throws -> Element? { + try await self.base.next() + } + #endif + + public mutating func next(isolation actor: isolated (any Actor)?) async throws(any Error) -> ValkeySubscriptionMessage? { + try await self.base.next(isolation: actor) + } } } diff --git a/Sources/Valkey/ValkeyClient.swift b/Sources/Valkey/ValkeyClient.swift index 139a49c3..940dafd9 100644 --- a/Sources/Valkey/ValkeyClient.swift +++ b/Sources/Valkey/ValkeyClient.swift @@ -117,15 +117,13 @@ extension ValkeyClient { /// Get connection from connection pool and run operation using connection /// /// - Parameters: - /// - isolation: Actor isolation /// - operation: Closure handling Valkey connection /// - Returns: Value returned by closure @inlinable public func withConnection( - isolation: isolated (any Actor)? = #isolation, operation: (ValkeyConnection) async throws -> sending Value ) async throws -> Value { - try await self.node.withConnection(isolation: isolation, operation: operation) + try await self.node.withConnection(operation: operation) } } diff --git a/Sources/Valkey/ValkeyClientProtocol.swift b/Sources/Valkey/ValkeyClientProtocol.swift index 09e1587c..fa041e5f 100644 --- a/Sources/Valkey/ValkeyClientProtocol.swift +++ b/Sources/Valkey/ValkeyClientProtocol.swift @@ -50,13 +50,12 @@ public protocol ValkeyClientProtocol: Sendable { /// AsyncSequence /// /// This should not be called directly, used the related commands - /// ``ValkeyClientProtocol/subscribe(to:isolation:process:)`` or - /// ``ValkeyClientProtocol/psubscribe(to:isolation:process:)`` + /// ``ValkeyClientProtocol/subscribe(to:process:)`` or + /// ``ValkeyClientProtocol/psubscribe(to:process:)`` func _subscribe( command: some ValkeySubscribeCommand, - isolation: isolated (any Actor)?, - process: (Subscription) async throws -> sending Value - ) async throws -> sending Value + process: (Subscription) async throws -> Value + ) async throws -> Value } @available(valkeySwift 1.0, *) @@ -70,16 +69,14 @@ extension ValkeyClientProtocol { /// /// - Parameters: /// - channels: list of channels to subscribe to - /// - isolation: Actor isolation /// - process: Closure that is called with subscription async sequence /// - Returns: Return value of closure @inlinable public func subscribe( to channels: String..., - isolation: isolated (any Actor)? = #isolation, - process: (Subscription) async throws -> sending Value - ) async throws -> sending Value { - try await self.subscribe(to: channels, isolation: isolation, process: process) + process: (Subscription) async throws -> Value + ) async throws -> Value { + try await self.subscribe(to: channels, process: process) } /// Subscribe to list of channels and run closure with subscription @@ -91,18 +88,15 @@ extension ValkeyClientProtocol { /// /// - Parameters: /// - channels: list of channels to subscribe to - /// - isolation: Actor isolation /// - process: Closure that is called with subscription async sequence /// - Returns: Return value of closure @inlinable public func subscribe( to channels: [String], - isolation: isolated (any Actor)? = #isolation, - process: (Subscription) async throws -> sending Value - ) async throws -> sending Value { + process: (Subscription) async throws -> Value + ) async throws -> Value { try await self._subscribe( command: SUBSCRIBE(channels: channels), - isolation: isolation, process: process ) } @@ -116,16 +110,14 @@ extension ValkeyClientProtocol { /// /// - Parameters: /// - patterns: list of channel patterns to subscribe to - /// - isolation: Actor isolation /// - process: Closure that is called with subscription async sequence /// - Returns: Return value of closure @inlinable public func psubscribe( to patterns: String..., - isolation: isolated (any Actor)? = #isolation, - process: (Subscription) async throws -> sending Value - ) async throws -> sending Value { - try await self.psubscribe(to: patterns, isolation: isolation, process: process) + process: (Subscription) async throws -> Value + ) async throws -> Value { + try await self.psubscribe(to: patterns, process: process) } /// Subscribe to list of pattern matching channels and run closure with subscription @@ -137,18 +129,15 @@ extension ValkeyClientProtocol { /// /// - Parameters: /// - patterns: list of channel patterns to subscribe to - /// - isolation: Actor isolation /// - process: Closure that is called with subscription async sequence /// - Returns: Return value of closure @inlinable public func psubscribe( to patterns: [String], - isolation: isolated (any Actor)? = #isolation, - process: (Subscription) async throws -> sending Value - ) async throws -> sending Value { + process: (Subscription) async throws -> Value + ) async throws -> Value { try await self._subscribe( command: PSUBSCRIBE(patterns: patterns), - isolation: isolation, process: process ) } diff --git a/Sources/ValkeyConnectionPool/ConnectionPool.swift b/Sources/ValkeyConnectionPool/ConnectionPool.swift index d86e3972..3758e999 100644 --- a/Sources/ValkeyConnectionPool/ConnectionPool.swift +++ b/Sources/ValkeyConnectionPool/ConnectionPool.swift @@ -582,13 +582,13 @@ protocol TaskGroupProtocol { // We need to call this `addTask_` because some Swift versions define this // under exactly this name and others have different attributes. So let's pick // a name that doesn't clash anywhere and implement it using the standard `addTask`. - mutating func addTask_(operation: @escaping @Sendable () async -> Void) + mutating func addTask_(operation: @isolated(any) @escaping @Sendable () async -> Void) } @available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *) extension DiscardingTaskGroup: TaskGroupProtocol { @inlinable - mutating func addTask_(operation: @escaping @Sendable () async -> Void) { + mutating func addTask_(operation: @isolated(any) @escaping @Sendable () async -> Void) { self.addTask(priority: nil, operation: operation) } } @@ -596,7 +596,7 @@ extension DiscardingTaskGroup: TaskGroupProtocol { @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension TaskGroup: TaskGroupProtocol { @inlinable - mutating func addTask_(operation: @escaping @Sendable () async -> Void) { + mutating func addTask_(operation: @isolated(any) @escaping @Sendable () async -> Void) { self.addTask(priority: nil, operation: operation) } }