Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Tree: 290adeab93
Fetching contributors…

Cannot retrieve contributors at this time

159 lines (141 sloc) 7.602 kB
{-# LANGUAGE DeriveDataTypeable #-}
-- | Exposes mechanisms for a program built on the "Remote.Process"
-- framework to discover nodes on the current network. Programs
-- can perform node discovery manually, or they can use "Remote.Task",
-- which does it automatically.
module Remote.Peer (PeerInfo,startDiscoveryService,getPeers,getPeersStatic,getPeersDynamic,findPeerByRole) where
import Prelude hiding (all, pi)
import Network.Socket (defaultHints,sendTo,recv,sClose,Socket,getAddrInfo,AddrInfoFlag(..),setSocketOption,addrFlags,addrSocketType,addrFamily,SocketType(..),Family(..),addrProtocol,SocketOption(..),AddrInfo,bindSocket,addrAddress,SockAddr(..),socket)
import Network.BSD (getProtocolNumber)
import Control.Concurrent.MVar (takeMVar, newMVar, modifyMVar_)
import Remote.Process (PeerInfo,pingNode,makeNodeFromHost,spawnLocalAnd,setDaemonic,TransmitStatus(..),TransmitException(..),PayloadDisposition(..),ptimeout,getSelfNode,sendSimple,cfgRole,cfgKnownHosts,cfgPeerDiscoveryPort,match,receiveWait,getSelfPid,getConfig,NodeId,PortId,ProcessM,ptry,localRegistryQueryNodes)
import Control.Monad.Trans (liftIO)
import Data.Typeable (Typeable)
import Data.Maybe (catMaybes)
import Data.Binary (Binary,get,put)
import Control.Exception (try,bracket,ErrorCall(..),throw)
import Data.List (nub)
import Control.Monad (filterM)
import qualified Data.Traversable as Traversable (mapM)
import qualified Data.Map as Map (unionsWith,insertWith,empty,lookup)
data DiscoveryInfo = DiscoveryInfo
{
discNodeId :: NodeId,
discRole :: String
} deriving (Typeable,Eq)
instance Binary DiscoveryInfo where
put (DiscoveryInfo nid role) = put nid >> put role
get = get >>= \nid -> get >>= \role -> return $ DiscoveryInfo nid role
getUdpSocket :: PortId -> IO (Socket,AddrInfo) -- mostly copied from Network.Socket
getUdpSocket port = do
proto <- getProtocolNumber "udp"
let hints = defaultHints { addrFlags = [AI_PASSIVE,AI_ADDRCONFIG]
, addrSocketType = Datagram
, addrFamily = AF_INET -- only INET supports broadcast
, addrProtocol = proto }
addrs <- getAddrInfo (Just hints) Nothing (Just (show port))
let addr = head addrs
s <- socket (addrFamily addr) (addrSocketType addr) (addrProtocol addr)
return (s,addr)
maxPacket :: Int
maxPacket = 1024
listenUdp :: PortId -> IO String
listenUdp port =
bracket
(getUdpSocket port)
(\(s,_) -> sClose s)
(\(sock,addr) -> do
setSocketOption sock ReuseAddr 1
bindSocket sock (addrAddress addr)
msg <- recv sock maxPacket
return msg
)
sendBroadcast :: PortId -> String -> IO ()
sendBroadcast port str
| length str > maxPacket = throw $ TransmitException $ QteOther $ "sendBroadcast: Specified packet is too big for UDP broadcast, having a length of " ++ (show $ length str)
| otherwise = bracket
(getUdpSocket port >>= return . fst)
(sClose)
(\sock -> do
setSocketOption sock Broadcast 1
_res <- sendTo sock str (SockAddrInet (toEnum port) (-1))
return ()
)
-- | Returns information about all nodes on the current network
-- that this node knows about. This function combines dynamic
-- and static mechanisms. See documentation on 'getPeersStatic'
-- and 'getPeersDynamic' for more info. This function depends
-- on the configuration values @cfgKnownHosts@ and @cfgPeerDiscoveryPort@.
getPeers :: ProcessM PeerInfo
getPeers = do a <- getPeersStatic
b <- getPeersDynamic 500000
verifyPeerInfo $ Map.unionsWith (\x y -> nub $ x ++ y) [a,b]
verifyPeerInfo :: PeerInfo -> ProcessM PeerInfo
verifyPeerInfo pi = Traversable.mapM verify1 pi
where verify1 = filterM pingNode -- TODO ping should require a response
-- | Returns a PeerInfo, containing a list of known nodes ordered by role.
-- This information is acquired by querying the local node registry on
-- each of the hosts in the cfgKnownHosts entry in this node's config.
-- Hostnames that don't respond are assumed to be down and nodes running
-- on them won't be included in the results.
getPeersStatic :: ProcessM PeerInfo
getPeersStatic = do cfg <- getConfig
let peers = cfgKnownHosts cfg
peerinfos <- mapM (localRegistryQueryNodes . hostToNodeId) peers
return $ Map.unionsWith (\a b -> nub $ a ++ b) (catMaybes peerinfos)
where hostToNodeId host = makeNodeFromHost host 0
-- | Returns a PeerInfo, containing a list of known nodes ordered by role.
-- This information is acquired by sending out a UDP broadcast on the
-- local network; active nodes running the discovery service
-- should respond with their information.
-- If nodes are running outside of the local network, or if UDP broadcasts
-- are disabled by firewall configuration, this won't return useful
-- information; in that case, use getPeersStatic.
-- This function takes a parameter indicating how long in microseconds
-- to wait for hosts to respond. A number like 50000 is usually good enough,
-- unless your network is highly congested or with high latency.
getPeersDynamic :: Int -> ProcessM PeerInfo
getPeersDynamic t =
do pid <- getSelfPid
cfg <- getConfig
case (cfgPeerDiscoveryPort cfg) of
0 -> return Map.empty
port -> do -- TODO should send broacast multiple times in case of packet loss
_ <- liftIO $ try $ sendBroadcast port (show pid) :: ProcessM (Either IOError ())
responses <- liftIO $ newMVar []
_ <- ptimeout t (receiveInfo responses)
res <- liftIO $ takeMVar responses
let all = map (\di -> (discRole di,[discNodeId di])) (nub res)
return $ foldl (\a (k,v) -> Map.insertWith (++) k v a ) Map.empty all
where receiveInfo responses = let matchInfo = match (\x -> liftIO $ modifyMVar_ responses (\m -> return (x:m))) in
receiveWait [matchInfo] >> receiveInfo responses
-- | Given a PeerInfo returned by getPeersDynamic or getPeersStatic,
-- give a list of nodes registered as a particular role. If no nodes of
-- that role are found, the empty list is returned.
findPeerByRole :: PeerInfo -> String -> [NodeId]
findPeerByRole disc role = maybe [] id (Map.lookup role disc)
findRoles :: PeerInfo -> [String]
findRoles disc = Map.keys disc
waitForDiscovery :: Int -> ProcessM Bool
waitForDiscovery delay
| delay <= 0 = doit
| otherwise = ptimeout delay doit >>= (return . maybe False id)
where doit =
do cfg <- getConfig
msg <- liftIO $ listenUdp (cfgPeerDiscoveryPort cfg)
nodeid <- getSelfNode
res <- ptry $ sendSimple (read msg) (DiscoveryInfo {discNodeId=nodeid,discRole=cfgRole cfg}) PldUser
:: ProcessM (Either ErrorCall TransmitStatus)
case res of
Right QteOK -> return True
_ -> return False
-- | Starts the discovery process, allowing this node to respond to
-- queries from getPeersDynamic. You don't want to call this yourself,
-- as it's called for you in 'Remote.Init.remoteInit'
startDiscoveryService :: ProcessM ()
startDiscoveryService =
do cfg <- getConfig
if cfgPeerDiscoveryPort cfg /= 0
then spawnLocalAnd service setDaemonic >> return ()
else return ()
where service = waitForDiscovery 0 >> service
Jump to Line
Something went wrong with that request. Please try again.