Skip to content

Commit

Permalink
Externalized LiveNeuron data type from Neron type class.
Browse files Browse the repository at this point in the history
  • Loading branch information
mitar committed Nov 26, 2010
1 parent a40e8d4 commit 599610d
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 93 deletions.
5 changes: 0 additions & 5 deletions lib/Control/Etage/Dump.hs
Expand Up @@ -42,7 +42,6 @@ instance Ord DumpForImpulse where
compare = impulseCompare

instance Neuron DumpNeuron where
data LiveNeuron DumpNeuron = LiveDumpNeuron NeuronDissolved NeuronId
data NeuronFromImpulse DumpNeuron
data NeuronForImpulse DumpNeuron where
DumpForImpulse :: Impulse i => i -> DumpForImpulse
Expand All @@ -51,10 +50,6 @@ instance Neuron DumpNeuron where
showInsteadOfDump :: Bool
} deriving (Eq, Show)

mkLiveNeuron = LiveDumpNeuron
getNeuronDissolved (LiveDumpNeuron dissolved _) = dissolved
getNeuronId (LiveDumpNeuron _ nid) = nid

mkDefaultOptions = return DumpOptions {
handle = stdout,
showInsteadOfDump = False
Expand Down
81 changes: 44 additions & 37 deletions lib/Control/Etage/Externals.hs
Expand Up @@ -6,6 +6,7 @@ module Control.Etage.Externals (
Nerve,
AxonConductive,
AxonNonConductive,
LiveNeuron,
sendFromNeuron,
getFromNeuron,
maybeGetFromNeuron,
Expand All @@ -19,12 +20,14 @@ module Control.Etage.Externals (
getNewestForNeuron,
NeuronMapCapability(..),
mkNeuronMapOnRandomCapability,
NeuronDissolved,
NeuronId,
DissolvingException,
dissolving,
DissolveException,
dissolveNeuron,
attach',
detach,
detachAndWait,
detachMany,
detachManyAndWait,
ImpulseTranslator(..),
ImpulseTime,
ImpulseValue,
Expand All @@ -37,6 +40,8 @@ module Control.Etage.Externals (
impulseCompare
) where

import Prelude hiding (catch)

import Control.Concurrent hiding (Chan, writeChan, readChan, isEmptyChan)
import Data.Data
import Data.Function
Expand Down Expand Up @@ -75,7 +80,7 @@ sendForNeuron (Nerve _ (Axon chan)) i = writeChan chan i

getForNeuron :: Nerve from fromConductivity for forConductivity -> IO for
getForNeuron (Nerve _ (Axon chan)) = readChan chan
getForNeuron (Nerve _ NoAxon) = newEmptyMVar >>= takeMVar
getForNeuron (Nerve _ NoAxon) = waitForException

maybeGetForNeuron :: Nerve from fromConductivity for forConductivity -> IO (Maybe for)
maybeGetForNeuron (Nerve _ (Axon chan)) = maybeReadChan chan
Expand Down Expand Up @@ -121,10 +126,6 @@ mkNeuronMapOnRandomCapability = do
c <- randomRIO (1, numCapabilities)
return $ NeuronMapOnCapability c

type NeuronDissolved = MVar ()
type NeuronId = ThreadId

-- TODO: Move dissolved MVar handling into divideNeuron
divideNeuron :: Neuron n => NeuronOptions n -> IO () -> IO NeuronId
divideNeuron options a = fork a
where fork = case getNeuronMapCapability options of
Expand All @@ -133,19 +134,12 @@ divideNeuron options a = fork a

deriving instance Typeable1 (NeuronFromImpulse)
deriving instance Typeable1 (NeuronForImpulse)
deriving instance Typeable1 (LiveNeuron)
deriving instance Typeable1 (NeuronOptions)

class (Typeable n, Impulse (NeuronFromImpulse n), Impulse (NeuronForImpulse n)) => Neuron n where
data LiveNeuron n
data NeuronFromImpulse n
data NeuronForImpulse n
data NeuronOptions n

-- TODO: Once defaults for associated type synonyms are implemented change to that, if possible
mkLiveNeuron :: NeuronDissolved -> NeuronId -> LiveNeuron n
getNeuronDissolved :: LiveNeuron n -> NeuronDissolved
getNeuronId :: LiveNeuron n -> NeuronId

mkDefaultOptions :: IO (NeuronOptions n)

Expand All @@ -155,33 +149,30 @@ class (Typeable n, Impulse (NeuronFromImpulse n), Impulse (NeuronForImpulse n))
dissolve :: n -> IO ()
live :: Nerve (NeuronFromImpulse n) fromConductivity (NeuronForImpulse n) forConductivity -> n -> IO ()

attach :: (NeuronOptions n -> NeuronOptions n) -> Nerve (NeuronFromImpulse n) fromConductivity (NeuronForImpulse n) forConductivity -> IO (LiveNeuron n)
detach :: LiveNeuron n -> IO ()
attach :: (NeuronOptions n -> NeuronOptions n) -> Nerve (NeuronFromImpulse n) fromConductivity (NeuronForImpulse n) forConductivity -> IO LiveNeuron

mkDefaultOptions = return undefined

getNeuronMapCapability _ = NeuronFreelyMapOnCapability

grow _ = return undefined
dissolve _ = return ()
live _ _ = newEmptyMVar >>= takeMVar
live _ _ = waitForException

-- TODO: Move default implementation out of the class so that it can be reused/wrapped around in some other class instance definition
attach optionsSetter nerve = do
currentThread <- myThreadId
dissolved <- newEmptyMVar
defOptions <- mkDefaultOptions
let options = optionsSetter defOptions
sequel = putMVar dissolved ()
run = do
bracket (grow options) dissolve (unblock . live nerve) `catches` [
Handler (\(_ :: DissolveException) -> return ()), -- we ignore DissolveException
Handler (throwTo currentThread :: SomeException -> IO ())
] `onException` sequel -- TODO: Change to finally in GHC 7.0
sequel
nid <- block $ divideNeuron options run -- TODO: Change block to nonInterruptibleMask? Or remove? Or move to divideNeuron?
return $ mkLiveNeuron dissolved nid
detach = dissolveNeuron
attach = attach'

attach' :: Neuron n => (NeuronOptions n -> NeuronOptions n) -> Nerve (NeuronFromImpulse n) fromConductivity (NeuronForImpulse n) forConductivity -> IO LiveNeuron
attach' optionsSetter nerve = mask $ \restore -> do
currentThread <- myThreadId
dissolved <- newEmptyMVar
defOptions <- mkDefaultOptions
let options = optionsSetter defOptions
nid <- divideNeuron options $ do
bracket (grow options) dissolve (restore . live nerve) `catches` [ -- TODO: Should be dissolve wrapped in uninterruptibleMask
Handler (\(_ :: DissolveException) -> return ()), -- we ignore DissolveException
Handler (\(e :: SomeException) -> uninterruptible $ throwTo currentThread e)
] `finally` (uninterruptible $ putMVar dissolved ())
return $ LiveNeuron dissolved nid

data DissolvingException = DissolvingException String deriving (Show, Typeable)

Expand All @@ -194,8 +185,24 @@ data DissolveException = DissolveException deriving (Show, Typeable)

instance Exception DissolveException

dissolveNeuron :: Neuron n => LiveNeuron n -> IO ()
dissolveNeuron n = throwTo (getNeuronId n) DissolveException
detach :: LiveNeuron -> IO ()
detach (LiveNeuron _ neuronId) = mask_ . uninterruptible $ throwTo neuronId DissolveException

detachAndWait :: LiveNeuron -> IO ()
detachAndWait n = detachManyAndWait [n]

detachMany :: [LiveNeuron] -> IO ()
detachMany = mask_ . mapM_ detach

detachManyAndWait :: [LiveNeuron] -> IO ()
detachManyAndWait neurons = mask_ $ do
detachMany neurons
mapM_ (\(LiveNeuron d _) -> uninterruptible $ takeMVar d) neurons

-- Some operations are interruptible, better than to make them uninterruptible (which can cause deadlocks) we simply retry interrupted operation
-- For this to really work all interruptible operations should be wrapped like this (so it is not good idea to use IO operations in such code sections)
uninterruptible :: IO a -> IO a
uninterruptible a = mask_ $ a `catch` (\(_ :: SomeException) -> uninterruptible a)

class (Impulse i, Impulse j) => ImpulseTranslator i j where
translate :: i -> [j]
Expand All @@ -209,7 +216,7 @@ prepareEnvironment = do

mainThreadId <- myThreadId

-- TODO: User interrupt sometimes hangs dissolving
-- TODO: User interrupt sometimes hangs dissolving (does it still in GHC 7.0?)
_ <- installHandler keyboardSignal (Catch (throwTo mainThreadId UserInterrupt)) Nothing -- sigINT
_ <- installHandler softwareTermination (Catch (throwTo mainThreadId UserInterrupt)) Nothing -- sigTERM

Expand Down
40 changes: 11 additions & 29 deletions lib/Control/Etage/Incubator.hs
Expand Up @@ -8,13 +8,12 @@ module Control.Etage.Incubator (
NerveNone,
NerveOnlyFrom,
NerveOnlyFor,
Incubation
Incubation,
-- * Internals
growNerve
) where

import Prelude hiding (catch)

import Control.Applicative
import Control.Concurrent hiding (Chan, newChan, dupChan)
import Control.Exception
import Control.Monad
import Control.Monad.Operational
Expand All @@ -39,19 +38,20 @@ newtype Incubation a = Incubation (Incubation' a) deriving (Monad, MonadIO, Appl
incubate :: Incubation () -> IO ()
incubate (Incubation program) = mask $ \restore -> do
(neurons, chans, attached) <- restore $ interpret [] [] [] program
let na = nub chans \\ nub attached
typ = unlines . map (\(ChanBox c) -> show $ neuronTypeOf c) $ na
unless (null na) $ hPutStrLn stderr $ "Warning: It seems not all created nerves were attached. This causes a memory leak as produced data is not consumed. You should probably just define those nerves as NerveOnlyFor or NerveNone. Dangling nerves for neurons:\n" ++ typ
waitForDissolve neurons
(flip finally) (detachManyAndWait neurons) $ do
let na = nub chans \\ nub attached
typ = unlines . map (\(ChanBox c) -> show $ neuronTypeOf c) $ na
unless (null na) $ hPutStrLn stderr $ "Warning: It seems not all created nerves were attached. This causes a memory leak as produced data is not consumed. You should probably just define those nerves as NerveOnlyFor or NerveNone. Dangling nerves for neurons:\n" ++ typ
restore waitForException

interpret :: [Living] -> [ChanBox] -> [ChanBox] -> Incubation' () -> IO ([Living], [ChanBox], [ChanBox])
interpret :: [LiveNeuron] -> [ChanBox] -> [ChanBox] -> Incubation' () -> IO ([LiveNeuron], [ChanBox], [ChanBox])
interpret neurons chans attached = viewT >=> (eval neurons chans attached)
where eval :: [Living] -> [ChanBox] -> [ChanBox] -> ProgramViewT IncubationOperation IO () -> IO ([Living], [ChanBox], [ChanBox])
where eval :: [LiveNeuron] -> [ChanBox] -> [ChanBox] -> ProgramViewT IncubationOperation IO () -> IO ([LiveNeuron], [ChanBox], [ChanBox])
eval ns cs ats (Return _) = return (ns, cs, ats)
eval ns cs ats (NeuronOperation optionsSetter :>>= is) = do
nerve <- liftIO $ growNerve
let c = getFromChan nerve
bracketOnError (attach optionsSetter nerve) detach $ \n -> (interpret ((Living n):ns) (c ++ cs) ats) . is $ nerve
bracketOnError (attach optionsSetter nerve) detach $ \n -> (interpret (n:ns) (c ++ cs) ats) . is $ nerve
eval ns cs ats (AttachOperation from for :>>= is) = do
let c = head . getFromChan $ from -- we know there exists from chan as type checking assures that (from is conductive)
(from', ats') <- if c `notElem` ats
Expand Down Expand Up @@ -108,21 +108,3 @@ dupNerve :: Nerve from AxonConductive for forConductivity -> IO (Nerve from Axon
dupNerve (Nerve (Axon c) for) = do
c' <- dupChan c
return $ Nerve (Axon c') for

data Living where
Living :: Neuron n => LiveNeuron n -> Living

-- Blocks thread until an exception arrives and cleans-up afterwards, waiting for all threads to finish
-- Should have MVar computations wrapped in uninterruptible and should not use any IO (because all this can be interrupted despite block)
waitForDissolve :: [Living] -> IO ()
waitForDissolve neurons = block $ do -- TODO: Change block to nonInterruptibleMask? Or remove?
_ <- (newEmptyMVar >>= takeMVar) `finally` do
-- TODO: Should also takeMVar go into detach? Or is better to first send exceptions and then wait?
mapM_ (\(Living l) -> uninterruptible $ detach l) neurons
mapM_ (\(Living l) -> uninterruptible $ takeMVar . getNeuronDissolved $ l) neurons
return ()

-- TODO: Remove with GHC 7.0 and masks?
-- Big hack to prevent interruption: it simply retries interrupted computation
uninterruptible :: IO a -> IO a
uninterruptible a = block $ a `catch` (\(_ :: SomeException) -> uninterruptible a)
30 changes: 27 additions & 3 deletions lib/Control/Etage/Internals.hs
Expand Up @@ -4,12 +4,17 @@ module Control.Etage.Internals (
Axon(..),
Nerve(..),
Impulse(..),
LiveNeuron(..),
ImpulseValue,
ImpulseTime,
AxonConductive,
AxonNonConductive
AxonNonConductive,
NeuronDissolved,
NeuronId,
waitForException
) where

import Control.Concurrent hiding (Chan)
import Data.Time.Clock.POSIX
import Data.Typeable
import Numeric
Expand All @@ -35,12 +40,31 @@ class (Show i, Typeable i) => Impulse i where
impulseTime :: i -> ImpulseTime
impulseValue :: i -> ImpulseValue

data AxonConductive deriving Typeable
data AxonNonConductive deriving Typeable
data AxonConductive deriving (Typeable)
data AxonNonConductive deriving (Typeable)

data Axon impulse conductivity where
Axon :: Impulse i => Chan i -> Axon i AxonConductive
NoAxon :: Axon i AxonNonConductive

data Nerve from fromConductivity for forConductivity where
Nerve :: (Impulse from, Impulse for) => Axon from fromConductivity -> Axon for forConductivity -> Nerve from fromConductivity for forConductivity

deriving instance Typeable4 Nerve

instance (Typeable forConductivity, Typeable fromConductivity, Typeable from, Typeable for) => Show (Nerve from fromConductivity for forConductivity) where
show = show . typeOf

type NeuronDissolved = MVar ()
type NeuronId = ThreadId

instance Show NeuronDissolved where
show = show . typeOf

data LiveNeuron = LiveNeuron NeuronDissolved NeuronId deriving (Eq, Typeable)

instance Show LiveNeuron where
show = show . typeOf

waitForException :: IO a
waitForException = newEmptyMVar >>= takeMVar
13 changes: 5 additions & 8 deletions lib/Control/Etage/Propagate.hs
Expand Up @@ -14,7 +14,6 @@ import Control.Etage.Externals

data (Typeable from, Typeable for, Typeable forConductivity) => PropagateNeuron from for forConductivity = PropagateNeuron (PropagateOptions from for forConductivity) deriving (Typeable)

type LivePropagateNeuron from for forConductivity = LiveNeuron (PropagateNeuron from for forConductivity)
type PropagateFromImpulse from for forConductivity = NeuronFromImpulse (PropagateNeuron from for forConductivity)
type PropagateForImpulse from for forConductivity = NeuronForImpulse (PropagateNeuron from for forConductivity)
type PropagateOptions from for forConductivity = NeuronOptions (PropagateNeuron from for forConductivity)
Expand All @@ -40,19 +39,17 @@ deriving instance Show (PropagateForImpulse from for forConductivity)
Internal 'Neuron' which implements 'propagate'.
-}
instance (Typeable from, Typeable for, Typeable forConductivity) => Neuron (PropagateNeuron from for forConductivity) where
data LiveNeuron (PropagateNeuron from for forConductivity) = LivePropagateNeuron NeuronDissolved NeuronId
data NeuronFromImpulse (PropagateNeuron from for forConductivity)
data NeuronForImpulse (PropagateNeuron from for forConductivity)
data NeuronOptions (PropagateNeuron from for forConductivity) = PropagateOptions {
from :: Nerve from AxonConductive for forConductivity,
for ::[Translatable from]
}

mkLiveNeuron = LivePropagateNeuron
getNeuronDissolved (LivePropagateNeuron dissolved _) = dissolved
getNeuronId (LivePropagateNeuron _ nid) = nid

mkDefaultOptions = return PropagateOptions { from = undefined, for = undefined }
mkDefaultOptions = return PropagateOptions {
from = undefined,
for = undefined
}

grow options = return $ PropagateNeuron options

Expand All @@ -63,5 +60,5 @@ instance (Typeable from, Typeable for, Typeable forConductivity) => Neuron (Prop
propagate :: forall from for forConductivity. (Typeable from, Typeable for, Typeable forConductivity) => Nerve from AxonConductive for forConductivity -> [Translatable from] -> IO ()
propagate from for = do
-- we do not manage this neuron, it will be cleaned by RTS at program exit
_ <- attach (\o -> o { from, for }) undefined :: IO (LivePropagateNeuron from for forConductivity)
_ <- attach (\o -> o { from, for } :: NeuronOptions (PropagateNeuron from for forConductivity)) undefined
return ()
5 changes: 0 additions & 5 deletions lib/Control/Etage/Sequence.hs
Expand Up @@ -37,7 +37,6 @@ instance (Real r, Random r, Show r, Typeable r) => Impulse (SequenceForImpulse r
deriving instance Show (SequenceForImpulse r)

instance (Real r, Random r, Show r, Typeable r) => Neuron (SequenceNeuron r) where
data LiveNeuron (SequenceNeuron r) = LiveSequenceNeuron NeuronDissolved NeuronId
data NeuronFromImpulse (SequenceNeuron r) = Value {
impulseTimestamp :: ImpulseTime, -- time is first so that ordering is first by time
value :: r
Expand All @@ -48,10 +47,6 @@ instance (Real r, Random r, Show r, Typeable r) => Neuron (SequenceNeuron r) whe
intervalSource :: [Int] -- microseconds
} deriving (Eq, Ord, Read, Show)

mkLiveNeuron = LiveSequenceNeuron
getNeuronDissolved (LiveSequenceNeuron dissolved _) = dissolved
getNeuronId (LiveSequenceNeuron _ nid) = nid

mkDefaultOptions = do
generator <- newStdGen
generator' <- newStdGen
Expand Down
7 changes: 1 addition & 6 deletions lib/Control/Etage/Worker.hs
Expand Up @@ -20,7 +20,7 @@ import Control.Etage
type WorkType = IO ()

instance Show WorkType where
show _ = "Work"
show = show . typeOf

data WorkerNeuron deriving (Typeable)

Expand All @@ -39,7 +39,6 @@ instance Impulse WorkerForImpulse where
deriving instance Show WorkerFromImpulse

instance Neuron WorkerNeuron where
data LiveNeuron WorkerNeuron = LiveWorkerNeuron NeuronDissolved NeuronId
data NeuronFromImpulse WorkerNeuron
data NeuronForImpulse WorkerNeuron = Work {
impulseTimestamp :: ImpulseTime,
Expand All @@ -49,10 +48,6 @@ instance Neuron WorkerNeuron where
mapOnCapability :: NeuronMapCapability
} deriving (Eq, Ord, Read, Show)

mkLiveNeuron = LiveWorkerNeuron
getNeuronDissolved (LiveWorkerNeuron dissolved _) = dissolved
getNeuronId (LiveWorkerNeuron _ nid) = nid

mkDefaultOptions = do
neuronMapCapability <- mkNeuronMapOnRandomCapability
return WorkerOptions {
Expand Down

0 comments on commit 599610d

Please sign in to comment.