Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling blocking tasks and long-running tasks #123

Open
mratsim opened this issue May 5, 2020 · 2 comments
Open

Handling blocking tasks and long-running tasks #123

mratsim opened this issue May 5, 2020 · 2 comments

Comments

@mratsim
Copy link
Owner

mratsim commented May 5, 2020

Somewhat linked to #22 or even a solution.

Vocabulary reminder:

  • We distinguish CPU-bound tasks as tasks that need to be scheduled by an actual CPU to make progress.
  • IO-bound tasks don't need to be scheduled to make progress (i.e. waiting for user input, waiting for the file system or a network response).

Confusingly, this is a different context from CPU-bound workload and memory-bound workload which is about resources while in our context we are about progress. Improvement in nomenclature welcome

Blocking tasks

Blocking tasks, for example stdin.readline() shouldn't be scheduled on Weave threadpool.

While blocking, Weave would lose complete control of the thread and it will not be available for scheduling, this would mean:

  • in the best case, losing a worker thread that would help for expensive computation (say processing an image)
  • in the worst case, an accumulation of steal requests in the blocked thread and stalling of the runtime

Instead a spawnBlocked or spawnDedicated function should be provided that will spin up a thread that does not participate in Weave scheduling and can be blocked safely.

Open questions:

  • do we use createThread "just-in-time" or do we prepare one or some threads in advance?
    • If using just-in-time we can just send the function and don't need to package it in a "BlockingTask"
    • But creating a thread just-in-time is expensive and will block the creating thread for a time ¯\_(ツ)_/¯
    • And we might never need it (the same can be said for Weave base thread-pool)
  • When the thread exit, do we keep it around or do we give it back to the OS
    • We can expect that a program that does readline will do so in a while true loop so recycling it is desired.
  • If we recycle the thread, do we ever release it?
    • Threads are a quite rare resource on an OS so we should release them when we can. But how to determine when?
      • A time-based thread joining is problematic in terms of side-effect (editing time, leap seconds, ... will kill a thread?)
      • The runtime is pretty much decentralized so events are tracked per worker, but dedicated blocking thread lifetime should be handled globally.
      • Can we joinIfNonBlocking()?

Long-running tasks

A bit similar to the Actor model, we might want to spin up event loop tasks that should be run for a very long-time, re-using the image compression example, in pseudo-code:

type
  Future[T] = object
    ## An IO Future from asyncdispatch or Chronos

  Flowvar[T] = object
    ## A CPU Future from Weave

  Image = object
  ImageRequest = tuple[respChannel: Connection, img: Image]
    ## an image request is a pair of
    ## - a connection to send back the processed image
    ## - an image
  PendingImage = tuple[inUse: bool, client: Connection, pending: Flowvar[Image]]
    ## A pending image is a processing slot

  Server = ptr object
    ## An image server
    connection: Connection # Ingoing connections
    pendingImages: seq[PendingImages]

    shutdown: Atomic[bool] # Can be an atomic, a channel, anything  or Ctrl+C

proc imageServer(server: Server, address: string, port: Port) =
  server.init(address, port)

  var imgRequest = server.connection.recv() # We get an IO-future from asyncdispatch or Chronos

  while not shutdown.load(moRelaxed):
    if imgRequest.finished(): # Non-blocking check if we received a new request
       let req = imgRequest.get()
       let slot = server.pendingImages.findAvailableSlot() # This finds an available slot to store the processing Flowvar, and extends the seq if required.
       doAssert not server.pendingImages[slot].inUse
       server.pendingImages[slot].slotAvailable = false
       server.pendingImages[slot].client = req.respChannel
       server.pendingImages[slot].pending = spawn process(req.img) # Create task on Weave threadpool

       # Handle the next connection request
       imgRequest = server.connection.recv()

    poll() # Allow asyncdispatch or Chronos to handle async events
    # `loadBalance()` for Weave is not needed because this event loop should not run on Weave threadpool.

    # Now look into all our pending images and send the result back if available
    for slot in 0 ..< server.pendingImages.len:
      if server.pendingImages[slot].inUse:
        if server.pendingImages[slot].pending.isReady(): # Ensure that we will not block the event loop
          server.pendingImages[slot].client.send sync(server.pendingImages[slot].pending)
          server.pendingImages[slot].client.closeConnection()
          server.pendingImages[slot].client.slotAvailable = true

  server.engagingShutdownProcedures()

proc main() =

  # Initialization
  init(Weave)
  let server = createSharedU(Server) # Allocate memory for the server instance

  # Create the image server on a dedicated thread
  spawnDedicated imageServer(server, address, port)

  # The main thread should at least run Weave load balancing
  # And we can do the Async library `poll()` as well (not needed here since `main` has no IO)
  # Note: we assume that `poll()` somehow avoids the `while true` to burn 100% of the CPU
  #   The downside is that it introduces latency in loadBalancing but awake Weave workers can wakeup each other when needed without the main thread, as long as `spawn` in the dedicated thread also sends a wakeup signal.
  # The only Weave proc that can block the main thread are `sync`, `syncScope` and `syncRoot`
  # which will make it work on pending (potentially computationally intense) tasks along with the threadpool.
  while true:
    loadBalance()
    poll()

This is all fine and dandy but:

  • We need this spawnDedicated
  • We don't expose Flowvar.isReady() to allow async code to check if sync() will block.
  • spawn currently enqueues the task on the current worker queue. But a "dedicated" thread is not a worker on Weave threadpool and should enqueue (and wakeup) a random worker from the threadpool.

Fair scheduling

Due to how Weave works, it maximizes for throughput, i.e. if a client requests processing on a big image, processing will take more time but it might not be fair to others.
I don't see how that can be solved without resumable functions, once a task is started it runs to completion.

Priority tasks

See Latency-optimized / job priorities / soft real-time parallel scheduling #88

Maximizing throughput in the async world.

In the image server example, the main thread never ever participates on the image processing, all it does is load balancing (i.e. waking up idle threads if tasks are pending).

However currently on a 4 cores CPU, Weave would spawn a threadpool with
3 threads and assume that the main thread will participate in the work.
In an async context (and the example), this is not true, sync, syncScope and syncRoot, should not be called on the root task unless at shutdown to avoid blocking async event handling, so we only use 75% of available CPU for image processing.

Instead we should make the number of threads an init input parameter so that there are 4 worker threads (mostly CPU) + 1 root thread (mostly IO) to maximize resource usage.

Autodetect blocking tasks

We could add autodetection of blocking tasks and their migration outside the threadpool.

But I don't want to try to solve the Halting Problem

@olliNiinivaara
Copy link

olliNiinivaara commented May 5, 2020

Hello, I'm writing web server that uses Weave:
https://github.com/olliNiinivaara/GuildenStern

Handling blocking tasks is exactly the interesting topic there!

There's a main event loop (at dispatcher.nim) that waits for sockets from selector and then spawns tasks that will mostly:

  • receive and send socket data (I/O blocking)
  • do crypto, json parsing and other CPU bound black box algorithms
  • query and update databases

So tasks will spend a lot of time waiting, and there is limited possibility to inject those loadBalance-requests

I have some preliminary benchmark results here, comparing 4-threaded, single-threaded and httpbeast -based serving and some statistics from weave.
https://gist.github.com/olliNiinivaara/1ca0f8bb6ef233cd6314af49f50b980f

I have no idea how to interpret those weave statistics. If you could install GuildenStern and run the benchmarks yourself, I hope you might get some insight - not only about GuildenStern, but about how Weave behaves in this kind of environment.
(To begin with, I'm afraid that my 4 CPU cores is not enough to get very meaningful results...)

I'm certain that I'll have some questions to ask as my work advances. I'll try to write those as generic issues so that they may interesting also for larger audience.

Keep up the good work!

@mratsim
Copy link
Owner Author

mratsim commented May 5, 2020

4 CPU cores should be plenty.

Looking at your use-case, and especially the ThreadContext it seems like you need to pin some permanent data to a thread. I'll think about that, the "long-running tasks" part should handle that.

One thing to note regarding latency is that Weave currently maximize throughput:

  • if you enqueue lots of tasks (via spawn), the enqueuing worker will start working on the last one
  • though the thieves will still from the first enqueued.

I'm not familiar with webserver but since there is a nice benchmark, I can play with it.

mratsim added a commit that referenced this issue May 6, 2020
* Expose isReady to check if sync will block (#123 #22)

* update README

* Add test

* The CI is very bad at precise sleep
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants