From 0f6c943f321026cb6ebd3a83b7719c52b74001ac Mon Sep 17 00:00:00 2001 From: Christian Riboldi Date: Wed, 28 Aug 2024 14:39:56 -0600 Subject: [PATCH] Wrap the elements array and globalChannelSubscriptions Dictionary in a LockIsolated helper that will avoid threading related crashes. --- Sources/PubNub/Helpers/LockIsolated.swift | 100 ++++++++++++++++++ Sources/PubNub/Helpers/WeakBox.swift | 26 +++-- .../Subscription/SubscriptionSession.swift | 16 +-- 3 files changed, 126 insertions(+), 16 deletions(-) create mode 100644 Sources/PubNub/Helpers/LockIsolated.swift diff --git a/Sources/PubNub/Helpers/LockIsolated.swift b/Sources/PubNub/Helpers/LockIsolated.swift new file mode 100644 index 00000000..5b14c857 --- /dev/null +++ b/Sources/PubNub/Helpers/LockIsolated.swift @@ -0,0 +1,100 @@ +import Foundation + +/// A generic wrapper for isolating a mutable value with a lock. +@dynamicMemberLookup +public final class LockIsolated: @unchecked Sendable { + private var _value: Value + private let lock = NSRecursiveLock() + + /// Initializes lock-isolated state around a value. + /// + /// - Parameter value: A value to isolate with a lock. + public init(_ value: @autoclosure @Sendable () throws -> Value) rethrows { + self._value = try value() + } + + public subscript(dynamicMember keyPath: KeyPath) -> Subject { + self.lock.sync { + self._value[keyPath: keyPath] + } + } + + /// Perform an operation with isolated access to the underlying value. + /// + /// Useful for modifying a value in a single transaction. + /// + /// ```swift + /// // Isolate an integer for concurrent read/write access: + /// var count = LockIsolated(0) + /// + /// func increment() { + /// // Safely increment it: + /// self.count.withValue { $0 += 1 } + /// } + /// ``` + /// + /// - Parameter operation: An operation to be performed on the the underlying value with a lock. + /// - Returns: The result of the operation. + public func withValue( + _ operation: @Sendable (inout Value) throws -> T + ) rethrows -> T { + try self.lock.sync { + var value = self._value + defer { self._value = value } + return try operation(&value) + } + } + + /// Overwrite the isolated value with a new value. + /// + /// ```swift + /// // Isolate an integer for concurrent read/write access: + /// var count = LockIsolated(0) + /// + /// func reset() { + /// // Reset it: + /// self.count.setValue(0) + /// } + /// ``` + /// + /// > Tip: Use ``withValue(_:)`` instead of ``setValue(_:)`` if the value being set is derived + /// > from the current value. That is, do this: + /// > + /// > ```swift + /// > self.count.withValue { $0 += 1 } + /// > ``` + /// > + /// > ...and not this: + /// > + /// > ```swift + /// > self.count.setValue(self.count + 1) + /// > ``` + /// > + /// > ``withValue(_:)`` isolates the entire transaction and avoids data races between reading and + /// > writing the value. + /// + /// - Parameter newValue: The value to replace the current isolated value with. + public func setValue(_ newValue: @autoclosure @Sendable () throws -> Value) rethrows { + try self.lock.sync { + self._value = try newValue() + } + } +} + +extension LockIsolated where Value: Sendable { + /// The lock-isolated value. + public var value: Value { + self.lock.sync { + self._value + } + } +} + +extension NSRecursiveLock { + @inlinable @discardableResult + @_spi(Internals) public func sync(work: () throws -> R) rethrows -> R { + self.lock() + defer { self.unlock() } + return try work() + } +} diff --git a/Sources/PubNub/Helpers/WeakBox.swift b/Sources/PubNub/Helpers/WeakBox.swift index ffba2013..4e8ada4c 100644 --- a/Sources/PubNub/Helpers/WeakBox.swift +++ b/Sources/PubNub/Helpers/WeakBox.swift @@ -29,31 +29,37 @@ final class WeakBox: Hashable where Element: AnyObject, Element: Hashab } struct WeakSet where Element: AnyObject, Element: Hashable { - private var elements: Set> = [] + private var elements: LockIsolated>> = .init([]) - init(_ elements: [Element]) { - elements.forEach { self.elements.update(with: WeakBox($0)) } + init(_ newElements: [Element]) { + self.elements.withValue({ [newElements] elements in + newElements.forEach { elements.update(with: WeakBox($0)) } + }) } // NSSet Operations var allObjects: [Element] { - return elements.compactMap { $0.underlying } + return self.elements.value.compactMap { $0.underlying } } var count: Int { - return self.elements.count + return self.elements.value.count } mutating func update(_ element: Element) { - elements.update(with: WeakBox(element)) + self.elements.withValue({ [element] in + $0.update(with: WeakBox(element)) + }) } mutating func remove(_ element: Element) { - elements.remove(WeakBox(element)) + self.elements.withValue({ [element] in + $0.remove(WeakBox(element)) + }) } mutating func removeAll() { - elements.removeAll() + self.elements.setValue(Set>()) } } @@ -62,10 +68,10 @@ extension WeakSet: Collection { var endIndex: Set>.Index { return elements.endIndex } subscript(position: Set>.Index) -> Element? { - return elements[position].underlying + return elements.value[position].underlying } func index(after index: Set>.Index) -> Set>.Index { - return elements.index(after: index) + return elements.value.index(after: index) } } diff --git a/Sources/PubNub/Subscription/SubscriptionSession.swift b/Sources/PubNub/Subscription/SubscriptionSession.swift index 0b3fe32a..f2af6753 100644 --- a/Sources/PubNub/Subscription/SubscriptionSession.swift +++ b/Sources/PubNub/Subscription/SubscriptionSession.swift @@ -64,7 +64,7 @@ class SubscriptionSession: EventEmitter, StatusEmitter { return statusListener }() - private var globalChannelSubscriptions: [String: Subscription] = [:] + private var globalChannelSubscriptions: LockIsolated<[String: Subscription]> = .init([:]) private var globalGroupSubscriptions: [String: Subscription] = [:] private let strategy: any SubscriptionSessionStrategy @@ -126,8 +126,10 @@ class SubscriptionSession: EventEmitter, StatusEmitter { at: cursor?.timetoken ) for subscription in channelSubscriptions { - subscription.subscriptionNames.compactMap { $0 }.forEach { - globalChannelSubscriptions[$0] = subscription + subscription.subscriptionNames.compactMap { $0 }.forEach { sub in + globalChannelSubscriptions.withValue { [sub] globalSubs in + globalSubs[sub] = subscription + } } } for subscription in channelGroupSubscriptions { @@ -163,12 +165,14 @@ class SubscriptionSession: EventEmitter, StatusEmitter { presenceOnly ? [$0.presenceChannelName] : [$0, $0.presenceChannelName] } internalUnsubscribe( - from: globalChannelSubscriptions.compactMap { channelNamesToUnsubscribe.contains($0.key) ? $0.value : nil }, + from: globalChannelSubscriptions.value.compactMap { channelNamesToUnsubscribe.contains($0.key) ? $0.value : nil }, and: globalGroupSubscriptions.compactMap { groupNamesToUnsubscribe.contains($0.key) ? $0.value : nil }, presenceOnly: presenceOnly ) - channelNamesToUnsubscribe.forEach { - globalChannelSubscriptions.removeValue(forKey: $0) + channelNamesToUnsubscribe.forEach { key in + globalChannelSubscriptions.withValue { globalSubs in + globalSubs.removeValue(forKey: key) + } } groupNamesToUnsubscribe.forEach { globalGroupSubscriptions.removeValue(forKey: $0)