Skip to content

Commit

Permalink
Consensus address type variable changes
Browse files Browse the repository at this point in the history
  • Loading branch information
bolt12 committed Nov 22, 2022
1 parent 3f2c8fa commit ce644b3
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 153 deletions.
22 changes: 12 additions & 10 deletions ouroboros-consensus-test/src/Test/ThreadNet/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ import Ouroboros.Network.Point (WithOrigin (..))
import qualified Ouroboros.Network.Protocol.ChainSync.Type as CS

import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.NodeToNode (MiniProtocolParameters (..))
import Ouroboros.Network.NodeToNode (ConnectionId (..),
MiniProtocolParameters (..))
import Ouroboros.Network.PeerSelection.PeerMetric (nullMetric)
import Ouroboros.Network.Protocol.KeepAlive.Type
import Ouroboros.Network.Protocol.Limits (waitForever)
Expand Down Expand Up @@ -1226,22 +1227,23 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay)

atomically $ writeTVar edgeStatusVar EUp

let miniProtocol ::
let local = CoreId (CoreNodeId 0)
miniProtocol ::
String
-- ^ protocol name
-> (String -> a -> RestartCause)
-> (String -> b -> RestartCause)
-> ( LimitedApp' m NodeId blk
-> NodeToNodeVersion
-> ControlMessageSTM m
-> NodeId
-> ConnectionId NodeId
-> Channel m msg
-> m (a, trailingBytes)
)
-- ^ client action to run on node1
-> ( LimitedApp' m NodeId blk
-> NodeToNodeVersion
-> NodeId
-> ConnectionId NodeId
-> Channel m msg
-> m (b, trailingBytes)
)
Expand All @@ -1252,8 +1254,8 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay)
(chan, dualChan) <-
createConnectedChannelsWithDelay registry (node1, node2, proto) middle
pure
( (retClient (proto <> ".client") . fst) <$> client app1 version (return Continue) (fromCoreNodeId node2) chan
, (retServer (proto <> ".server") . fst) <$> server app2 version (fromCoreNodeId node1) dualChan
( (retClient (proto <> ".client") . fst) <$> client app1 version (return Continue) (ConnectionId local (fromCoreNodeId node2)) chan
, (retServer (proto <> ".server") . fst) <$> server app2 version (ConnectionId local (fromCoreNodeId node1)) dualChan
)

(>>= withAsyncsWaitAny) $
Expand Down Expand Up @@ -1598,14 +1600,14 @@ withAsyncsWaitAny = go [] . NE.toList
-- its use in this module
--
-- Used internal to this module, essentially as an abbreviation.
data LimitedApp m peer blk =
LimitedApp (LimitedApp' m peer blk)
data LimitedApp m addr blk =
LimitedApp (LimitedApp' m addr blk)

-- | Argument of 'LimitedApp' data constructor
--
-- Used internal to this module, essentially as an abbreviation.
type LimitedApp' m peer blk =
NTN.Apps m peer
type LimitedApp' m addr blk =
NTN.Apps m addr
-- The 'ChainSync' and 'BlockFetch' protocols use @'Serialised' x@ for
-- the servers and @x@ for the clients. Since both have to match to be
-- sent across a channel, we can't use @'AnyMessage' ..@, instead, we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import Ouroboros.Consensus.Util.ResourceRegistry
import Ouroboros.Consensus.Util.STM (blockUntilJust,
forkLinkedWatcher)

import Ouroboros.Network.ConnectionId (ConnectionId (..))
import Test.Util.ChainUpdates
import qualified Test.Util.LogicalClock as LogicalClock
import Test.Util.LogicalClock (Tick (..))
Expand Down Expand Up @@ -116,8 +117,8 @@ prop_blockFetch bfcts@BlockFetchClientTestSetup{..} =
-------------------------------------------------------------------------------}

data BlockFetchClientOutcome = BlockFetchClientOutcome {
bfcoBlockFetchResults :: Map PeerId (Either SomeException ())
, bfcoFetchedBlocks :: Map PeerId Word
bfcoBlockFetchResults :: Map (ConnectionId PeerId) (Either SomeException ())
, bfcoFetchedBlocks :: Map (ConnectionId PeerId) Word
, bfcoTrace :: [(Tick, String)]
}

Expand Down Expand Up @@ -276,9 +277,9 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do


mkTestBlockFetchConsensusInterface ::
STM m (Map PeerId (AnchoredFragment (Header TestBlock)))
STM m (Map (ConnectionId PeerId) (AnchoredFragment (Header TestBlock)))
-> BlockFetchClientInterface.ChainDbView m TestBlock
-> BlockFetchConsensusInterface PeerId (Header TestBlock) TestBlock m
-> BlockFetchConsensusInterface (ConnectionId PeerId) (Header TestBlock) TestBlock m
mkTestBlockFetchConsensusInterface getCandidates chainDbView =
BlockFetchClientInterface.mkBlockFetchConsensusInterface
(TestBlockConfig numCoreNodes)
Expand Down Expand Up @@ -322,7 +323,7 @@ ntnVersion = maxBound
data BlockFetchClientTestSetup = BlockFetchClientTestSetup {
-- | A 'Schedule' of 'ChainUpdate's for every peer. This emulates
-- the candidate fragments provided by the ChainSync client.
peerUpdates :: Map PeerId (Schedule ChainUpdate)
peerUpdates :: Map (ConnectionId PeerId) (Schedule ChainUpdate)
-- | BlockFetch 'FetchMode'
, blockFetchMode :: FetchMode
, blockFetchCfg :: BlockFetchConfiguration
Expand All @@ -348,7 +349,9 @@ instance Condense BlockFetchClientTestSetup where
instance Arbitrary BlockFetchClientTestSetup where
arbitrary = do
numPeers <- chooseInt (1, 3)
let peerIds = PeerId <$> [1 .. numPeers]
let local = PeerId 0
peerIds = map (ConnectionId local)
$ PeerId <$> [1 .. numPeers]
peerUpdates <-
Map.fromList . zip peerIds
<$> replicateM numPeers genUpdateSchedule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,16 @@ data Handlers m peer blk = Handlers {
}

mkHandlers
:: forall m blk remotePeer localPeer.
:: forall m blk addrNTN localPeer.
( IOLike m
, LedgerSupportsMempool blk
, LedgerSupportsProtocol blk
, QueryLedger blk
, ConfigSupportsNode blk
)
=> NodeKernelArgs m remotePeer localPeer blk
-> NodeKernel m remotePeer localPeer blk
-> Handlers m localPeer blk
=> NodeKernelArgs m addrNTN localPeer blk
-> NodeKernel m addrNTN localPeer blk
-> Handlers m localPeer blk
mkHandlers NodeKernelArgs {cfg, tracers} NodeKernel {getChainDB, getMempool} =
Handlers {
hChainSyncServer =
Expand Down Expand Up @@ -383,7 +383,7 @@ data Apps m peer bCS bTX bSQ bTM a = Apps {

-- | Construct the 'NetworkApplication' for the node-to-client protocols
mkApps
:: forall m remotePeer localPeer blk e bCS bTX bSQ bTM.
:: forall m addrNTN localPeer blk e bCS bTX bSQ bTM.
( IOLike m
, Exception e
, ShowProxy blk
Expand All @@ -393,7 +393,7 @@ mkApps
, ShowProxy (GenTxId blk)
, ShowQuery (BlockQuery blk)
)
=> NodeKernel m remotePeer localPeer blk
=> NodeKernel m addrNTN localPeer blk
-> Tracers m localPeer blk e
-> Codecs blk e m bCS bTX bSQ bTM
-> Handlers m localPeer blk
Expand Down
80 changes: 40 additions & 40 deletions ouroboros-consensus/src/Ouroboros/Consensus/Network/NodeToNode.hs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ import Ouroboros.Consensus.Storage.Serialisation (SerialisedHeader)
-------------------------------------------------------------------------------}

-- | Protocol handlers for node-to-node (remote) communication
data Handlers m peer blk = Handlers {
data Handlers m addr blk = Handlers {
hChainSyncClient
:: peer
:: ConnectionId addr
-> NodeToNodeVersion
-> ControlMessageSTM m
-> HeaderMetricsTracer m
Expand Down Expand Up @@ -147,41 +147,41 @@ data Handlers m peer blk = Handlers {
, hTxSubmissionClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> peer
-> ConnectionId addr
-> TxSubmissionClient (GenTxId blk) (GenTx blk) m ()

, hTxSubmissionServer
:: NodeToNodeVersion
-> peer
-> ConnectionId addr
-> TxSubmissionServerPipelined (GenTxId blk) (GenTx blk) m ()

, hKeepAliveClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> peer
-> StrictTVar m (Map peer PeerGSV)
-> ConnectionId addr
-> StrictTVar m (Map (ConnectionId addr) PeerGSV)
-> KeepAliveInterval
-> KeepAliveClient m ()

, hKeepAliveServer
:: NodeToNodeVersion
-> peer
-> ConnectionId addr
-> KeepAliveServer m ()
}

mkHandlers
:: forall m blk remotePeer localPeer.
:: forall m blk addrNTN localPeer.
( IOLike m
, MonadTime m
, MonadTimer m
, LedgerSupportsMempool blk
, HasTxId (GenTx blk)
, LedgerSupportsProtocol blk
, Ord remotePeer
, Ord addrNTN
)
=> NodeKernelArgs m remotePeer localPeer blk
-> NodeKernel m remotePeer localPeer blk
-> Handlers m remotePeer blk
=> NodeKernelArgs m addrNTN localPeer blk
-> NodeKernel m addrNTN localPeer blk
-> Handlers m addrNTN blk
mkHandlers
NodeKernelArgs {keepAliveRng, miniProtocolParameters}
NodeKernel {getChainDB, getMempool, getTopLevelConfig, getTracers = tracers} =
Expand Down Expand Up @@ -392,33 +392,33 @@ type ServerApp m peer bytes a =
-- | Applications for the node-to-node protocols
--
-- See 'Network.Mux.Types.MuxApplication'
data Apps m peer bCS bBF bTX bKA a b = Apps {
data Apps m addr bCS bBF bTX bKA a b = Apps {
-- | Start a chain sync client that communicates with the given upstream
-- node.
aChainSyncClient :: ClientApp m peer bCS a
aChainSyncClient :: ClientApp m (ConnectionId addr) bCS a

-- | Start a chain sync server.
, aChainSyncServer :: ServerApp m peer bCS b
, aChainSyncServer :: ServerApp m (ConnectionId addr) bCS b

-- | Start a block fetch client that communicates with the given
-- upstream node.
, aBlockFetchClient :: ClientApp m peer bBF a
, aBlockFetchClient :: ClientApp m (ConnectionId addr) bBF a

-- | Start a block fetch server.
, aBlockFetchServer :: ServerApp m peer bBF b
, aBlockFetchServer :: ServerApp m (ConnectionId addr) bBF b

-- | Start a transaction submission v2 client that communicates with the
-- given upstream node.
, aTxSubmission2Client :: ClientApp m peer bTX a
, aTxSubmission2Client :: ClientApp m (ConnectionId addr) bTX a

-- | Start a transaction submission v2 server.
, aTxSubmission2Server :: ServerApp m peer bTX b
, aTxSubmission2Server :: ServerApp m (ConnectionId addr) bTX b

-- | Start a keep-alive client.
, aKeepAliveClient :: ClientApp m peer bKA a
, aKeepAliveClient :: ClientApp m (ConnectionId addr) bKA a

-- | Start a keep-alive server.
, aKeepAliveServer :: ServerApp m peer bKA b
, aKeepAliveServer :: ServerApp m (ConnectionId addr) bKA b
}


Expand Down Expand Up @@ -472,32 +472,32 @@ byteLimits = ByteLimits {

-- | Construct the 'NetworkApplication' for the node-to-node protocols
mkApps
:: forall m remotePeer localPeer blk e bCS bBF bTX bKA.
:: forall m addrNTN localPeer blk e bCS bBF bTX bKA.
( IOLike m
, MonadTimer m
, Ord remotePeer
, Ord addrNTN
, Exception e
, LedgerSupportsProtocol blk
, ShowProxy blk
, ShowProxy (Header blk)
, ShowProxy (TxId (GenTx blk))
, ShowProxy (GenTx blk)
)
=> NodeKernel m remotePeer localPeer blk -- ^ Needed for bracketing only
-> Tracers m remotePeer blk e
=> NodeKernel m addrNTN localPeer blk -- ^ Needed for bracketing only
-> Tracers m (ConnectionId addrNTN) blk e
-> (NodeToNodeVersion -> Codecs blk e m bCS bCS bBF bBF bTX bKA)
-> ByteLimits bCS bBF bTX bKA
-> m ChainSyncTimeout
-> ReportPeerMetrics m remotePeer
-> Handlers m remotePeer blk
-> Apps m remotePeer bCS bBF bTX bKA NodeToNodeInitiatorResult ()
-> ReportPeerMetrics m (ConnectionId addrNTN)
-> Handlers m addrNTN blk
-> Apps m addrNTN bCS bBF bTX bKA NodeToNodeInitiatorResult ()
mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPeerMetrics {..} Handlers {..} =
Apps {..}
where
aChainSyncClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> ConnectionId addrNTN
-> Channel m bCS
-> m (NodeToNodeInitiatorResult, Maybe bCS)
aChainSyncClient version controlMessageSTM them channel = do
Expand Down Expand Up @@ -531,7 +531,7 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe

aChainSyncServer
:: NodeToNodeVersion
-> remotePeer
-> ConnectionId addrNTN
-> Channel m bCS
-> m ((), Maybe bCS)
aChainSyncServer version them channel = do
Expand Down Expand Up @@ -559,7 +559,7 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe
aBlockFetchClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> ConnectionId addrNTN
-> Channel m bBF
-> m (NodeToNodeInitiatorResult, Maybe bBF)
aBlockFetchClient version controlMessageSTM them channel = do
Expand All @@ -578,7 +578,7 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe

aBlockFetchServer
:: NodeToNodeVersion
-> remotePeer
-> ConnectionId addrNTN
-> Channel m bBF
-> m ((), Maybe bBF)
aBlockFetchServer version them channel = do
Expand All @@ -596,7 +596,7 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe
aTxSubmission2Client
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> ConnectionId addrNTN
-> Channel m bTX
-> m (NodeToNodeInitiatorResult, Maybe bTX)
aTxSubmission2Client version controlMessageSTM them channel = do
Expand All @@ -612,7 +612,7 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe

aTxSubmission2Server
:: NodeToNodeVersion
-> remotePeer
-> ConnectionId addrNTN
-> Channel m bTX
-> m ((), Maybe bTX)
aTxSubmission2Server version them channel = do
Expand All @@ -628,7 +628,7 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe
aKeepAliveClient
:: NodeToNodeVersion
-> ControlMessageSTM m
-> remotePeer
-> ConnectionId addrNTN
-> Channel m bKA
-> m (NodeToNodeInitiatorResult, Maybe bKA)
aKeepAliveClient version _controlMessageSTM them channel = do
Expand All @@ -649,7 +649,7 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe

aKeepAliveServer
:: NodeToNodeVersion
-> remotePeer
-> ConnectionId addrNTN
-> Channel m bKA
-> m ((), Maybe bKA)
aKeepAliveServer version _them channel = do
Expand All @@ -676,8 +676,8 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe
initiator
:: MiniProtocolParameters
-> NodeToNodeVersion
-> Apps m (ConnectionId peer) b b b b a c
-> OuroborosBundle 'InitiatorMode peer b m a Void
-> Apps m addr b b b b a c
-> OuroborosBundle 'InitiatorMode addr b m a Void
initiator miniProtocolParameters version Apps {..} =
nodeToNodeProtocols
miniProtocolParameters
Expand Down Expand Up @@ -707,8 +707,8 @@ initiator miniProtocolParameters version Apps {..} =
initiatorAndResponder
:: MiniProtocolParameters
-> NodeToNodeVersion
-> Apps m (ConnectionId peer) b b b b a c
-> OuroborosBundle 'InitiatorResponderMode peer b m a c
-> Apps m addr b b b b a c
-> OuroborosBundle 'InitiatorResponderMode addr b m a c
initiatorAndResponder miniProtocolParameters version Apps {..} =
nodeToNodeProtocols
miniProtocolParameters
Expand Down
Loading

0 comments on commit ce644b3

Please sign in to comment.