Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions Workflow/Sources/SubtreeManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import Dispatch
import Foundation

extension WorkflowNode {
/// Manages the subtree of a workflow. Specifically, this type encapsulates the logic required to update and manage
Expand Down Expand Up @@ -353,21 +354,34 @@ extension WorkflowNode.SubtreeManager {
}

fileprivate final class ReusableSink<Action: WorkflowAction>: AnyReusableSink where Action.WorkflowType == WorkflowType {
func handle(action: Action) {
let output = Output.update(
action,
source: .external,
subtreeInvalidated: false // initial state
func handle(action: Action, deferralCount: Int = 0) {
assert(
Thread.isMainThread,
"Attempt to handle \(String(describing: action)) on a background thread. Actions should be sent on the main thread."
)
assert(
deferralCount < 1000,
"Anomalous number of repeated deferrals (\(deferralCount)) when handling action: \(String(describing: action))"
)

if case .pending = eventPipe.validationState {
// Workflow is currently processing an `event`.
// Scheduling it to be processed after.
// Enqueue a future attempt to handle this action.
DispatchQueue.workflowExecution.async { [weak self] in
self?.eventPipe.handle(event: output)
self?.handle(
action: action,
deferralCount: deferralCount + 1
)
}
return
}

let output = Output.update(
action,
source: .external,
subtreeInvalidated: false // initial state
)

eventPipe.handle(event: output)
}
}
Expand Down
131 changes: 117 additions & 14 deletions Workflow/Tests/WorkflowHostTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
* limitations under the License.
*/

import ReactiveSwift
import XCTest

@_spi(WorkflowRuntimeConfig) @testable import Workflow

final class WorkflowHostTests: XCTestCase {
Expand Down Expand Up @@ -58,32 +60,92 @@ final class WorkflowHostTests: XCTestCase {
final class WorkflowHost_EventEmissionTests: XCTestCase {
// Previous versions of Workflow would fatalError under this scenario
func test_event_sent_to_invalidated_sink_during_action_handling() {
let root = Parent()
let host = WorkflowHost(workflow: root)
let host = WorkflowHost(workflow: Parent())
let (lifetime, token) = ReactiveSwift.Lifetime.make()
defer { _ = token }
let initialRendering = host.rendering.value
var observedRenderCount = 0

XCTAssertEqual(initialRendering.eventCount, 0)

let disposable = host.rendering.signal.observeValues { rendering in
XCTAssertEqual(rendering.eventCount, 1)
host
.rendering
.signal
.take(during: lifetime)
.observeValues { rendering in
XCTAssertEqual(rendering.eventCount, 1)

// emit another event using an old rendering
// while the first is still being processed, but
// the workflow that handles the event has been
// removed from the tree
if observedRenderCount == 0 {
initialRendering.eventHandler()
}

// emit another event using an old rendering
// while the first is still being processed, but
// the workflow that handles the event has been
// removed from the tree
if observedRenderCount == 0 {
initialRendering.eventHandler()
observedRenderCount += 1
}

observedRenderCount += 1
}
defer { disposable?.dispose() }

// send an event and cause a re-render
initialRendering.eventHandler()

XCTAssertEqual(observedRenderCount, 1)

drainMainQueueBySpinningRunLoop()

// Ensure the invalidated sink doesn't process the event
let nextRendering = host.rendering.value
XCTAssertEqual(nextRendering.eventCount, 1)
XCTAssertEqual(observedRenderCount, 1)
}

func test_reentrant_event_during_render() {
let host = WorkflowHost(workflow: ReentrancyWorkflow())
let (lifetime, token) = ReactiveSwift.Lifetime.make()
defer { _ = token }
let initialRendering = host.rendering.value

var emitReentrantEvent = false

let renderExpectation = expectation(description: "render")
renderExpectation.expectedFulfillmentCount = 2

var observedRenderCount = 0

host
.rendering
.signal
.take(during: lifetime)
.observeValues { val in
observedRenderCount += 1

defer { renderExpectation.fulfill() }
guard !emitReentrantEvent else { return }
emitReentrantEvent = true

// In a prior implementation, this would check state local
// to the underlying EventPipe and defer event handling
// into the future. If the RunLoop was spun after that
// point, the action could attempt to be handled and an
// we'd hit a trap when sending a sink an action in an
// invalid state.
//
// 'Real world' code could hit this case as there are some
// UI bindings that fire when a rendering/output is updated
// that call into system API that do sometimes spin the
// RunLoop manually (e.g. stuff calling into WebKit).
initialRendering.sink.send(.event)
drainMainQueueBySpinningRunLoop()
}

// Send an event and cause a re-render
initialRendering.sink.send(.event)

XCTAssertEqual(observedRenderCount, 1)

waitForExpectations(timeout: 1)

XCTAssertEqual(observedRenderCount, 2)
}
}

Expand Down Expand Up @@ -115,6 +177,35 @@ extension WorkflowHostTests {

// MARK: Utility Types

extension WorkflowHost_EventEmissionTests {
struct ReentrancyWorkflow: Workflow {
typealias State = Void
typealias Output = Never

struct Rendering {
var sink: Sink<Action>!
}

func render(state: Void, context: RenderContext<Self>) -> Rendering {
let sink = context.makeSink(of: Action.self)
return Rendering(sink: sink)
}

enum Action: WorkflowAction {
typealias WorkflowType = ReentrancyWorkflow

case event

func apply(
toState state: inout WorkflowType.State,
context: ApplyContext<WorkflowType>
) -> WorkflowType.Output? {
nil
}
}
}
}

extension WorkflowHost_EventEmissionTests {
struct Parent: Workflow {
struct Rendering {
Expand Down Expand Up @@ -182,3 +273,15 @@ extension WorkflowHost_EventEmissionTests {
}
}
}

private func drainMainQueueBySpinningRunLoop(timeoutSeconds: UInt = 1) {
var done = false
DispatchQueue.main.async {
done = true
}

let deadline = ContinuousClock.now + .seconds(timeoutSeconds)
while !done, ContinuousClock.now < deadline {
RunLoop.current.run(mode: .common, before: .now)
}
}
Loading