From ba2b03aec7b096cf1410a4ea4af92a334dd1c584 Mon Sep 17 00:00:00 2001 From: Dhaval Shreyas Date: Mon, 22 Feb 2021 12:39:33 -0800 Subject: [PATCH] Handling Queued Events --- Workflow/Sources/SubtreeManager.swift | 31 +++++------------ Workflow/Tests/ConcurrencyTests.swift | 50 ++++++++++++++++++++++++--- 2 files changed, 55 insertions(+), 26 deletions(-) diff --git a/Workflow/Sources/SubtreeManager.swift b/Workflow/Sources/SubtreeManager.swift index d1aeca350..c7ca3fd5e 100644 --- a/Workflow/Sources/SubtreeManager.swift +++ b/Workflow/Sources/SubtreeManager.swift @@ -81,15 +81,6 @@ extension WorkflowNode { /// Enable the eventPipes for the previous rendering. The eventPipes are not valid until this has /// be called. If is an error to call this twice without generating a new rendering. func enableEvents() { - /// Check for queued events. If there are any, apply the first and yield to the next render loop. - let queuedEvents = eventPipes.compactMap { pipe in - pipe.pendingOutput() - } - if !queuedEvents.isEmpty { - handle(output: queuedEvents[0]) - return - } - /// Enable all action pipes. for eventPipe in eventPipes { eventPipe.enable { [weak self] output in @@ -285,6 +276,14 @@ extension WorkflowNode.SubtreeManager { func handle(action: Action) { let output = Output.update(AnyWorkflowAction(action), source: .external) + if case .pending = eventPipe.validationState { + // Workflow is currently processing an `event`. + // Scheduling it to be processed after. + DispatchQueue.workflowExecution.async { [weak self] in + self?.eventPipe.handle(event: output) + } + return + } eventPipe.handle(event: output) } } @@ -298,7 +297,6 @@ extension WorkflowNode.SubtreeManager { enum ValidationState { case preparing case pending - case queued(Output) case valid(handler: (Output) -> Void) case invalid } @@ -315,10 +313,7 @@ extension WorkflowNode.SubtreeManager { fatalError("[\(WorkflowType.self)] Sink sent an action inside `render`. Sinks are not valid until `render` has completed.") case .pending: - validationState = .queued(event) - - case .queued: - fatalError("[\(WorkflowType.self)] Action sent to pipe while already in the `queueing` state.") + fatalError("[\(WorkflowType.self)] Action sent to pipe while in the `pending` state.") case .valid(let handler): handler(event) @@ -335,14 +330,6 @@ extension WorkflowNode.SubtreeManager { validationState = .pending } - func pendingOutput() -> Output? { - if case .queued(let output) = validationState { - return output - } else { - return nil - } - } - func enable(with handler: @escaping (Output) -> Void) { guard case .pending = validationState else { fatalError("EventPipe can only be enabled from the `pending` state") diff --git a/Workflow/Tests/ConcurrencyTests.swift b/Workflow/Tests/ConcurrencyTests.swift index ee8a7db5c..a8587ed5f 100644 --- a/Workflow/Tests/ConcurrencyTests.swift +++ b/Workflow/Tests/ConcurrencyTests.swift @@ -55,19 +55,20 @@ final class ConcurrencyTests: XCTestCase { disposable?.dispose() } - // Events emitted between `render` on a workflow and `enableEvents` are queued and will be delivered immediately when `enableEvents` is called. + // Events emitted between `render` on a workflow and `enableEvents` are queued and will be delivered asynchronously after rendering is updated. func test_queuedEvents() { let host = WorkflowHost(workflow: TestWorkflow()) - let expectation = XCTestExpectation() + let renderingExpectation = expectation(description: "Waiting on rendering values.") var first = true let disposable = host.rendering.signal.observeValues { rendering in if first { - expectation.fulfill() first = false // Emit an event when the rendering is first received. rendering.update() + } else { + renderingExpectation.fulfill() } } @@ -77,9 +78,44 @@ final class ConcurrencyTests: XCTestCase { // Updating the screen will cause two events - the `update` here, and the update caused by the first time the rendering changes. initialScreen.update() + waitForExpectations(timeout: 1) + XCTAssertEqual(2, host.rendering.value.count) - wait(for: [expectation], timeout: 1.0) + disposable?.dispose() + } + + func test_multipleQueuedEvents() { + let host = WorkflowHost(workflow: TestWorkflow()) + + let renderingExpectation = expectation(description: "Waiting on rendering values.") + var renderingValuesCount = 0 + + let disposable = host.rendering.signal.observeValues { rendering in + if renderingValuesCount == 0 { + // Emit two events. + rendering.update() + rendering.update() + } else if renderingValuesCount == 1 { + // Wait for another rendering + } else if renderingValuesCount == 2 { + renderingExpectation.fulfill() + } else { + XCTFail("Unexpected rendering") + } + + renderingValuesCount += 1 + } + + let initialScreen = host.rendering.value + XCTAssertEqual(0, initialScreen.count) + + // Updating the screen will cause three events. + initialScreen.update() + + waitForExpectations(timeout: 1) + + XCTAssertEqual(3, host.rendering.value.count) disposable?.dispose() } @@ -192,16 +228,22 @@ final class ConcurrencyTests: XCTestCase { let host = WorkflowHost(workflow: TestWorkflow(), debugger: debugger) var first = true + + let renderingsComplete = expectation(description: "Waiting for renderings") let disposable = host.rendering.signal.observeValues { rendering in if first { first = false rendering.update() + } else { + renderingsComplete.fulfill() } } let initialScreen = host.rendering.value initialScreen.update() + waitForExpectations(timeout: 1) + XCTAssertEqual(2, debugger.snapshots.count) XCTAssertEqual("1", debugger.snapshots[0].stateDescription) XCTAssertEqual("2", debugger.snapshots[1].stateDescription)