Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
stefantalpalaru committed May 18, 2019
1 parent f1a8edc commit 618bb63
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
23 changes: 20 additions & 3 deletions lib/pure/concurrency/threadpool.nim
Expand Up @@ -133,6 +133,8 @@ type
q: ToFreeQueue
readyForTask: Semaphore

const threadpoolWaitMs {.intdefine.}: int = 100

proc blockUntil*(fv: FlowVarBase) =
## Waits until the value for the ``fv`` arrives.
##
Expand Down Expand Up @@ -201,6 +203,8 @@ proc finished(fv: FlowVarBase) =
inc q.len
release(q.lock)
fv.data = nil
# the worker thread waits for "data" to be set to nil before shutting down
owner.data = nil

proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)

Expand Down Expand Up @@ -241,21 +245,24 @@ proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
## Blocks until the value is available and then returns this value.
blockUntil(fv)
result = cast[ptr T](fv.data)
finished(fv)

proc `^`*[T](fv: FlowVar[ref T]): ref T =
## Blocks until the value is available and then returns this value.
blockUntil(fv)
let src = cast[ref T](fv.data)
deepCopy result, src
finished(fv)

proc `^`*[T](fv: FlowVar[T]): T =
## Blocks until the value is available and then returns this value.
blockUntil(fv)
when T is string or T is seq:
# XXX closures? deepCopy?
result = cast[T](fv.data)
let src = cast[T](fv.data)
deepCopy result, src
else:
result = fv.blob
finished(fv)

proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
## Awaits any of the given ``flowVars``. Returns the index of one ``flowVar``
Expand Down Expand Up @@ -334,6 +341,16 @@ proc slave(w: ptr Worker) {.thread.} =
if w.shutdown:
w.shutdown = false
atomicDec currentPoolSize
while true:
if w.data != nil:
sleep(threadpoolWaitMs)
else:
# The flowvar finalizer ("finished()") set w.data to nil, so we can
# safely terminate the thread.
#
# TODO: look for scenarios in which the flowvar is never finalized, so
# a shut down thread gets stuck in this loop until the main thread exits.
break
break
when declared(atomicStoreN):
atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
Expand Down Expand Up @@ -576,7 +593,7 @@ proc sync*() =
if not allReady: break
allReady = allReady and workersData[i].ready
if allReady: break
sleep(100)
sleep(threadpoolWaitMs)
# We cannot "blockUntil(gSomeReady)" because workers may be shut down between
# the time we establish that some are not "ready" and the time we wait for a
# "signal(gSomeReady)" from inside "slave()" that can never come.
Expand Down
2 changes: 2 additions & 0 deletions tests/parallel/tconvexhull.nim
Expand Up @@ -52,6 +52,8 @@ proc convex_hull[T](points: var seq[T], cmp: proc(x, y: T): int {.closure.}) : s
result = concat(^ul[0], ^ul[1])

var s = map(toSeq(0..99999), proc(x: int): Point = (float(x div 1000), float(x mod 1000)))
# On some runs, this pool size reduction will set the "shutdown" attribute on the
# worker thread that executes our spawned task, before we can read the flowvars.
setMaxPoolSize 2

#echo convex_hull[Point](s, cmpPoint)
Expand Down

0 comments on commit 618bb63

Please sign in to comment.