Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kotlin] Experiment with simplified queueing architecture #762

Closed
zach-klippenstein opened this issue Jan 24, 2020 · 1 comment
Closed
Labels
enhancement New feature or request kotlin optimization Issues related to benchmarking and optimization proposal

Comments

@zach-klippenstein
Copy link
Collaborator

zach-klippenstein commented Jan 24, 2020

Queue-per-worker is actually more complicated and probably less performant than necessary.

A simpler implementation could involve a single, global (to the instance of the workflow runtime) Channel of values that look something like:

data class PendingUpdate<O : Any>(
  val isDisposed: () -> Boolean,
  val applyUpdate: () -> O?
)

When a Worker is run for the first time, you subscribe to it by enqueing one of these structs:

scope.launch {
  val job = coroutineContext[Job]!!
  workerFlow.collect { value ->
    val pendingUpdate = PendingUpdate(
      isDisposed = job::isCancelled,
      applyUpdate = { workflowNode.applyActionForWorkerValue(value) }
    )
    channel.send(pendingUpdate)
  }
}

Where applyActionForWorkerValue is a function that gets the latest action handler for this worker, calculates the action, and applies it to the node's current state, returning either null or the root workflow's output.

This means all workers are only subscribed once, the first time they are ran, and the node just needs to keep track of all those jobs so it can dispose them when the workflow is torn down. The channel could have capacity of 0, so all workers experience backpressure immediately on contention, but I think we could also buffer pending updates without negative effect (since the consumer will check for disposed events before applying anyway).

Rendering/UI Events

There are a couple options for support non-blocking rendering events. Launch-per-event is a simpler, more elegant solution.

Secondary queue

Since UI events can't handle backpressure, an additional, unbounded channel would be created at the runtime level that would be used to pump rendering events into (i.e. back the RenderContext.actionSink). A coroutine would be launched for the lifetime of the workflow that would just forward events from this channel into the main one. Or the consumer could select over them both (the former is slightly more fair, I think).

Launch-per-event

An alternative solution is to launch a new coroutine in the workflow node's CoroutineScope every time an event is sent. No secondary queue would be required, and the coroutine runtime would take care of clearing cancelled workflows' events from the queue. It's important to process UI events in the order in which they were sent – fortunately, channels are "fair", so if rendering events start coming in too fast, they will be processed in FIFO order.

Consuming PendingUpdates

In WorkflowLoop, after the render pass finishes, you receive the next PendingUpdate from the queue. If isDisposed returns true, it means that struct represents an update for a worker/rendering-event that is now stale, so dequeue another and repeat until you find an undisposed update. Once isDisposed returns false, just call applyUpdate, emit the top-level output if present, and then do another render pass.

Originally posted by @zach-klippenstein in square/workflow#907 (comment)

@zach-klippenstein zach-klippenstein added enhancement New feature or request proposal labels Jan 24, 2020
zach-klippenstein referenced this issue in square/workflow Jan 25, 2020
…ry tick pass.

This doesn't change any behavior, it is just more efficient: there's no need to
define the function at the tick pass, so this is not only more efficient but also
just easier to read, since there are fewer moving parts.

This is also required for #910.
zach-klippenstein referenced this issue in square/workflow Jan 25, 2020
Based on #916.

Implements and closes #910.
@rjrjr rjrjr transferred this issue from square/workflow May 4, 2022
@rjrjr rjrjr added the optimization Issues related to benchmarking and optimization label May 4, 2022
@steve-the-edwards
Copy link
Contributor

This queueing strategy is obsolete given how we select over all the nodes channels now and that Workers are treated like any normal Workflow node.

We could take some queues from here like moving things into a single channel rather than selecting but the basic shape laid out here isn't the key change anymore.

Closing as such.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request kotlin optimization Issues related to benchmarking and optimization proposal
Projects
None yet
Development

No branches or pull requests

4 participants