Skip to content

Commit

Permalink
separate job and thread management
Browse files Browse the repository at this point in the history
  • Loading branch information
Yoichi Hirai committed May 28, 2011
1 parent 57378a8 commit ceacce9
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 34 deletions.
50 changes: 50 additions & 0 deletions doc/sample/participating.hs
@@ -0,0 +1,50 @@
{-# LANGUAGE TypeOperators #-}

import Control.Concurrent.Waitfree
import Control.Concurrent (threadDelay)
import System.Random (randomRIO)

-- Can everything be parametric to this type?
type FirstT = SucT ZeroT

ownID :: Thread t => t -> IO (t, t)
ownID _ = do
wait <- randomRIO (0, 20000)
threadDelay wait
return (t, t)

hOwnId :: Thread t => Hyp (K t (t, t) :*: HNil)
hOwnId = single ownID

putWithName :: Thread t => t -> String -> IO ()
putWithName th content = putStrLn $ "Thread " ++ (show $ atid th) ++ ": " ++ content

putResult :: Thread t => t -> String -> IO ThreadStatus
putResult th str = do
putWithName th $ "obtained " ++ str
return Finished

putOneResult :: (Thread t, Thread s) => s -> t -> IO ThreadStatus
putOneResult owner content = putResult owner $ show $ [atid content]

two :: (Thread s, Thread t) => Hyp (K s (s, t) :*: (K t (t, s) :*: HNil))
two = comm hOwnId putOneResult hOwnId putOneResult

putTwoResults :: (Thread s, Thread t, Thread u) => u -> (s,t) -> IO ThreadStatus
putTwoResults owner (c0, c1) = putResult owner $ show $ [atid c0, atid c1]

twoFin :: Hyp (HCons (K ZeroT ThreadStatus) (HCons (K FirstT ThreadStatus) HNil))
twoFin = two >>= (putTwoResults -*- putTwoResults -*- return)

duplicateTwo :: t -> a -> IO (a,a)
duplicateTwo _ x = return (x,x)

twoBeforeComm :: (Thread s, Thread t) =>
Hyp (HCons
(K s ((s, t), (s, t)))
(HCons (K t ((t, s), (t, s))) HNil))
twoBeforeComm = two >>= (duplicateTwo -*- duplicateTwo -*- return)

main :: IO ()
main = execute twoFin

6 changes: 4 additions & 2 deletions doc/sample/simple.hs
Expand Up @@ -35,8 +35,10 @@ twoPrints :: HCons (K ZeroT ((Handle, String), String))
-> Hyp (HCons (K ZeroT ()) (HCons (K (SucT ZeroT) ()) HNil))
twoPrints = printTaken -*- printTaken -*- return

rerror :: Thread t => t -> (Handle, a) -> IO ()
rerror th (h, _) = hPutStrLn h $ "Thread " ++ (show $ atid th) ++ " failed to read peer's input."
rerror :: Thread t => t -> (Handle, a) -> IO ThreadStatus
rerror th (h, _) = do
hPutStrLn h $ "Thread " ++ (show $ atid th) ++ " failed to read peer's input."
return Finished

content :: Hyp (K ZeroT () :*: (K (SucT ZeroT) () :*: HNil))
content =
Expand Down
76 changes: 44 additions & 32 deletions src/Control/Concurrent/Waitfree.hs
Expand Up @@ -14,6 +14,8 @@ module Control.Concurrent.Waitfree
, comm
, execute
, (-*-)
, ThreadStatus (TryAnotherJob, Finished)
, JobStatus (Having, Done)
)
where

Expand Down Expand Up @@ -43,10 +45,15 @@ instance Thread t => Thread (SucT t) where

-- | Each 'Thread' type has 'AbstractThreadId'
type AbstractThreadId = Int


-- | ThreadStatus shows whether a thread is finished or have to try executing another job.
data ThreadStatus = TryAnotherJob | Finished

-- | JobStatus shows whether a job is finished or not.
data JobStatus a = Having a | Done ThreadStatus

-- | A value of type 'K t a' represents a remote computation returning 'a' that is performed by a thread 't'.
newtype K t a = K (t, IO (Maybe a))
newtype K t a = K (t, IO (JobStatus a))

---
--- Hypersequent
Expand Down Expand Up @@ -102,7 +109,7 @@ cappy f (MakeHyp x) = MakeHyp $ do
(newls, newlx) <- y
return (ls ++ newls, newlx)

remote :: Thread t => IO (Maybe a) -> K t a
remote :: Thread t => IO (JobStatus a) -> K t a
remote y = K (t, y)

-- | 'single' creates a Hyp hypersequent consisting of a single remote computation.
Expand All @@ -111,7 +118,7 @@ single f = MakeHyp $ return $ ([], HCons (remote f') HNil)
where
f' = do
x <- f t
return $ Just x
return $ Having x

infixr 4 -*-

Expand All @@ -121,24 +128,24 @@ infixr 4 -*-
HCons (K t a) l -> Hyp (HCons (K t b) l')
(-*-) hdf = progress_ (extend $ peek $ lmaybe hdf)

lmaybe :: (t -> a -> IO b) -> (t -> Maybe a -> IO (Maybe b))
lmaybe _ _ Nothing = return Nothing
lmaybe f th (Just x) = do
lmaybe :: (t -> a -> IO b) -> (t -> JobStatus a -> IO (JobStatus b))
lmaybe _ _ (Done x) = return (Done x)
lmaybe f th (Having x) = do
y <- f th x
return $ Just y
return $ Having y

-- | 'peek' allows to look at the result of a remote computation
peek :: Thread t => (t -> (Maybe a) -> IO b) -> K t a -> IO b
peek :: Thread t => (t -> (JobStatus a) -> IO b) -> K t a -> IO b
peek f (K (th, content)) = do
content >>= f th

-- | 'comm' stands for communication. 'comm' combines two hypersequents with a communicating component from each hypersequent.
-- | 'comm hypersequent1 error1 hypersequent2 error2' where 'error1' and 'error2' specifies what to do in case of read failure.
comm :: (Thread s, Thread t, HAppend l l' l'') =>
Hyp (HCons (K t (b,a)) l)
-> (t -> b -> IO ())
-> (t -> b -> IO ThreadStatus)
-> Hyp (HCons (K s (d,c)) l')
-> (s -> d -> IO ())
-> (s -> d -> IO ThreadStatus)
-> Hyp (K t (b, c) :*: (K s (d, a) :*: l''))
comm (MakeHyp x) terror (MakeHyp y) serror = MakeHyp $ do
(s0, HCons (K (taT, ta)) l) <- x
Expand All @@ -151,8 +158,8 @@ comm (MakeHyp x) terror (MakeHyp y) serror = MakeHyp $ do

-- internal implementation of comm
comm_ :: Thread t => Thread s => HAppend l l' l'' => MVar a -> MVar c -> IORef (Maybe b) -> IORef (Maybe d) ->
([L], ((K t (b,a)) :*: l)) -> (t -> b -> IO ()) ->
([L], ((K s (d,c)) :*: l')) -> (s -> d -> IO ()) ->
([L], ((K t (b,a)) :*: l)) -> (t -> b -> IO ThreadStatus) ->
([L], ((K s (d,c)) :*: l')) -> (s -> d -> IO ThreadStatus) ->
([L], (K t (b,c)) :*: (K s (d,a) ):*: l'')
comm_ abox cbox bbox dbox (s0, HCons (K (taT, tba)) l) terror (s1, HCons (K (scT, sdc)) l') serror =
(news, HCons (K (taT, tbc)) (HCons (K (scT, sda)) (hAppend l l')))
Expand All @@ -164,13 +171,13 @@ comm_ abox cbox bbox dbox (s0, HCons (K (taT, tba)) l) terror (s1, HCons (K (scT
maybetb <- readIORef bbox
case maybetb of
Just tb -> do
terror taT tb
return Nothing
terror_result <- terror taT tb
return $ Done terror_result
Nothing -> error "this should not happen"
Just cva -> do
maybetb <- readIORef bbox
case maybetb of
Just tb -> return $ Just (tb, cva)
Just tb -> return $ Having (tb, cva)
Nothing -> error "this should not happen"
sda = do
aval <- tryTakeMVar abox
Expand All @@ -179,31 +186,36 @@ comm_ abox cbox bbox dbox (s0, HCons (K (taT, tba)) l) terror (s1, HCons (K (scT
maybesd <- readIORef dbox
case maybesd of
Just sd -> do
serror scT sd
return Nothing
serror_result <- serror scT sd
return $ Done serror_result
Nothing -> error "this should not happen"
Just ava -> do
maybesd <- readIORef dbox
case maybesd of
Nothing -> error "this should not happen"
Just sd -> return $ Just (sd, ava)
Just sd -> return $ Having (sd, ava)
news = s0 ++ s1 ++ [ta_task] ++ [sc_task]
ta_task = (atid taT,
do
maybeba <- tba
case maybeba of
Nothing -> writeMVar abox Nothing
Just (tb, ta) -> do
Done thStatus -> do writeMVar abox Nothing
return thStatus
Having (tb, ta) -> do
writeMVar abox $ Just ta
writeIORef bbox $ Just tb
return TryAnotherJob
)
sc_task = (atid scT,
do
maybedc <- sdc
case maybedc of
Nothing -> writeMVar cbox Nothing
Just (sd, sc) -> do writeMVar cbox $ Just sc
writeIORef dbox $ Just sd
Done thStatus -> do writeMVar cbox Nothing
return thStatus
Having (sd, sc) -> do
writeMVar cbox $ Just sc
writeIORef dbox $ Just sd
return TryAnotherJob
)


Expand All @@ -214,7 +226,6 @@ writeMVar box (Just v) = do
writeMVar _ Nothing = return ()



-- | 'execute' executes a 'Hyp' hypersequent.
execute :: Lconvertible l => Hyp l -> IO ()
execute (MakeHyp ls) = do
Expand All @@ -223,20 +234,21 @@ execute (MakeHyp ls) = do

-- below is internal

extend :: Thread t => (K t a -> IO (Maybe b)) -> K t a -> K t b
extend :: Thread t => (K t a -> IO (JobStatus b)) -> K t a -> K t b
extend trans r = K (t, trans r)

mute :: Thread t => K t a -> L
mute (K (th, a)) = (atid th, a >>= \_ -> return ())

mute (K (th, a)) = (atid th, a >>= \_ -> return Finished)

type JobChannel = [IO ()]
type JobChannel = [IO ThreadStatus]

worker :: JobChannel -> MVar () -> IO ()
worker [] fin = putMVar fin ()
worker (hd : tl) fin = do
hd
worker tl fin
result <- hd
case result of
TryAnotherJob -> worker tl fin
Finished -> putMVar fin ()

-- ThreadPool is finite map
type ThreadPool =
Expand All @@ -245,7 +257,7 @@ type ThreadPool =
type JobPool =
Map.Map AbstractThreadId JobChannel

type L = (AbstractThreadId, IO ())
type L = (AbstractThreadId, IO ThreadStatus)

class Lconvertible l where
htol :: l -> [L]
Expand Down

0 comments on commit ceacce9

Please sign in to comment.