Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Taskpools starts workstealing and never stops when used with Chronos ThreadSignalPtr, causing hotspinning #41

Open
PhilippMDoerner opened this issue May 12, 2024 · 4 comments

Comments

@PhilippMDoerner
Copy link

PhilippMDoerner commented May 12, 2024

I'm trying to build a library on top of taskpools, trying to abstract away some of the complexity of communicating between tasks on threads. While doing so I noticed that when you try to have a task with a while-loop, and make use of chronos/threadsync.ThreadSignalPtr, you start seeing it max out 1 CPU core at 100% doing nothing but work-stealing. And I mean explicitly calling trySteal over and over again.

Here an example that should show this:

# test.nim
import chronos
import taskpools
import chronos/threadsync
import std/os

proc awaitReceiver(receiver: ThreadSignalPtr) {.gcsafe, nimcall, raises: [].} =
  while true:
    try:
      waitFor receiver.wait()
    except Exception: discard

var tp = Taskpool.new(numThreads = 2) 
let receiver = new(ThreadSignalPtr)[]
tp.spawn awaitReceiver(receiver)
discard receiver.fireSync()
sleep(0)
discard receiver.fireSync()
sleep(0)
tp.syncAll()
tp.shutDown()

Compiling this with nim r -f --debugger:native --cc:clang -d:TP_Debug -d:useMalloc test.nim >> log.txt results in this file:

Worker  1: eventLoop 1 - searching task from local deque
Worker  1: eventLoop 2 - becoming a thief
Worker  1: eventLoop 2.b - sleeping
Worker  0: schedule task 0x02ab1310 (parent 0x02ab1250, current 0x02ab1250)
Worker  1: eventLoop 2.b - waking
Worker  1: eventLoop 1 - searching task from local deque
Worker  1: eventLoop 2 - becoming a thief
Worker  1: eventLoop 2.a - stole task 0x02ab1310 (parent 0x02ab1250, current 0x00000000)
>>> Worker  0 enters barrier <<<
Worker  0: syncAll 1 - searching task from local deque
Worker  0: syncAll 2 - becoming a thief
Worker  0: syncAll 1 - searching task from local deque
Worker  0: syncAll 2 - becoming a thief
Worker  0: syncAll 1 - searching task from local deque
Worker  0: syncAll 2 - becoming a thief
Worker  0: syncAll 1 - searching task from local deque
Worker  0: syncAll 2 - becoming a thief
... the above 2 lines repeat ad-infinitum

I have no real idea what's going on here.
It seems like awaitReceiver is originally scheduled on Worker 0, which then gets stolen by Worker 1, then Worker 0 enters some kind of Barrier and starts 100%ing a CPU thread trying to steal a task?

Either way, is there something I can do about this?
Is this a bug?

@mratsim
Copy link
Contributor

mratsim commented May 16, 2024

Taskpool doesn't implement thread-local barrier.

syncAll is a barrier for the master thread. As mentioned in the code here, I cannot put the master thread to sleep as there is no notification system to wake it up and it would lead to deadlocks.

if not taskNode.isNil:
# 2.1 We stole some task
debug: log("Worker %2d: syncAll 2.1 - stole task 0x%.08x (parent 0x%.08x, current 0x%.08x)\n", ctx.id, taskNode, taskNode.parent, ctx.currentTask)
taskNode.runTask()
else:
# 2.2 No task to steal
if tp.eventNotifier.getParked() == tp.numThreads - 1:
# 2.2.1 all threads besides the current are parked
debugTermination:
log("Worker %2d: syncAll 2.2.1 - termination, all other threads sleeping\n", ctx.id)
foreignThreadsParked = true
else:
# 2.2.2 We don't park as there is no notif for task completion
cpuRelax()

If it is guaranteed that a known number of threads will hit the barrier before release, that can serve as a trigger, you can use a posix barrier or implement your own like done for MacOS:

If otherwise the syncAll might be nested in a parallel section, for example half the thread go in one branch of a tree and half in another, you need to count the descendant threads per branch similar to syncScope in Constantine, Weave and Weave-io:

Then implement a wait primitive: https://github.com/mratsim/weave-io/blob/8672f1c/weave_io/threadpool.nim#L894-L932 and an appropriate parking/sleeping protocol that prevents deadlocks and livelocks.

@PhilippMDoerner
Copy link
Author

I will admit I lack fundamentally all of the knowledge of the threadpool problem-domain.

From your post I vaguely conclude (combined with Having async-awaitable tasks as a non-goal which I guess applies here given chronos involvement) that this is out-of-scope and thus this should be closed?

I would be happy to start looking into threadpools, but I think this is a topic for discord and I think I'd best start of with basically babies first threadpool (for example: I'm not even sure if I benefit at all from work-stealing for what I want in any way, so if I'm going to have to implement it myself anyway, what are the minimum problems I have to solve for writing a minimal threadpool with a similar API but potentially a simpler, even if inefficient, implementation) .

@arnetheduck
Copy link
Member

I have no real idea what's going on here.

indeed - what are you trying to achieve? first and foremost, your awaitReceiver is a single long-running task that never finishes - if you schedule it, it'll take up a worker thread from the task pool "forever".

Think of nim-taskpools as a scheduler for running "short jobs", one after the other, as efficiently as possible meaning that one job starts as soon as the other has ended.

Here, you're scheduling a never-ending job which takes a thread from the thread pool but never gives it back. The example cannot "end" so syncAll cannot return.

@PhilippMDoerner
Copy link
Author

PhilippMDoerner commented May 21, 2024

indeed - what are you trying to achieve?

The key goal of the library that I want to build is effortless multithreading for app-development for the purposes of pushing computation away from the thread running the GUI loop. It doesn't need to be the fastest, it just needs to be primarily easy to use and effortlessly correct. Meaning I need to be able to use it however I want and ideally never run into memory management or data-race problems, even if I want to start communicating with other threads via message-passing.

As an alternative to short-lived-tasks I wanted to also provide the possibility of having "long running threads". Those are essentially sort-of local webservers (I'm a webdev at work, those tend to be the terms I think in), that you send messages to via channels and that can send messages back via their own channels. And when you don't need them they just go into a low-powered state (which is what waitFor threadSignalPtr.wait() does as I understand).

Out of convenience and because it would be neat (as such a long-running thread could likely end up being used for async IO) I also wanted to provide an async event-loop so you users can do their stuff on the backend thread, and push computation to "later" when the thread sleeps from not having any messages to process. Which is how I arrived at jobs with while-loops that block on waitFor threadSignalPtr.wait()

A more accurate representation of what I want looks somewhat like this:

# test.nim
import chronos
import taskpools
import chronos/threadsync
import std/[atomics, os]

var isThreadRunning: Atomic[bool] = Atomic[bool]()
isThreadRunning.store(true)
proc awaitReceiver(receiver: ThreadSignalPtr) {.gcsafe, nimcall, raises: [].} =
  while isThreadRunning.load():
    echo "loop"
    try:
      waitFor receiver.wait() # In the while-loop but before this call somewhere I'd also start processing messages I received via mailboxes
    except Exception: discard
  echo "Thread ended"

proc shutDownServer(receiver: ThreadSignalPtr) = 
  isThreadRunning.store(false) # breaks the while-loop
  discard receiver.fireSync() # Force another loop so the server breaks the while-loop

var tp = Taskpool.new(numThreads = 2) 
let receiver = new(ThreadSignalPtr)[]
tp.spawn awaitReceiver(receiver)

sleep(10)
discard receiver.fireSync()
sleep(10)
discard receiver.fireSync()
sleep(10)

receiver.shutdownServer()
tp.syncAll()
tp.shutDown()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants