Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Improved handling of promise complaints

  • Loading branch information...
commit e02cd9acfe121a9d0d43913985611161bd160cba 1 parent a6009c8
@jepst authored
Showing with 33 additions and 23 deletions.
  1. +1 −1  Remote/Encoding.hs
  2. +21 −18 Remote/Task.hs
  3. +11 −4 util/Diag.hs
View
2  Remote/Encoding.hs
@@ -138,7 +138,7 @@ genericPut = generic `extQ` genericString
genericString :: String -> Put
genericString = put.encode
--- | See 'genericPut'
+-- | This is the counterpart 'genericPut'
genericGet :: Data a => Get a
genericGet = generic `extR` genericString
where generic = (\id -> liftM id $ deserializeConstr $ \constr_rep ->
View
39 Remote/Task.hs
@@ -403,9 +403,9 @@ diskify fp mps reallywrite =
return False
Right v -> return v
-startNodeWorker :: ProcessId -> MVar (Map.Map PromiseId (MVar PromiseStorage)) ->
+startNodeWorker :: ProcessId -> NodeBossState ->
MVar PromiseStorage -> Closure Payload -> ProcessM ()
-startNodeWorker masterpid mpc mps clo@(Closure cloname cloarg) =
+startNodeWorker masterpid nbs mps clo@(Closure cloname cloarg) =
do self <- getSelfPid
spawnLocalAnd (starter self) (prefix self)
return ()
@@ -416,7 +416,7 @@ startNodeWorker masterpid mpc mps clo@(Closure cloname cloarg) =
setDaemonic
starter nodeboss = -- TODO try to do an undiskify here, if the promise is left over from a previous, failed run
let initialState = TaskState {tsMaster=masterpid,tsNodeBoss=Just nodeboss,
- tsPromiseCache=mpc, tsRedeemerForwarding=Map.empty,
+ tsPromiseCache=nsPromiseCache nbs, tsRedeemerForwarding=nsRedeemerForwarding nbs,
tsMonitoring=Map.empty}
tasker = do tbl <- liftTask $ getLookup
case getEntryByIdent tbl cloname of
@@ -436,7 +436,8 @@ startNodeWorker masterpid mpc mps clo@(Closure cloname cloarg) =
data NodeBossState =
NodeBossState
{
- nsPromiseCache :: MVar (Map.Map PromiseId (MVar PromiseStorage))
+ nsPromiseCache :: MVar (Map.Map PromiseId (MVar PromiseStorage)),
+ nsRedeemerForwarding :: MVar (Map.Map PromiseId ProcessId)
}
startNodeManager :: ProcessId -> ProcessM ()
@@ -451,7 +452,7 @@ startNodeManager masterpid =
(\pc -> let newpc = Map.insert promise promisestore pc
in return (newpc,True))
when (ret)
- (startNodeWorker masterpid promisecache promisestore clo)
+ (startNodeWorker masterpid state promisestore clo)
return (NmStartResponse ret,state))
nmTermination = matchProcessDown masterpid $
do forwardLogs Nothing
@@ -482,7 +483,8 @@ startNodeManager masterpid =
monitorProcess mypid masterpid MaMonitor
logS "TSK" LoInformation $ "Starting a nodeboss owned by " ++ show masterpid
pc <- liftIO $ newMVar Map.empty
- let initState = NodeBossState {nsPromiseCache=pc}
+ pf <- liftIO $ newMVar Map.empty
+ let initState = NodeBossState {nsPromiseCache=pc,nsRedeemerForwarding=pf}
handler initState
-- | Starts a new context for executing a 'TaskM' environment.
@@ -509,8 +511,9 @@ startMaster proc =
where masterproc mvdone mvmaster nodeboss =
do master <- liftIO $ takeMVar mvmaster
pc <- liftIO $ newMVar Map.empty
+ pf <- liftIO $ newMVar Map.empty
let initialState = TaskState {tsMaster=master,tsNodeBoss=Just nodeboss,
- tsPromiseCache=pc, tsRedeemerForwarding=Map.empty,
+ tsPromiseCache=pc, tsRedeemerForwarding=pf,
tsMonitoring=Map.empty}
res <- liftM snd $ runTaskM proc initialState
liftIO $ putMVar mvdone res
@@ -779,7 +782,7 @@ readPromise (PromiseImmediate a) = return a
readPromise thepromise@(PromiseBasic prhost prid) =
do mp <- lookupCachedPromise prid
case mp of
- Nothing -> do fprhost <- lookupForwardedRedeemer prhost
+ Nothing -> do fprhost <- liftM (maybe prhost id) $ lookupForwardedRedeemer prid
res <- roundtrip fprhost (NmRedeem prid)
case res of
Left e -> do tlogS "TSK" LoInformation $ "Complaining about promise " ++ show prid ++" on " ++show fprhost++" because of "++show e
@@ -826,7 +829,7 @@ readPromise thepromise@(PromiseBasic prhost prid) =
Left a -> taskError $ "Couldn't file complaint with master about " ++ show fprhost ++ " because " ++ show a
Right (MmComplainResponse newhost)
| newhost == nullPid -> taskError $ "Couldn't file complaint with master about " ++ show fprhost
- | otherwise -> do setForwardedRedeemer prhost newhost
+ | otherwise -> do setForwardedRedeemer prid newhost
readPromise thepromise
data TaskState = TaskState
@@ -834,7 +837,7 @@ data TaskState = TaskState
tsMaster :: ProcessId,
tsNodeBoss :: Maybe ProcessId,
tsPromiseCache :: MVar (Map.Map PromiseId (MVar PromiseStorage)),
- tsRedeemerForwarding :: Map.Map ProcessId ProcessId,
+ tsRedeemerForwarding :: MVar (Map.Map PromiseId ProcessId),
tsMonitoring :: Map.Map ProcessId ()
}
@@ -847,18 +850,18 @@ instance Monad TaskM where
return (ts'',a')
return x = TaskM $ \ts -> return $ (ts,x)
-lookupForwardedRedeemer :: ProcessId -> TaskM ProcessId
+lookupForwardedRedeemer :: PromiseId -> TaskM (Maybe ProcessId)
lookupForwardedRedeemer q =
TaskM $ \ts ->
- case Map.lookup q (tsRedeemerForwarding ts) of
- Nothing -> return (ts,q)
- Just a -> return (ts,a)
+ liftIO $ withMVar (tsRedeemerForwarding ts) $ (\fwd ->
+ let lo = Map.lookup q fwd
+ in return (ts,lo))
-setForwardedRedeemer :: ProcessId -> ProcessId -> TaskM ()
+setForwardedRedeemer :: PromiseId -> ProcessId -> TaskM ()
setForwardedRedeemer from to =
- TaskM $ \ts ->
- let newmap = Map.insert from to (tsRedeemerForwarding ts)
- in return (ts {tsRedeemerForwarding=newmap},())
+ TaskM $ \ts -> liftIO $ modifyMVar (tsRedeemerForwarding ts) (\fwd ->
+ let newmap = Map.insert from to fwd
+ in return ( newmap,(ts,()) ) )
lookupCachedPromise :: PromiseId -> TaskM (Maybe (MVar PromiseStorage))
lookupCachedPromise prid = TaskM $ \ts ->
View
15 util/Diag.hs
@@ -7,15 +7,19 @@ module Main (main) where
-- nodes, run this program to check your configuration.
import Remote (remoteInit, getPeers, getSelfNode, ProcessM)
-import Remote.Process (hostFromNid,getConfig,cfgNetworkMagic,cfgKnownHosts)
+import Remote.Process (hostFromNid,getConfig,cfgNetworkMagic,cfgKnownHosts,cfgPeerDiscoveryPort)
import qualified Data.Map as Map (elems)
import Control.Monad.Trans (liftIO)
-import Data.List (intercalate)
+import Data.List (intercalate,nub)
s :: String -> ProcessM ()
s = liftIO . putStrLn
+orNone :: [String] -> String
+orNone [] = "None"
+orNone a = intercalate "," a
+
initialProcess myRole =
do s "Cloud Haskell diagnostics\n"
mynid <- getSelfNode
@@ -24,8 +28,11 @@ initialProcess myRole =
s $ "I seem to be running on host \""++hostFromNid mynid++"\".\nIf that's wrong, set it using the cfgHostName option.\n"
s $ "My role is \""++myRole++"\".\nIf that's wrong, set it using the cfgRole option.\n"
s $ "My magic is \""++cfgNetworkMagic cfg++"\".\nIf that's wrong, set it using the cfgNetworkMagic option.\n"
- s $ "I will look for nodes on the following hosts,\n as well as any hosts on the local network: " ++ intercalate "," (cfgKnownHosts cfg)
- let hosts = intercalate ", " $ map (hostFromNid) (concat $ Map.elems peers)
+ s $ "I will look for nodes on the following hosts: " ++ orNone (cfgKnownHosts cfg)
+ s $ if cfgPeerDiscoveryPort cfg > 0
+ then "I will also look for nodes on the local network."
+ else "I will not look for nodes on the local network other than those named above."
+ let hosts = orNone $ nub $ map (hostFromNid) (concat $ Map.elems peers)
s $ "I have found nodes on the following hosts: "++hosts++".\nIf I'm not finding all the nodes you expected, make sure they:"
s $ "\tare running\n\tare not behind a firewall\n\thave the same magic\n\tare listed in cfgKnownHosts"
Please sign in to comment.
Something went wrong with that request. Please try again.