Skip to content

Commit

Permalink
Refactored DiffusionArguments to accomodate root peers as TVars
Browse files Browse the repository at this point in the history
Updated ouroboros-consensus to accomodate new DiffusionArguments type signature

Removed daStaticLocalRootPeers since RelayAddresses are used everywhere

Removed redundant imports

Fixed ToJSON/FromJSON instance of DomainAddress and RelayAddress
  • Loading branch information
bolt12 committed May 4, 2021
1 parent e1925d8 commit 73c1540
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 58 deletions.
4 changes: 2 additions & 2 deletions ouroboros-consensus/src/Ouroboros/Consensus/Node.hs
Expand Up @@ -605,7 +605,7 @@ stdVersionDataNTC networkMagic = NodeToClientVersionData

stdRunDataDiffusion ::
DiffusionTracers
-> DiffusionArguments
-> DiffusionArguments IO
-> DiffusionApplications
RemoteAddress LocalAddress
NodeToNodeVersionData NodeToClientVersionData
Expand All @@ -624,7 +624,7 @@ data StdRunNodeArgs m blk = StdRunNodeArgs
-- ^ If @True@, validate the ChainDB on init no matter what
, srnDatabasePath :: FilePath
-- ^ Location of the DBs
, srnDiffusionArguments :: DiffusionArguments
, srnDiffusionArguments :: DiffusionArguments m
, srnDiffusionTracers :: DiffusionTracers
, srnEnableInDevelopmentVersions :: Bool
-- ^ If @False@, then the node will limit the negotiated NTN and NTC
Expand Down
43 changes: 14 additions & 29 deletions ouroboros-network/src/Ouroboros/Network/Diffusion.hs
Expand Up @@ -49,7 +49,7 @@ import Control.Tracer (Tracer, nullTracer, traceWith)
import Data.Foldable (asum)
import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as NonEmpty
import qualified Data.Map as Map
import Data.Map (Map)
import Data.Maybe (catMaybes, maybeToList)
import Data.Set (Set)
import Data.Void (Void)
Expand Down Expand Up @@ -88,11 +88,12 @@ import Ouroboros.Network.ConnectionHandler
import Ouroboros.Network.RethrowPolicy
import qualified Ouroboros.Network.Diffusion.Policies as Diffusion.Policies
import Ouroboros.Network.IOManager
import Ouroboros.Network.PeerSelection.RootPeersDNS ( DomainAddress
, resolveDomainAddresses
, RelayAddress (..)
)
import Ouroboros.Network.InboundGovernor (InboundGovernorTrace (..))
import Ouroboros.Network.PeerSelection.RootPeersDNS ( resolveDomainAddresses
, RelayAddress(..)
, TraceLocalRootPeers(..)
, TracePublicRootPeers(..)
)
import qualified Ouroboros.Network.PeerSelection.Governor as Governor
import Ouroboros.Network.PeerSelection.Governor.Types ( TracePeerSelection (..)
, DebugPeerSelection (..)
Expand Down Expand Up @@ -132,9 +133,6 @@ import Ouroboros.Network.NodeToNode ( ConnectionId (..)
, nodeToNodeHandshakeCodec
)
import qualified Ouroboros.Network.NodeToNode as NodeToNode
import Ouroboros.Network.PeerSelection.RootPeersDNS ( TraceLocalRootPeers (..)
, TracePublicRootPeers (..)
)


-- TODO: use LocalAddress where appropriate rather than 'path'.
Expand Down Expand Up @@ -266,7 +264,7 @@ nullTracers = DiffusionTracers {

-- | Network Node argumets
--
data DiffusionArguments = DiffusionArguments {
data DiffusionArguments m = DiffusionArguments {
daIPv4Address :: Maybe (Either Socket.Socket AddrInfo)
-- ^ an @IPv4@ socket ready to accept connections or an @IPv4@ addresses
, daIPv6Address :: Maybe (Either Socket.Socket AddrInfo)
Expand All @@ -277,10 +275,9 @@ data DiffusionArguments = DiffusionArguments {
, daPeerSelectionTargets :: PeerSelectionTargets
-- ^ selection targets for the peer governor

, daStaticLocalRootPeers :: [(Socket.SockAddr, PeerAdvertise)]
, daLocalRootPeers :: [(DomainAddress, PeerAdvertise)]
, daPublicRootPeers :: [DomainAddress]
, daUseLedgerAfter :: UseLedgerAfter
, daLocalRootPeersVar :: StrictTVar m [(Int, Map RelayAddress PeerAdvertise)]
, daPublicRootPeersVar :: StrictTVar m [RelayAddress]
, daUseLedgerAfterVar :: StrictTVar m UseLedgerAfter

, daAcceptedConnectionsLimit :: AcceptedConnectionsLimit
-- ^ parameters for limiting number of accepted connections
Expand Down Expand Up @@ -568,7 +565,7 @@ type NodeToNodePeerSelectionActions (mode :: MuxMode) a =
--
runDataDiffusion
:: DiffusionTracers
-> DiffusionArguments
-> DiffusionArguments IO
-> DiffusionApplications
RemoteAddress LocalAddress
NodeToNodeVersionData NodeToClientVersionData
Expand All @@ -579,10 +576,9 @@ runDataDiffusion tracers
, daIPv6Address
, daLocalAddress
, daPeerSelectionTargets
, daStaticLocalRootPeers
, daLocalRootPeers
, daPublicRootPeers
, daUseLedgerAfter
, daLocalRootPeersVar
, daPublicRootPeersVar
, daUseLedgerAfterVar
, daAcceptedConnectionsLimit
, daDiffusionMode
, daProtocolIdleTimeout
Expand Down Expand Up @@ -666,15 +662,6 @@ runDataDiffusion tracers
targetNumberOfActivePeers = min 2 (targetNumberOfActivePeers daPeerSelectionTargets)
}

-- v TODO: This is just a simple transformation
daLocalRootPeersVar <- newTVarIO $
([( 1
, Map.fromList $
map (\(d,p) -> (RelayDomain d, p))
daLocalRootPeers)])
daPublicRootPeersVar <- newTVarIO $ map RelayDomain daPublicRootPeers
daUseLedgerAfterVar <- newTVarIO daUseLedgerAfter

let -- snocket for remote communication.
snocket :: SocketSnocket
snocket = Snocket.socketSnocket iocp
Expand Down Expand Up @@ -853,7 +840,6 @@ runDataDiffusion tracers
dtTracePublicRootPeersTracer
timeout
(readTVar peerSelectionTargetsVar)
(Map.fromList daStaticLocalRootPeers)
daLocalRootPeersVar
daPublicRootPeersVar
peerStateActions
Expand Down Expand Up @@ -964,7 +950,6 @@ runDataDiffusion tracers
dtTracePublicRootPeersTracer
timeout
(readTVar peerSelectionTargetsVar)
(Map.fromList daStaticLocalRootPeers)
daLocalRootPeersVar
daPublicRootPeersVar
peerStateActions
Expand Down
Expand Up @@ -45,7 +45,9 @@ import qualified Data.Map.Strict as Map
import Data.Map.Strict (Map)
import qualified Data.IntMap.Strict as IntMap
import Data.Text.Encoding (decodeUtf8, encodeUtf8)
import Data.Text (Text)
import qualified Data.Text as Text
import Text.Read (readMaybe)
import Data.Void (Void)

import Control.Applicative ((<|>))
Expand Down Expand Up @@ -81,13 +83,13 @@ data DomainAddress = DomainAddress {
instance FromJSON DomainAddress where
parseJSON = withObject "DomainAddress" $ \v ->
DomainAddress
<$> (encodeUtf8 <$> v .: "domain")
<$> (encodeUtf8 <$> v .: "addr")
<*> ((fromIntegral :: Int -> Socket.PortNumber) <$> v .: "port")

instance ToJSON DomainAddress where
toJSON da =
object
[ "domain" .= decodeUtf8 (daDomain da)
[ "addr" .= decodeUtf8 (daDomain da)
, "port" .= (fromIntegral (daPortNumber da) :: Int)
]

Expand All @@ -99,27 +101,30 @@ data RelayAddress = RelayDomain !DomainAddress

instance FromJSON RelayAddress where
parseJSON = withObject "RelayAddress" $ \v -> do
isDomain <- v .: "isDomain"
if isDomain
then RelayDomain
<$> v .: "domain"
else RelayAddress
<$> (read . Text.unpack <$> v .: "addr")
<*> ((fromIntegral :: Int -> Socket.PortNumber) <$> v .: "port")
addr <- v .: "addr"
port <- v .: "port"
return (toRelayAddress addr port)

instance ToJSON RelayAddress where
toJSON (RelayDomain domain) =
toJSON (RelayDomain (DomainAddress addr port)) =
object
[ "isDomain" .= True
, "domain" .= domain
[ "addr" .= decodeUtf8 addr
, "port" .= (fromIntegral port :: Int)
]
toJSON (RelayAddress ip port) =
object
[ "isDomain" .= False
, "addr" .= Text.pack (show ip)
[ "addr" .= Text.pack (show ip)
, "port" .= (fromIntegral port :: Int)
]

-- | Parse a address field as either an IP address or a DNS address.
-- Returns corresponding RelayAddress.
--
toRelayAddress :: Text -> Int -> RelayAddress
toRelayAddress address port =
case readMaybe (Text.unpack address) of
Nothing -> RelayDomain (DomainAddress (encodeUtf8 address) (fromIntegral port))
Just addr -> RelayAddress addr (fromIntegral port)

-----------------------------------------------
-- Resource
Expand Down
15 changes: 2 additions & 13 deletions ouroboros-network/src/Ouroboros/Network/PeerSelection/Simple.hs
Expand Up @@ -16,7 +16,6 @@ import Control.Monad.Class.MonadSTM.Strict
import Control.Tracer (Tracer)

import Data.Map (Map)
import qualified Data.Map as Map
import Data.Set (Set)
import Data.Void (Void)

Expand All @@ -35,8 +34,6 @@ withPeerSelectionActions
-> Tracer IO TracePublicRootPeers
-> TimeoutFn IO
-> STM IO PeerSelectionTargets
-> Map Socket.SockAddr PeerAdvertise
-- ^ static local root peers
-> StrictTVar IO [(Int, Map RelayAddress PeerAdvertise)]
-- ^ local root peers
-> StrictTVar IO [RelayAddress]
Expand All @@ -48,20 +45,12 @@ withPeerSelectionActions
-- ^ continuation, recieves a handle to the local roots peer provider thread
-- (only if local root peers where non-empty).
-> IO a
withPeerSelectionActions localRootTracer publicRootTracer timeout readTargets staticLocalRootPeers
withPeerSelectionActions localRootTracer publicRootTracer timeout readTargets
localRootPeersVar publicRootPeersVar peerStateActions reqLedgerPeers getLedgerPeers k = do
localRootsVar <- newTVarIO []
let peerSelectionActions = PeerSelectionActions {
readPeerSelectionTargets = readTargets,
readLocalRootPeers = do
localRoots <- readTVar localRootsVar

-- TODO: Config support
-- For now use 1 target per ipaddress/domain name.
let staticLocalRootPeers' = map (\(sa, a) ->
(1, Map.singleton sa a)) $ Map.toList staticLocalRootPeers

pure $ staticLocalRootPeers' ++ localRoots,
readLocalRootPeers = readTVar localRootsVar,
requestPublicRootPeers = requestLedgerPeers,
requestPeerGossip = \_ -> pure [],
peerStateActions
Expand Down

0 comments on commit 73c1540

Please sign in to comment.