-
Notifications
You must be signed in to change notification settings - Fork 2
/
fork.hs
79 lines (73 loc) · 3.09 KB
/
fork.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
module Main
where
------------------------------------------------------------------------
-- Device that sends data streams coming from a connected publisher
-- to all connected subscribers;
-- subscribers can be connected and removed through the "ifc" program.
-- The program cannot be stopped by the INT signal.
-- Instead, it is stopped by using the "stop" command with ifc.
------------------------------------------------------------------------
import Command
import Helper (getPorts, address, onErr_, onErr)
import Network.Mom.Patterns
import Control.Concurrent
import System.Posix.Signals
main :: IO ()
main = do
((l1, p1), (l2, p2), r) <- getPorts
(l3, p3) <- case r of
[l,p] -> case getThird l p of
Nothing -> usage
Just lp -> return lp
_ -> print r >> usage
let sub = pollEntry "Subscriber" XSub
(address l1 "tcp" "localhost" p1 []) l1 [""]
let pub = pollEntry "Publisher" XPub
(address l2 "tcp" "localhost" p2 []) l2 []
-- don't stop process on sigINT
_ <- installHandler sigINT Ignore Nothing
withContext 1 $ \ctx ->
withDevice ctx "Forker" noparam (-1)
[sub, pub] idIn idOut onErr_
(\_ -> putStrLn "Timeout!")
(\_ -> putThrough) $ \d -> do
cc <- newChan
rc <- newChan
withServer ctx "Commander" noparam 1
(address l3 "tcp" "localhost" p3 []) l3
toCmd fromResult onErr
(\_ -> one Empty)
(fetch1 (commander cc rc)) $ go cc rc d
where go cc rc d s = do
cmd <- readChan cc
putStrLn $ srvName s ++ " received: " ++ show cmd
case cmd of
Empty -> writeChan rc OK >> go cc rc d s
Stop -> do
writeChan rc OK
threadDelay 10000
Tmo t -> do
changeTimeout d t
writeChan rc OK
go cc rc d s
AddPort i l p -> do
let e = pollEntry i XPub
(address l "tcp" "localhost" p []) l []
addDevice d e
writeChan rc OK
go cc rc d s
RemPort i -> do
remDevice d i
writeChan rc OK
go cc rc d s
getThird :: String -> String -> Maybe (LinkType, Int)
getThird l p =
case parseLink l of
Just t -> Just (t, read p)
_ -> Nothing
usage :: IO a
usage = error $ "fork 'bind' | 'connect' <incoming port>\n" ++
" 'bind' | 'connect' <outgoing port>\n" ++
" 'bind' | 'connect' <command port>\n"
commander :: Chan Command -> Chan Result -> FetchHelper' Command Result
commander cc rc _ _ cmd = writeChan cc cmd >> readChan rc