diff --git a/lib/pure/concurrency/threadpool.nim b/lib/pure/concurrency/threadpool.nim index 04be704be8ba..56fd74d8606c 100644 --- a/lib/pure/concurrency/threadpool.nim +++ b/lib/pure/concurrency/threadpool.nim @@ -133,6 +133,8 @@ type q: ToFreeQueue readyForTask: Semaphore +const threadpoolWaitMs {.intdefine.}: int = 100 + proc blockUntil*(fv: FlowVarBase) = ## Waits until the value for the ``fv`` arrives. ## @@ -201,6 +203,8 @@ proc finished(fv: FlowVarBase) = inc q.len release(q.lock) fv.data = nil + # the worker thread waits for "data" to be set to nil before shutting down + owner.data = nil proc fvFinalizer[T](fv: FlowVar[T]) = finished(fv) @@ -241,21 +245,24 @@ proc unsafeRead*[T](fv: FlowVar[ref T]): ptr T = ## Blocks until the value is available and then returns this value. blockUntil(fv) result = cast[ptr T](fv.data) + finished(fv) proc `^`*[T](fv: FlowVar[ref T]): ref T = ## Blocks until the value is available and then returns this value. blockUntil(fv) let src = cast[ref T](fv.data) deepCopy result, src + finished(fv) proc `^`*[T](fv: FlowVar[T]): T = ## Blocks until the value is available and then returns this value. blockUntil(fv) when T is string or T is seq: - # XXX closures? deepCopy? - result = cast[T](fv.data) + let src = cast[T](fv.data) + deepCopy result, src else: result = fv.blob + finished(fv) proc blockUntilAny*(flowVars: openArray[FlowVarBase]): int = ## Awaits any of the given ``flowVars``. Returns the index of one ``flowVar`` @@ -334,6 +341,16 @@ proc slave(w: ptr Worker) {.thread.} = if w.shutdown: w.shutdown = false atomicDec currentPoolSize + while true: + if w.data != nil: + sleep(threadpoolWaitMs) + else: + # The flowvar finalizer ("finished()") set w.data to nil, so we can + # safely terminate the thread. + # + # TODO: look for scenarios in which the flowvar is never finalized, so + # a shut down thread gets stuck in this loop until the main thread exits. + break break when declared(atomicStoreN): atomicStoreN(addr(w.ready), true, ATOMIC_SEQ_CST) @@ -576,7 +593,7 @@ proc sync*() = if not allReady: break allReady = allReady and workersData[i].ready if allReady: break - sleep(100) + sleep(threadpoolWaitMs) # We cannot "blockUntil(gSomeReady)" because workers may be shut down between # the time we establish that some are not "ready" and the time we wait for a # "signal(gSomeReady)" from inside "slave()" that can never come. diff --git a/tests/parallel/tconvexhull.nim b/tests/parallel/tconvexhull.nim index cc01b5c78eb3..184a131a2f8a 100644 --- a/tests/parallel/tconvexhull.nim +++ b/tests/parallel/tconvexhull.nim @@ -52,6 +52,8 @@ proc convex_hull[T](points: var seq[T], cmp: proc(x, y: T): int {.closure.}) : s result = concat(^ul[0], ^ul[1]) var s = map(toSeq(0..99999), proc(x: int): Point = (float(x div 1000), float(x mod 1000))) +# On some runs, this pool size reduction will set the "shutdown" attribute on the +# worker thread that executes our spawned task, before we can read the flowvars. setMaxPoolSize 2 #echo convex_hull[Point](s, cmpPoint)