Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

switched to exposing STM to enhance composability

  • Loading branch information...
commit 4e00896f07808e955e32506fef9750477f2cb1c8 1 parent f309928
Paolo Veronelli authored
22 Control/Concurrent/Killable.hs
View
@@ -0,0 +1,22 @@
+{-# LANGUAGE FlexibleInstances #-}
+-- | Ad hoc overload of term kill
+--
+--
+
+
+
+module Control.Concurrent.Killable (Killable (kill)) where
+
+import Prelude hiding (mapM_)
+import Control.Concurrent (killThread, ThreadId)
+import Data.Foldable
+
+-- | Objects that forked threads and can be killed
+class Killable a where
+ kill :: a -> IO ()
+
+instance Killable ThreadId where
+ kill = killThread
+
+instance Foldable a => Killable (a ThreadId) where
+ kill = mapM_ killThread
63 System/Timer/Updatable.hs
View
@@ -1,11 +1,11 @@
-- | An updatable timer is a timer for which it is possible to update the remaining time.
--
--- Any number of threads can be hanged on one timer and they will all continue when the timer rings or when it is destroyed.
+-- Methods are exposed in STM to give composability. IO wrappers for those are exported also.
--
-- Parallel and serial update politics are implemented.
--
-- In the example we start a timer with a time to wait of 10 seconds, hang 2 threads which will wait for it to finish and update
--- it after 5 seconds to wait for 6 seconds.
+-- it after 5 seconds to wait for other 6 seconds.
-- It will complete and run its action and the hanged threads after 11 seconds because of its parallel nature.
-- The serial timer would have ringed after 16 seconds.
--
@@ -15,20 +15,33 @@
-- import Data.Maybe
-- main = do
-- t <- parallel (return 5) $ 10^7
--- forkIO $ wait t >>= print . (+1) . fromJust
--- forkIO $ wait t >>= print . (+2) . fromJust
+-- forkIO $ waitIO t >>= print . (+1) . fromJust
+-- forkIO $ waitIO t >>= print . (+2) . fromJust
-- threadDelay $ 5 * 10 ^ 6
--- renew t $ 6 * 10 ^ 6
--- wait t >>= print . fromJust
+-- renewIO t $ 6 * 10 ^ 6
+-- waitIO t >>= print . fromJust
-- @
-module System.Timer.Updatable (Updatable (..), parallel, serial, Delay) where
+module System.Timer.Updatable
+ (Delay
+ -- * Datatype
+ , Updatable
+ , wait
+ , renew
+ -- * IO wrappers
+ , waitIO
+ , renewIO
+ -- * Builders
+ , parallel
+ , serial
+ ) where
import Data.Maybe
-import Control.Concurrent
-import Control.Monad (when, void)
+import Control.Concurrent (forkIO, threadDelay)
+import Control.Monad (when, void, forever)
import Control.Concurrent.STM
+import Control.Concurrent.Killable
-- | A delay in microseconds
type Delay = Int
@@ -36,17 +49,33 @@ type Delay = Int
-- | Abstract timers that can be updated. Hanging via wait function can be done by any number of threads, which is sinchronization.
data Updatable a = Updatable {
wait :: STM (Maybe a), -- ^ wait until the timer rings, or signal Nothing if timer is destroyed
- renew :: Delay -> IO (), -- ^ update the delay in the timer
- kill :: IO () -- ^ destoy the timer
+ renew :: Delay -> STM (), -- ^ update the delay in the timer
+ _kill :: IO ()
}
+instance Killable (Updatable a) where
+ kill = _kill
+
+-- | Wait in IO
+waitIO :: Updatable a -> IO (Maybe a)
+waitIO = atomically . wait
+
+-- | Renew in IO
+renewIO :: Updatable a -> Delay -> IO ()
+renewIO u = atomically . renew u
+
-- wrap the logic with a framework for signalling the time is over
engine :: IO () -> (Delay -> IO ()) -> IO a -> Delay -> IO (Updatable a)
engine k t w d0 = do
+ deltas <- newTChanIO
x <- newEmptyTMVarIO
t d0
+ z <- forkIO . forever $ atomically (readTChan deltas) >>= t
p <- forkIO $ w >>= atomically . putTMVar x . Just
- return $ Updatable (takeTMVar x >>= \r -> putTMVar x r >> return r) t (k >> killThread p >> atomically (putTMVar x Nothing))
+ return $ Updatable
+ (takeTMVar x >>= \r -> putTMVar x r >> return r)
+ (writeTChan deltas)
+ (k >> kill [p,z] >> atomically (putTMVar x Nothing))
-- | Create and start a parallel updatable timer. This timer renew actions will start parallel timers. Last timer that is over will compute the given action.
parallel :: IO a -- ^ the action to run when timer rings
@@ -64,7 +93,7 @@ parallel a d0 = do
z <- readTVar tz
when (z > 0) retry
a
- k = atomically (readTVar tp) >>= mapM_ killThread
+ k = atomically (readTVar tp) >>= kill
engine k t w d0
-- | Create and start a serial updatable timer. This timer renew action will schedule new timer after the running one. The timer will run the given action after the sum of all scheduled times is over.
@@ -84,9 +113,9 @@ serial a d0 = do
main = do
t <- parallel (return 5) $ 10^7
- forkIO $ atomically (wait t) >>= print . (+1) . fromJust
- forkIO $ atomically (wait t) >>= print . (+2) . fromJust
+ forkIO $ waitIO t >>= print . (+1) . fromJust
+ forkIO $ waitIO t >>= print . (+2) . fromJust
threadDelay $ 5 * 10 ^ 6
- renew t $ 6 * 10 ^ 6
- atomically (wait t) >>= print . fromJust
+ renewIO t $ 6 * 10 ^ 6
+ waitIO t >>= print . fromJust
3  timers-updatable.cabal
View
@@ -6,7 +6,7 @@ Name: timers-updatable
-- The package version. See the Haskell package versioning policy
-- (http://www.haskell.org/haskellwiki/Package_versioning_policy) for
-- standards guiding when and how versions should be incremented.
-Version: 0.1
+Version: 0.2
-- A short (one-line) description of the package.
Synopsis: timers which are updatable in the remaining time
@@ -49,6 +49,7 @@ Library
-- Modules exported by the library.
Exposed-modules:
System.Timer.Updatable
+ Control.Concurrent.Killable
-- Packages needed in order to build this package.
Build-depends:
Please sign in to comment.
Something went wrong with that request. Please try again.