Skip to content

Commit

Permalink
SWIFT-1469 Provide AsyncSequence APIs for monitoring events (#764)
Browse files Browse the repository at this point in the history
  • Loading branch information
rchhaya committed Aug 4, 2022
1 parent 48e603d commit 2d579e2
Show file tree
Hide file tree
Showing 6 changed files with 593 additions and 62 deletions.
105 changes: 105 additions & 0 deletions Sources/MongoSwift/APM.swift
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,111 @@ private protocol CommandEventProtocol {
var serviceID: BSONObjectID? { get }
}

#if compiler(>=5.5.2) && canImport(_Concurrency)
/// An asynchronous way to monitor command events that uses `AsyncSequence`.
/// Only available for Swift 5.5.2 and higher.
@available(macOS 10.15, *)
// sourcery: skipSyncExport
public struct CommandEventStream {
fileprivate let stream: AsyncStream<CommandEvent>
private let cmdHandler: CommandEventHandler
/// Initialize the stream with a `CommandEventHandler`
internal init(cmdHandler: CommandEventHandler, stream: AsyncStream<CommandEvent>) {
self.cmdHandler = cmdHandler
self.stream = stream
}
}

/// An asynchronous way to monitor SDAM events that uses `AsyncSequence`.
/// Only available for Swift 5.5.2 and higher.
@available(macOS 10.15, *)
// sourcery: skipSyncExport
public struct SDAMEventStream {
fileprivate let stream: AsyncStream<SDAMEvent>
private let sdamHandler: SDAMEventHandler
/// Initialize the stream with an `SDAMEventHandler`
internal init(sdamHandler: SDAMEventHandler, stream: AsyncStream<SDAMEvent>) {
self.sdamHandler = sdamHandler
self.stream = stream
}
}

@available(macOS 10.15, *)
extension CommandEventStream: AsyncSequence {
/// The type of element produced by this `CommandEventStream`.
public typealias Element = CommandEvent

/// The asynchronous iterator of type `CommandEventStreamIterator`
/// that produces elements of this asynchronous sequence.
public typealias AsyncIterator = CommandEventStreamIterator

/// Creates the asynchronous iterator that produces elements of this `CommandEventStream`.
public func makeAsyncIterator() -> CommandEventStreamIterator {
CommandEventStreamIterator(cmdEventStream: self)
}
}

@available(macOS 10.15, *)
extension SDAMEventStream: AsyncSequence {
/// The type of element produced by this `SDAMEventStream`.
public typealias Element = SDAMEvent

/// The asynchronous iterator of type `SDAMEventStreamIterator`
/// that produces elements of this asynchronous sequence.
public typealias AsyncIterator = SDAMEventStreamIterator

/// Creates the asynchronous iterator that produces elements of this `SDAMEventStream`.
public func makeAsyncIterator() -> SDAMEventStreamIterator {
SDAMEventStreamIterator(sdamEventStream: self)
}
}

/// The associated iterator for the `CommandEventStream`.
@available(macOS 10.15, *)
// sourcery: skipSyncExport
public struct CommandEventStreamIterator: AsyncIteratorProtocol {
private var iterator: AsyncStream<CommandEvent>.AsyncIterator
private let cmdEventStream: CommandEventStream

/// Initialize the iterator
internal init(cmdEventStream: CommandEventStream) {
self.iterator = cmdEventStream.stream.makeAsyncIterator()
self.cmdEventStream = cmdEventStream
}

/// Asynchronously advances to the next element and returns it, or ends the sequence if there is no next element.
public mutating func next() async -> CommandEvent? {
await self.iterator.next()
}

/// The type of element iterated over by this `CommandEventStreamIterator`.
public typealias Element = CommandEvent
}

/// The associated iterator for the `SDAMEventStream`.
@available(macOS 10.15, *)
// sourcery: skipSyncExport
public struct SDAMEventStreamIterator: AsyncIteratorProtocol {
private var iterator: AsyncStream<SDAMEvent>.AsyncIterator
private let sdamEventStream: SDAMEventStream

/// Initialize the iterator
internal init(sdamEventStream: SDAMEventStream) {
self.iterator = sdamEventStream.stream.makeAsyncIterator()
self.sdamEventStream = sdamEventStream
}

/// Asynchronously advances to the next element and returns it, or ends the sequence if there is no next element.
public mutating func next() async -> SDAMEvent? {
await self.iterator.next()
}

/// The type of element iterated over by this `SDAMEventStreamIterator`.
public typealias Element = SDAMEvent
}

#endif

/// An event published when a command starts.
public struct CommandStartedEvent: MongoSwiftEvent, CommandEventProtocol {
/// Wrapper around a `mongoc_apm_command_started_t`.
Expand Down
145 changes: 138 additions & 7 deletions Sources/MongoSwift/MongoClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,15 @@ public class MongoClient {
/// - This value is only read in `deinit`. That occurs exactly once after the above modification is complete.
private var wasClosed = false

/// Handlers for command monitoring events.
/// Handlers for command monitoring events. Should only be accessed when holding `eventHandlerLock`.
internal var commandEventHandlers: [CommandEventHandler]

/// Handlers for SDAM monitoring events.
/// Handlers for SDAM monitoring events. Should only be accessed when holding `eventHandlerLock`.
internal var sdamEventHandlers: [SDAMEventHandler]

/// Lock used to synchronize access to the event handler arrays to prevent data races.
private let eventHandlerLock: Lock = .init()

/// Counter for generating client _ids.
internal static var clientIDGenerator = NIOAtomic<Int>.makeAtomic(value: 0)

Expand Down Expand Up @@ -402,6 +405,124 @@ public class MongoClient {
)
}

#if compiler(>=5.5.2) && canImport(_Concurrency)
@available(macOS 10.15, *)
internal class CmdHandler: CommandEventHandler {
private let continuation: AsyncStream<CommandEvent>.Continuation
internal init(continuation: AsyncStream<CommandEvent>.Continuation) {
self.continuation = continuation
}

// Satisfies the protocol
internal func handleCommandEvent(_ event: CommandEvent) {
self.continuation.yield(event)
}

internal func finish() {
self.continuation.finish()
}
}

@available(macOS 10.15, *)
internal class SDAMHandler: SDAMEventHandler {
private let continuation: AsyncStream<SDAMEvent>.Continuation
internal init(continuation: AsyncStream<SDAMEvent>.Continuation) {
self.continuation = continuation
}

// Satisfies the protocol
internal func handleSDAMEvent(_ event: SDAMEvent) {
self.continuation.yield(event)
}

internal func finish() {
self.continuation.finish()
}
}

/**
* Provides an `AsyncSequence` API for consuming command monitoring events.
*
* Example: printing the command events out would be written as
* ```
* for try await event in client.commandEventStream() {
* print(event)
* }
* ```
* If you are looping over the events in the stream, you may wish to do so in a dedicated `Task`.
* The stream will be ended automatically if the `Task` it is running in is cancelled.
* - Returns: A `CommandEventStream` that implements `AsyncSequence`.
* - Note: Only the most recent 100 events are stored in the stream.
*/
@available(macOS 10.15, *)
public func commandEventStream() -> CommandEventStream {
var handler: CmdHandler?
let stream = AsyncStream(
CommandEvent.self,
bufferingPolicy: .bufferingNewest(100)
) { con in
let cmdHandler = CmdHandler(continuation: con)
handler = cmdHandler
self.addCommandEventHandler(cmdHandler)
}

// Ok to force unwrap since handler is set in the closure
// swiftlint:disable force_unwrapping
let commandEvents = CommandEventStream(cmdHandler: handler!, stream: stream)

return commandEvents
}

/**
* Provides an `AsyncSequence` API for consuming SDAM monitoring events.
*
* Example: printing the SDAM events out would be written as
* ```
* for try await event in client.sdamEventStream() {
* print(event)
* }
* ```
* If you are looping over the events in the stream, you may wish to do so in a dedicated `Task`.
* The stream will be ended automatically if the `Task` it is running in is cancelled.
* - Returns: An `SDAMEventStream` that implements `AsyncSequence`.
* - Note: Only the most recent 100 events are stored in the stream.
*/
@available(macOS 10.15, *)
public func sdamEventStream() -> SDAMEventStream {
var handler: SDAMHandler?
let stream = AsyncStream(
SDAMEvent.self,
bufferingPolicy: .bufferingNewest(100)
) { con in
let sdamHandler = SDAMHandler(continuation: con)
handler = sdamHandler
self.addSDAMEventHandler(sdamHandler)
}
// Ok to force unwrap since handler is set just above
// swiftlint:disable force_unwrapping
let sdamEvents = SDAMEventStream(sdamHandler: handler!, stream: stream)
return sdamEvents
}
#endif

// Check which handlers are assoc. with streams and finish them
private func closeHandlers() {
#if compiler(>=5.5.2) && canImport(_Concurrency)
if #available(macOS 10.15, *) {
for handler in commandEventHandlers {
if let cmdHandler = handler as? WeakEventHandler<CmdHandler> {
cmdHandler.handler?.finish()
}
}
for handler in sdamEventHandlers {
if let sdamHandler = handler as? WeakEventHandler<SDAMHandler> {
sdamHandler.handler?.finish()
}
}
}
#endif
}

/**
* Closes this `MongoClient`, closing all connections to the server and cleaning up internal state.
*
Expand All @@ -422,6 +543,7 @@ public class MongoClient {
self.operationExecutor.shutdown()
}
closeResult.whenComplete { _ in
self.closeHandlers()
self.wasClosed = true
}

Expand All @@ -441,6 +563,7 @@ public class MongoClient {
public func syncClose() throws {
try self.connectionPool.close()
try self.operationExecutor.syncShutdown()
self.closeHandlers()
self.wasClosed = true
}

Expand Down Expand Up @@ -786,7 +909,9 @@ public class MongoClient {
* to continue to receive events.
*/
public func addCommandEventHandler<T: CommandEventHandler>(_ handler: T) {
self.commandEventHandlers.append(WeakEventHandler<T>(referencing: handler))
self.eventHandlerLock.withLock {
self.commandEventHandlers.append(WeakEventHandler<T>(referencing: handler))
}
}

/**
Expand All @@ -796,7 +921,9 @@ public class MongoClient {
* strong reference cycle and potentially result in memory leaks.
*/
public func addCommandEventHandler(_ handlerFunc: @escaping (CommandEvent) -> Void) {
self.commandEventHandlers.append(CallbackEventHandler(handlerFunc))
self.eventHandlerLock.withLock {
self.commandEventHandlers.append(CallbackEventHandler(handlerFunc))
}
}

/**
Expand All @@ -806,7 +933,9 @@ public class MongoClient {
* to continue to receive events.
*/
public func addSDAMEventHandler<T: SDAMEventHandler>(_ handler: T) {
self.sdamEventHandlers.append(WeakEventHandler(referencing: handler))
self.eventHandlerLock.withLock {
self.sdamEventHandlers.append(WeakEventHandler(referencing: handler))
}
}

/**
Expand All @@ -816,7 +945,9 @@ public class MongoClient {
* strong reference cycle and potentially result in memory leaks.
*/
public func addSDAMEventHandler(_ handlerFunc: @escaping (SDAMEvent) -> Void) {
self.sdamEventHandlers.append(CallbackEventHandler(handlerFunc))
self.eventHandlerLock.withLock {
self.sdamEventHandlers.append(CallbackEventHandler(handlerFunc))
}
}

/// Internal method to check the `ReadConcern` that was ultimately set on this client. **This method may block
Expand Down Expand Up @@ -882,7 +1013,7 @@ extension CallbackEventHandler: CommandEventHandler where EventType == CommandEv

/// Event handler that stores a weak reference to the underlying handler.
private class WeakEventHandler<T: AnyObject> {
private weak var handler: T?
internal weak var handler: T?

fileprivate init(referencing handler: T) {
self.handler = handler
Expand Down

0 comments on commit 2d579e2

Please sign in to comment.