diff --git a/Control/Concurrent/ParallelIO/Local.hs b/Control/Concurrent/ParallelIO/Local.hs index 8e8b26c..04d548c 100644 --- a/Control/Concurrent/ParallelIO/Local.hs +++ b/Control/Concurrent/ParallelIO/Local.hs @@ -21,7 +21,7 @@ module Control.Concurrent.ParallelIO.Local ( parallelFirst, parallelFirstE, -- * Pool management - Pool, withPool, startPool, stopPool, + Pool, withPool, withBindingPool, startPool, startBindingPool, stopPool, extraWorkerWhileBlocked, -- * Advanced pool management @@ -51,6 +51,7 @@ reflectExceptionsTo tid act = catchNonThreadKilled act (throwTo tid) -- | A thread pool, containing a maximum number of threads. The best way to -- construct one of these is using 'withPool'. data Pool = Pool { + pool_binding :: Bool, pool_threadcount :: Int, pool_sem :: QSem } @@ -64,9 +65,16 @@ data Pool = Pool { -- -- A better alternative is to see if you can use the 'withPool' variant. startPool :: Int -> IO Pool -startPool threadcount +startPool = startPool' False + +-- | 'startPool' binding threads to Haskell capabilities +startBindingPool :: Int -> IO Pool +startBindingPool = startPool' True + +startPool' :: Bool -> Int -> IO Pool +startPool' binding threadcount | threadcount < 1 = error $ "startPool: thread count must be strictly positive (was " ++ show threadcount ++ ")" - | otherwise = fmap (Pool threadcount) $ newQSem (threadcount - 1) + | otherwise = fmap (Pool binding threadcount) $ newQSem (threadcount - 1) -- | Clean up a thread pool. If you don't call this from the main thread then no one holds the queue, -- the queue gets GC'd, the threads find themselves blocked indefinitely, and you get exceptions. @@ -84,6 +92,9 @@ stopPool pool = replicateM_ (pool_threadcount pool - 1) $ killPoolWorkerFor pool withPool :: Int -> (Pool -> IO a) -> IO a withPool threadcount = bracket (startPool threadcount) stopPool +-- | 'withPool' variant binding threads to Haskell capabilities. +withBindingPool :: Int -> (Pool -> IO a) -> IO a +withBindingPool threadcount = bracket (startBindingPool threadcount) stopPool -- | You should wrap any IO action used from your worker threads that may block with this method. -- It temporarily spawns another worker thread to make up for the loss of the old blocked @@ -225,9 +236,12 @@ parallelE_ pool = fmap (map (either Just (\_ -> Nothing))) . parallelE pool parallel :: Pool -> [IO a] -> IO [a] parallel pool acts = mask $ \restore -> do main_tid <- myThreadId - resultvars <- forM acts $ \act -> do + nCaps <- getNumCapabilities + let enumeratedActs = zip (cycle [0..nCaps]) acts + resultvars <- forM enumeratedActs $ \(n, act) -> do resultvar <- newEmptyMVar - _tid <- forkIO $ bracket_ (killPoolWorkerFor pool) (spawnPoolWorkerFor pool) $ reflectExceptionsTo main_tid $ do + let fork = if pool_binding pool then forkOn n else forkIO + _tid <- fork $ bracket_ (killPoolWorkerFor pool) (spawnPoolWorkerFor pool) $ reflectExceptionsTo main_tid $ do res <- restore act -- Use tryPutMVar instead of putMVar so we get an exception if my brain has failed True <- tryPutMVar resultvar res