Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[work-pool] Add work-pool subproject #305

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions stack-9.4.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ flags:
development: true
tasty-mgolden:
development: true
work-pool:
development: true
xray:
development: true
packages:
Expand All @@ -97,4 +99,5 @@ packages:
- source-constraints
- stack-deploy
- tasty-mgolden
- work-pool
- xray
3 changes: 3 additions & 0 deletions stack-9.6.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ flags:
development: true
tasty-mgolden:
development: true
work-pool:
development: true
xray:
development: true
packages:
Expand All @@ -89,4 +91,5 @@ packages:
- source-constraints
- stack-deploy
- tasty-mgolden
- work-pool
- xray
3 changes: 3 additions & 0 deletions work-pool/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# work-pool

Minimal fixed max size, fixed worker count work pool with per worker boot and dynamic job production.
25 changes: 25 additions & 0 deletions work-pool/package.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
_common/package: !include "../common/package.yaml"

name: work-pool
synopsis: Simple work pool with max queue size, dynamic resupply and explicit worker boot.
homepage: https://github.com/mbj/mhs#readme
github: mbj/mhs
version: 0.0.1

<<: *defaults

dependencies:
- base
- containers
- mprelude
- text
- unliftio

tests:
test:
<<: *test
dependencies:
- devtools
- tasty
- tasty-hunit
- work-pool
119 changes: 119 additions & 0 deletions work-pool/src/WorkPool.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
module WorkPool
( Config(..)
, Done
, Pool
, Source
, pushJob
, readJobCount
, runPool
, runSource
)
where

import Data.Set (Set)
import MPrelude
import Prelude (succ)

import qualified Data.Set as Set
import qualified UnliftIO.Async as UnliftIO
import qualified UnliftIO.STM as UnliftIO

-- | Worker pool configuration
data Config m a = Config
{ produceJobs :: Pool a -> m ()
-- ^ function called to produce work, use `pushJob` to create workable jobs.
-- once this function returns and all jobs are worked off the workers exit.
, queueSize :: Natural
-- ^ maximum size of the jobs queued
, workerCount :: Natural
-- ^ number of workers to boot
, workerRun :: Natural -> Source a -> m Done
-- ^ function called when a worker is booted, argument is the worker index,
-- and a source to be drained with `runSource`
Comment on lines +30 to +32
Copy link
Collaborator

@dkubb dkubb Jun 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: If nothing yet uses the index, I would probably remove it from the interface and add it when it is needed.

EDIT: It's possible the tests don't use it, but other code does. If so, then I'd suggest adding a test to exercise it.

}

-- Internal queue event, supplying job or quitting the worker
data Event a = Quit | Job a

-- | Running pool
newtype Pool a = Pool
{ queue :: UnliftIO.TBQueue (Event a)
}

-- | Source to be drained
newtype Source a = Source
{ queue :: UnliftIO.TBQueue (Event a)
}

-- | Type forcing clients to drain the source via `runSource`
data Done = Done

-- | Add (dynamically) created a job to the pool
--
-- This function will block if the max queue size would be overflown.
-- As the workers create space in the queue this function will unblock.
pushJob :: MonadIO m => Pool a -> a -> m ()
pushJob Pool{..} item
= UnliftIO.atomically
$ UnliftIO.writeTBQueue queue (Job item)

-- | Read the number of jobs in the pool
--
-- This function does not block. Its not guaranteed count is still correct
-- when the function returns
readJobCount :: MonadIO m => Pool a -> m Natural
readJobCount Pool{..}
= UnliftIO.atomically
$ UnliftIO.lengthTBQueue queue

-- | Drain a source
--
-- This function:
-- * is executed 0 or many times per worker.
-- * executes the action as long there are items in the queue.
-- * as long the producer did not exit: blocks if there are no items in the queue
-- * if the producer exits and the queue is empty: returns.
-- * is he only way to produce a `Done` value required by the `Config` api.
runSource :: MonadIO m => (a -> m ()) -> Source a -> m Done
runSource action Source{..} = go $> Done
where
go = UnliftIO.atomically (UnliftIO.readTBQueue queue) >>= \case
(Job item) -> action item >> go
dkubb marked this conversation as resolved.
Show resolved Hide resolved
Quit -> UnliftIO.atomically $ UnliftIO.writeTBQueue queue Quit

-- | Run worker pool with specified config
--
-- The function will return if either:
-- * the `produceJobs` function returns
-- * a worker or throws an error
-- * the producer throws an error.
--
-- Care is taken to not leak threads via the use if `withAsync` from the `async` package.
runPool :: forall a m . MonadUnliftIO m => Config m a -> m ()
runPool Config{..} = boot =<< UnliftIO.atomically (UnliftIO.newTBQueue queueSize)
where
boot queue = go 0 []
where
go index workerHandlers =
if index == workerCount
then
UnliftIO.withAsync
(runProducer queue)
(\producerHandle -> waitAll (Set.fromList (producerHandle:workerHandlers)))
else
UnliftIO.withAsync
(workerRun index Source{..})
(\async -> go (succ index) (async:workerHandlers))

runProducer queue = do
produceJobs Pool{..}
UnliftIO.atomically (UnliftIO.writeTBQueue queue Quit)
pure Done

waitAll :: Set (UnliftIO.Async Done) -> m ()
waitAll remaining = case Set.toList remaining of
[] -> pure ()
[handler] -> void $ UnliftIO.wait handler
list -> do
(handler, _result) <- UnliftIO.waitAny list
waitAll $ Set.delete handler remaining
88 changes: 88 additions & 0 deletions work-pool/test/Test.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import Control.Arrow (left)
import Control.Monad (when)
import MPrelude
import Test.Tasty
import Test.Tasty.HUnit

import qualified Data.List as List
import qualified Data.Set as Set
import qualified Data.String as String
import qualified Devtools
import qualified UnliftIO.Concurrent as UnliftIO
import qualified UnliftIO.Exception as UnliftIO
import qualified WorkPool

main :: IO ()
main
= defaultMain
$ testGroup "work-pool"
[ Devtools.testTree $$(Devtools.readDependencies [Devtools.Target "work-pool"])
, mkSuccess 1 1
, mkSuccess 1 100
, mkSuccess 100 1
, mkSuccess 100 100
, mkSuccess 1000 1000
, producerFailure
, workerFailure
]
where
mkSuccess :: Natural -> Natural -> TestTree
mkSuccess queueSize workerCount =
testCase ("queue size: " <> show queueSize <> ", workerCount: " <> show workerCount) $ do
output <- UnliftIO.newMVar []
WorkPool.runPool $ config output
assertEqual "" (Set.fromList values) =<< UnliftIO.readMVar output
where
config output = WorkPool.Config{..}
where
workerRun :: Natural -> WorkPool.Source Natural -> IO WorkPool.Done
workerRun _index = WorkPool.runSource $ \value -> do
void $ UnliftIO.modifyMVar output $ \set -> pure (Set.insert value set, ())

workerFailure :: TestTree
workerFailure = testCase "worker failure" $ do
result <- UnliftIO.try (WorkPool.runPool config)
assertEqual "" (Left "intentional error\n") (left formatException result)
where
config = WorkPool.Config{queueSize = 100, workerCount = 100, ..}

workerRun :: Natural -> WorkPool.Source Natural -> IO WorkPool.Done
workerRun _index =
WorkPool.runSource $ \value ->
when (value == 100) $ UnliftIO.throwString "intentional error"

producerFailure :: TestTree
producerFailure = testCase "producer failure" $ do
result <- UnliftIO.try (WorkPool.runPool config)
assertEqual "" (Left "intentional error\n") (left formatException result)
where
config :: WorkPool.Config IO Natural
config
= WorkPool.Config
{ produceJobs = produceJobsFailing
, queueSize = 100
, workerCount = 100
, ..
}

produceJobsFailing :: MonadIO m => WorkPool.Pool Natural -> m ()
produceJobsFailing pool = do
WorkPool.pushJob pool 1
UnliftIO.throwString "intentional error"

workerRun :: MonadIO m => Natural -> WorkPool.Source Natural -> m WorkPool.Done
workerRun _index = WorkPool.runSource $ const (pure ())

formatException :: UnliftIO.SomeException -> String
formatException
= String.unlines
. List.drop 2
. List.take 3
. String.lines
. UnliftIO.displayException

produceJobs :: MonadIO m => WorkPool.Pool Natural -> m ()
produceJobs pool = traverse_ (WorkPool.pushJob pool) values

values :: [Natural]
values = [0..1000]
122 changes: 122 additions & 0 deletions work-pool/test/stack-9.4-dependencies.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
Diff 0.4.1
OneTuple 0.4.1.1
QuickCheck 2.14.3
StateVar 1.2.2
aeson 2.1.2.1
alex 3.3.0.0
ansi-terminal 0.11.5
ansi-terminal-types 0.11.5
ansi-wl-pprint 0.6.9
array 0.5.4.0
assoc 1.1
async 2.2.5
attoparsec 0.14.4
base 4.17.2.1
base-compat 0.12.3
base-compat-batteries 0.12.3
base-orphans 0.9.1
bifunctors 5.5.15
binary 0.8.9.1
bitvec 1.1.5.0
bytestring 0.11.5.3
call-stack 0.4.0
clock 0.8.4
cmdargs 0.10.22
colour 2.3.6
comonad 5.0.8
conduit 1.3.5
containers 0.6.7
contravariant 1.5.5
cpphs 1.20.9.1
data-default 0.7.1.1
data-default-class 0.1.2.0
data-default-instances-containers 0.0.1
data-default-instances-dlist 0.0.1
data-default-instances-old-locale 0.0.1
data-fix 0.3.2
deepseq 1.4.8.0
deriving-aeson 0.2.9
devtools 0.2.0
directory 1.3.7.1
distributive 0.6.2.1
dlist 1.0
exceptions 0.10.5
extra 1.7.14
file-embed 0.0.15.0
filepath 1.4.2.2
filepattern 0.1.3
foldable1-classes-compat 0.1
generically 0.1.1
ghc 9.4.8
ghc-bignum 1.3
ghc-boot 9.4.8
ghc-boot-th 9.4.8
ghc-heap 9.4.8
ghc-lib-parser 9.4.8.20231111
ghc-lib-parser-ex 9.4.0.0
ghc-prim 0.9.1
ghci 9.4.8
happy 1.20.1.1
hashable 1.4.3.0
hlint 3.5
hpc 0.6.1.0
hscolour 1.24.4
indexed-traversable 0.1.3
indexed-traversable-instances 0.1.1.2
integer-logarithms 1.0.3.1
libyaml 0.1.2
mono-traversable 1.0.15.3
mprelude 0.2.3
mtl 2.2.2
old-locale 1.0.0.7
optparse-applicative 0.17.1.0
parsec 3.1.16.1
polyparse 1.13
pretty 1.1.3.6
primitive 0.8.0.0
process 1.6.18.0
random 1.2.1.1
refact 0.3.0.2
resourcet 1.2.6
rts 1.0.2
safe-exceptions 0.1.7.4
scientific 0.3.7.0
semialign 1.3
semigroupoids 5.3.7
source-constraints 0.0.5
split 0.2.3.5
splitmix 0.1.0.5
stm 2.5.1.0
strict 0.5
syb 0.7.2.4
tagged 0.8.7
tasty 1.4.3
tasty-expected-failure 0.12.3
tasty-hunit 0.10.1
tasty-mgolden 0.0.2
template-haskell 2.19.0.0
terminfo 0.4.1.5
text 2.0.2
text-short 0.1.5
th-abstraction 0.4.5.0
th-lift 0.8.4
these 1.2
time 1.12.2
time-compat 1.9.6.1
transformers 0.5.6.2
transformers-compat 0.7.2
typed-process 0.2.11.1
unbounded-delays 0.1.1.1
uniplate 1.6.13
unix 2.7.3
unliftio 0.2.25.0
unliftio-core 0.2.1.0
unordered-containers 0.2.19.1
utf8-string 1.0.2
uuid-types 1.0.5.1
vector 0.13.1.0
vector-algorithms 0.9.0.1
vector-stream 0.1.0.0
witherable 0.4.2
work-pool 0.0.1
yaml 0.11.11.2
Loading