-
Notifications
You must be signed in to change notification settings - Fork 0
/
Externals.hs
238 lines (186 loc) · 8.63 KB
/
Externals.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
{-# LANGUAGE TypeFamilies, MultiParamTypeClasses, GADTs, FlexibleInstances, FlexibleContexts, ScopedTypeVariables, TypeSynonymInstances, StandaloneDeriving, DeriveDataTypeable, EmptyDataDecls, RecordWildCards, NamedFieldPuns #-}
module Control.Etage.Externals (
Neuron(..),
Impulse(..),
Nerve,
AxonConductive,
AxonNonConductive,
LiveNeuron,
sendFromNeuron,
getFromNeuron,
maybeGetFromNeuron,
slurpFromNeuron,
waitAndSlurpFromNeuron,
sendForNeuron,
getForNeuron,
maybeGetForNeuron,
slurpForNeuron,
waitAndSlurpForNeuron,
getNewestForNeuron,
NeuronMapCapability(..),
mkNeuronMapOnRandomCapability,
DissolvingException,
dissolving,
DissolveException,
attach',
detach,
detachAndWait,
detachMany,
detachManyAndWait,
ImpulseTranslator(..),
ImpulseTime,
ImpulseValue,
defaultOptions,
prepareEnvironment,
translateAndSend,
Translatable(..),
getCurrentImpulseTime,
impulseEq,
impulseCompare
) where
import Prelude hiding (catch)
import Control.Concurrent hiding (Chan, writeChan, readChan, isEmptyChan)
import Data.Data
import Data.Function
import Data.List
import Control.Exception
import Data.Time.Clock.POSIX
import GHC.Conc (forkOnIO, numCapabilities)
import System.IO
import System.Posix.Signals
import System.Random
import Control.Etage.Chan
import Control.Etage.Internals
sendFromNeuron :: Nerve from fromConductivity for forConductivity -> from -> IO ()
sendFromNeuron (Nerve (Axon chan) _) i = writeChan chan i
sendFromNeuron (Nerve NoAxon _) _ = return () -- we allow sending but ignore so that same Neuron defintion can be used on all kinds of Nerves
getFromNeuron :: Nerve from AxonConductive for forConductivity -> IO from
getFromNeuron (Nerve (Axon chan) _) = readChan chan
maybeGetFromNeuron :: Nerve from AxonConductive for forConductivity -> IO (Maybe from)
maybeGetFromNeuron (Nerve (Axon chan) _) = maybeReadChan chan
slurpFromNeuron :: Nerve from AxonConductive for forConductivity -> IO [from]
slurpFromNeuron (Nerve (Axon chan) _) = slurpChan chan
waitAndSlurpFromNeuron :: Nerve from AxonConductive for forConductivity -> IO [from]
waitAndSlurpFromNeuron nerve = do
oldest <- getFromNeuron nerve
others <- slurpFromNeuron nerve
return $ others ++ [oldest]
sendForNeuron :: Nerve from fromConductivity for AxonConductive -> for -> IO ()
sendForNeuron (Nerve _ (Axon chan)) i = writeChan chan i
getForNeuron :: Nerve from fromConductivity for forConductivity -> IO for
getForNeuron (Nerve _ (Axon chan)) = readChan chan
getForNeuron (Nerve _ NoAxon) = waitForException
maybeGetForNeuron :: Nerve from fromConductivity for forConductivity -> IO (Maybe for)
maybeGetForNeuron (Nerve _ (Axon chan)) = maybeReadChan chan
maybeGetForNeuron (Nerve _ NoAxon) = return Nothing -- we allow getting but return Nothing so that same Neuron defintion can be used on all kinds of Nerves
slurpForNeuron :: Nerve from fromConductivity for forConductivity -> IO [for]
slurpForNeuron (Nerve _ (Axon chan)) = slurpChan chan
slurpForNeuron (Nerve _ NoAxon) = return [] -- we allow getting but return [] so that same Neuron defintion can be used on all kinds of Nerves
waitAndSlurpForNeuron :: Nerve from fromConductivity for forConductivity -> IO [for]
waitAndSlurpForNeuron nerve = do
oldest <- getForNeuron nerve
others <- slurpForNeuron nerve
return $ others ++ [oldest]
getNewestForNeuron :: (Data for, Impulse for) => Nerve from fromConductivity for forConductivity -> IO [for]
getNewestForNeuron nerve = do
impulses <- waitAndSlurpForNeuron nerve
return $ nubBy ((==) `on` toConstr) $ impulses
maybeReadChan :: Chan a -> IO (Maybe a)
maybeReadChan chan = do
e <- isEmptyChan chan
if e
then return Nothing
else do
c <- readChan chan
return $ Just c
-- First-in (oldest) element in the channel is last in the list
slurpChan :: Chan a -> IO [a]
slurpChan chan = slurpChan' []
where slurpChan' cs = do
mc <- maybeReadChan chan
case mc of
Nothing -> return cs
Just c -> slurpChan' (c:cs)
data NeuronMapCapability = NeuronMapOnCapability Int | NeuronFreelyMapOnCapability deriving (Eq, Ord, Read, Show)
mkNeuronMapOnRandomCapability :: IO NeuronMapCapability
mkNeuronMapOnRandomCapability = do
c <- randomRIO (1, numCapabilities)
return $ NeuronMapOnCapability c
divideNeuron :: Neuron n => NeuronOptions n -> IO () -> IO NeuronId
divideNeuron options a = fork a
where fork = case getNeuronMapCapability options of
NeuronFreelyMapOnCapability -> forkIO
NeuronMapOnCapability c -> forkOnIO c
deriving instance Typeable1 (NeuronFromImpulse)
deriving instance Typeable1 (NeuronForImpulse)
deriving instance Typeable1 (NeuronOptions)
class (Typeable n, Impulse (NeuronFromImpulse n), Impulse (NeuronForImpulse n)) => Neuron n where
data NeuronFromImpulse n
data NeuronForImpulse n
data NeuronOptions n
mkDefaultOptions :: IO (NeuronOptions n)
getNeuronMapCapability :: NeuronOptions n -> NeuronMapCapability
grow :: NeuronOptions n -> IO n
dissolve :: n -> IO ()
live :: Nerve (NeuronFromImpulse n) fromConductivity (NeuronForImpulse n) forConductivity -> n -> IO ()
attach :: (NeuronOptions n -> NeuronOptions n) -> Nerve (NeuronFromImpulse n) fromConductivity (NeuronForImpulse n) forConductivity -> IO LiveNeuron
mkDefaultOptions = return undefined
getNeuronMapCapability _ = NeuronFreelyMapOnCapability
grow _ = return undefined
dissolve _ = return ()
live _ _ = waitForException
attach = attach'
attach' :: Neuron n => (NeuronOptions n -> NeuronOptions n) -> Nerve (NeuronFromImpulse n) fromConductivity (NeuronForImpulse n) forConductivity -> IO LiveNeuron
attach' optionsSetter nerve = mask $ \restore -> do
currentThread <- myThreadId
dissolved <- newEmptyMVar
defOptions <- mkDefaultOptions
let options = optionsSetter defOptions
nid <- divideNeuron options $ do
bracket (grow options) dissolve (restore . live nerve) `catches` [ -- TODO: Should be dissolve wrapped in uninterruptibleMask
Handler (\(_ :: DissolveException) -> return ()), -- we ignore DissolveException
Handler (\(e :: SomeException) -> uninterruptible $ throwTo currentThread e)
] `finally` (uninterruptible $ putMVar dissolved ())
return $ LiveNeuron dissolved nid
data DissolvingException = DissolvingException String deriving (Show, Typeable)
instance Exception DissolvingException
dissolving :: Show n => n -> IO a
dissolving n = throwIO $ DissolvingException (show n)
data DissolveException = DissolveException deriving (Show, Typeable)
instance Exception DissolveException
detach :: LiveNeuron -> IO ()
detach (LiveNeuron _ neuronId) = mask_ . uninterruptible $ throwTo neuronId DissolveException
detachAndWait :: LiveNeuron -> IO ()
detachAndWait n = detachManyAndWait [n]
detachMany :: [LiveNeuron] -> IO ()
detachMany = mask_ . mapM_ detach
detachManyAndWait :: [LiveNeuron] -> IO ()
detachManyAndWait neurons = mask_ $ do
detachMany neurons
mapM_ (\(LiveNeuron d _) -> uninterruptible $ takeMVar d) neurons
-- Some operations are interruptible, better than to make them uninterruptible (which can cause deadlocks) we simply retry interrupted operation
-- For this to really work all interruptible operations should be wrapped like this (so it is not good idea to use IO operations in such code sections)
uninterruptible :: IO a -> IO a
uninterruptible a = mask_ $ a `catch` (\(_ :: SomeException) -> uninterruptible a)
class (Impulse i, Impulse j) => ImpulseTranslator i j where
translate :: i -> [j]
defaultOptions :: Neuron n => NeuronOptions n -> NeuronOptions n
defaultOptions = id
prepareEnvironment :: IO ()
prepareEnvironment = do
hSetBuffering stderr LineBuffering
mainThreadId <- myThreadId
-- TODO: User interrupt sometimes hangs dissolving (does it still in GHC 7.0?)
_ <- installHandler keyboardSignal (Catch (throwTo mainThreadId UserInterrupt)) Nothing -- sigINT
_ <- installHandler softwareTermination (Catch (throwTo mainThreadId UserInterrupt)) Nothing -- sigTERM
return ()
translateAndSend :: ImpulseTranslator i for => Nerve from fromConductivity for AxonConductive -> i -> IO ()
translateAndSend nerve i = mapM_ (sendForNeuron nerve) $ translate i
data Translatable i where
Translatable :: ImpulseTranslator i for => Nerve from fromConductivity for AxonConductive -> Translatable i
getCurrentImpulseTime :: IO ImpulseTime
getCurrentImpulseTime = getPOSIXTime
impulseEq :: (Impulse i, Impulse j) => i -> j -> Bool
impulseEq a b = impulseTime a == impulseTime b && impulseValue a == impulseValue b
impulseCompare :: (Impulse i, Impulse j) => i -> j -> Ordering
impulseCompare a b = (impulseTime a, impulseValue a) `compare` (impulseTime b, impulseValue b)