From ceacce9d33619af37cebd12d18b434110a6aa8d7 Mon Sep 17 00:00:00 2001 From: Yoichi Hirai Date: Sat, 28 May 2011 23:37:49 +0900 Subject: [PATCH] separate job and thread management --- doc/sample/participating.hs | 50 ++++++++++++++++++++ doc/sample/simple.hs | 6 ++- src/Control/Concurrent/Waitfree.hs | 76 +++++++++++++++++------------- 3 files changed, 98 insertions(+), 34 deletions(-) create mode 100644 doc/sample/participating.hs diff --git a/doc/sample/participating.hs b/doc/sample/participating.hs new file mode 100644 index 0000000..2872ccf --- /dev/null +++ b/doc/sample/participating.hs @@ -0,0 +1,50 @@ +{-# LANGUAGE TypeOperators #-} + +import Control.Concurrent.Waitfree +import Control.Concurrent (threadDelay) +import System.Random (randomRIO) + +-- Can everything be parametric to this type? +type FirstT = SucT ZeroT + +ownID :: Thread t => t -> IO (t, t) +ownID _ = do + wait <- randomRIO (0, 20000) + threadDelay wait + return (t, t) + +hOwnId :: Thread t => Hyp (K t (t, t) :*: HNil) +hOwnId = single ownID + +putWithName :: Thread t => t -> String -> IO () +putWithName th content = putStrLn $ "Thread " ++ (show $ atid th) ++ ": " ++ content + +putResult :: Thread t => t -> String -> IO ThreadStatus +putResult th str = do + putWithName th $ "obtained " ++ str + return Finished + +putOneResult :: (Thread t, Thread s) => s -> t -> IO ThreadStatus +putOneResult owner content = putResult owner $ show $ [atid content] + +two :: (Thread s, Thread t) => Hyp (K s (s, t) :*: (K t (t, s) :*: HNil)) +two = comm hOwnId putOneResult hOwnId putOneResult + +putTwoResults :: (Thread s, Thread t, Thread u) => u -> (s,t) -> IO ThreadStatus +putTwoResults owner (c0, c1) = putResult owner $ show $ [atid c0, atid c1] + +twoFin :: Hyp (HCons (K ZeroT ThreadStatus) (HCons (K FirstT ThreadStatus) HNil)) +twoFin = two >>= (putTwoResults -*- putTwoResults -*- return) + +duplicateTwo :: t -> a -> IO (a,a) +duplicateTwo _ x = return (x,x) + +twoBeforeComm :: (Thread s, Thread t) => + Hyp (HCons + (K s ((s, t), (s, t))) + (HCons (K t ((t, s), (t, s))) HNil)) +twoBeforeComm = two >>= (duplicateTwo -*- duplicateTwo -*- return) + +main :: IO () +main = execute twoFin + diff --git a/doc/sample/simple.hs b/doc/sample/simple.hs index a10e1f9..5d134be 100644 --- a/doc/sample/simple.hs +++ b/doc/sample/simple.hs @@ -35,8 +35,10 @@ twoPrints :: HCons (K ZeroT ((Handle, String), String)) -> Hyp (HCons (K ZeroT ()) (HCons (K (SucT ZeroT) ()) HNil)) twoPrints = printTaken -*- printTaken -*- return -rerror :: Thread t => t -> (Handle, a) -> IO () -rerror th (h, _) = hPutStrLn h $ "Thread " ++ (show $ atid th) ++ " failed to read peer's input." +rerror :: Thread t => t -> (Handle, a) -> IO ThreadStatus +rerror th (h, _) = do + hPutStrLn h $ "Thread " ++ (show $ atid th) ++ " failed to read peer's input." + return Finished content :: Hyp (K ZeroT () :*: (K (SucT ZeroT) () :*: HNil)) content = diff --git a/src/Control/Concurrent/Waitfree.hs b/src/Control/Concurrent/Waitfree.hs index 3de022f..525ba27 100644 --- a/src/Control/Concurrent/Waitfree.hs +++ b/src/Control/Concurrent/Waitfree.hs @@ -14,6 +14,8 @@ module Control.Concurrent.Waitfree , comm , execute , (-*-) + , ThreadStatus (TryAnotherJob, Finished) + , JobStatus (Having, Done) ) where @@ -43,10 +45,15 @@ instance Thread t => Thread (SucT t) where -- | Each 'Thread' type has 'AbstractThreadId' type AbstractThreadId = Int - + +-- | ThreadStatus shows whether a thread is finished or have to try executing another job. +data ThreadStatus = TryAnotherJob | Finished + +-- | JobStatus shows whether a job is finished or not. +data JobStatus a = Having a | Done ThreadStatus -- | A value of type 'K t a' represents a remote computation returning 'a' that is performed by a thread 't'. -newtype K t a = K (t, IO (Maybe a)) +newtype K t a = K (t, IO (JobStatus a)) --- --- Hypersequent @@ -102,7 +109,7 @@ cappy f (MakeHyp x) = MakeHyp $ do (newls, newlx) <- y return (ls ++ newls, newlx) -remote :: Thread t => IO (Maybe a) -> K t a +remote :: Thread t => IO (JobStatus a) -> K t a remote y = K (t, y) -- | 'single' creates a Hyp hypersequent consisting of a single remote computation. @@ -111,7 +118,7 @@ single f = MakeHyp $ return $ ([], HCons (remote f') HNil) where f' = do x <- f t - return $ Just x + return $ Having x infixr 4 -*- @@ -121,14 +128,14 @@ infixr 4 -*- HCons (K t a) l -> Hyp (HCons (K t b) l') (-*-) hdf = progress_ (extend $ peek $ lmaybe hdf) -lmaybe :: (t -> a -> IO b) -> (t -> Maybe a -> IO (Maybe b)) -lmaybe _ _ Nothing = return Nothing -lmaybe f th (Just x) = do +lmaybe :: (t -> a -> IO b) -> (t -> JobStatus a -> IO (JobStatus b)) +lmaybe _ _ (Done x) = return (Done x) +lmaybe f th (Having x) = do y <- f th x - return $ Just y + return $ Having y -- | 'peek' allows to look at the result of a remote computation -peek :: Thread t => (t -> (Maybe a) -> IO b) -> K t a -> IO b +peek :: Thread t => (t -> (JobStatus a) -> IO b) -> K t a -> IO b peek f (K (th, content)) = do content >>= f th @@ -136,9 +143,9 @@ peek f (K (th, content)) = do -- | 'comm hypersequent1 error1 hypersequent2 error2' where 'error1' and 'error2' specifies what to do in case of read failure. comm :: (Thread s, Thread t, HAppend l l' l'') => Hyp (HCons (K t (b,a)) l) - -> (t -> b -> IO ()) + -> (t -> b -> IO ThreadStatus) -> Hyp (HCons (K s (d,c)) l') - -> (s -> d -> IO ()) + -> (s -> d -> IO ThreadStatus) -> Hyp (K t (b, c) :*: (K s (d, a) :*: l'')) comm (MakeHyp x) terror (MakeHyp y) serror = MakeHyp $ do (s0, HCons (K (taT, ta)) l) <- x @@ -151,8 +158,8 @@ comm (MakeHyp x) terror (MakeHyp y) serror = MakeHyp $ do -- internal implementation of comm comm_ :: Thread t => Thread s => HAppend l l' l'' => MVar a -> MVar c -> IORef (Maybe b) -> IORef (Maybe d) -> - ([L], ((K t (b,a)) :*: l)) -> (t -> b -> IO ()) -> - ([L], ((K s (d,c)) :*: l')) -> (s -> d -> IO ()) -> + ([L], ((K t (b,a)) :*: l)) -> (t -> b -> IO ThreadStatus) -> + ([L], ((K s (d,c)) :*: l')) -> (s -> d -> IO ThreadStatus) -> ([L], (K t (b,c)) :*: (K s (d,a) ):*: l'') comm_ abox cbox bbox dbox (s0, HCons (K (taT, tba)) l) terror (s1, HCons (K (scT, sdc)) l') serror = (news, HCons (K (taT, tbc)) (HCons (K (scT, sda)) (hAppend l l'))) @@ -164,13 +171,13 @@ comm_ abox cbox bbox dbox (s0, HCons (K (taT, tba)) l) terror (s1, HCons (K (scT maybetb <- readIORef bbox case maybetb of Just tb -> do - terror taT tb - return Nothing + terror_result <- terror taT tb + return $ Done terror_result Nothing -> error "this should not happen" Just cva -> do maybetb <- readIORef bbox case maybetb of - Just tb -> return $ Just (tb, cva) + Just tb -> return $ Having (tb, cva) Nothing -> error "this should not happen" sda = do aval <- tryTakeMVar abox @@ -179,31 +186,36 @@ comm_ abox cbox bbox dbox (s0, HCons (K (taT, tba)) l) terror (s1, HCons (K (scT maybesd <- readIORef dbox case maybesd of Just sd -> do - serror scT sd - return Nothing + serror_result <- serror scT sd + return $ Done serror_result Nothing -> error "this should not happen" Just ava -> do maybesd <- readIORef dbox case maybesd of Nothing -> error "this should not happen" - Just sd -> return $ Just (sd, ava) + Just sd -> return $ Having (sd, ava) news = s0 ++ s1 ++ [ta_task] ++ [sc_task] ta_task = (atid taT, do maybeba <- tba case maybeba of - Nothing -> writeMVar abox Nothing - Just (tb, ta) -> do + Done thStatus -> do writeMVar abox Nothing + return thStatus + Having (tb, ta) -> do writeMVar abox $ Just ta writeIORef bbox $ Just tb + return TryAnotherJob ) sc_task = (atid scT, do maybedc <- sdc case maybedc of - Nothing -> writeMVar cbox Nothing - Just (sd, sc) -> do writeMVar cbox $ Just sc - writeIORef dbox $ Just sd + Done thStatus -> do writeMVar cbox Nothing + return thStatus + Having (sd, sc) -> do + writeMVar cbox $ Just sc + writeIORef dbox $ Just sd + return TryAnotherJob ) @@ -214,7 +226,6 @@ writeMVar box (Just v) = do writeMVar _ Nothing = return () - -- | 'execute' executes a 'Hyp' hypersequent. execute :: Lconvertible l => Hyp l -> IO () execute (MakeHyp ls) = do @@ -223,20 +234,21 @@ execute (MakeHyp ls) = do -- below is internal -extend :: Thread t => (K t a -> IO (Maybe b)) -> K t a -> K t b +extend :: Thread t => (K t a -> IO (JobStatus b)) -> K t a -> K t b extend trans r = K (t, trans r) mute :: Thread t => K t a -> L -mute (K (th, a)) = (atid th, a >>= \_ -> return ()) - +mute (K (th, a)) = (atid th, a >>= \_ -> return Finished) -type JobChannel = [IO ()] +type JobChannel = [IO ThreadStatus] worker :: JobChannel -> MVar () -> IO () worker [] fin = putMVar fin () worker (hd : tl) fin = do - hd - worker tl fin + result <- hd + case result of + TryAnotherJob -> worker tl fin + Finished -> putMVar fin () -- ThreadPool is finite map type ThreadPool = @@ -245,7 +257,7 @@ type ThreadPool = type JobPool = Map.Map AbstractThreadId JobChannel -type L = (AbstractThreadId, IO ()) +type L = (AbstractThreadId, IO ThreadStatus) class Lconvertible l where htol :: l -> [L]