Skip to content

Commit

Permalink
[work-pool] Add work-pool subproject
Browse files Browse the repository at this point in the history
* This is an extraction from my mutant-manager project.
  • Loading branch information
mbj committed Jun 2, 2024
1 parent e4ec728 commit c3bbefb
Show file tree
Hide file tree
Showing 9 changed files with 608 additions and 0 deletions.
3 changes: 3 additions & 0 deletions stack-9.4.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ flags:
development: true
tasty-mgolden:
development: true
work-pool:
development: true
xray:
development: true
packages:
Expand All @@ -97,4 +99,5 @@ packages:
- source-constraints
- stack-deploy
- tasty-mgolden
- work-pool
- xray
3 changes: 3 additions & 0 deletions stack-9.6.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ flags:
development: true
tasty-mgolden:
development: true
work-pool:
development: true
xray:
development: true
packages:
Expand All @@ -89,4 +91,5 @@ packages:
- source-constraints
- stack-deploy
- tasty-mgolden
- work-pool
- xray
3 changes: 3 additions & 0 deletions work-pool/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# work-pool

Minimal fixed max size, fixed worker count work pool with per worker boot and dynamic job production.
25 changes: 25 additions & 0 deletions work-pool/package.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
_common/package: !include "../common/package.yaml"

name: work-pool
synopsis: Simple work pool with max queue size, dynamic resupply and explicit worker boot.
homepage: https://github.com/mbj/mhs#readme
github: mbj/mhs
version: 0.0.1

<<: *defaults

dependencies:
- base
- containers
- mprelude
- text
- unliftio

tests:
test:
<<: *test
dependencies:
- devtools
- tasty
- tasty-hunit
- work-pool
109 changes: 109 additions & 0 deletions work-pool/src/WorkPool.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
module WorkPool
( Config(..)
, Done
, Pool
, Source
, pushJob
, runPool
, runSource
)
where

import Data.Set (Set)
import MPrelude
import Prelude (succ)

import qualified Data.Set as Set
import qualified UnliftIO.Async as UnliftIO
import qualified UnliftIO.STM as UnliftIO

-- | Worker pool configuration
data Config m a = Config
{ produceJobs :: Pool a -> m ()
-- ^ function called to produce work, use `pushJob` to create workable jobs.
-- once this function returns and all jobs are worked off the workers exit.
, queueSize :: Natural
-- ^ maximum size of the jobs queued
, workerCount :: Natural
-- ^ number of workers to boot
, workerRun :: Natural -> Source a -> m Done
-- ^ function called when a worker is booted, argument is the worker index,
-- and a source to be drained with `runSource`
}

-- Internal queue event, supplying job or quitting the worker
data Event a = Quit | Job a

-- | Running pool
newtype Pool a = Pool
{ queue :: UnliftIO.TBQueue (Event a)
}

-- | Source to be drained
newtype Source a = Source
{ queue :: UnliftIO.TBQueue (Event a)
}

-- | Type forcing clients to drain the source via `runSource`
data Done = Done

-- | Add (dynamically) created a job to the pool
--
-- This function will block if the max queue size would be overflown.
-- As the workers create space in the queue this function will unblock.
pushJob :: MonadIO m => Pool a -> a -> m ()
pushJob Pool{..} item
= UnliftIO.atomically
$ UnliftIO.writeTBQueue queue (Job item)

-- | Drain a source
--
-- This function:
-- * is executed 0 or many times per worker.
-- * executes the action as long there are items in the queue.
-- * as long the producer did not exit: blocks if there are no items in the queue
-- * if the producer exits and the queue is empty: returns.
-- * is he only way to produce a `Done` value required by the `Config` api.
runSource :: MonadIO m => (a -> m ()) -> Source a -> m Done
runSource action Source{..} = go $> Done
where
go = UnliftIO.atomically (UnliftIO.readTBQueue queue) >>= \case
(Job item) -> action item >> go
Quit -> UnliftIO.atomically $ UnliftIO.writeTBQueue queue Quit

-- | Run worker pool with specified config
--
-- The function will return if either:
-- * the `produceJobs` function returns
-- * a worker or throws an error
-- * the producer throws an error.
--
-- Care is taken to not leak threads via the use if `withAsync` from the `async` package.
runPool :: forall a m . MonadUnliftIO m => Config m a -> m ()
runPool Config{..} = boot =<< UnliftIO.atomically (UnliftIO.newTBQueue queueSize)
where
boot queue = go 0 []
where
go index workerHandlers =
if index == workerCount
then
UnliftIO.withAsync
(runProducer queue)
(\producerHandle -> waitAll (Set.fromList (producerHandle:workerHandlers)))
else
UnliftIO.withAsync
(workerRun index Source{..})
(\async -> go (succ index) (async:workerHandlers))

runProducer queue = do
produceJobs Pool{..}
UnliftIO.atomically (UnliftIO.writeTBQueue queue Quit)
pure Done

waitAll :: Set (UnliftIO.Async Done) -> m ()
waitAll remaining = case Set.toList remaining of
[] -> pure ()
[handler] -> void $ UnliftIO.wait handler
list -> do
(handler, _result) <- UnliftIO.waitAny list
waitAll $ Set.delete handler remaining
88 changes: 88 additions & 0 deletions work-pool/test/Test.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import Control.Arrow (left)
import Control.Monad (when)
import MPrelude
import Test.Tasty
import Test.Tasty.HUnit

import qualified Data.List as List
import qualified Data.Set as Set
import qualified Data.String as String
import qualified Devtools
import qualified UnliftIO.Concurrent as UnliftIO
import qualified UnliftIO.Exception as UnliftIO
import qualified WorkPool

main :: IO ()
main
= defaultMain
$ testGroup "work-pool"
[ Devtools.testTree $$(Devtools.readDependencies [Devtools.Target "work-pool"])
, mkSuccess 1 1
, mkSuccess 1 100
, mkSuccess 100 1
, mkSuccess 100 100
, mkSuccess 1000 1000
, producerFailure
, workerFailure
]
where
mkSuccess :: Natural -> Natural -> TestTree
mkSuccess queueSize workerCount =
testCase ("queue size: " <> show queueSize <> ", workerCount: " <> show workerCount) $ do
output <- UnliftIO.newMVar []
WorkPool.runPool $ config output
assertEqual "" (Set.fromList values) =<< UnliftIO.readMVar output
where
config output = WorkPool.Config{..}
where
workerRun :: Natural -> WorkPool.Source Natural -> IO WorkPool.Done
workerRun _index = WorkPool.runSource $ \value -> do
void $ UnliftIO.modifyMVar output $ \set -> pure (Set.insert value set, ())

workerFailure :: TestTree
workerFailure = testCase "worker failure" $ do
result <- UnliftIO.try (WorkPool.runPool config)
assertEqual "" (Left "intentional error\n") (left formatException result)
where
config = WorkPool.Config{queueSize = 100, workerCount = 100, ..}

workerRun :: Natural -> WorkPool.Source Natural -> IO WorkPool.Done
workerRun _index =
WorkPool.runSource $ \value ->
when (value == 100) $ UnliftIO.throwString "intentional error"

producerFailure :: TestTree
producerFailure = testCase "producer failure" $ do
result <- UnliftIO.try (WorkPool.runPool config)
assertEqual "" (Left "intentional error\n") (left formatException result)
where
config :: WorkPool.Config IO Natural
config
= WorkPool.Config
{ produceJobs = produceJobsFailing
, queueSize = 100
, workerCount = 100
, ..
}

produceJobsFailing :: MonadIO m => WorkPool.Pool Natural -> m ()
produceJobsFailing pool = do
WorkPool.pushJob pool 1
UnliftIO.throwString "intentional error"

workerRun :: MonadIO m => Natural -> WorkPool.Source Natural -> m WorkPool.Done
workerRun _index = WorkPool.runSource $ const (pure ())

formatException :: UnliftIO.SomeException -> String
formatException
= String.unlines
. List.drop 2
. List.take 3
. String.lines
. UnliftIO.displayException

produceJobs :: MonadIO m => WorkPool.Pool Natural -> m ()
produceJobs pool = traverse_ (WorkPool.pushJob pool) values

values :: [Natural]
values = [0..1000]
122 changes: 122 additions & 0 deletions work-pool/test/stack-9.4-dependencies.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
Diff 0.4.1
OneTuple 0.4.1.1
QuickCheck 2.14.3
StateVar 1.2.2
aeson 2.1.2.1
alex 3.3.0.0
ansi-terminal 0.11.5
ansi-terminal-types 0.11.5
ansi-wl-pprint 0.6.9
array 0.5.4.0
assoc 1.1
async 2.2.5
attoparsec 0.14.4
base 4.17.2.1
base-compat 0.12.3
base-compat-batteries 0.12.3
base-orphans 0.9.1
bifunctors 5.5.15
binary 0.8.9.1
bitvec 1.1.5.0
bytestring 0.11.5.3
call-stack 0.4.0
clock 0.8.4
cmdargs 0.10.22
colour 2.3.6
comonad 5.0.8
conduit 1.3.5
containers 0.6.7
contravariant 1.5.5
cpphs 1.20.9.1
data-default 0.7.1.1
data-default-class 0.1.2.0
data-default-instances-containers 0.0.1
data-default-instances-dlist 0.0.1
data-default-instances-old-locale 0.0.1
data-fix 0.3.2
deepseq 1.4.8.0
deriving-aeson 0.2.9
devtools 0.2.0
directory 1.3.7.1
distributive 0.6.2.1
dlist 1.0
exceptions 0.10.5
extra 1.7.14
file-embed 0.0.15.0
filepath 1.4.2.2
filepattern 0.1.3
foldable1-classes-compat 0.1
generically 0.1.1
ghc 9.4.8
ghc-bignum 1.3
ghc-boot 9.4.8
ghc-boot-th 9.4.8
ghc-heap 9.4.8
ghc-lib-parser 9.4.8.20231111
ghc-lib-parser-ex 9.4.0.0
ghc-prim 0.9.1
ghci 9.4.8
happy 1.20.1.1
hashable 1.4.3.0
hlint 3.5
hpc 0.6.1.0
hscolour 1.24.4
indexed-traversable 0.1.3
indexed-traversable-instances 0.1.1.2
integer-logarithms 1.0.3.1
libyaml 0.1.2
mono-traversable 1.0.15.3
mprelude 0.2.3
mtl 2.2.2
old-locale 1.0.0.7
optparse-applicative 0.17.1.0
parsec 3.1.16.1
polyparse 1.13
pretty 1.1.3.6
primitive 0.8.0.0
process 1.6.18.0
random 1.2.1.1
refact 0.3.0.2
resourcet 1.2.6
rts 1.0.2
safe-exceptions 0.1.7.4
scientific 0.3.7.0
semialign 1.3
semigroupoids 5.3.7
source-constraints 0.0.5
split 0.2.3.5
splitmix 0.1.0.5
stm 2.5.1.0
strict 0.5
syb 0.7.2.4
tagged 0.8.7
tasty 1.4.3
tasty-expected-failure 0.12.3
tasty-hunit 0.10.1
tasty-mgolden 0.0.2
template-haskell 2.19.0.0
terminfo 0.4.1.5
text 2.0.2
text-short 0.1.5
th-abstraction 0.4.5.0
th-lift 0.8.4
these 1.2
time 1.12.2
time-compat 1.9.6.1
transformers 0.5.6.2
transformers-compat 0.7.2
typed-process 0.2.11.1
unbounded-delays 0.1.1.1
uniplate 1.6.13
unix 2.7.3
unliftio 0.2.25.0
unliftio-core 0.2.1.0
unordered-containers 0.2.19.1
utf8-string 1.0.2
uuid-types 1.0.5.1
vector 0.13.1.0
vector-algorithms 0.9.0.1
vector-stream 0.1.0.0
witherable 0.4.2
work-pool 0.0.1
yaml 0.11.11.2
Loading

0 comments on commit c3bbefb

Please sign in to comment.