Skip to content

Commit

Permalink
JobPool: improve haddocks, clean the code
Browse files Browse the repository at this point in the history
  • Loading branch information
coot committed Jun 21, 2022
1 parent 08cfd48 commit 0740cec
Showing 1 changed file with 25 additions and 11 deletions.
36 changes: 25 additions & 11 deletions network-mux/src/Control/Concurrent/JobPool.hs
Expand Up @@ -22,18 +22,27 @@ import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map

import Control.Exception (SomeAsyncException (..))
import Control.Monad (when)
import Control.Monad (when, void)
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadFork (MonadThread (..))
import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadThrow

-- | JobPool allows to submit asynchronous jobs, wait for their completion or
-- cancel. Jobs are grouped, each group can be cancelled separately.
--
data JobPool group m a = JobPool {
jobsVar :: !(TVar m (Map (group, ThreadId m) (Async m ()))),
completionQueue :: !(TQueue m a)
}

data Job group m a = Job (m a) (SomeException -> m a) group String
-- | An asynchronous job which belongs to some group and its exception handler.
--
data Job group m a =
Job (m a) -- ^ job
(SomeException -> m a) -- ^ error handler
group -- ^ job group
String -- ^ thread label

withJobPool :: forall group m a b.
(MonadAsync m, MonadThrow m, MonadLabelledSTM m)
Expand All @@ -56,7 +65,7 @@ withJobPool =
-- condition).
close :: JobPool group m a -> m ()
close JobPool{jobsVar} = do
jobs <- atomically (readTVar jobsVar)
jobs <- readTVarIO jobsVar
mapM_ uninterruptibleCancel jobs

forkJob :: forall group m a.
Expand Down Expand Up @@ -99,20 +108,25 @@ readGroupSize JobPool{jobsVar} group =
. Map.filterWithKey (\(group', _) _ -> group' == group)
<$> readTVar jobsVar

-- | Wait for next successfully completed job. Unlike 'wait' it will not throw
-- if a job errors.
--
waitForJob :: MonadSTM m => JobPool group m a -> STM m a
waitForJob JobPool{completionQueue} = readTQueue completionQueue


-- | Cancel all threads in a given group. Blocks until all threads terminated.
--
cancelGroup :: ( MonadAsync m
, Eq group
)
=> JobPool group m a -> group -> m ()
cancelGroup JobPool { jobsVar } group = do
jobs <- atomically (readTVar jobsVar)
_ <- Map.traverseWithKey (\(group', _) thread ->
when (group' == group) $
cancel thread
)
jobs
return ()
jobs <- readTVarIO jobsVar
void $ Map.traverseWithKey
(\(group', _) thread ->
when (group' == group) $
cancel thread
)
jobs


0 comments on commit 0740cec

Please sign in to comment.