/
Boss.hs
69 lines (66 loc) · 2.88 KB
/
Boss.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import Control.Distributed.Process
import Control.Distributed.Process.Node (initRemoteTable)
import Control.Distributed.Process.Zookeeper
import Control.Monad (void)
import System.Environment (getArgs)
-- Run the program with "args" to request a job named "args" - run with no
-- args to startup a boss and each worker. Entering a new-line will
-- terminate Boss or Workers.
main :: IO ()
main =
bootstrapWith defaultConfig --{logTrace = sayTrace}
"localhost"
"0"
"localhost:2181"
initRemoteTable $
do args <- liftIO getArgs
case args of
job : more -> -- transient requestor - sends job and exits
do Just boss <- whereisGlobal "boss"
send boss (job ++ " " ++ unwords more)
_ -> -- will be a boss or boss candidate + worker
do say "Starting persistent process - press <Enter> to exit."
Right boss <- registerCandidate "boss" $
do say "I'm the boss!"
workers <- getCapable "worker"
case length workers of
0 -> say $ "I don't do any work! Start "
++ " a worker."
n -> say $ "I've got " ++ show n
++ " workers right now."
bossLoop 0
self <- getSelfNode
if self == processNodeId boss -- like a boss
then void $ liftIO getLine
else do worker <- getSelfPid
register "worker" worker
say "Worker waiting for task..."
void . spawnLocal $ --wait for exit
do void $ liftIO getLine
send worker "stop"
workLoop
-- Calling getCapable for every loop is no problem because the results
-- of this are cached until the worker node is updated in Zookeeper.
bossLoop :: Int -> Process ()
bossLoop i =
do job <- expect :: Process String
found <- getCapable "worker"
case found of
[] ->
do say "Where is my workforce?!"
bossLoop i
workers ->
do say $ "'Delegating' task: " ++ job ++ " to " ++ show (pick workers)
send (pick workers) job >> bossLoop (i + 1)
where pick workers
| length workers > 1 = head $ drop (mod i (length workers)) workers
| otherwise = head workers
workLoop :: Process ()
workLoop =
do work <- expect :: Process String
case take 4 work of
"stop" ->
say "No more work for me!"
_ ->
do say $ "Doing task: " ++ work
workLoop