diff --git a/lib/Control/Etage/Dump.hs b/lib/Control/Etage/Dump.hs index d945235..c518c38 100644 --- a/lib/Control/Etage/Dump.hs +++ b/lib/Control/Etage/Dump.hs @@ -42,7 +42,6 @@ instance Ord DumpForImpulse where compare = impulseCompare instance Neuron DumpNeuron where - data LiveNeuron DumpNeuron = LiveDumpNeuron NeuronDissolved NeuronId data NeuronFromImpulse DumpNeuron data NeuronForImpulse DumpNeuron where DumpForImpulse :: Impulse i => i -> DumpForImpulse @@ -51,10 +50,6 @@ instance Neuron DumpNeuron where showInsteadOfDump :: Bool } deriving (Eq, Show) - mkLiveNeuron = LiveDumpNeuron - getNeuronDissolved (LiveDumpNeuron dissolved _) = dissolved - getNeuronId (LiveDumpNeuron _ nid) = nid - mkDefaultOptions = return DumpOptions { handle = stdout, showInsteadOfDump = False diff --git a/lib/Control/Etage/Externals.hs b/lib/Control/Etage/Externals.hs index e13bae1..ed13a97 100644 --- a/lib/Control/Etage/Externals.hs +++ b/lib/Control/Etage/Externals.hs @@ -6,6 +6,7 @@ module Control.Etage.Externals ( Nerve, AxonConductive, AxonNonConductive, + LiveNeuron, sendFromNeuron, getFromNeuron, maybeGetFromNeuron, @@ -19,12 +20,14 @@ module Control.Etage.Externals ( getNewestForNeuron, NeuronMapCapability(..), mkNeuronMapOnRandomCapability, - NeuronDissolved, - NeuronId, DissolvingException, dissolving, DissolveException, - dissolveNeuron, + attach', + detach, + detachAndWait, + detachMany, + detachManyAndWait, ImpulseTranslator(..), ImpulseTime, ImpulseValue, @@ -37,6 +40,8 @@ module Control.Etage.Externals ( impulseCompare ) where +import Prelude hiding (catch) + import Control.Concurrent hiding (Chan, writeChan, readChan, isEmptyChan) import Data.Data import Data.Function @@ -75,7 +80,7 @@ sendForNeuron (Nerve _ (Axon chan)) i = writeChan chan i getForNeuron :: Nerve from fromConductivity for forConductivity -> IO for getForNeuron (Nerve _ (Axon chan)) = readChan chan -getForNeuron (Nerve _ NoAxon) = newEmptyMVar >>= takeMVar +getForNeuron (Nerve _ NoAxon) = waitForException maybeGetForNeuron :: Nerve from fromConductivity for forConductivity -> IO (Maybe for) maybeGetForNeuron (Nerve _ (Axon chan)) = maybeReadChan chan @@ -121,10 +126,6 @@ mkNeuronMapOnRandomCapability = do c <- randomRIO (1, numCapabilities) return $ NeuronMapOnCapability c -type NeuronDissolved = MVar () -type NeuronId = ThreadId - --- TODO: Move dissolved MVar handling into divideNeuron divideNeuron :: Neuron n => NeuronOptions n -> IO () -> IO NeuronId divideNeuron options a = fork a where fork = case getNeuronMapCapability options of @@ -133,19 +134,12 @@ divideNeuron options a = fork a deriving instance Typeable1 (NeuronFromImpulse) deriving instance Typeable1 (NeuronForImpulse) -deriving instance Typeable1 (LiveNeuron) deriving instance Typeable1 (NeuronOptions) class (Typeable n, Impulse (NeuronFromImpulse n), Impulse (NeuronForImpulse n)) => Neuron n where - data LiveNeuron n data NeuronFromImpulse n data NeuronForImpulse n data NeuronOptions n - - -- TODO: Once defaults for associated type synonyms are implemented change to that, if possible - mkLiveNeuron :: NeuronDissolved -> NeuronId -> LiveNeuron n - getNeuronDissolved :: LiveNeuron n -> NeuronDissolved - getNeuronId :: LiveNeuron n -> NeuronId mkDefaultOptions :: IO (NeuronOptions n) @@ -155,8 +149,7 @@ class (Typeable n, Impulse (NeuronFromImpulse n), Impulse (NeuronForImpulse n)) dissolve :: n -> IO () live :: Nerve (NeuronFromImpulse n) fromConductivity (NeuronForImpulse n) forConductivity -> n -> IO () - attach :: (NeuronOptions n -> NeuronOptions n) -> Nerve (NeuronFromImpulse n) fromConductivity (NeuronForImpulse n) forConductivity -> IO (LiveNeuron n) - detach :: LiveNeuron n -> IO () + attach :: (NeuronOptions n -> NeuronOptions n) -> Nerve (NeuronFromImpulse n) fromConductivity (NeuronForImpulse n) forConductivity -> IO LiveNeuron mkDefaultOptions = return undefined @@ -164,24 +157,22 @@ class (Typeable n, Impulse (NeuronFromImpulse n), Impulse (NeuronForImpulse n)) grow _ = return undefined dissolve _ = return () - live _ _ = newEmptyMVar >>= takeMVar + live _ _ = waitForException - -- TODO: Move default implementation out of the class so that it can be reused/wrapped around in some other class instance definition - attach optionsSetter nerve = do - currentThread <- myThreadId - dissolved <- newEmptyMVar - defOptions <- mkDefaultOptions - let options = optionsSetter defOptions - sequel = putMVar dissolved () - run = do - bracket (grow options) dissolve (unblock . live nerve) `catches` [ - Handler (\(_ :: DissolveException) -> return ()), -- we ignore DissolveException - Handler (throwTo currentThread :: SomeException -> IO ()) - ] `onException` sequel -- TODO: Change to finally in GHC 7.0 - sequel - nid <- block $ divideNeuron options run -- TODO: Change block to nonInterruptibleMask? Or remove? Or move to divideNeuron? - return $ mkLiveNeuron dissolved nid - detach = dissolveNeuron + attach = attach' + +attach' :: Neuron n => (NeuronOptions n -> NeuronOptions n) -> Nerve (NeuronFromImpulse n) fromConductivity (NeuronForImpulse n) forConductivity -> IO LiveNeuron +attach' optionsSetter nerve = mask $ \restore -> do + currentThread <- myThreadId + dissolved <- newEmptyMVar + defOptions <- mkDefaultOptions + let options = optionsSetter defOptions + nid <- divideNeuron options $ do + bracket (grow options) dissolve (restore . live nerve) `catches` [ -- TODO: Should be dissolve wrapped in uninterruptibleMask + Handler (\(_ :: DissolveException) -> return ()), -- we ignore DissolveException + Handler (\(e :: SomeException) -> uninterruptible $ throwTo currentThread e) + ] `finally` (uninterruptible $ putMVar dissolved ()) + return $ LiveNeuron dissolved nid data DissolvingException = DissolvingException String deriving (Show, Typeable) @@ -194,8 +185,24 @@ data DissolveException = DissolveException deriving (Show, Typeable) instance Exception DissolveException -dissolveNeuron :: Neuron n => LiveNeuron n -> IO () -dissolveNeuron n = throwTo (getNeuronId n) DissolveException +detach :: LiveNeuron -> IO () +detach (LiveNeuron _ neuronId) = mask_ . uninterruptible $ throwTo neuronId DissolveException + +detachAndWait :: LiveNeuron -> IO () +detachAndWait n = detachManyAndWait [n] + +detachMany :: [LiveNeuron] -> IO () +detachMany = mask_ . mapM_ detach + +detachManyAndWait :: [LiveNeuron] -> IO () +detachManyAndWait neurons = mask_ $ do + detachMany neurons + mapM_ (\(LiveNeuron d _) -> uninterruptible $ takeMVar d) neurons + +-- Some operations are interruptible, better than to make them uninterruptible (which can cause deadlocks) we simply retry interrupted operation +-- For this to really work all interruptible operations should be wrapped like this (so it is not good idea to use IO operations in such code sections) +uninterruptible :: IO a -> IO a +uninterruptible a = mask_ $ a `catch` (\(_ :: SomeException) -> uninterruptible a) class (Impulse i, Impulse j) => ImpulseTranslator i j where translate :: i -> [j] @@ -209,7 +216,7 @@ prepareEnvironment = do mainThreadId <- myThreadId - -- TODO: User interrupt sometimes hangs dissolving + -- TODO: User interrupt sometimes hangs dissolving (does it still in GHC 7.0?) _ <- installHandler keyboardSignal (Catch (throwTo mainThreadId UserInterrupt)) Nothing -- sigINT _ <- installHandler softwareTermination (Catch (throwTo mainThreadId UserInterrupt)) Nothing -- sigTERM diff --git a/lib/Control/Etage/Incubator.hs b/lib/Control/Etage/Incubator.hs index f422f23..3084706 100644 --- a/lib/Control/Etage/Incubator.hs +++ b/lib/Control/Etage/Incubator.hs @@ -8,13 +8,12 @@ module Control.Etage.Incubator ( NerveNone, NerveOnlyFrom, NerveOnlyFor, - Incubation + Incubation, + -- * Internals + growNerve ) where -import Prelude hiding (catch) - import Control.Applicative -import Control.Concurrent hiding (Chan, newChan, dupChan) import Control.Exception import Control.Monad import Control.Monad.Operational @@ -39,19 +38,20 @@ newtype Incubation a = Incubation (Incubation' a) deriving (Monad, MonadIO, Appl incubate :: Incubation () -> IO () incubate (Incubation program) = mask $ \restore -> do (neurons, chans, attached) <- restore $ interpret [] [] [] program - let na = nub chans \\ nub attached - typ = unlines . map (\(ChanBox c) -> show $ neuronTypeOf c) $ na - unless (null na) $ hPutStrLn stderr $ "Warning: It seems not all created nerves were attached. This causes a memory leak as produced data is not consumed. You should probably just define those nerves as NerveOnlyFor or NerveNone. Dangling nerves for neurons:\n" ++ typ - waitForDissolve neurons + (flip finally) (detachManyAndWait neurons) $ do + let na = nub chans \\ nub attached + typ = unlines . map (\(ChanBox c) -> show $ neuronTypeOf c) $ na + unless (null na) $ hPutStrLn stderr $ "Warning: It seems not all created nerves were attached. This causes a memory leak as produced data is not consumed. You should probably just define those nerves as NerveOnlyFor or NerveNone. Dangling nerves for neurons:\n" ++ typ + restore waitForException -interpret :: [Living] -> [ChanBox] -> [ChanBox] -> Incubation' () -> IO ([Living], [ChanBox], [ChanBox]) +interpret :: [LiveNeuron] -> [ChanBox] -> [ChanBox] -> Incubation' () -> IO ([LiveNeuron], [ChanBox], [ChanBox]) interpret neurons chans attached = viewT >=> (eval neurons chans attached) - where eval :: [Living] -> [ChanBox] -> [ChanBox] -> ProgramViewT IncubationOperation IO () -> IO ([Living], [ChanBox], [ChanBox]) + where eval :: [LiveNeuron] -> [ChanBox] -> [ChanBox] -> ProgramViewT IncubationOperation IO () -> IO ([LiveNeuron], [ChanBox], [ChanBox]) eval ns cs ats (Return _) = return (ns, cs, ats) eval ns cs ats (NeuronOperation optionsSetter :>>= is) = do nerve <- liftIO $ growNerve let c = getFromChan nerve - bracketOnError (attach optionsSetter nerve) detach $ \n -> (interpret ((Living n):ns) (c ++ cs) ats) . is $ nerve + bracketOnError (attach optionsSetter nerve) detach $ \n -> (interpret (n:ns) (c ++ cs) ats) . is $ nerve eval ns cs ats (AttachOperation from for :>>= is) = do let c = head . getFromChan $ from -- we know there exists from chan as type checking assures that (from is conductive) (from', ats') <- if c `notElem` ats @@ -108,21 +108,3 @@ dupNerve :: Nerve from AxonConductive for forConductivity -> IO (Nerve from Axon dupNerve (Nerve (Axon c) for) = do c' <- dupChan c return $ Nerve (Axon c') for - -data Living where - Living :: Neuron n => LiveNeuron n -> Living - --- Blocks thread until an exception arrives and cleans-up afterwards, waiting for all threads to finish --- Should have MVar computations wrapped in uninterruptible and should not use any IO (because all this can be interrupted despite block) -waitForDissolve :: [Living] -> IO () -waitForDissolve neurons = block $ do -- TODO: Change block to nonInterruptibleMask? Or remove? - _ <- (newEmptyMVar >>= takeMVar) `finally` do - -- TODO: Should also takeMVar go into detach? Or is better to first send exceptions and then wait? - mapM_ (\(Living l) -> uninterruptible $ detach l) neurons - mapM_ (\(Living l) -> uninterruptible $ takeMVar . getNeuronDissolved $ l) neurons - return () - --- TODO: Remove with GHC 7.0 and masks? --- Big hack to prevent interruption: it simply retries interrupted computation -uninterruptible :: IO a -> IO a -uninterruptible a = block $ a `catch` (\(_ :: SomeException) -> uninterruptible a) diff --git a/lib/Control/Etage/Internals.hs b/lib/Control/Etage/Internals.hs index 0322c91..de6ac3e 100644 --- a/lib/Control/Etage/Internals.hs +++ b/lib/Control/Etage/Internals.hs @@ -4,12 +4,17 @@ module Control.Etage.Internals ( Axon(..), Nerve(..), Impulse(..), + LiveNeuron(..), ImpulseValue, ImpulseTime, AxonConductive, - AxonNonConductive + AxonNonConductive, + NeuronDissolved, + NeuronId, + waitForException ) where +import Control.Concurrent hiding (Chan) import Data.Time.Clock.POSIX import Data.Typeable import Numeric @@ -35,8 +40,8 @@ class (Show i, Typeable i) => Impulse i where impulseTime :: i -> ImpulseTime impulseValue :: i -> ImpulseValue -data AxonConductive deriving Typeable -data AxonNonConductive deriving Typeable +data AxonConductive deriving (Typeable) +data AxonNonConductive deriving (Typeable) data Axon impulse conductivity where Axon :: Impulse i => Chan i -> Axon i AxonConductive @@ -44,3 +49,22 @@ data Axon impulse conductivity where data Nerve from fromConductivity for forConductivity where Nerve :: (Impulse from, Impulse for) => Axon from fromConductivity -> Axon for forConductivity -> Nerve from fromConductivity for forConductivity + +deriving instance Typeable4 Nerve + +instance (Typeable forConductivity, Typeable fromConductivity, Typeable from, Typeable for) => Show (Nerve from fromConductivity for forConductivity) where + show = show . typeOf + +type NeuronDissolved = MVar () +type NeuronId = ThreadId + +instance Show NeuronDissolved where + show = show . typeOf + +data LiveNeuron = LiveNeuron NeuronDissolved NeuronId deriving (Eq, Typeable) + +instance Show LiveNeuron where + show = show . typeOf + +waitForException :: IO a +waitForException = newEmptyMVar >>= takeMVar diff --git a/lib/Control/Etage/Propagate.hs b/lib/Control/Etage/Propagate.hs index 22b3f2f..272f373 100644 --- a/lib/Control/Etage/Propagate.hs +++ b/lib/Control/Etage/Propagate.hs @@ -14,7 +14,6 @@ import Control.Etage.Externals data (Typeable from, Typeable for, Typeable forConductivity) => PropagateNeuron from for forConductivity = PropagateNeuron (PropagateOptions from for forConductivity) deriving (Typeable) -type LivePropagateNeuron from for forConductivity = LiveNeuron (PropagateNeuron from for forConductivity) type PropagateFromImpulse from for forConductivity = NeuronFromImpulse (PropagateNeuron from for forConductivity) type PropagateForImpulse from for forConductivity = NeuronForImpulse (PropagateNeuron from for forConductivity) type PropagateOptions from for forConductivity = NeuronOptions (PropagateNeuron from for forConductivity) @@ -40,7 +39,6 @@ deriving instance Show (PropagateForImpulse from for forConductivity) Internal 'Neuron' which implements 'propagate'. -} instance (Typeable from, Typeable for, Typeable forConductivity) => Neuron (PropagateNeuron from for forConductivity) where - data LiveNeuron (PropagateNeuron from for forConductivity) = LivePropagateNeuron NeuronDissolved NeuronId data NeuronFromImpulse (PropagateNeuron from for forConductivity) data NeuronForImpulse (PropagateNeuron from for forConductivity) data NeuronOptions (PropagateNeuron from for forConductivity) = PropagateOptions { @@ -48,11 +46,10 @@ instance (Typeable from, Typeable for, Typeable forConductivity) => Neuron (Prop for ::[Translatable from] } - mkLiveNeuron = LivePropagateNeuron - getNeuronDissolved (LivePropagateNeuron dissolved _) = dissolved - getNeuronId (LivePropagateNeuron _ nid) = nid - - mkDefaultOptions = return PropagateOptions { from = undefined, for = undefined } + mkDefaultOptions = return PropagateOptions { + from = undefined, + for = undefined + } grow options = return $ PropagateNeuron options @@ -63,5 +60,5 @@ instance (Typeable from, Typeable for, Typeable forConductivity) => Neuron (Prop propagate :: forall from for forConductivity. (Typeable from, Typeable for, Typeable forConductivity) => Nerve from AxonConductive for forConductivity -> [Translatable from] -> IO () propagate from for = do -- we do not manage this neuron, it will be cleaned by RTS at program exit - _ <- attach (\o -> o { from, for }) undefined :: IO (LivePropagateNeuron from for forConductivity) + _ <- attach (\o -> o { from, for } :: NeuronOptions (PropagateNeuron from for forConductivity)) undefined return () diff --git a/lib/Control/Etage/Sequence.hs b/lib/Control/Etage/Sequence.hs index 57e74ff..c6112b5 100644 --- a/lib/Control/Etage/Sequence.hs +++ b/lib/Control/Etage/Sequence.hs @@ -37,7 +37,6 @@ instance (Real r, Random r, Show r, Typeable r) => Impulse (SequenceForImpulse r deriving instance Show (SequenceForImpulse r) instance (Real r, Random r, Show r, Typeable r) => Neuron (SequenceNeuron r) where - data LiveNeuron (SequenceNeuron r) = LiveSequenceNeuron NeuronDissolved NeuronId data NeuronFromImpulse (SequenceNeuron r) = Value { impulseTimestamp :: ImpulseTime, -- time is first so that ordering is first by time value :: r @@ -48,10 +47,6 @@ instance (Real r, Random r, Show r, Typeable r) => Neuron (SequenceNeuron r) whe intervalSource :: [Int] -- microseconds } deriving (Eq, Ord, Read, Show) - mkLiveNeuron = LiveSequenceNeuron - getNeuronDissolved (LiveSequenceNeuron dissolved _) = dissolved - getNeuronId (LiveSequenceNeuron _ nid) = nid - mkDefaultOptions = do generator <- newStdGen generator' <- newStdGen diff --git a/lib/Control/Etage/Worker.hs b/lib/Control/Etage/Worker.hs index 10ef06d..caf4b72 100644 --- a/lib/Control/Etage/Worker.hs +++ b/lib/Control/Etage/Worker.hs @@ -20,7 +20,7 @@ import Control.Etage type WorkType = IO () instance Show WorkType where - show _ = "Work" + show = show . typeOf data WorkerNeuron deriving (Typeable) @@ -39,7 +39,6 @@ instance Impulse WorkerForImpulse where deriving instance Show WorkerFromImpulse instance Neuron WorkerNeuron where - data LiveNeuron WorkerNeuron = LiveWorkerNeuron NeuronDissolved NeuronId data NeuronFromImpulse WorkerNeuron data NeuronForImpulse WorkerNeuron = Work { impulseTimestamp :: ImpulseTime, @@ -49,10 +48,6 @@ instance Neuron WorkerNeuron where mapOnCapability :: NeuronMapCapability } deriving (Eq, Ord, Read, Show) - mkLiveNeuron = LiveWorkerNeuron - getNeuronDissolved (LiveWorkerNeuron dissolved _) = dissolved - getNeuronId (LiveWorkerNeuron _ nid) = nid - mkDefaultOptions = do neuronMapCapability <- mkNeuronMapOnRandomCapability return WorkerOptions {