Skip to content

Commit

Permalink
Use ledger as a source for publicRootPeers
Browse files Browse the repository at this point in the history
  • Loading branch information
karknu authored and coot committed Jan 28, 2021
1 parent 62b2601 commit 5ee829b
Show file tree
Hide file tree
Showing 7 changed files with 448 additions and 205 deletions.
Expand Up @@ -65,11 +65,11 @@ instance c ~ EraCrypto era

relayToRelayAddress :: SL.StakePoolRelay -> Maybe RelayAddress
relayToRelayAddress (SL.SingleHostAddr (SJust (Port port)) (SJust ipv4) _) =
Just $ RelayAddressAddr (IPv4 ipv4) (fromIntegral port)
Just $ RelayAddress (IPv4 ipv4) (fromIntegral port)
relayToRelayAddress (SL.SingleHostAddr (SJust (Port port)) SNothing (SJust ipv6)) =
Just $ RelayAddressAddr (IPv6 ipv6) (fromIntegral port)
Just $ RelayAddress (IPv6 ipv6) (fromIntegral port)
relayToRelayAddress (SL.SingleHostName (SJust (Port port)) dnsName) =
Just $ RelayAddressDomain $ DomainAddress (encodeUtf8 $ dnsToText dnsName) (fromIntegral port)
Just $ RelayDomain $ DomainAddress (encodeUtf8 $ dnsToText dnsName) (fromIntegral port)
relayToRelayAddress _ =
-- This could be an unsupported relay (SRV records) or an unusable
-- relay such as a relay with an IP address but without a port number.
Expand Down
5 changes: 4 additions & 1 deletion ouroboros-consensus/src/Ouroboros/Consensus/Node.hs
Expand Up @@ -295,6 +295,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
(miniProtocolParameters nodeKernelArgs)
ntnApps
ntcApps
nodeKernel

llrnRunDataDiffusion registry diffusionApplications
where
Expand Down Expand Up @@ -342,11 +343,12 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
-> NodeToClientVersion
-> NTC.Apps m (ConnectionId addrNTC) ByteString ByteString ByteString ()
)
-> NodeKernel m (ConnectionId addrNTN) (ConnectionId addrNTC) blk
-> DiffusionApplications
addrNTN addrNTC
versionDataNTN versionDataNTC
m
mkDiffusionApplications miniProtocolParams ntnApps ntcApps =
mkDiffusionApplications miniProtocolParams ntnApps ntcApps kernel =
DiffusionApplications {
daApplicationInitiatorMode = combineVersions
[ simpleSingletonVersions
Expand All @@ -372,6 +374,7 @@ runWith RunNodeArgs{..} LowLevelRunNodeArgs{..} =
, daMiniProtocolParameters = miniProtocolParams
, daRethrowPolicy = consensusRethrowPolicy (Proxy @blk)
, daLocalRethrowPolicy = mempty
, daLedgerPeersCtx = LedgerPeersConsensusInterface (getPeersFromCurrentLedgerAfterSlot kernel)
}

-- | Did the ChainDB already have existing clean-shutdown marker on disk?
Expand Down
402 changes: 227 additions & 175 deletions ouroboros-network/src/Ouroboros/Network/Diffusion.hs

Large diffs are not rendered by default.

134 changes: 126 additions & 8 deletions ouroboros-network/src/Ouroboros/Network/PeerSelection/LedgerPeers.hs
Expand Up @@ -13,14 +13,19 @@ module Ouroboros.Network.PeerSelection.LedgerPeers (
PoolStake (..),
AccPoolStake (..),
TraceLedgerPeers (..),
NumberOfPeers (..),
pickPeers,
accPoolStake,
runLedgerPeers,
UseLedgerAfter (..),

Socket.PortNumber
) where


import Control.Monad.Class.MonadSTM
import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadSTM.Strict
import Control.Monad.Class.MonadTime
import Control.Tracer (Tracer, traceWith)
import qualified Data.IP as IP
import Data.List (foldl')
Expand All @@ -29,8 +34,12 @@ import qualified Data.List.NonEmpty as NonEmpty
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Ratio
import qualified Data.Set as Set
import Data.Set (Set)
import Data.Word
import Data.Void (Void)
import qualified Network.Socket as Socket
import Network.Socket (SockAddr)
import System.Random

import Cardano.Slotting.Slot (SlotNo)
Expand All @@ -39,6 +48,15 @@ import Ouroboros.Network.PeerSelection.RootPeersDNS

import Text.Printf

-- | Only use the ledger after the given slot number.
data UseLedgerAfter = DontUseLedger | UseLedgerAfter SlotNo deriving Show

isLedgerPeersEnabled :: UseLedgerAfter -> Bool
isLedgerPeersEnabled DontUseLedger = False
isLedgerPeersEnabled _ = True

newtype NumberOfPeers = NumberOfPeers Word16 deriving Show

newtype LedgerPeersConsensusInterface m = LedgerPeersConsensusInterface {
lpGetPeers :: SlotNo -> STM m (Maybe [(PoolStake, NonEmpty RelayAddress)])
}
Expand All @@ -47,11 +65,15 @@ newtype LedgerPeersConsensusInterface m = LedgerPeersConsensusInterface {
data TraceLedgerPeers =
PickedPeer !RelayAddress !AccPoolStake ! PoolStake
-- ^ Trace for a peer picked with accumulated and relative stake of its pool.
| PickedPeers !Word16 ![RelayAddress]
| PickedPeers !NumberOfPeers ![RelayAddress]
-- ^ Trace for the number of peers we wanted to pick and the list of peers picked.
| FetchingNewLedgerState !Int
-- ^ Trace for fetching a new list of peers from the ledger. Int is the number of peers
-- returned.
| WaitingOnRequest
| RequestForPeers !NumberOfPeers
| ReusingLedgerState !Int !DiffTime
| FallingBackToBootstrapPeers


instance Show TraceLedgerPeers where
Expand All @@ -62,15 +84,23 @@ instance Show TraceLedgerPeers where
(fromRational (unAccPoolStake ackStake) :: Double)
(show $ unPoolStake stake)
(fromRational (unPoolStake stake) :: Double)
show (PickedPeers n peers) =
show (PickedPeers (NumberOfPeers n) peers) =
printf "PickedPeers %d %s" n (show peers)
show (FetchingNewLedgerState cnt) =
printf "Fetching new ledgerstate, %d registered pools"
cnt
show WaitingOnRequest = "WaitingOnRequest"
show (RequestForPeers (NumberOfPeers cnt)) = printf "RequestForPeers %d" cnt
show (ReusingLedgerState cnt age) =
printf "ReusingLedgerState %d peers age %s"
cnt
(show age)
show FallingBackToBootstrapPeers = "Falling back to bootstrap peers"


data RelayAddress = RelayAddressDomain DomainAddress
| RelayAddressAddr IP.IP Socket.PortNumber
-- | A relay can have either an IP address and a port number or
-- a domain with a port number
data RelayAddress = RelayDomain DomainAddress
| RelayAddress IP.IP Socket.PortNumber
deriving (Show, Eq, Ord)

-- | The relative stake of a stakepool in relation to the total amount staked.
Expand Down Expand Up @@ -125,10 +155,10 @@ pickPeers :: forall m. Monad m
=> StdGen
-> Tracer m TraceLedgerPeers
-> Map AccPoolStake (PoolStake, NonEmpty RelayAddress)
-> Word16
-> NumberOfPeers
-> m (StdGen, [RelayAddress])
pickPeers inRng _ pools _ | Map.null pools = return (inRng, [])
pickPeers inRng tracer pools cnt = go inRng cnt []
pickPeers inRng tracer pools (NumberOfPeers cnt) = go inRng cnt []
where
go :: StdGen -> Word16 -> [RelayAddress] -> m (StdGen, [RelayAddress])
go rng 0 picked = return (rng, picked)
Expand All @@ -144,3 +174,91 @@ pickPeers inRng tracer pools cnt = go inRng cnt []
relay = relays NonEmpty.!! ix
traceWith tracer $ PickedPeer relay ackStake stake
go rng'' (n - 1) (relay : picked)


-- | Run the LedgerPeers worker thread.
runLedgerPeers :: forall m.
( MonadAsync m
, MonadTime m
)
=> StdGen
-> Tracer m TraceLedgerPeers
-> UseLedgerAfter
-> LedgerPeersConsensusInterface m
-> ([DomainAddress] -> m (Map DomainAddress (Set SockAddr)))
-> STM m NumberOfPeers
-> (Maybe (Set SockAddr, DiffTime) -> STM m ())
-> m Void
runLedgerPeers inRng tracer useLedgerAfter LedgerPeersConsensusInterface{..} doResolve
getReq putRsp = do
go inRng (Time 0) Map.empty
where
go :: StdGen -> Time -> Map AccPoolStake (PoolStake, NonEmpty RelayAddress) -> m Void
go rng oldTs peerMap = do
let peerListLifeTime = if Map.null peerMap && isLedgerPeersEnabled useLedgerAfter
then 30
else 1847 -- Close to but not exactly 30min.

traceWith tracer WaitingOnRequest
numRequested <- atomically getReq
traceWith tracer $ RequestForPeers numRequested
!now <- getMonotonicTime
let age = diffTime now oldTs
(peerMap', ts) <- if age > peerListLifeTime
then
case useLedgerAfter of
DontUseLedger -> do
traceWith tracer $ FetchingNewLedgerState 0
return (Map.empty, now)
UseLedgerAfter slot -> do
peers_m <- atomically $ lpGetPeers slot
let peers = maybe Map.empty accPoolStake peers_m
traceWith tracer $ FetchingNewLedgerState $ Map.size peers
return (peers, now)

else do
traceWith tracer $ ReusingLedgerState (Map.size peerMap) age
return (peerMap, oldTs)

if Map.null peerMap'
then do
traceWith tracer FallingBackToBootstrapPeers
atomically $ putRsp Nothing
go rng ts peerMap'
else do
let ttl = 5 -- TTL, used as re-request interval by the governor.

(rng', !pickedPeers) <- pickPeers rng tracer peerMap' numRequested
traceWith tracer $ PickedPeers numRequested pickedPeers

let (plainAddrs, domains) = foldl' splitPeers (Set.empty, []) pickedPeers

domainAddrs <- doResolve domains

let (rng'', rngDomain) = split rng'
pickedAddrs = snd $ foldl' pickDomainAddrs (rngDomain, plainAddrs)
domainAddrs

atomically $ putRsp $ Just (pickedAddrs, ttl)
go rng'' ts peerMap'

-- Randomly pick one of the addresses returned in the DNS result.
pickDomainAddrs :: (StdGen, Set SockAddr)
-> Set SockAddr
-> (StdGen, Set SockAddr)
pickDomainAddrs (rng, pickedAddrs) addrs | Set.null addrs = (rng, pickedAddrs)
pickDomainAddrs (rng, pickedAddrs) addrs =
let (ix, rng') = randomR (0, Set.size addrs - 1) rng
!pickedAddr = Set.elemAt ix addrs in
(rng', Set.insert pickedAddr pickedAddrs)


-- Divide the picked peers form the ledger into addresses we can use directly and
-- domain names that we need to resolve.
splitPeers :: (Set SockAddr, [DomainAddress])
-> RelayAddress
-> (Set SockAddr, [DomainAddress])
splitPeers (addrs, domains) (RelayDomain domain) = (addrs, domain : domains)
splitPeers (addrs, domains) (RelayAddress ip port) =
let !addr = IP.toSockAddr (ip, port) in
(Set.insert addr addrs, domains)
Expand Up @@ -13,14 +13,16 @@ module Ouroboros.Network.PeerSelection.RootPeersDNS (
-- * DNS based provider for local root peers
localRootPeersProvider,
DomainAddress (..),
RelayAddress (..),
IP.IP (..),
TraceLocalRootPeers(..),

-- * DNS based provider for public root peers
publicRootPeersProvider,
TracePublicRootPeers(..),

-- DNS lookup support
resolveDomainAddresses,

-- * DNS type re-exports
DNS.ResolvConf,
DNS.Domain,
Expand All @@ -31,6 +33,7 @@ module Ouroboros.Network.PeerSelection.RootPeersDNS (
) where

import Data.Word (Word32)
import Data.List (foldl')
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as NonEmpty
import qualified Data.Set as Set
Expand Down Expand Up @@ -59,13 +62,6 @@ import Network.Mux.Timeout

import Ouroboros.Network.PeerSelection.Types

-- | A relay can have either an IP address and a port number or
-- a domain with a port number.
-- TODO: move to a Ledger Peer file.
data RelayAddress = RelayAddressDomain DomainAddress
| RelayAddressAddr IP.IP Socket.PortNumber
deriving (Show, Eq, Ord)

-- | A product of a 'DNS.Domain' and 'Socket.PortNumber'. After resolving the
-- domain we will use the 'Socket.PortNumber' to form 'Socket.SockAddr'.
--
Expand Down Expand Up @@ -396,6 +392,66 @@ publicRootPeersProvider tracer timeout resolvConf domains action = do
-- TTL, and the governor will invoke its exponential backoff.
return (ips, ttl)

-- | Provides DNS resulution functionality.
--
resolveDomainAddresses :: Tracer IO TracePublicRootPeers
-> TimeoutFn IO
-> DNS.ResolvConf
-> [DomainAddress]
-> IO (Map DomainAddress (Set Socket.SockAddr))
resolveDomainAddresses tracer timeout resolvConf domains = do
traceWith tracer (TracePublicRootDomains domains)
#if !defined(mingw32_HOST_OS)
rr <- resolverResource resolvConf
#else
let rr = newResolverResource resolvConf
#endif
resourceVar <- newTVarIO rr
requestPublicRootPeers resourceVar
where
requestPublicRootPeers :: StrictTVar IO (Resource DNSorIOError DNS.Resolver)
-> IO (Map DomainAddress (Set Socket.SockAddr))
requestPublicRootPeers resourceVar = do
rr <- atomically $ readTVar resourceVar
(er, rr') <- withResource rr
atomically $ writeTVar resourceVar rr'
case er of
Left (DNSError err) -> throwIO err
Left (IOError err) -> throwIO err
Right resolver -> do
let lookups =
[ lookupAWithTTL timeout resolvConf resolver daDomain
| DomainAddress {daDomain} <- domains ]
-- The timeouts here are handled by the 'lookupAWithTTL'. They're
-- configured via the DNS.ResolvConf resolvTimeout field and defaults
-- to 3 sec.
results <- withAsyncAll lookups (atomically . mapM waitSTM)
sequence_
[ traceWith tracer $ case result of
Left dnserr -> TracePublicRootFailure daDomain dnserr
Right ipttls -> TracePublicRootResult daDomain ipttls
| (DomainAddress {daDomain}, result) <- zip domains results ]
return $ foldl' buildResult Map.empty $ zip domains results

buildResult :: Map DomainAddress (Set Socket.SockAddr)
-> (DomainAddress, Either DNS.DNSError [(IPv4, DNS.TTL)])
-> Map DomainAddress (Set Socket.SockAddr)
buildResult mr (_, Left _) = mr
buildResult mr (domain, Right ipsttls) =
Map.alter addFn domain mr
where
addFn :: Maybe (Set Socket.SockAddr) -> Maybe (Set Socket.SockAddr)
addFn Nothing =
let ips = map fst ipsttls
!addrs = map (Socket.SockAddrInet (daPortNumber domain) . IP.toHostAddress) ips
!addrSet = Set.fromList addrs in
Just addrSet
addFn (Just addrSet) =
let ips = map fst ipsttls
!addrs = map (Socket.SockAddrInet (daPortNumber domain) . IP.toHostAddress) ips
!addrSet' = Set.union addrSet (Set.fromList addrs) in
Just addrSet'


---------------------------------------------
-- Shared utils
Expand Down

0 comments on commit 5ee829b

Please sign in to comment.