Skip to content

Commit

Permalink
refactor P2P api
Browse files Browse the repository at this point in the history
Co-authored-by: Mark Tullsen <mtullsen@users.noreply.github.com>
  • Loading branch information
coot and mtullsen committed Oct 28, 2022
1 parent 8396e9d commit 2c2cafd
Showing 1 changed file with 105 additions and 112 deletions.
217 changes: 105 additions & 112 deletions ouroboros-network/src/Ouroboros/Network/Diffusion/P2P.hs
Expand Up @@ -445,6 +445,7 @@ type NodeToNodePeerSelectionActions (mode :: MuxMode) ntnAddr m a =
(NodeToNodePeerConnectionHandle mode ntnAddr m a)
m


data Interfaces ntnFd ntnAddr ntnVersion ntnVersionData
ntcFd ntcAddr ntcVersion ntcVersionData
resolver resolverError
Expand Down Expand Up @@ -656,57 +657,22 @@ runM Interfaces
-> throwIO (UnexpectedIPv6Address addr)
Nothing -> pure ()

lookupReqs <- case (cmIPv4Address, cmIPv6Address) of
(Just _, Nothing) -> return LookupReqAOnly
(Nothing, Just _) -> return LookupReqAAAAOnly
(Just _, Just _) -> return LookupReqAAndAAAA
_ ->
throwIO (NoSocket :: Failure RemoteAddress)

-- control channel for the server; only required in
-- @'InitiatorResponderMode' :: 'MuxMode'@
cmdInMode
<- case diffusionMode of
InitiatorOnlyDiffusionMode ->
-- action which we pass to connection handler
pure (HasInitiator CMDInInitiatorMode)
InitiatorAndResponderDiffusionMode -> do
-- we pass 'Server.newOutboundConnection serverControlChannel' to
-- connection handler
HasInitiatorResponder <$>
(CMDInInitiatorResponderMode
<$> Server.newControlChannel
<*> Server.newObservableStateVar ntnInbgovRng)

localControlChannel <- Server.newControlChannel
localServerStateVar <- Server.newObservableStateVar ntcInbgovRng

-- RNGs used for picking random peers from the ledger and for
-- demoting/promoting peers.
policyRngVar <- newTVarIO policyRng

churnModeVar <- newTVarIO ChurnModeNormal

peerSelectionTargetsVar <- newTVarIO $ daPeerSelectionTargets {
-- Start with a smaller number of active peers, the churn governor will increase
-- it to the configured value after a delay.
targetNumberOfActivePeers =
min 2 (targetNumberOfActivePeers daPeerSelectionTargets)
}

let localConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0

--
-- local connection manager
--
localThread :: Maybe (m Void)
--
-- local connection manager
--
let localThread :: Maybe (m Void)
localThread =
case daLocalAddress of
Nothing -> Nothing
Just localAddr ->
Just $ withLocalSocket tracer diNtcGetFileDescriptor diNtcSnocket localAddr
$ \localSocket -> do
let localConnectionHandler :: NodeToClientConnectionHandler
localControlChannel <- Server.newControlChannel
localServerStateVar <- Server.newObservableStateVar ntcInbgovRng

let localConnectionLimits = AcceptedConnectionsLimit maxBound maxBound 0

localConnectionHandler :: NodeToClientConnectionHandler
ntcFd ntcAddr ntcVersion ntcVersionData m
localConnectionHandler =
makeConnectionHandler
Expand Down Expand Up @@ -751,34 +717,69 @@ runM Interfaces
ntcVersionData m)
-> do

--
-- run local server
--
--
-- run local server
--

traceWith tracer . RunLocalServer
=<< Snocket.getLocalAddr diNtcSnocket localSocket

Async.withAsync
(Server.run
ServerArguments {
serverSockets = localSocket :| [],
serverSnocket = diNtcSnocket,
serverTracer = dtLocalServerTracer,
serverTrTracer = nullTracer, -- TODO: issue #3320
serverInboundGovernorTracer = dtLocalInboundGovernorTracer,
serverInboundIdleTimeout = local_PROTOCOL_IDLE_TIMEOUT,
serverConnectionLimits = localConnectionLimits,
serverConnectionManager = localConnectionManager,
serverControlChannel = localControlChannel,
serverObservableStateVar = localServerStateVar
}) Async.wait
traceWith tracer . RunLocalServer
=<< Snocket.getLocalAddr diNtcSnocket localSocket

Async.withAsync
(Server.run
ServerArguments {
serverSockets = localSocket :| [],
serverSnocket = diNtcSnocket,
serverTracer = dtLocalServerTracer,
serverTrTracer = nullTracer, -- TODO: issue #3320
serverInboundGovernorTracer = dtLocalInboundGovernorTracer,
serverInboundIdleTimeout = local_PROTOCOL_IDLE_TIMEOUT,
serverConnectionLimits = localConnectionLimits,
serverConnectionManager = localConnectionManager,
serverControlChannel = localControlChannel,
serverObservableStateVar = localServerStateVar
}) Async.wait

--
-- remote connection manager
--
--
-- remote connection manager
--
let remoteThread :: m Void
remoteThread = do
lookupReqs <- case (cmIPv4Address, cmIPv6Address) of
(Just _, Nothing) -> return LookupReqAOnly
(Nothing, Just _) -> return LookupReqAAAAOnly
(Just _, Just _) -> return LookupReqAAndAAAA
_ ->
throwIO (NoSocket :: Failure RemoteAddress)

-- control channel for the server; only required in
-- @'InitiatorResponderMode' :: 'MuxMode'@
cmdInMode
<- case diffusionMode of
InitiatorOnlyDiffusionMode ->
-- action which we pass to connection handler
pure (HasInitiator CMDInInitiatorMode)
InitiatorAndResponderDiffusionMode ->
-- we pass 'Server.newOutboundConnection serverControlChannel' to
-- connection handler
-- GR-FIXME: last comment? not understandable in context. DEI?
HasInitiatorResponder <$>
(CMDInInitiatorResponderMode
<$> Server.newControlChannel
<*> Server.newObservableStateVar ntnInbgovRng)

-- RNGs used for picking random peers from the ledger and for
-- demoting/promoting peers.
policyRngVar <- newTVarIO policyRng

churnModeVar <- newTVarIO ChurnModeNormal

peerSelectionTargetsVar <- newTVarIO $ daPeerSelectionTargets {
-- Start with a smaller number of active peers, the churn governor will increase
-- it to the configured value after a delay.
targetNumberOfActivePeers =
min 2 (targetNumberOfActivePeers daPeerSelectionTargets)
}

remoteThread :: m Void
remoteThread =
withLedgerPeers
ledgerPeersRng
diNtnToPeerAddr
Expand Down Expand Up @@ -809,14 +810,11 @@ runM Interfaces
cmAddressType = diNtnAddressType,
cmSnocket = diNtnSnocket,
connectionDataFlow = uncurry diNtnDataFlow,
cmPrunePolicy =
case cmdInMode of
HasInitiator CMDInInitiatorMode ->
-- Server is not running, it will not be able to
-- advise which connections to prune. It's also not
-- expected that the governor targets will be larger
-- than limits imposed by 'cmConnectionsLimits'.
simplePrunePolicy,
cmPrunePolicy = simplePrunePolicy,
-- Server is not running, it will not be able to
-- advise which connections to prune. It's also not
-- expected that the governor targets will be larger
-- than limits imposed by 'cmConnectionsLimits'.
cmConnectionsLimits = daAcceptedConnectionsLimit,
cmTimeWaitTimeout = daTimeWaitTimeout,
cmOutboundIdleTimeout = daProtocolIdleTimeout
Expand Down Expand Up @@ -882,35 +880,33 @@ runM Interfaces
:: NodeToNodePeerSelectionActions
InitiatorMode ntnAddr m Void) ->

Async.withAsync
(Governor.peerSelectionGovernor
dtTracePeerSelectionTracer
dtDebugPeerSelectionInitiatorTracer
dtTracePeerSelectionCounters
fuzzRng
peerSelectionActions
(Diffusion.Policies.simplePeerSelectionPolicy
policyRngVar (readTVar churnModeVar) daPeerMetrics))
$ \governorThread ->
Async.withAsync
(Governor.peerChurnGovernor
dtTracePeerSelectionTracer
daPeerMetrics
churnModeVar
churnRng
daBlockFetchMode
daPeerSelectionTargets
peerSelectionTargetsVar)
$ \churnGovernorThread ->

-- wait for any thread to fail
snd <$> Async.waitAny
(maybeToList mbLocalPeerSelectionActionsThread
++ [ governorThread
, ledgerPeerThread
, churnGovernorThread
])

(Governor.peerSelectionGovernor
dtTracePeerSelectionTracer
dtDebugPeerSelectionInitiatorTracer
dtTracePeerSelectionCounters
fuzzRng
peerSelectionActions
(Diffusion.Policies.simplePeerSelectionPolicy
policyRngVar (readTVar churnModeVar) daPeerMetrics))
$ \governorThread ->
Async.withAsync
(Governor.peerChurnGovernor
dtTracePeerSelectionTracer
daPeerMetrics
churnModeVar
churnRng
daBlockFetchMode
daPeerSelectionTargets
peerSelectionTargetsVar)
$ \churnGovernorThread ->
-- wait for any thread to fail
snd <$> Async.waitAny
(maybeToList mbLocalPeerSelectionActionsThread
++ [ governorThread
, ledgerPeerThread
, churnGovernorThread
])

-- InitiatorResponderMode
--
Expand All @@ -935,10 +931,7 @@ runM Interfaces
cmAddressType = diNtnAddressType,
cmSnocket = diNtnSnocket,
connectionDataFlow = uncurry diNtnDataFlow,
cmPrunePolicy =
case cmdInMode of
HasInitiatorResponder (CMDInInitiatorResponderMode _ serverStateVar) ->
Diffusion.Policies.prunePolicy serverStateVar,
cmPrunePolicy = Diffusion.Policies.prunePolicy observableStateVar,
cmConnectionsLimits = daAcceptedConnectionsLimit,
cmTimeWaitTimeout = daTimeWaitTimeout,
cmOutboundIdleTimeout = daProtocolIdleTimeout
Expand Down Expand Up @@ -967,6 +960,7 @@ runM Interfaces
InitiatorResponderMode ntnFd ntnAddr ntnVersion m ()
) -> do
diInstallSigUSR1Handler connectionManager

--
-- peer state actions
--
Expand All @@ -985,7 +979,6 @@ runM Interfaces
$ \(peerStateActions
:: NodeToNodePeerStateActions
InitiatorResponderMode ntnAddr m ()) ->

--
-- Run peer selection (p2p governor)
--
Expand Down Expand Up @@ -1193,7 +1186,6 @@ run tracers tracersExtra args argsExtra apps appsExtra = do
}
tracers tracersExtra args argsExtra apps appsExtra


--
-- Data flow
--
Expand Down Expand Up @@ -1321,3 +1313,4 @@ withLocalSocket tracer getFileDescriptor sn localAddress k =

-- pre-configured systemd socket
Left sd -> k sd

0 comments on commit 2c2cafd

Please sign in to comment.