Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions Sources/PubNub/Helpers/LockIsolated.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import Foundation

/// A generic wrapper for isolating a mutable value with a lock.
@dynamicMemberLookup
public final class LockIsolated<Value>: @unchecked Sendable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmriboldi is there a particular reason why this class is public?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it doesn't need to be public.

private var _value: Value
private let lock = NSRecursiveLock()

/// Initializes lock-isolated state around a value.
///
/// - Parameter value: A value to isolate with a lock.
public init(_ value: @autoclosure @Sendable () throws -> Value) rethrows {
self._value = try value()
}

public subscript<Subject: Sendable>(dynamicMember keyPath: KeyPath<Value, Subject>) -> Subject {
self.lock.sync {
self._value[keyPath: keyPath]
}
}

/// Perform an operation with isolated access to the underlying value.
///
/// Useful for modifying a value in a single transaction.
///
/// ```swift
/// // Isolate an integer for concurrent read/write access:
/// var count = LockIsolated(0)
///
/// func increment() {
/// // Safely increment it:
/// self.count.withValue { $0 += 1 }
/// }
/// ```
///
/// - Parameter operation: An operation to be performed on the the underlying value with a lock.
/// - Returns: The result of the operation.
public func withValue<T: Sendable>(
_ operation: @Sendable (inout Value) throws -> T
) rethrows -> T {
try self.lock.sync {
var value = self._value
defer { self._value = value }
return try operation(&value)
}
}

/// Overwrite the isolated value with a new value.
///
/// ```swift
/// // Isolate an integer for concurrent read/write access:
/// var count = LockIsolated(0)
///
/// func reset() {
/// // Reset it:
/// self.count.setValue(0)
/// }
/// ```
///
/// > Tip: Use ``withValue(_:)`` instead of ``setValue(_:)`` if the value being set is derived
/// > from the current value. That is, do this:
/// >
/// > ```swift
/// > self.count.withValue { $0 += 1 }
/// > ```
/// >
/// > ...and not this:
/// >
/// > ```swift
/// > self.count.setValue(self.count + 1)
/// > ```
/// >
/// > ``withValue(_:)`` isolates the entire transaction and avoids data races between reading and
/// > writing the value.
///
/// - Parameter newValue: The value to replace the current isolated value with.
public func setValue(_ newValue: @autoclosure @Sendable () throws -> Value) rethrows {
try self.lock.sync {
self._value = try newValue()
}
}
}

extension LockIsolated where Value: Sendable {
/// The lock-isolated value.
public var value: Value {
self.lock.sync {
self._value
}
}
}

extension NSRecursiveLock {
@inlinable @discardableResult
@_spi(Internals) public func sync<R>(work: () throws -> R) rethrows -> R {
self.lock()
defer { self.unlock() }
return try work()
}
}
26 changes: 16 additions & 10 deletions Sources/PubNub/Helpers/WeakBox.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,37 @@ final class WeakBox<Element>: Hashable where Element: AnyObject, Element: Hashab
}

struct WeakSet<Element> where Element: AnyObject, Element: Hashable {
private var elements: Set<WeakBox<Element>> = []
private var elements: LockIsolated<Set<WeakBox<Element>>> = .init([])

init(_ elements: [Element]) {
elements.forEach { self.elements.update(with: WeakBox($0)) }
init(_ newElements: [Element]) {
self.elements.withValue({ [newElements] elements in
newElements.forEach { elements.update(with: WeakBox($0)) }
})
}

// NSSet Operations
var allObjects: [Element] {
return elements.compactMap { $0.underlying }
return self.elements.value.compactMap { $0.underlying }
}

var count: Int {
return self.elements.count
return self.elements.value.count
}

mutating func update(_ element: Element) {
elements.update(with: WeakBox(element))
self.elements.withValue({ [element] in
$0.update(with: WeakBox(element))
})
}

mutating func remove(_ element: Element) {
elements.remove(WeakBox(element))
self.elements.withValue({ [element] in
$0.remove(WeakBox(element))
})
}

mutating func removeAll() {
elements.removeAll()
self.elements.setValue(Set<WeakBox<Element>>())
}
}

Expand All @@ -62,10 +68,10 @@ extension WeakSet: Collection {
var endIndex: Set<WeakBox<Element>>.Index { return elements.endIndex }

subscript(position: Set<WeakBox<Element>>.Index) -> Element? {
return elements[position].underlying
return elements.value[position].underlying
}

func index(after index: Set<WeakBox<Element>>.Index) -> Set<WeakBox<Element>>.Index {
return elements.index(after: index)
return elements.value.index(after: index)
}
}
16 changes: 10 additions & 6 deletions Sources/PubNub/Subscription/SubscriptionSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class SubscriptionSession: EventEmitter, StatusEmitter {
return statusListener
}()

private var globalChannelSubscriptions: [String: Subscription] = [:]
private var globalChannelSubscriptions: LockIsolated<[String: Subscription]> = .init([:])
private var globalGroupSubscriptions: [String: Subscription] = [:]
private let strategy: any SubscriptionSessionStrategy

Expand Down Expand Up @@ -126,8 +126,10 @@ class SubscriptionSession: EventEmitter, StatusEmitter {
at: cursor?.timetoken
)
for subscription in channelSubscriptions {
subscription.subscriptionNames.compactMap { $0 }.forEach {
globalChannelSubscriptions[$0] = subscription
subscription.subscriptionNames.compactMap { $0 }.forEach { sub in
globalChannelSubscriptions.withValue { [sub] globalSubs in
globalSubs[sub] = subscription
}
}
}
for subscription in channelGroupSubscriptions {
Expand Down Expand Up @@ -163,12 +165,14 @@ class SubscriptionSession: EventEmitter, StatusEmitter {
presenceOnly ? [$0.presenceChannelName] : [$0, $0.presenceChannelName]
}
internalUnsubscribe(
from: globalChannelSubscriptions.compactMap { channelNamesToUnsubscribe.contains($0.key) ? $0.value : nil },
from: globalChannelSubscriptions.value.compactMap { channelNamesToUnsubscribe.contains($0.key) ? $0.value : nil },
and: globalGroupSubscriptions.compactMap { groupNamesToUnsubscribe.contains($0.key) ? $0.value : nil },
presenceOnly: presenceOnly
)
channelNamesToUnsubscribe.forEach {
globalChannelSubscriptions.removeValue(forKey: $0)
channelNamesToUnsubscribe.forEach { key in
globalChannelSubscriptions.withValue { globalSubs in
globalSubs.removeValue(forKey: key)
}
}
groupNamesToUnsubscribe.forEach {
globalGroupSubscriptions.removeValue(forKey: $0)
Expand Down