Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 9 additions & 22 deletions Workflow/Sources/SubtreeManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,6 @@ extension WorkflowNode {
/// Enable the eventPipes for the previous rendering. The eventPipes are not valid until this has
/// be called. If is an error to call this twice without generating a new rendering.
func enableEvents() {
/// Check for queued events. If there are any, apply the first and yield to the next render loop.
let queuedEvents = eventPipes.compactMap { pipe in
pipe.pendingOutput()
}
if !queuedEvents.isEmpty {
handle(output: queuedEvents[0])
return
}

/// Enable all action pipes.
for eventPipe in eventPipes {
eventPipe.enable { [weak self] output in
Expand Down Expand Up @@ -285,6 +276,14 @@ extension WorkflowNode.SubtreeManager {
func handle(action: Action) {
let output = Output.update(AnyWorkflowAction(action), source: .external)

if case .pending = eventPipe.validationState {
// Workflow is currently processing an `event`.
// Scheduling it to be processed after.
DispatchQueue.workflowExecution.async { [weak self] in
self?.eventPipe.handle(event: output)
}
return
}
eventPipe.handle(event: output)
}
}
Expand All @@ -298,7 +297,6 @@ extension WorkflowNode.SubtreeManager {
enum ValidationState {
case preparing
case pending
case queued(Output)
case valid(handler: (Output) -> Void)
case invalid
}
Expand All @@ -315,10 +313,7 @@ extension WorkflowNode.SubtreeManager {
fatalError("[\(WorkflowType.self)] Sink sent an action inside `render`. Sinks are not valid until `render` has completed.")

case .pending:
validationState = .queued(event)

case .queued:
fatalError("[\(WorkflowType.self)] Action sent to pipe while already in the `queueing` state.")
fatalError("[\(WorkflowType.self)] Action sent to pipe while in the `pending` state.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this PR was to eliminate this error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this should not be possible, however the .pending state will continue to exist. fatal ensures we don't hit this spot.


case .valid(let handler):
handler(event)
Expand All @@ -335,14 +330,6 @@ extension WorkflowNode.SubtreeManager {
validationState = .pending
}

func pendingOutput() -> Output? {
if case .queued(let output) = validationState {
return output
} else {
return nil
}
}

func enable(with handler: @escaping (Output) -> Void) {
guard case .pending = validationState else {
fatalError("EventPipe can only be enabled from the `pending` state")
Expand Down
50 changes: 46 additions & 4 deletions Workflow/Tests/ConcurrencyTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,20 @@ final class ConcurrencyTests: XCTestCase {
disposable?.dispose()
}

// Events emitted between `render` on a workflow and `enableEvents` are queued and will be delivered immediately when `enableEvents` is called.
// Events emitted between `render` on a workflow and `enableEvents` are queued and will be delivered asynchronously after rendering is updated.
func test_queuedEvents() {
let host = WorkflowHost(workflow: TestWorkflow())

let expectation = XCTestExpectation()
let renderingExpectation = expectation(description: "Waiting on rendering values.")
var first = true

let disposable = host.rendering.signal.observeValues { rendering in
if first {
expectation.fulfill()
first = false
// Emit an event when the rendering is first received.
rendering.update()
} else {
renderingExpectation.fulfill()
}
}

Expand All @@ -77,9 +78,44 @@ final class ConcurrencyTests: XCTestCase {
// Updating the screen will cause two events - the `update` here, and the update caused by the first time the rendering changes.
initialScreen.update()

waitForExpectations(timeout: 1)

XCTAssertEqual(2, host.rendering.value.count)

wait(for: [expectation], timeout: 1.0)
disposable?.dispose()
}

func test_multipleQueuedEvents() {
let host = WorkflowHost(workflow: TestWorkflow())

let renderingExpectation = expectation(description: "Waiting on rendering values.")
var renderingValuesCount = 0

let disposable = host.rendering.signal.observeValues { rendering in
if renderingValuesCount == 0 {
// Emit two events.
rendering.update()
rendering.update()
} else if renderingValuesCount == 1 {
// Wait for another rendering
} else if renderingValuesCount == 2 {
renderingExpectation.fulfill()
} else {
XCTFail("Unexpected rendering")
}

renderingValuesCount += 1
}

let initialScreen = host.rendering.value
XCTAssertEqual(0, initialScreen.count)

// Updating the screen will cause three events.
initialScreen.update()

waitForExpectations(timeout: 1)

XCTAssertEqual(3, host.rendering.value.count)

disposable?.dispose()
}
Expand Down Expand Up @@ -192,16 +228,22 @@ final class ConcurrencyTests: XCTestCase {
let host = WorkflowHost(workflow: TestWorkflow(), debugger: debugger)

var first = true

let renderingsComplete = expectation(description: "Waiting for renderings")
let disposable = host.rendering.signal.observeValues { rendering in
if first {
first = false
rendering.update()
} else {
renderingsComplete.fulfill()
}
}

let initialScreen = host.rendering.value
initialScreen.update()

waitForExpectations(timeout: 1)

XCTAssertEqual(2, debugger.snapshots.count)
XCTAssertEqual("1", debugger.snapshots[0].stateDescription)
XCTAssertEqual("2", debugger.snapshots[1].stateDescription)
Expand Down