Skip to content
Find file
Fetching contributors…
Cannot retrieve contributors at this time
982 lines (876 sloc) 46.3 KB
{-# LANGUAGE DeriveDataTypeable #-}
-- | This module provides data dependency resolution and
-- fault tolerance via /promises/ (known elsewhere as /futures/).
-- It's implemented in terms of the "Remote.Process" module.
module Remote.Task (
-- * Tasks and promises
TaskM, Promise, PromiseList(..),
runTask,
newPromise, newPromiseAt, newPromiseNear, newPromiseHere, newPromiseAtRole,
toPromise, toPromiseAt, toPromiseNear, toPromiseImm,
readPromise,
-- * MapReduce
MapReduce(..),
mapReduce,
-- * Useful auxilliaries
chunkify,
shuffle,
tsay,
tlogS,
Locality(..),
TaskException(..),
-- * Internals, not for general use
__remoteCallMetaData, serialEncodeA, serialDecodeA
) where
import Remote.Reg (putReg,getEntryByIdent,RemoteCallMetaData)
import Remote.Encoding (serialEncodePure,hGetPayload,hPutPayload,Payload,getPayloadContent,Serializable,serialDecode,serialEncode)
import Remote.Process (roundtripQuery, ServiceException(..), TransmitStatus(..),diffTime,getConfig,Config(..),matchProcessDown,terminate,nullPid,monitorProcess,TransmitException(..),MonitorAction(..),ptry,LogConfig(..),getLogConfig,setNodeLogConfig,nodeFromPid,LogLevel(..),LogTarget(..),logS,getLookup,say,LogSphere,NodeId,ProcessM,ProcessId,PayloadDisposition(..),getSelfPid,getSelfNode,matchUnknownThrow,receiveWait,receiveTimeout,roundtripResponse,roundtripResponseAsync,roundtripQueryImpl,match,makePayloadClosure,spawn,spawnLocal,spawnLocalAnd,setDaemonic,send,makeClosure)
import Remote.Closure (Closure(..))
import Remote.Peer (getPeers)
import Data.Dynamic (Dynamic, toDyn, fromDynamic)
import System.IO (withFile,IOMode(..))
import System.Directory (renameFile)
import Data.Binary (Binary,get,put,putWord8,getWord8)
import Control.Exception (SomeException,Exception,throw)
import Data.Typeable (Typeable)
import Control.Applicative (Applicative(..))
import Control.Monad (liftM,when)
import Control.Monad.Trans (liftIO)
import Control.Concurrent.MVar (MVar,modifyMVar,modifyMVar_,newMVar,newEmptyMVar,takeMVar,putMVar,readMVar,withMVar)
import qualified Data.Map as Map (Map,insert,lookup,empty,insertWith',toList)
import Data.List ((\\),union,nub,groupBy,sortBy,delete)
import Data.Time (UTCTime,getCurrentTime)
-- imports required for hashClosure; is there a lighter-weight of doing this?
import Data.Digest.Pure.MD5 (md5)
import Data.ByteString.Lazy.UTF8 (fromString)
import qualified Data.ByteString.Lazy as B (concat)
----------------------------------------------
-- * Promises and tasks
----------------------------------------------
type PromiseId = Integer
type Hash = String
data PromiseList a = PlChunk a (Promise (PromiseList a))
| PlNil deriving Typeable
instance (Serializable a) => Binary (PromiseList a) where
put (PlChunk a p) = putWord8 0 >> put a >> put p
put PlNil = putWord8 1
get = do w <- getWord8
case w of
0 -> do a <- get
p <- get
return $ PlChunk a p
1 -> return PlNil
-- | The basic data type for expressing data dependence
-- in the 'TaskM' monad. A Promise represents a value that
-- may or may not have been computed yet; thus, it's like
-- a distributed thunk (in the sense of a non-strict unit
-- of evaluation). These are created by 'newPromise' and friends,
-- and the underlying value can be gotten with 'readPromise'.
data Promise a = PromiseBasic { _psRedeemer :: ProcessId, _psId :: PromiseId }
| PromiseImmediate a deriving Typeable
-- psRedeemer should maybe be wrapped in an IORef so that it can be updated in case of node failure
instance (Serializable a) => Binary (Promise a) where
put (PromiseBasic a b) = putWord8 0 >> put a >> put b
put (PromiseImmediate a) = putWord8 1 >> put a
get = do a <- getWord8
case a of
0 -> do b <- get
c <- get
return $ PromiseBasic b c
1 -> do b <- get
return $ PromiseImmediate b
-- | Stores the data produced by a promise, in one of its
-- various forms. If it's currently in memory, we keep it
-- as a payload, to be decoded by its ultimate user (who
-- of course has the static type information), the time
-- it was last touched (so we know when to flush it),
-- and perhaps also a decoded version, so that it doesn't
-- need to be decoded repeatedly: this makes this go a lot
-- faster. If it's been flushed to disk, we keep track of
-- where, and if the promise didn't complete, but threw
-- an exception during its execution, we mark there here
-- as well: the exception will be propagated to
-- dependents.
data PromiseStorage = PromiseInMemory PromiseData UTCTime (Maybe Dynamic)
| PromiseOnDisk FilePath
| PromiseException String
type PromiseData = Payload
{- UNUSED
type TimeStamp = UTCTime
-}
-- | Keeps track of what we know about currently running promises.
-- The closure and locality and provided by the initial call to
-- newPromise, the nodeboss is where it is currently running.
-- We need this info to deal with complaints.
data PromiseRecord = PromiseRecord ProcessId (Closure PromiseData) Locality
data MasterState = MasterState
{
-- | Promise IDs are allocated serially from here
msNextId :: PromiseId,
-- | All currently known nodes, with the role, node ID, and node boss. Updated asychronously by prober thread
msNodes :: MVar [(String,NodeId,ProcessId)],
-- | Given a nodeboss, which promises belong to it. Not sure what this is good for
msAllocation :: Map.Map ProcessId [PromiseId],
-- | Given a promise, what do we know about it. Include its nodeboss, its closure, and its locality preference
msPromises :: Map.Map PromiseId PromiseRecord,
-- | The locality preference of new worker tasks, if not specified otherwise
msDefaultLocality :: Locality
}
data MmNewPromise = MmNewPromise (Closure Payload) Locality Queueing deriving (Typeable)
instance Binary MmNewPromise where
get = do a <- get
l <- get
q <- get
return $ MmNewPromise a l q
put (MmNewPromise a l q) = put a >> put l >> put q
data MmNewPromiseResponse = MmNewPromiseResponse ProcessId PromiseId
| MmNewPromiseResponseFail deriving (Typeable)
instance Binary MmNewPromiseResponse where
put (MmNewPromiseResponse a b) =
do putWord8 0
put a
put b
put MmNewPromiseResponseFail = putWord8 1
get = do a <- getWord8
case a of
0 -> do b <- get
c <- get
return $ MmNewPromiseResponse b c
1 -> return MmNewPromiseResponseFail
data MmStatus = MmStatus deriving Typeable
instance Binary MmStatus where
get = return MmStatus
put MmStatus = put ()
data MmStatusResponse = MmStatusResponse [NodeId] (Map.Map ProcessId [PromiseId]) deriving Typeable
instance Binary MmStatusResponse where
get = do a <- get
b <- get
return $ MmStatusResponse a b
put (MmStatusResponse a b) = put a >> put b
data MmComplain = MmComplain ProcessId PromiseId deriving (Typeable)
instance Binary MmComplain where
put (MmComplain a b) = put a >> put b
get = do a <- get
b <- get
return $ MmComplain a b
data MmComplainResponse = MmComplainResponse ProcessId deriving (Typeable)
instance Binary MmComplainResponse where
put (MmComplainResponse a) = put a
get = do a <- get
return $ MmComplainResponse a
data TmNewPeer = TmNewPeer NodeId deriving (Typeable)
instance Binary TmNewPeer where
get = do a <- get
return $ TmNewPeer a
put (TmNewPeer nid) = put nid
data NmStart = NmStart PromiseId (Closure Payload) Queueing deriving (Typeable)
instance Binary NmStart where
get = do a <- get
b <- get
c <- get
return $ NmStart a b c
put (NmStart a b c) = put a >> put b >> put c
data NmStartResponse = NmStartResponse Bool deriving (Typeable)
instance Binary NmStartResponse where
get = do a <- get
return $ NmStartResponse a
put (NmStartResponse a) = put a
data NmRedeem = NmRedeem PromiseId deriving (Typeable)
instance Binary NmRedeem where
get = do a <- get
return $ NmRedeem a
put (NmRedeem prid) = put prid
data NmRedeemResponse = NmRedeemResponse Payload
| NmRedeemResponseUnknown
| NmRedeemResponseException deriving (Typeable)
instance Binary NmRedeemResponse where
get = do a <- getWord8
case a of
0 -> do b <- get
return $ NmRedeemResponse b
1 -> return NmRedeemResponseUnknown
2 -> return NmRedeemResponseException
put (NmRedeemResponse a) = putWord8 0 >> put a
put (NmRedeemResponseUnknown) = putWord8 1
put (NmRedeemResponseException) = putWord8 2
data TaskException = TaskException String deriving (Show,Typeable)
instance Exception TaskException
-- | (Currently ignored.)
data Queueing = QuNone
| QuExclusive
| QuSmall deriving (Typeable,Ord,Eq)
defaultQueueing :: Queueing
defaultQueueing = QuNone
instance Binary Queueing where
put QuNone = putWord8 0
put QuExclusive = putWord8 1
put QuSmall = putWord8 2
get = do a <- getWord8
case a of
0 -> return QuNone
1 -> return QuExclusive
2 -> return QuSmall
-- | A specification of preference
-- of where a promise should be allocated,
-- among the nodes visible to the master.
data Locality = LcUnrestricted -- ^ The promise can be placed anywhere.
| LcDefault -- ^ The default preference is applied, which is for nodes having a role of NODE of WORKER
| LcByRole [String] -- ^ Nodes having the given roles will be preferred
| LcByNode [NodeId] -- ^ The given nodes will be preferred
instance Binary Locality where
put LcUnrestricted = putWord8 0
put LcDefault = putWord8 1
put (LcByRole a) = putWord8 2 >> put a
put (LcByNode a) = putWord8 3 >> put a
get = do a <- getWord8
case a of
0 -> return LcUnrestricted
1 -> return LcDefault
2 -> do r <- get
return $ LcByRole r
3 -> do r <- get
return $ LcByNode r
defaultLocality :: Locality
defaultLocality = LcByRole ["WORKER","NODE"]
taskError :: String -> a
taskError s = throw $ TaskException s
serialEncodeA :: (Serializable a) => a -> TaskM Payload
serialEncodeA = liftTask . liftIO . serialEncode
serialDecodeA :: (Serializable a) => Payload -> TaskM (Maybe a)
serialDecodeA = liftTask . liftIO . serialDecode
monitorTask :: ProcessId -> ProcessId -> ProcessM TransmitStatus
monitorTask monitor monitee
= do res <- ptry $ monitorProcess monitor monitee MaMonitor
case res of
Right _ -> return QteOK
Left (ServiceException e) -> return $ QteOther e
roundtripImpl :: (Serializable a, Serializable b) => ProcessId -> a -> ProcessM (Either TransmitStatus b)
roundtripImpl pid dat = roundtripQueryImpl 0 PldUser pid dat id []
roundtrip :: (Serializable a, Serializable b) => ProcessId -> a -> TaskM (Either TransmitStatus b)
roundtrip apid dat =
TaskM $ \ts ->
case Map.lookup apid (tsMonitoring ts) of
Nothing -> do mypid <- getSelfPid
res0 <- monitorTask mypid apid
case res0 of
QteOK ->
do res <- roundtripImpl apid dat
return (ts {tsMonitoring=Map.insert apid () (tsMonitoring ts)},res)
_ -> return (ts,Left res0)
Just _ -> do res <- roundtripImpl apid dat
return (ts,res)
-- roundtrip a b = liftTask $ roundtripQueryUnsafe PldUser a b
spawnDaemonic :: ProcessM () -> ProcessM ProcessId
spawnDaemonic p = spawnLocalAnd p setDaemonic
runWorkerNode :: ProcessId -> NodeId -> ProcessM ProcessId
runWorkerNode masterpid nid =
do clo <- makeClosure "Remote.Task.runWorkerNode__impl" (masterpid) :: ProcessM (Closure (ProcessM ()))
spawn nid clo
runWorkerNode__impl :: Payload -> ProcessM ()
runWorkerNode__impl pl =
do setDaemonic -- maybe it's good to have the node manager be daemonic, but prolly not. If so, the MASTERPID must be terminated when user-provided MASTERPROC ends
mpid <- liftIO $ serialDecode pl
case mpid of
Just masterpid -> handler masterpid
Nothing -> error "Failure to extract in rwn__impl"
where handler masterpid = startNodeManager masterpid
passthrough__implPl :: Payload -> TaskM Payload
passthrough__implPl pl = return pl
passthrough__closure :: (Serializable a) => a -> Closure (TaskM a)
passthrough__closure a = Closure "Remote.Task.passthrough__impl" (serialEncodePure a)
__remoteCallMetaData :: RemoteCallMetaData
__remoteCallMetaData x = putReg runWorkerNode__impl "Remote.Task.runWorkerNode__impl"
(putReg passthrough__implPl "Remote.Task.passthrough__implPl" x)
updatePromiseInMemory :: PromiseStorage -> IO PromiseStorage
updatePromiseInMemory (PromiseInMemory p _ d) = do utc <- getCurrentTime
return $ PromiseInMemory p utc d
updatePromiseInMemory other = return other
makePromiseInMemory :: PromiseData -> Maybe Dynamic -> IO PromiseStorage
makePromiseInMemory p dyn = do utc <- getCurrentTime
return $ PromiseInMemory p utc dyn
forwardLogs :: Maybe ProcessId -> ProcessM ()
forwardLogs masterpid =
do lc <- getLogConfig
selfnid <- getSelfNode
let newlc = lc {logTarget = case masterpid of
Just mp
| nodeFromPid mp /= selfnid -> LtForward $ nodeFromPid mp
_ -> LtStdout}
in setNodeLogConfig newlc
hashClosure :: Closure a -> Hash
hashClosure (Closure s pl) = show $ md5 $ B.concat [fromString s, getPayloadContent pl]
undiskify :: FilePath -> MVar PromiseStorage -> ProcessM (Maybe PromiseData)
undiskify fpIn mps =
do wrap $ liftIO $ modifyMVar mps (\val ->
case val of
PromiseOnDisk fp ->
do pl <- withFile fp ReadMode hGetPayload
inmem <- makePromiseInMemory pl Nothing
return (inmem,Just pl)
PromiseInMemory payload _ _ -> return (val,Just payload)
_ -> return (val,Nothing))
where wrap a = do res <- ptry a
case res of
Left e -> do logS "TSK" LoCritical $ "Error reading promise from file "++fpIn++": "++show (e::IOError)
return Nothing
Right r -> return r
diskify :: FilePath -> MVar PromiseStorage -> Bool -> ProcessM ()
diskify fp mps reallywrite =
do cfg <- getConfig
when (cfgPromiseFlushDelay cfg > 0)
(handler (cfgPromiseFlushDelay cfg))
where
handler delay =
do _ <- receiveTimeout delay []
again <- wrap $ liftIO $ modifyMVar mps (\val ->
case val of
PromiseInMemory payload utc _ ->
do now <- getCurrentTime
if diffTime now utc > delay
then do when reallywrite $
do liftIO $ withFile tmp WriteMode (\h -> hPutPayload h payload)
renameFile tmp fp
return (PromiseOnDisk fp,False)
else return (val,True)
_ -> return (val,False))
when again
(diskify fp mps reallywrite)
tmp = fp ++ ".tmp"
wrap a = do res <- ptry a
case res of
Left z -> do logS "TSK" LoImportant $ "Error writing promise to disk on file "++fp++": "++show (z::IOError)
return False
Right v -> return v
startNodeWorker :: ProcessId -> NodeBossState ->
MVar PromiseStorage -> Closure Payload -> ProcessM ()
startNodeWorker masterpid nbs mps clo@(Closure cloname cloarg) =
do self <- getSelfPid
_ <- spawnLocalAnd (starter self) (prefix self)
return ()
where
prefix nodeboss =
do self <- getSelfPid
monitorProcess self nodeboss MaLink
setDaemonic
starter nodeboss = -- TODO try to do an undiskify here, if the promise is left over from a previous, failed run
let initialState = TaskState {tsMaster=masterpid,tsNodeBoss=Just nodeboss,
tsPromiseCache=nsPromiseCache nbs, tsRedeemerForwarding=nsRedeemerForwarding nbs,
tsMonitoring=Map.empty}
tasker = do tbl <- liftTask $ getLookup
case getEntryByIdent tbl cloname of
Just funval ->
do val <- funval cloarg
p <- liftTaskIO $ makePromiseInMemory val Nothing
liftTaskIO $ putMVar mps p
cfg <- liftTask $ getConfig
let cachefile = cfgPromisePrefix cfg++hashClosure clo
liftTask $ diskify cachefile mps True
Nothing -> taskError $ "Failed looking up "++cloname++" in closure table"
in do res <- ptry $ runTaskM tasker initialState :: ProcessM (Either SomeException (TaskState,()))
case res of
Left ex -> liftIO (putMVar mps (PromiseException (show ex))) >> throw ex
Right _ -> return ()
data NodeBossState =
NodeBossState
{
nsPromiseCache :: MVar (Map.Map PromiseId (MVar PromiseStorage)),
nsRedeemerForwarding :: MVar (Map.Map PromiseId ProcessId)
}
startNodeManager :: ProcessId -> ProcessM ()
startNodeManager masterpid =
let
handler :: NodeBossState -> ProcessM a
handler state =
let promisecache = nsPromiseCache state
nmStart = roundtripResponse (\(NmStart promise clo _queueing) ->
do promisestore <- liftIO $ newEmptyMVar
ret <- liftIO $ modifyMVar promisecache
(\pc -> let newpc = Map.insert promise promisestore pc
in return (newpc,True))
when (ret)
(startNodeWorker masterpid state promisestore clo)
return (NmStartResponse ret,state))
nmTermination = matchProcessDown masterpid $
do forwardLogs Nothing
logS "TSK" LoInformation $ "Terminating nodeboss after my master "++show masterpid++" is gone"
terminate
nmRedeem = roundtripResponseAsync (\(NmRedeem promise) ans ->
let answerer = do pc <- liftIO $ readMVar promisecache
case Map.lookup promise pc of
Nothing -> ans NmRedeemResponseUnknown
Just v -> do rv <- liftIO $ readMVar v -- possibly long wait
case rv of
PromiseInMemory rrv _ _ ->
do liftIO $ modifyMVar_ v (\_ -> updatePromiseInMemory rv)
ans (NmRedeemResponse rrv)
PromiseOnDisk fp -> do mpd <- undiskify fp v
case mpd of
Nothing ->
ans (NmRedeemResponseUnknown)
Just a ->
ans (NmRedeemResponse a)
diskify fp v False
PromiseException _ -> ans NmRedeemResponseException
in do _ <- spawnLocal answerer
return state) False
in receiveWait [nmStart, nmRedeem, nmTermination, matchUnknownThrow] >>= handler
in do forwardLogs $ Just masterpid
mypid <- getSelfPid
monitorProcess mypid masterpid MaMonitor
logS "TSK" LoInformation $ "Starting a nodeboss owned by " ++ show masterpid
pc <- liftIO $ newMVar Map.empty
pf <- liftIO $ newMVar Map.empty
let initState = NodeBossState {nsPromiseCache=pc,nsRedeemerForwarding=pf}
handler initState
-- | Starts a new context for executing a 'TaskM' environment.
-- The node on which this function is run becomes a new master
-- in a Task application; as a result, the application should
-- only call this function once. The master will attempt to
-- control all nodes that it can find; if you are going to be
-- running more than one CH application on a single network,
-- be sure to give each application a different network
-- magic (via cfgNetworkMagic). The master TaskM environment
-- created by this function can then spawn other threads,
-- locally or remotely, using 'newPromise' and friends.
runTask :: TaskM a -> ProcessM a
runTask = startMaster
startMaster :: TaskM a -> ProcessM a
startMaster proc =
do mvmaster <- liftIO $ newEmptyMVar
mvdone <- liftIO $ newEmptyMVar
master <- runMaster (masterproc mvdone mvmaster)
liftIO $ putMVar mvmaster master
liftIO $ takeMVar mvdone
where masterproc mvdone mvmaster nodeboss =
do master <- liftIO $ takeMVar mvmaster
pc <- liftIO $ newMVar Map.empty
pf <- liftIO $ newMVar Map.empty
let initialState = TaskState {tsMaster=master,tsNodeBoss=Just nodeboss,
tsPromiseCache=pc, tsRedeemerForwarding=pf,
tsMonitoring=Map.empty}
res <- liftM snd $ runTaskM proc initialState
liftIO $ putMVar mvdone res
{- UNUSED
type LocationSelector = MasterState -> ProcessM (NodeId,ProcessId)
-}
selectLocation :: MasterState -> Locality -> ProcessM (Maybe (String,NodeId,ProcessId))
selectLocation ms locality =
let nodes = msNodes ms
in liftIO $ modifyMVar nodes
(\n -> case n of
[] -> return (n,Nothing)
_ -> let dflt = (rotate n,Just $ head n)
filterify f = case filter f n of
[] -> return dflt
(a:_) -> return ((delete a n) ++ [a],Just a)
in case cond locality of
LcUnrestricted -> return dflt
LcDefault -> return dflt
LcByRole l -> filterify (\(r,_,_) -> r `elem` l)
LcByNode l -> filterify (\(_,r,_) -> r `elem` l))
where rotate [] = []
rotate (h:t) = t ++ [h]
cond l = case l of
LcDefault -> msDefaultLocality ms
_ -> l
countLocations :: MasterState -> ProcessM Int
countLocations ms = liftIO $ withMVar (msNodes ms) (\a -> return $ length a)
findPeers :: ProcessM [(String,NodeId)]
findPeers = liftM (concat . (map (\(role,v) -> [ (role,x) | x <- v] )) . Map.toList) getPeers
sendSilent :: (Serializable a) => ProcessId -> a -> ProcessM ()
sendSilent pid a = do res <- ptry $ send pid a
case res of
Left (TransmitException _) -> return ()
Right _ -> return ()
{- UNUSED
getStatus :: TaskM ()
getStatus =
do master <- getMaster
res <- roundtrip master MmStatus
case res of
Left _ -> return ()
Right (MmStatusResponse nodes promises) ->
let verboseNodes = intercalate ", " (map show nodes)
verbosePromises = intercalate "\n" $ map (\(nb,l) -> (show nb)++" -- "++intercalate "," (map show l)) (Map.toList promises)
in tsay $ "\nKnown nodes: " ++ verboseNodes ++ "\n\nNodebosses: " ++ verbosePromises
-}
runMaster :: (ProcessId -> ProcessM ()) -> ProcessM ProcessId
runMaster masterproc =
let
probeOnce nodes seen masterpid =
do recentlist <- findPeers -- TODO if a node fails to response to a probe even once, it's gone forever; be more flexible
let newseen = seen `union` recentlist
let topidlist = recentlist \\ seen
let cleanOut n = filter (\(_,nid,_) -> nid `elem` (map snd recentlist)) n
newlypidded <- mapM (\(role,nid) ->
do pid <- runWorkerNode masterpid nid
return (role,nid,pid)) topidlist
(_newlist,totalseen) <- liftIO $ modifyMVar nodes (\oldlist ->
return ((cleanOut oldlist) ++ newlypidded,(recentlist,newseen)))
let newlyadded = totalseen \\ seen
mapM_ (\nid -> sendSilent masterpid (TmNewPeer nid)) (map snd newlyadded)
return totalseen
proberDelay = 10000000 -- how often do we check the network to see what nodes are available?
prober nodes seen masterpid =
do totalseen <- probeOnce nodes seen masterpid
_ <- receiveTimeout proberDelay [matchUnknownThrow]
prober nodes totalseen masterpid
master state =
let
tryAlloc clo promiseid locality queueing =
do ns <- selectLocation state locality
case ns of
Nothing -> do logS "TSK" LoCritical "Attempt to allocate a task, but no nodes found"
return Nothing
Just (_,nid,nodeboss) ->
do res <- roundtripQuery PldUser nodeboss (NmStart promiseid clo queueing) -- roundtripQuery monitors and then unmonitors, which generates a lot of traffic; we probably don't need to do this
case res of
Left e ->
do logS "TSK" LoImportant $ "Failed attempt to start "++show clo++" on " ++show nid ++": "++show e
return Nothing
Right (NmStartResponse True) -> return $ Just nodeboss
_ -> do logS "TSK" LoImportant $ "Failed attempt to start "++show clo++" on " ++show nid
return Nothing
basicAllocate clo promiseid locality queueing =
do count <- countLocations state
res1 <- tryAlloc clo promiseid locality queueing
case res1 of
Just _ -> return res1
Nothing -> -- TODO we should try all matching locations before moving on to Unrestricted
do res <- stubborn count $ tryAlloc clo promiseid LcUnrestricted queueing
case res of
Nothing -> do logS "TSK" LoCritical $ "Terminally failed to start "++show clo
return res
_ -> return res
statusMsg = roundtripResponse
(\x -> case x of
MmStatus ->
do thenodes <- liftIO $ readMVar $ msNodes state
let knownNodes = map (\(_,n,_) -> n) thenodes
proctree = msAllocation state
return (MmStatusResponse knownNodes proctree,state))
complainMsg = roundtripResponse
(\x -> case x of
MmComplain procid promid ->
case Map.lookup promid (msPromises state) of
Nothing -> return (MmComplainResponse nullPid,state) -- failure
Just (PromiseRecord curprocid curclo curlocality)
| curprocid /= procid -> return (MmComplainResponse curprocid,state)
| otherwise ->
do res <- basicAllocate curclo promid curlocality defaultQueueing
case res of
Nothing -> return (MmComplainResponse nullPid,state) -- failure
Just newprocid ->
let newpromises = Map.insert promid (PromiseRecord newprocid curclo curlocality) (msPromises state)
in return (MmComplainResponse newprocid,state {msPromises=newpromises}))
promiseMsg = roundtripResponse
(\x -> case x of
MmNewPromise clo locality queueing ->
do
let promiseid = msNextId state
res <- basicAllocate clo promiseid locality queueing
case res of
Just nodeboss ->
let newstate = state {msAllocation=newAllocation,msPromises=newPromises,msNextId=promiseid+1}
newAllocation = Map.insertWith' (\a b -> nub $ a++b) nodeboss [promiseid] (msAllocation state)
newPromises = Map.insert promiseid (PromiseRecord nodeboss clo locality) (msPromises state)
in return (MmNewPromiseResponse nodeboss promiseid,newstate)
Nothing ->
return (MmNewPromiseResponseFail,state))
simpleMsg = match
(\x -> case x of
TmNewPeer nid -> do logS "TSK" LoInformation $ "Found new peer " ++show nid
return state)
in receiveWait [simpleMsg, promiseMsg, complainMsg,statusMsg] >>= master -- TODO matchUnknownThrow
in do nodes <- liftIO $ newMVar []
selfnode <- getSelfNode
selfpid <- getSelfPid
let initState = MasterState {msNextId=0, msAllocation=Map.empty, msPromises=Map.empty, msNodes=nodes, msDefaultLocality = defaultLocality}
masterpid <- spawnDaemonic (master initState)
seennodes <- probeOnce nodes [] masterpid
let getByNid _ [] = Nothing
getByNid nid ((_,n,nodeboss):xs) = if nid==n then Just nodeboss else getByNid nid xs
res <- liftIO $ withMVar nodes (\n -> return $ getByNid selfnode n)
_ <- case res of
Nothing -> taskError "Can't find self: make sure cfgKnownHosts includes the master"
Just x -> spawnLocalAnd (masterproc x) (do myself <- getSelfPid
monitorProcess selfpid myself MaLinkError)
_ <- spawnDaemonic (prober nodes seennodes masterpid)
return masterpid
stubborn :: (Monad m) => Int -> m (Maybe a) -> m (Maybe a)
stubborn 0 a = a
stubborn n a | n>0
= do r <- a
case r of
Just _ -> return r
Nothing -> stubborn (n-1) a
-- TODO: setDefaultLocality :: Locality -> TaskM ()
-- | Like 'newPromise', but creates a promise whose
-- values is already known. In other words, it puts
-- a given, already-calculated value in a promise.
-- Conceptually (but not syntactically, due to closures),
-- you can consider it like this:
--
-- > toPromise a = newPromise (return a)
toPromise :: (Serializable a) => a -> TaskM (Promise a)
toPromise = toPromiseAt LcDefault
-- | A variant of 'toPromise' that lets the user
-- express a locality preference, i.e. some information
-- about which node will become the owner of the
-- new promise. These preferences will not necessarily
-- be respected.
toPromiseAt :: (Serializable a) => Locality -> a -> TaskM (Promise a)
toPromiseAt locality a = newPromiseAt locality (passthrough__closure a)
-- | Similar to 'toPromiseAt' and 'newPromiseNear'
toPromiseNear :: (Serializable a,Serializable b) => Promise b -> a -> TaskM (Promise a)
toPromiseNear (PromiseImmediate _) = toPromise
-- TODO should I consult tsRedeemerForwarding here?
toPromiseNear (PromiseBasic prhost _prid) = toPromiseAt (LcByNode [nodeFromPid prhost])
-- | Creates an /immediate promise/, which is to say, a promise
-- in name only. Unlike a regular promise (created by 'toPromise'),
-- this kind of promise contains the value directly. The
-- advantage is that promise redemption is very fast, requiring
-- no network communication. The downside is that it the
-- underlying data will be copied along with the promise.
-- Useful only for small data.
toPromiseImm :: (Serializable a) => a -> TaskM (Promise a)
toPromiseImm = return . PromiseImmediate
-- | Given a function (expressed here as a closure, see "Remote.Call")
-- that computes a value, returns a token identifying that value.
-- This token, a 'Promise' can be moved about even if the
-- value hasn't been computed yet. The computing function
-- will be started somewhere among the nodes visible to the
-- current master, preferring those nodes that correspond
-- to the 'defaultLocality'. Afterwards, attempts to
-- redeem the promise with 'readPromise' will contact the node
-- where the function is executing.
newPromise :: (Serializable a) => Closure (TaskM a) -> TaskM (Promise a)
newPromise = newPromiseAt LcDefault
-- | A variant of 'newPromise' that prefers to start
-- the computing function on the same node as the caller.
-- Useful if you plan to use the resulting value
-- locally.
newPromiseHere :: (Serializable a) => Closure (TaskM a) -> TaskM (Promise a)
newPromiseHere clo =
do mynode <- liftTask $ getSelfNode
newPromiseAt (LcByNode [mynode]) clo
-- | A variant of 'newPromise' that prefers to start
-- the computing function on the same node where some
-- other promise lives. The other promise is not
-- evaluated.
newPromiseNear :: (Serializable a, Serializable b) => Promise b -> Closure (TaskM a) -> TaskM (Promise a)
newPromiseNear (PromiseImmediate _) = newPromise
newPromiseNear (PromiseBasic prhost _prid) = newPromiseAt (LcByNode [nodeFromPid prhost])
-- | A variant of 'newPromise' that prefers to start
-- the computing functions on some set of nodes that
-- have a given role (assigned by the cfgRole configuration
-- option).
newPromiseAtRole :: (Serializable a) => String -> Closure (TaskM a) -> TaskM (Promise a)
newPromiseAtRole role clo = newPromiseAt (LcByRole [role]) clo
-- | A variant of 'newPromise' that lets the user
-- specify a 'Locality'. The other flavors of newPromise,
-- such as 'newPromiseAtRole', 'newPromiseNear', and
-- 'newPromiseHere' at just shorthand for a call to this function.
newPromiseAt :: (Serializable a) => Locality -> Closure (TaskM a) -> TaskM (Promise a)
newPromiseAt locality clo =
let realclo = makePayloadClosure clo
in case realclo of
Just plclo -> do master <- getMaster
res <- roundtrip master (MmNewPromise plclo locality defaultQueueing)
case res of
Right (MmNewPromiseResponse pid prid) -> return $ PromiseBasic pid prid
Right (MmNewPromiseResponseFail) ->
taskError $ "Spawning of closure "++show clo++" by newPromise failed"
Left tms -> taskError $ "Spawning of closure "++show clo++" by newPromise resulted in "++show tms
Nothing -> taskError $ "The specified closure, "++show clo++", can't produce payloads"
-- | Given a promise, gets the value that is being
-- calculated. If the calculation has finished,
-- the owning node will be contacted and the data
-- moved to the current node. If the calculation
-- has not finished, this function will block
-- until it has. If the calculation failed
-- by throwing an exception (e.g. divide by zero),
-- then this function will throw an excption as well
-- (a 'TaskException'). If the node owning the
-- promise is not accessible, the calculation
-- will be restarted.
readPromise :: (Serializable a) => Promise a -> TaskM a
readPromise (PromiseImmediate a) = return a
readPromise thepromise@(PromiseBasic prhost prid) =
do mp <- lookupCachedPromise prid
case mp of
Nothing -> do fprhost <- liftM (maybe prhost id) $ lookupForwardedRedeemer prid
res <- roundtrip fprhost (NmRedeem prid)
case res of
Left e -> do tlogS "TSK" LoInformation $ "Complaining about promise " ++ show prid ++" on " ++show fprhost++" because of "++show e
complain fprhost prid
Right NmRedeemResponseUnknown ->
do tlogS "TSK" LoInformation $ "Complaining about promise " ++ show prid ++" on " ++show fprhost++" because allegedly unknown"
complain fprhost prid
Right (NmRedeemResponse thedata) ->
do extracted <- extractFromPayload thedata
promiseinmem <- liftTaskIO $ makePromiseInMemory thedata (Just $ toDyn extracted)
putPromiseInCache prid promiseinmem
return extracted
Right NmRedeemResponseException ->
taskError "Failed promise redemption" -- don't redeem, this is a terminal failure
Just mv -> do val <- liftTaskIO $ readMVar mv -- possible long wait here
case val of -- TODO this read/write MVars should be combined!
PromiseInMemory v _utc thedyn ->
case thedyn of
Just thedynvalue ->
case fromDynamic thedynvalue of
Nothing -> do liftTask $ logS "TSK" LoStandard "Insufficiently dynamic promise cache"
extractFromPayload v
Just realval -> do updated <- liftTaskIO $ makePromiseInMemory v thedyn
putPromiseInCache prid updated
return realval
Nothing -> do extracted <- extractFromPayload v
updated <- liftTaskIO $ makePromiseInMemory v (Just $ toDyn extracted)
putPromiseInCache prid updated
return extracted
PromiseException _ -> taskError $ "Redemption of promise failed"
PromiseOnDisk fp -> do mpd <- liftTask $ undiskify fp mv
_ <- liftTask $ spawnLocal $ diskify fp mv False
case mpd of
Just dat -> extractFromPayload dat
_ -> taskError "Promise extraction from disk failed"
where extractFromPayload v = do out <- liftTaskIO $ serialDecode v
case out of
Just r -> return r
Nothing -> taskError "Unexpected payload type"
complain fprhost prid =
do master <- getMaster
response <- roundtrip master (MmComplain fprhost prid)
case response of
Left a -> taskError $ "Couldn't file complaint with master about " ++ show fprhost ++ " because " ++ show a
Right (MmComplainResponse newhost)
| newhost == nullPid -> taskError $ "Couldn't file complaint with master about " ++ show fprhost
| otherwise -> do setForwardedRedeemer prid newhost
readPromise thepromise
data TaskState = TaskState
{
tsMaster :: ProcessId,
tsNodeBoss :: Maybe ProcessId,
tsPromiseCache :: MVar (Map.Map PromiseId (MVar PromiseStorage)),
tsRedeemerForwarding :: MVar (Map.Map PromiseId ProcessId),
tsMonitoring :: Map.Map ProcessId ()
}
data TaskM a = TaskM { runTaskM :: TaskState -> ProcessM (TaskState, a) } deriving (Typeable)
instance Monad TaskM where
m >>= k = TaskM $ \ts -> do
(ts',a) <- runTaskM m ts
(ts'',a') <- runTaskM (k a) (ts')
return (ts'',a')
return x = TaskM $ \ts -> return $ (ts,x)
instance Functor TaskM where
f `fmap` m =
TaskM $ \ts ->
runTaskM m ts >>= \(ts', x) ->
return (ts', f x)
instance Applicative TaskM where
mf <*> mx =
TaskM $ \ts ->
runTaskM mf ts >>= \(ts', f) ->
runTaskM mx ts' >>= \(ts'', x) ->
return (ts'', f x)
pure = return
lookupForwardedRedeemer :: PromiseId -> TaskM (Maybe ProcessId)
lookupForwardedRedeemer q =
TaskM $ \ts ->
liftIO $ withMVar (tsRedeemerForwarding ts) $ (\fwd ->
let lo = Map.lookup q fwd
in return (ts,lo))
setForwardedRedeemer :: PromiseId -> ProcessId -> TaskM ()
setForwardedRedeemer from to =
TaskM $ \ts -> liftIO $ modifyMVar (tsRedeemerForwarding ts) (\fwd ->
let newmap = Map.insert from to fwd
in return ( newmap,(ts,()) ) )
lookupCachedPromise :: PromiseId -> TaskM (Maybe (MVar PromiseStorage))
lookupCachedPromise prid = TaskM $ \ts ->
do mv <- liftIO $ withMVar (tsPromiseCache ts)
(\pc -> return $ Map.lookup prid pc)
return (ts,mv)
putPromiseInCache :: PromiseId -> PromiseStorage -> TaskM ()
putPromiseInCache prid ps = TaskM $ \ts ->
do liftIO $ modifyMVar_ (tsPromiseCache ts)
(\pc -> do mv <- newMVar ps
return $ Map.insert prid mv pc)
return (ts,())
getMaster :: TaskM ProcessId
getMaster = TaskM $ \ts -> return (ts,tsMaster ts)
liftTask :: ProcessM a -> TaskM a
liftTask a = TaskM $ \ts -> a >>= (\x -> return (ts,x))
liftTaskIO :: IO a -> TaskM a
liftTaskIO = liftTask . liftIO
-- | A Task-monadic version of 'Remote.Process.say'.
-- Puts text messages in the log.
tsay :: String -> TaskM ()
tsay a = liftTask $ say a
-- | Writes various kinds of messages to the
-- "Remote.Process" log.
tlogS :: LogSphere -> LogLevel -> String -> TaskM ()
tlogS a b c = liftTask $ logS a b c
----------------------------------------------
-- * MapReduce
----------------------------------------------
-- | A data structure that stores the important
-- user-provided functions that are the namesakes
-- of the MapReduce algorithm.
-- The number of mapper processes can be controlled
-- by the user by controlling the length of the string
-- returned by mtChunkify. The number of reducer
-- promises is controlled by the number of values
-- values returned by shuffler.
-- The user must provide their own mapper and reducer.
-- For many cases, the default chunkifier ('chunkify')
-- and shuffler ('shuffle') are adequate.
data MapReduce rawinput input middle1 middle2 result
= MapReduce
{
mtMapper :: input -> Closure (TaskM [middle1]),
mtReducer :: middle2 -> Closure (TaskM result),
mtChunkify :: rawinput -> [input],
mtShuffle :: [middle1] -> [middle2]
}
-- | A convenient way to provide the 'mtShuffle' function
-- as part of 'mapReduce'.
shuffle :: Ord a => [(a,b)] -> [(a,[b])]
shuffle q =
let semi = groupBy (\(a,_) (b,_) -> a==b) (sortBy (\(a,_) (b,_) -> compare a b) q)
in map (\x -> (fst $ head x,map snd x)) semi
-- | A convenient way to provide the 'mtChunkify' function
-- as part of 'mapReduce'.
chunkify :: Int -> [a] -> [[a]]
chunkify numChunks l
| numChunks <= 0 = taskError "Can't chunkify into less than one chunk"
| otherwise = splitSize (ceiling ((fromIntegral (length l) / fromIntegral numChunks) :: Double)) l
where
splitSize _ [] = []
splitSize i v = let (first,second) = splitAt i v
in first : splitSize i second
-- | The MapReduce algorithm, implemented in a very
-- simple form on top of the Task layer. Its
-- use depends on four user-determined data types:
--
-- * input -- The data type provided as the input to the algorithm as a whole and given to the mapper.
--
-- * middle1 -- The output of the mapper. This may include some /key/ which is used by the shuffler to allocate data to reducers.
-- If you use the default shuffler, 'shuffle', this type must have the form @Ord a => (a,b)@.
--
-- * middle2 -- The output of the shuffler. The default shuffler emits a type in the form @Ord => (a,[b])@. Each middle2 output
-- by shuffler is given to a separate reducer.
--
-- * result -- The output of the reducer, upon being given a bunch of middles.
mapReduce :: (Serializable i,Serializable k,Serializable m,Serializable r) =>
MapReduce ri i k m r -> ri -> TaskM [r]
mapReduce mr inputs =
let chunks = (mtChunkify mr) inputs
in do
pmapResult <- mapM (\chunk ->
newPromise ((mtMapper mr) chunk) ) chunks
mapResult <- mapM readPromise pmapResult
let shuffled = (mtShuffle mr) (concat mapResult)
pres <- mapM (\mid2 ->
newPromise ((mtReducer mr) mid2)) shuffled
mapM readPromise pres
Jump to Line
Something went wrong with that request. Please try again.