Skip to content

Commit

Permalink
[work-pool] Add work-pool subproject
Browse files Browse the repository at this point in the history
* This is an extraction from my mutant-manager project.
  • Loading branch information
mbj committed Jun 3, 2024
1 parent e4ec728 commit bd1b861
Show file tree
Hide file tree
Showing 9 changed files with 597 additions and 0 deletions.
3 changes: 3 additions & 0 deletions stack-9.4.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ flags:
development: true
tasty-mgolden:
development: true
work-pool:
development: true
xray:
development: true
packages:
Expand All @@ -97,4 +99,5 @@ packages:
- source-constraints
- stack-deploy
- tasty-mgolden
- work-pool
- xray
3 changes: 3 additions & 0 deletions stack-9.6.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ flags:
development: true
tasty-mgolden:
development: true
work-pool:
development: true
xray:
development: true
packages:
Expand All @@ -89,4 +91,5 @@ packages:
- source-constraints
- stack-deploy
- tasty-mgolden
- work-pool
- xray
3 changes: 3 additions & 0 deletions work-pool/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# work-pool

Minimal fixed max size, fixed worker count work pool with per worker boot and dynamic job production.
25 changes: 25 additions & 0 deletions work-pool/package.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
_common/package: !include "../common/package.yaml"

name: work-pool
synopsis: Simple work pool with max queue size, dynamic resupply and explicit worker boot.
homepage: https://github.com/mbj/mhs#readme
github: mbj/mhs
version: 0.0.1

<<: *defaults

dependencies:
- base
- mprelude
- text
- unliftio

tests:
test:
<<: *test
dependencies:
- containers
- devtools
- tasty
- tasty-hunit
- work-pool
99 changes: 99 additions & 0 deletions work-pool/src/WorkPool.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
module WorkPool
( Config(..)
, Pool
, Source
, SourceDrained
, pushJob
, runPool
, runSource
)
where

import MPrelude
import Prelude (succ)

import qualified UnliftIO.Async as UnliftIO
import qualified UnliftIO.STM as UnliftIO

-- | Worker pool configuration
data Config m a = Config
{ produceJobs :: Pool a -> m ()
-- ^ function called from the main thread producing work, use `pushJob` to
-- create workable jobs.
, queueSize :: Natural
-- ^ maximum size of the jobs queued
, workerCount :: Natural
-- ^ number of workers to boot
, workerRun :: Natural -> Source a -> m SourceDrained
-- ^ function called when a worker is booted, argument is the worker index,
-- and a source to be drained with `runSource`
}

-- Internal queue event, supplying job or quitting the worker
data Event a = Quit | Job a

-- | Running pool
newtype Pool a = Pool
{ queue :: UnliftIO.TBQueue (Event a)
}

-- | Source to be drained
newtype Source a = Source
{ queue :: UnliftIO.TBQueue (Event a)
}

-- | Type forcing clients to drain the source via `runSource`
data SourceDrained = SourceDrained

-- | Add (dynamically) created a job to the pool
--
-- This function will block if the max queue size would be overflown.
-- As the workers create space in the queue this function will unblock.
pushJob :: MonadIO m => Pool a -> a -> m ()
pushJob Pool{..} item
= UnliftIO.atomically
$ UnliftIO.writeTBQueue queue (Job item)

-- | Drain a source
--
-- This function:
-- * is executed 0 or many times per worker.
-- * executes the action as long there are items in the queue.
-- * as long the producer did not exit: blocks if there are no items in the queue
-- * if the producer exits and the queue is empty: returns.
-- * is he only way to produce a `SourceDrained` value required by the `Config` api.
runSource :: MonadIO m => (a -> m ()) -> Source a -> m SourceDrained
runSource action Source{..} = go $> SourceDrained
where
go = UnliftIO.atomically (UnliftIO.readTBQueue queue) >>= \case
(Job item) -> action item >> go
Quit -> UnliftIO.atomically $ UnliftIO.writeTBQueue queue Quit

-- | Run worker pool with specified config
--
-- The function will return if either:
-- * the `produceJobs` function returns
-- * a worker or throws an error
-- * the producer throws an error.
--
-- Care is taken to not leak threads via the use if `withAsync` from the `async` package.
runPool :: MonadUnliftIO m => Config m a -> m ()
runPool Config{..} = do
pool@Pool{..} <- UnliftIO.atomically $ do
queue <- UnliftIO.newTBQueue queueSize
pure Pool{..}

boot pool $ \handlers -> do
produceJobs pool -- supply jobs
UnliftIO.atomically $ UnliftIO.writeTBQueue queue Quit -- signal workers to gracefully exit
traverse_ UnliftIO.wait handlers -- wait for workers to gracefully exit
where
boot Pool{..} withAsyncHandlers = go 0 []
where
go index asyncHandlers =
if index == workerCount
then withAsyncHandlers asyncHandlers
else
UnliftIO.withAsync
(workerRun index Source{..})
(\async -> go (succ index) (async:asyncHandlers))
88 changes: 88 additions & 0 deletions work-pool/test/Test.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import Control.Arrow (left)
import Control.Monad (when)
import MPrelude
import Test.Tasty
import Test.Tasty.HUnit

import qualified Data.List as List
import qualified Data.Set as Set
import qualified Data.String as String
import qualified Devtools
import qualified UnliftIO.Concurrent as UnliftIO
import qualified UnliftIO.Exception as UnliftIO
import qualified WorkPool

main :: IO ()
main
= defaultMain
$ testGroup "work-pool"
[ Devtools.testTree $$(Devtools.readDependencies [Devtools.Target "work-pool"])
, mkSuccess 1 1
, mkSuccess 1 100
, mkSuccess 100 1
, mkSuccess 100 100
, mkSuccess 1000 1000
, producerFailure
, workerFailure
]
where
mkSuccess :: Natural -> Natural -> TestTree
mkSuccess queueSize workerCount =
testCase ("queue size: " <> show queueSize <> ", workerCount: " <> show workerCount) $ do
output <- UnliftIO.newMVar []
WorkPool.runPool $ config output
assertEqual "" (Set.fromList values) =<< UnliftIO.readMVar output
where
config output = WorkPool.Config{..}
where
workerRun :: Natural -> WorkPool.Source Natural -> IO WorkPool.SourceDrained
workerRun _index = WorkPool.runSource $ \value -> do
void $ UnliftIO.modifyMVar output $ \set -> pure (Set.insert value set, ())

workerFailure :: TestTree
workerFailure = testCase "worker failure" $ do
result <- UnliftIO.try (WorkPool.runPool config)
assertEqual "" (Left "intentional error\n") (left formatException result)
where
config = WorkPool.Config{queueSize = 100, workerCount = 100, ..}

workerRun :: Natural -> WorkPool.Source Natural -> IO WorkPool.SourceDrained
workerRun _index =
WorkPool.runSource $ \value ->
when (value == 100) $ UnliftIO.throwString "intentional error"

producerFailure :: TestTree
producerFailure = testCase "producer failure" $ do
result <- UnliftIO.try (WorkPool.runPool config)
assertEqual "" (Left "intentional error\n") (left formatException result)
where
config :: WorkPool.Config IO Natural
config
= WorkPool.Config
{ produceJobs = produceJobsFailing
, queueSize = 100
, workerCount = 100
, ..
}

produceJobsFailing :: MonadIO m => WorkPool.Pool Natural -> m ()
produceJobsFailing pool = do
WorkPool.pushJob pool 1
UnliftIO.throwString "intentional error"

workerRun :: MonadIO m => Natural -> WorkPool.Source Natural -> m WorkPool.SourceDrained
workerRun _index = WorkPool.runSource $ const (pure ())

formatException :: UnliftIO.SomeException -> String
formatException
= String.unlines
. List.drop 2
. List.take 3
. String.lines
. UnliftIO.displayException

produceJobs :: MonadIO m => WorkPool.Pool Natural -> m ()
produceJobs pool = traverse_ (WorkPool.pushJob pool) values

values :: [Natural]
values = [0..1000]
122 changes: 122 additions & 0 deletions work-pool/test/stack-9.4-dependencies.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
Diff 0.4.1
OneTuple 0.4.1.1
QuickCheck 2.14.3
StateVar 1.2.2
aeson 2.1.2.1
alex 3.3.0.0
ansi-terminal 0.11.5
ansi-terminal-types 0.11.5
ansi-wl-pprint 0.6.9
array 0.5.4.0
assoc 1.1
async 2.2.5
attoparsec 0.14.4
base 4.17.2.1
base-compat 0.12.3
base-compat-batteries 0.12.3
base-orphans 0.9.1
bifunctors 5.5.15
binary 0.8.9.1
bitvec 1.1.5.0
bytestring 0.11.5.3
call-stack 0.4.0
clock 0.8.4
cmdargs 0.10.22
colour 2.3.6
comonad 5.0.8
conduit 1.3.5
containers 0.6.7
contravariant 1.5.5
cpphs 1.20.9.1
data-default 0.7.1.1
data-default-class 0.1.2.0
data-default-instances-containers 0.0.1
data-default-instances-dlist 0.0.1
data-default-instances-old-locale 0.0.1
data-fix 0.3.2
deepseq 1.4.8.0
deriving-aeson 0.2.9
devtools 0.2.0
directory 1.3.7.1
distributive 0.6.2.1
dlist 1.0
exceptions 0.10.5
extra 1.7.14
file-embed 0.0.15.0
filepath 1.4.2.2
filepattern 0.1.3
foldable1-classes-compat 0.1
generically 0.1.1
ghc 9.4.8
ghc-bignum 1.3
ghc-boot 9.4.8
ghc-boot-th 9.4.8
ghc-heap 9.4.8
ghc-lib-parser 9.4.8.20231111
ghc-lib-parser-ex 9.4.0.0
ghc-prim 0.9.1
ghci 9.4.8
happy 1.20.1.1
hashable 1.4.3.0
hlint 3.5
hpc 0.6.1.0
hscolour 1.24.4
indexed-traversable 0.1.3
indexed-traversable-instances 0.1.1.2
integer-logarithms 1.0.3.1
libyaml 0.1.2
mono-traversable 1.0.15.3
mprelude 0.2.3
mtl 2.2.2
old-locale 1.0.0.7
optparse-applicative 0.17.1.0
parsec 3.1.16.1
polyparse 1.13
pretty 1.1.3.6
primitive 0.8.0.0
process 1.6.18.0
random 1.2.1.1
refact 0.3.0.2
resourcet 1.2.6
rts 1.0.2
safe-exceptions 0.1.7.4
scientific 0.3.7.0
semialign 1.3
semigroupoids 5.3.7
source-constraints 0.0.5
split 0.2.3.5
splitmix 0.1.0.5
stm 2.5.1.0
strict 0.5
syb 0.7.2.4
tagged 0.8.7
tasty 1.4.3
tasty-expected-failure 0.12.3
tasty-hunit 0.10.1
tasty-mgolden 0.0.2
template-haskell 2.19.0.0
terminfo 0.4.1.5
text 2.0.2
text-short 0.1.5
th-abstraction 0.4.5.0
th-lift 0.8.4
these 1.2
time 1.12.2
time-compat 1.9.6.1
transformers 0.5.6.2
transformers-compat 0.7.2
typed-process 0.2.11.1
unbounded-delays 0.1.1.1
uniplate 1.6.13
unix 2.7.3
unliftio 0.2.25.0
unliftio-core 0.2.1.0
unordered-containers 0.2.19.1
utf8-string 1.0.2
uuid-types 1.0.5.1
vector 0.13.1.0
vector-algorithms 0.9.0.1
vector-stream 0.1.0.0
witherable 0.4.2
work-pool 0.0.1
yaml 0.11.11.2
Loading

0 comments on commit bd1b861

Please sign in to comment.