Skip to content

Commit

Permalink
Thread binding poool
Browse files Browse the repository at this point in the history
  • Loading branch information
qrilka committed Oct 31, 2016
1 parent 4ca9a64 commit 19a9b18
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions Control/Concurrent/ParallelIO/Local.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ module Control.Concurrent.ParallelIO.Local (
parallelFirst, parallelFirstE,

-- * Pool management
Pool, withPool, startPool, stopPool,
Pool, withPool, withBindingPool, startPool, startBindingPool, stopPool,
extraWorkerWhileBlocked,

-- * Advanced pool management
Expand Down Expand Up @@ -51,6 +51,7 @@ reflectExceptionsTo tid act = catchNonThreadKilled act (throwTo tid)
-- | A thread pool, containing a maximum number of threads. The best way to
-- construct one of these is using 'withPool'.
data Pool = Pool {
pool_binding :: Bool,
pool_threadcount :: Int,
pool_sem :: QSem
}
Expand All @@ -64,9 +65,16 @@ data Pool = Pool {
--
-- A better alternative is to see if you can use the 'withPool' variant.
startPool :: Int -> IO Pool
startPool threadcount
startPool = startPool' False

-- | 'startPool' binding threads to Haskell capabilities
startBindingPool :: Int -> IO Pool
startBindingPool = startPool' True

startPool' :: Bool -> Int -> IO Pool
startPool' binding threadcount
| threadcount < 1 = error $ "startPool: thread count must be strictly positive (was " ++ show threadcount ++ ")"
| otherwise = fmap (Pool threadcount) $ newQSem (threadcount - 1)
| otherwise = fmap (Pool binding threadcount) $ newQSem (threadcount - 1)

-- | Clean up a thread pool. If you don't call this from the main thread then no one holds the queue,
-- the queue gets GC'd, the threads find themselves blocked indefinitely, and you get exceptions.
Expand All @@ -84,6 +92,9 @@ stopPool pool = replicateM_ (pool_threadcount pool - 1) $ killPoolWorkerFor pool
withPool :: Int -> (Pool -> IO a) -> IO a
withPool threadcount = bracket (startPool threadcount) stopPool

-- | 'withPool' variant binding threads to Haskell capabilities.
withBindingPool :: Int -> (Pool -> IO a) -> IO a
withBindingPool threadcount = bracket (startBindingPool threadcount) stopPool

-- | You should wrap any IO action used from your worker threads that may block with this method.
-- It temporarily spawns another worker thread to make up for the loss of the old blocked
Expand Down Expand Up @@ -225,9 +236,12 @@ parallelE_ pool = fmap (map (either Just (\_ -> Nothing))) . parallelE pool
parallel :: Pool -> [IO a] -> IO [a]
parallel pool acts = mask $ \restore -> do
main_tid <- myThreadId
resultvars <- forM acts $ \act -> do
nCaps <- getNumCapabilities
let enumeratedActs = zip (cycle [0..nCaps]) acts
resultvars <- forM enumeratedActs $ \(n, act) -> do
resultvar <- newEmptyMVar
_tid <- forkIO $ bracket_ (killPoolWorkerFor pool) (spawnPoolWorkerFor pool) $ reflectExceptionsTo main_tid $ do
let fork = if pool_binding pool then forkOn n else forkIO
_tid <- fork $ bracket_ (killPoolWorkerFor pool) (spawnPoolWorkerFor pool) $ reflectExceptionsTo main_tid $ do
res <- restore act
-- Use tryPutMVar instead of putMVar so we get an exception if my brain has failed
True <- tryPutMVar resultvar res
Expand Down

0 comments on commit 19a9b18

Please sign in to comment.