Skip to content

Commit

Permalink
Cancel unstructured task when current task is cancelled.
Browse files Browse the repository at this point in the history
  • Loading branch information
swhitty committed Aug 2, 2022
1 parent e31495e commit 27d1fbd
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions Sources/Operators/AsyncSequence+SwitchToLatest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public struct AsyncSwitchToLatestSequence<UpstreamAsyncSequence: AsyncSequence>:
self.currentTask = task
}

@Sendable
func cancelCurrentTask() {
self.currentTask?.cancel()
}

/// iterates over the upstream sequence and maintain the current async iterator while cancelling the current .next() task for each new element
func startUpstreamIterator() async throws {
guard !self.hasStarted else { return }
Expand Down Expand Up @@ -130,7 +135,7 @@ public struct AsyncSwitchToLatestSequence<UpstreamAsyncSequence: AsyncSequence>:

// if a task is cancelled while waiting with the next element (a new element arrived in the root iterator)
// we create a new task and wait for the elements from the new child iterator
while noValueHasBeenEmitted {
while noValueHasBeenEmitted, !Task.isCancelled {
currentTask = Task {
do {
return try await localUpstreamIteratorManager.nextOnCurrentChildIterator()
Expand All @@ -141,7 +146,9 @@ public struct AsyncSwitchToLatestSequence<UpstreamAsyncSequence: AsyncSequence>:
}
}
localUpstreamIteratorManager.setCurrentTask(task: currentTask)
emittedElement = try await currentTask.value
emittedElement = try await withTaskCancellationHandler(handler: localUpstreamIteratorManager.cancelCurrentTask) {
try await currentTask.value
}
noValueHasBeenEmitted = (emittedElement == nil && currentTask.isCancelled)
}

Expand Down

0 comments on commit 27d1fbd

Please sign in to comment.