Skip to content
Permalink
Browse files

fixes #11275 (#11276)

  • Loading branch information...
stefantalpalaru authored and Araq committed May 20, 2019
1 parent a63c2a2 commit 13b3e4af8a419f39b139d716ae45acb51750e787
Showing with 22 additions and 3 deletions.
  1. +20 −3 lib/pure/concurrency/threadpool.nim
  2. +2 −0 tests/parallel/tconvexhull.nim
@@ -133,6 +133,8 @@ type
q: ToFreeQueue
readyForTask: Semaphore

const threadpoolWaitMs {.intdefine.}: int = 100

proc blockUntil*(fv: FlowVarBase) =
## Waits until the value for the ``fv`` arrives.
##
@@ -201,6 +203,8 @@ proc finished(fv: FlowVarBase) =
inc q.len
release(q.lock)
fv.data = nil
# the worker thread waits for "data" to be set to nil before shutting down
owner.data = nil

proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv)

@@ -241,21 +245,24 @@ proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T =
## Blocks until the value is available and then returns this value.
blockUntil(fv)
result = cast[ptr T](fv.data)
finished(fv)

proc `^`*[T](fv: FlowVar[ref T]): ref T =
## Blocks until the value is available and then returns this value.
blockUntil(fv)
let src = cast[ref T](fv.data)
deepCopy result, src
finished(fv)

proc `^`*[T](fv: FlowVar[T]): T =
## Blocks until the value is available and then returns this value.
blockUntil(fv)
when T is string or T is seq:
# XXX closures? deepCopy?
result = cast[T](fv.data)
let src = cast[T](fv.data)
deepCopy result, src
else:
result = fv.blob
finished(fv)

proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int =
## Awaits any of the given ``flowVars``. Returns the index of one ``flowVar``
@@ -334,6 +341,16 @@ proc slave(w: ptr Worker) {.thread.} =
if w.shutdown:
w.shutdown = false
atomicDec currentPoolSize
while true:
if w.data != nil:
sleep(threadpoolWaitMs)
else:
# The flowvar finalizer ("finished()") set w.data to nil, so we can
# safely terminate the thread.
#
# TODO: look for scenarios in which the flowvar is never finalized, so
# a shut down thread gets stuck in this loop until the main thread exits.
break
break
when declared(atomicStoreN):
atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST)
@@ -576,7 +593,7 @@ proc sync*() =
if not allReady: break
allReady = allReady and workersData[i].ready
if allReady: break
sleep(100)
sleep(threadpoolWaitMs)
# We cannot "blockUntil(gSomeReady)" because workers may be shut down between
# the time we establish that some are not "ready" and the time we wait for a
# "signal(gSomeReady)" from inside "slave()" that can never come.
@@ -52,6 +52,8 @@ proc convex_hull[T](points: var seq[T], cmp: proc(x, y: T): int {.closure.}) : s
result = concat(^ul[0], ^ul[1])

var s = map(toSeq(0..99999), proc(x: int): Point = (float(x div 1000), float(x mod 1000)))
# On some runs, this pool size reduction will set the "shutdown" attribute on the
# worker thread that executes our spawned task, before we can read the flowvars.
setMaxPoolSize 2

#echo convex_hull[Point](s, cmpPoint)

0 comments on commit 13b3e4a

Please sign in to comment.
You can’t perform that action at this time.