Skip to content

Commit

Permalink
RelayAddress renamed as RelayAccessPoint
Browse files Browse the repository at this point in the history
'DomainAddress' is renamed to 'DomainAccessPoint'.

'RelayAccessPoint' constructors are uniform: they take either ip or
domain and address.  We also provide 'RelayDomainAccessPoint'
bidirectional pattern which links 'RelayAccessDomain' and
'DomainAccessPoint'

Both types, and their instances are moved to their own module.
  • Loading branch information
coot committed Sep 28, 2021
1 parent aeda67a commit 3d76034
Show file tree
Hide file tree
Showing 13 changed files with 293 additions and 247 deletions.
Expand Up @@ -34,7 +34,7 @@ import Ouroboros.Consensus.Shelley.Ledger.Ledger
instance c ~ EraCrypto era
=> LedgerSupportsPeerSelection (ShelleyBlock era) where
getPeers ShelleyLedgerState { shelleyLedgerState } = catMaybes
[ (poolStake,) <$> Map.lookup stakePool poolRelayAddresses
[ (poolStake,) <$> Map.lookup stakePool poolRelayAccessPoints
| (stakePool, poolStake) <- orderByStake poolDistr
]
where
Expand Down Expand Up @@ -64,36 +64,38 @@ instance c ~ EraCrypto era
. SL.nesEs
$ shelleyLedgerState

relayToRelayAddress :: SL.StakePoolRelay -> Maybe RelayAddress
relayToRelayAddress (SL.SingleHostAddr (SJust (Port port)) (SJust ipv4) _) =
Just $ RelayAddress (IPv4 ipv4) (fromIntegral port)
relayToRelayAddress (SL.SingleHostAddr (SJust (Port port)) SNothing (SJust ipv6)) =
Just $ RelayAddress (IPv6 ipv6) (fromIntegral port)
relayToRelayAddress (SL.SingleHostName (SJust (Port port)) dnsName) =
Just $ RelayDomain $ DomainAddress (encodeUtf8 $ dnsToText dnsName) (fromIntegral port)
relayToRelayAddress _ =
relayToRelayAccessPoint :: SL.StakePoolRelay -> Maybe RelayAccessPoint
relayToRelayAccessPoint (SL.SingleHostAddr (SJust (Port port)) (SJust ipv4) _) =
Just $ RelayAccessAddress (IPv4 ipv4) (fromIntegral port)
relayToRelayAccessPoint (SL.SingleHostAddr (SJust (Port port))
SNothing
(SJust ipv6)) =
Just $ RelayAccessAddress (IPv6 ipv6) (fromIntegral port)
relayToRelayAccessPoint (SL.SingleHostName (SJust (Port port)) dnsName) =
Just $ RelayAccessDomain (encodeUtf8 $ dnsToText dnsName) (fromIntegral port)
relayToRelayAccessPoint _ =
-- This could be an unsupported relay (SRV records) or an unusable
-- relay such as a relay with an IP address but without a port number.
Nothing

-- | Note that a stake pool can have multiple registered relays
pparamsRelayAddresses ::
(RelayAddress -> StakePoolRelay)
pparamsRelayAccessPoints ::
(RelayAccessPoint -> StakePoolRelay)
-> SL.PoolParams c
-> Maybe (NonEmpty StakePoolRelay)
pparamsRelayAddresses injStakePoolRelay =
pparamsRelayAccessPoints injStakePoolRelay =
NE.nonEmpty
. force
. mapMaybe (fmap injStakePoolRelay . relayToRelayAddress)
. mapMaybe (fmap injStakePoolRelay . relayToRelayAccessPoint)
. toList
. SL._poolRelays

-- | Combine the stake pools registered in the future and the current pool
-- parameters, and remove duplicates.
poolRelayAddresses ::
poolRelayAccessPoints ::
Map (SL.KeyHash 'SL.StakePool c) (NonEmpty StakePoolRelay)
poolRelayAddresses =
poolRelayAccessPoints =
Map.unionWith
(\futureRelays currentRelays -> NE.nub (futureRelays <> currentRelays))
(Map.mapMaybe (pparamsRelayAddresses FutureRelay) futurePoolParams)
(Map.mapMaybe (pparamsRelayAddresses CurrentRelay) poolParams)
(Map.mapMaybe (pparamsRelayAccessPoints FutureRelay) futurePoolParams)
(Map.mapMaybe (pparamsRelayAccessPoints CurrentRelay) poolParams)
Expand Up @@ -2,39 +2,39 @@ module Ouroboros.Consensus.Ledger.SupportsPeerSelection (
LedgerSupportsPeerSelection (..)
, PoolStake (..)
, StakePoolRelay (..)
, stakePoolRelayAddress
, stakePoolRelayAccessPoint
-- * Re-exports for convenience
, DomainAddress (..)
, DomainAccessPoint (..)
, IP (..)
, PortNumber
, RelayAddress (..)
, RelayAccessPoint (..)
) where

import Control.DeepSeq (NFData (..))
import Data.List.NonEmpty (NonEmpty)

import Ouroboros.Network.PeerSelection.LedgerPeers
(DomainAddress (..), IP (..), PoolStake (..), PortNumber,
RelayAddress (..))
(DomainAccessPoint (..), IP (..), PoolStake (..),
PortNumber, RelayAccessPoint (..))

import Ouroboros.Consensus.Ledger.Abstract (LedgerState)

-- | A relay registered for a stake pool
data StakePoolRelay =
-- | One of the current relays
CurrentRelay RelayAddress
CurrentRelay RelayAccessPoint

-- | One of the future relays
| FutureRelay RelayAddress
| FutureRelay RelayAccessPoint
deriving (Show, Eq)

instance NFData StakePoolRelay where
rnf (CurrentRelay ra) = rnf ra
rnf (FutureRelay ra) = rnf ra

stakePoolRelayAddress :: StakePoolRelay -> RelayAddress
stakePoolRelayAddress (CurrentRelay ra) = ra
stakePoolRelayAddress (FutureRelay ra) = ra
stakePoolRelayAccessPoint :: StakePoolRelay -> RelayAccessPoint
stakePoolRelayAccessPoint (CurrentRelay ra) = ra
stakePoolRelayAccessPoint (FutureRelay ra) = ra

class LedgerSupportsPeerSelection blk where
-- | Return peers registered in the ledger ordered by descending 'PoolStake'.
Expand Down
6 changes: 3 additions & 3 deletions ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs
Expand Up @@ -738,14 +738,14 @@ getPeersFromCurrentLedger ::
(IOLike m, LedgerSupportsPeerSelection blk)
=> NodeKernel m remotePeer localPeer blk
-> (LedgerState blk -> Bool)
-> STM m (Maybe [(PoolStake, NonEmpty RelayAddress)])
-> STM m (Maybe [(PoolStake, NonEmpty RelayAccessPoint)])
getPeersFromCurrentLedger kernel p = do
immutableLedger <-
ledgerState <$> ChainDB.getImmutableLedger (getChainDB kernel)
return $ do
guard (p immutableLedger)
return
$ map (second (fmap stakePoolRelayAddress))
$ map (second (fmap stakePoolRelayAccessPoint))
$ force
$ getPeers immutableLedger

Expand All @@ -759,7 +759,7 @@ getPeersFromCurrentLedgerAfterSlot ::
)
=> NodeKernel m remotePeer localPeer blk
-> SlotNo
-> STM m (Maybe [(PoolStake, NonEmpty RelayAddress)])
-> STM m (Maybe [(PoolStake, NonEmpty RelayAccessPoint)])
getPeersFromCurrentLedgerAfterSlot kernel slotNo =
getPeersFromCurrentLedger kernel afterSlotNo
where
Expand Down
1 change: 1 addition & 0 deletions ouroboros-network/ouroboros-network.cabal
Expand Up @@ -77,6 +77,7 @@ library
Ouroboros.Network.PeerSelection.KnownPeers
Ouroboros.Network.PeerSelection.LedgerPeers
Ouroboros.Network.PeerSelection.LocalRootPeers
Ouroboros.Network.PeerSelection.RelayAccessPoint
Ouroboros.Network.PeerSelection.RootPeersDNS.DNSActions
Ouroboros.Network.PeerSelection.RootPeersDNS
Ouroboros.Network.PeerSelection.Governor
Expand Down
Expand Up @@ -7,10 +7,10 @@
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.PeerSelection.LedgerPeers (
DomainAddress (..),
DomainAccessPoint (..),
IP.IP (..),
LedgerPeersConsensusInterface (..),
RelayAddress (..),
RelayAccessPoint (..),
PoolStake (..),
AccPoolStake (..),
TraceLedgerPeers (..),
Expand Down Expand Up @@ -48,7 +48,7 @@ import System.Random

import Cardano.Slotting.Slot (SlotNo)
import Ouroboros.Network.PeerSelection.RootPeersDNS
(RelayAddress (..), DomainAddress (..))
(RelayAccessPoint (..), DomainAccessPoint (..))

import Text.Printf

Expand All @@ -62,14 +62,14 @@ isLedgerPeersEnabled _ = True
newtype NumberOfPeers = NumberOfPeers Word16 deriving Show

newtype LedgerPeersConsensusInterface m = LedgerPeersConsensusInterface {
lpGetPeers :: SlotNo -> STM m (Maybe [(PoolStake, NonEmpty RelayAddress)])
lpGetPeers :: SlotNo -> STM m (Maybe [(PoolStake, NonEmpty RelayAccessPoint)])
}

-- | Trace LedgerPeers events.
data TraceLedgerPeers =
PickedPeer !RelayAddress !AccPoolStake ! PoolStake
PickedPeer !RelayAccessPoint !AccPoolStake !PoolStake
-- ^ Trace for a peer picked with accumulated and relative stake of its pool.
| PickedPeers !NumberOfPeers ![RelayAddress]
| PickedPeers !NumberOfPeers ![RelayAccessPoint]
-- ^ Trace for the number of peers we wanted to pick and the list of peers picked.
| FetchingNewLedgerState !Int
-- ^ Trace for fetching a new list of peers from the ledger. Int is the number of peers
Expand Down Expand Up @@ -131,16 +131,16 @@ newtype AccPoolStake = AccPoolStake { unAccPoolStake :: Rational }
-- O(log n) time by taking advantage of Map.lookupGE (returns the smallest key greater or equal
-- to the provided value).
--
accPoolStake :: [(PoolStake, NonEmpty RelayAddress)]
-> Map AccPoolStake (PoolStake, NonEmpty RelayAddress)
accPoolStake :: [(PoolStake, NonEmpty RelayAccessPoint)]
-> Map AccPoolStake (PoolStake, NonEmpty RelayAccessPoint)
accPoolStake pl =
let pl' = reRelativeStake pl
ackList = foldl' fn [] pl' in
Map.fromList ackList
where
fn :: [(AccPoolStake, (PoolStake, NonEmpty RelayAddress))]
-> (PoolStake, NonEmpty RelayAddress)
-> [(AccPoolStake, (PoolStake, NonEmpty RelayAddress))]
fn :: [(AccPoolStake, (PoolStake, NonEmpty RelayAccessPoint))]
-> (PoolStake, NonEmpty RelayAccessPoint)
-> [(AccPoolStake, (PoolStake, NonEmpty RelayAccessPoint))]
fn [] (s, rs) =
[(AccPoolStake (unPoolStake s), (s, rs))]
fn ps (s, !rs) =
Expand All @@ -156,8 +156,8 @@ accPoolStake pl =
-- of down stream peers smaller pools are likely to get.
-- https://en.wikipedia.org/wiki/Penrose_method
--
reRelativeStake :: [(PoolStake, NonEmpty RelayAddress)]
-> [(PoolStake, NonEmpty RelayAddress)]
reRelativeStake :: [(PoolStake, NonEmpty RelayAccessPoint)]
-> [(PoolStake, NonEmpty RelayAccessPoint)]
reRelativeStake pl =
let total = sum $ map (adjustment . fst) pl
pl' = map (\(s, rls) -> (adjustment s / total, rls)) pl
Expand All @@ -178,13 +178,13 @@ reRelativeStake pl =
pickPeers :: forall m. Monad m
=> StdGen
-> Tracer m TraceLedgerPeers
-> Map AccPoolStake (PoolStake, NonEmpty RelayAddress)
-> Map AccPoolStake (PoolStake, NonEmpty RelayAccessPoint)
-> NumberOfPeers
-> m (StdGen, [RelayAddress])
-> m (StdGen, [RelayAccessPoint])
pickPeers inRng _ pools _ | Map.null pools = return (inRng, [])
pickPeers inRng tracer pools (NumberOfPeers cnt) = go inRng cnt []
where
go :: StdGen -> Word16 -> [RelayAddress] -> m (StdGen, [RelayAddress])
go :: StdGen -> Word16 -> [RelayAccessPoint] -> m (StdGen, [RelayAccessPoint])
go rng 0 picked = return (rng, picked)
go rng n picked =
let (r :: Word64, rng') = random rng
Expand All @@ -210,15 +210,16 @@ ledgerPeersThread :: forall m.
-> Tracer m TraceLedgerPeers
-> STM m UseLedgerAfter
-> LedgerPeersConsensusInterface m
-> ([DomainAddress] -> m (Map DomainAddress (Set SockAddr)))
-> ([DomainAccessPoint] -> m (Map DomainAccessPoint (Set SockAddr)))
-> STM m NumberOfPeers
-> (Maybe (Set SockAddr, DiffTime) -> STM m ())
-> m Void
ledgerPeersThread inRng tracer readUseLedgerAfter LedgerPeersConsensusInterface{..} doResolve
getReq putRsp =
go inRng (Time 0) Map.empty
where
go :: StdGen -> Time -> Map AccPoolStake (PoolStake, NonEmpty RelayAddress) -> m Void
go :: StdGen -> Time -> Map AccPoolStake (PoolStake, NonEmpty RelayAccessPoint)
-> m Void
go rng oldTs peerMap = do
useLedgerAfter <- atomically readUseLedgerAfter
traceWith tracer (TraceUseLedgerAfter useLedgerAfter)
Expand Down Expand Up @@ -284,11 +285,11 @@ ledgerPeersThread inRng tracer readUseLedgerAfter LedgerPeersConsensusInterface{

-- Divide the picked peers form the ledger into addresses we can use directly and
-- domain names that we need to resolve.
splitPeers :: (Set SockAddr, [DomainAddress])
-> RelayAddress
-> (Set SockAddr, [DomainAddress])
splitPeers (addrs, domains) (RelayDomain domain) = (addrs, domain : domains)
splitPeers (addrs, domains) (RelayAddress ip port) =
splitPeers :: (Set SockAddr, [DomainAccessPoint])
-> RelayAccessPoint
-> (Set SockAddr, [DomainAccessPoint])
splitPeers (addrs, domains) (RelayDomainAccessPoint domain) = (addrs, domain : domains)
splitPeers (addrs, domains) (RelayAccessAddress ip port) =
let !addr = IP.toSockAddr (ip, port) in
(Set.insert addr addrs, domains)

Expand All @@ -303,7 +304,7 @@ withLedgerPeers :: forall m a.
-> Tracer m TraceLedgerPeers
-> STM m UseLedgerAfter
-> LedgerPeersConsensusInterface m
-> ([DomainAddress] -> m (Map DomainAddress (Set SockAddr)))
-> ([DomainAccessPoint] -> m (Map DomainAccessPoint (Set SockAddr)))
-> ( (NumberOfPeers -> m (Maybe (Set SockAddr, DiffTime)))
-> Async m Void
-> m a )
Expand Down
@@ -0,0 +1,111 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ViewPatterns #-}

module Ouroboros.Network.PeerSelection.RelayAccessPoint
( DomainAccessPoint (..)
, RelayAccessPoint (.., RelayDomainAccessPoint)
, IP.IP (..)

-- * Socket type re-exports
, Socket.PortNumber,
) where

import Control.DeepSeq (NFData (..))

import Data.Aeson
import qualified Data.IP as IP
import Data.Text (Text)
import qualified Data.Text as Text
import Data.Text.Encoding (decodeUtf8, encodeUtf8)
import Text.Read (readMaybe)

import qualified Network.DNS as DNS
import qualified Network.Socket as Socket

-- | A product of a 'DNS.Domain' and 'Socket.PortNumber'. After resolving the
-- domain we will use the 'Socket.PortNumber' to form 'Socket.SockAddr'.
--
data DomainAccessPoint = DomainAccessPoint {
dapDomain :: !DNS.Domain,
dapPortNumber :: !Socket.PortNumber
}
deriving (Show, Eq, Ord)

instance FromJSON DomainAccessPoint where
parseJSON = withObject "DomainAccessPoint" $ \v ->
DomainAccessPoint
<$> (encodeUtf8 <$> v .: "addr")
<*> ((fromIntegral :: Int -> Socket.PortNumber) <$> v .: "port")

instance ToJSON DomainAccessPoint where
toJSON da =
object
[ "addr" .= decodeUtf8 (dapDomain da)
, "port" .= (fromIntegral (dapPortNumber da) :: Int)
]

-- | A relay can have either an IP address and a port number or
-- a domain with a port number
--
data RelayAccessPoint = RelayAccessDomain !DNS.Domain !Socket.PortNumber
| RelayAccessAddress !IP.IP !Socket.PortNumber
deriving (Show, Eq, Ord)


-- | 'RelayDomainAccessPoint' a bidirectional pattern which links
-- 'RelayAccessDomain' and 'DomainAccessPoint'.
--
pattern RelayDomainAccessPoint :: DomainAccessPoint -> RelayAccessPoint
pattern RelayDomainAccessPoint dap <- (viewRelayAccessPoint -> Just dap)
where
RelayDomainAccessPoint DomainAccessPoint {dapDomain, dapPortNumber} =
RelayAccessDomain dapDomain dapPortNumber

{-# COMPLETE RelayDomainAccessPoint, RelayAccessAddress #-}

viewRelayAccessPoint :: RelayAccessPoint -> Maybe DomainAccessPoint
viewRelayAccessPoint (RelayAccessDomain dapDomain dapPortNumber) =
Just DomainAccessPoint {dapDomain, dapPortNumber}
viewRelayAccessPoint RelayAccessAddress {} =
Nothing


-- 'IP' nor 'IPv6' is strict, 'IPv4' is strict only because it's a newtype for
-- a primitive type ('Word32').
--
instance NFData RelayAccessPoint where
rnf (RelayAccessDomain !_domain !_port) = ()
rnf (RelayAccessAddress ip !_port) =
case ip of
IP.IPv4 ipv4 -> rnf (IP.fromIPv4w ipv4)
IP.IPv6 ipv6 -> rnf (IP.fromIPv6w ipv6)

instance FromJSON RelayAccessPoint where
parseJSON = withObject "RelayAccessPoint" $ \v -> do
addr <- v .: "addr"
port <- v .: "port"
return (toRelayAccessPoint addr port)

instance ToJSON RelayAccessPoint where
toJSON (RelayAccessDomain addr port) =
object
[ "addr" .= decodeUtf8 addr
, "port" .= (fromIntegral port :: Int)
]
toJSON (RelayAccessAddress ip port) =
object
[ "addr" .= Text.pack (show ip)
, "port" .= (fromIntegral port :: Int)
]

-- | Parse a address field as either an IP address or a DNS address.
-- Returns corresponding RelayAccessPoint.
--
toRelayAccessPoint :: Text -> Int -> RelayAccessPoint
toRelayAccessPoint address port =
case readMaybe (Text.unpack address) of
Nothing -> RelayAccessDomain (encodeUtf8 address) (fromIntegral port)
Just addr -> RelayAccessAddress addr (fromIntegral port)

0 comments on commit 3d76034

Please sign in to comment.