Skip to content

Commit

Permalink
ouroboros-network update
Browse files Browse the repository at this point in the history
  • Loading branch information
coot committed May 30, 2023
1 parent 044c65c commit a27a3bf
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 91 deletions.
Expand Up @@ -41,7 +41,7 @@ import System.FS.IO (ioHasFS)
serve ::
SockAddr
-> N2N.Versions N2N.NodeToNodeVersion N2N.NodeToNodeVersionData
(OuroborosApplication 'ResponderMode SockAddr BL.ByteString IO Void ())
(OuroborosApplication 'ResponderMode (N2N.MinimalInitiatorContext SockAddr) (N2N.ResponderContext SockAddr) BL.ByteString IO Void ())
-> IO Void
serve sockAddr application = withIOManager \iocp -> do
let sn = Snocket.socketSnocket iocp
Expand Down
Expand Up @@ -45,8 +45,8 @@ import Ouroboros.Network.Block (ChainUpdate (..), Tip (..))
import Ouroboros.Network.Driver (runPeer)
import Ouroboros.Network.KeepAlive (keepAliveServer)
import Ouroboros.Network.Magic (NetworkMagic)
import Ouroboros.Network.Mux (MiniProtocol (..), MuxMode (..),
MuxPeer (..), OuroborosApplication (..),
import Ouroboros.Network.Mux (MiniProtocol (..), MiniProtocolCb (..), MuxMode (..),
OuroborosApplication (..),
RunMiniProtocol (..))
import Ouroboros.Network.NodeToNode (NodeToNodeVersionData (..),
Versions (..))
Expand All @@ -72,7 +72,7 @@ immDBServer ::
-> ImmutableDB m blk
-> NetworkMagic
-> Versions NodeToNodeVersion NodeToNodeVersionData
(OuroborosApplication 'ResponderMode addr BL.ByteString m Void ())
(OuroborosApplication 'ResponderMode (N2N.MinimalInitiatorContext addr) (N2N.ResponderContext addr) BL.ByteString m Void ())
immDBServer codecCfg encAddr decAddr immDB networkMagic = do
forAllVersions application
where
Expand All @@ -96,9 +96,9 @@ immDBServer codecCfg encAddr decAddr immDB networkMagic = do
application ::
NodeToNodeVersion
-> BlockNodeToNodeVersion blk
-> OuroborosApplication 'ResponderMode addr BL.ByteString m Void ()
-> OuroborosApplication 'ResponderMode (N2N.MinimalInitiatorContext addr) (N2N.ResponderContext addr) BL.ByteString m Void ()
application version blockVersion =
OuroborosApplication \_connId _controlMessageSTM -> miniprotocols
OuroborosApplication miniprotocols
where
miniprotocols =
[ mkMiniProtocol
Expand Down Expand Up @@ -127,23 +127,24 @@ immDBServer codecCfg encAddr decAddr immDB networkMagic = do
Consensus.N2N.defaultCodecs codecCfg blockVersion encAddr decAddr version

keepAliveProt =
MuxPeer nullTracer cKeepAliveCodec
$ keepAliveServerPeer keepAliveServer
MiniProtocolCb $ \_ctx channel ->
runPeer nullTracer cKeepAliveCodec channel
$ keepAliveServerPeer keepAliveServer
chainSyncProt =
MuxPeerRaw \channel ->
MiniProtocolCb $ \_ctx channel ->
withRegistry
$ runPeer nullTracer cChainSyncCodecSerialised channel
. chainSyncServerPeer
. chainSyncServer immDB ChainDB.getSerialisedHeaderWithPoint
blockFetchProt =
MuxPeerRaw \channel ->
MiniProtocolCb $ \_ctx channel ->
withRegistry
$ runPeer nullTracer cBlockFetchCodecSerialised channel
. blockFetchServerPeer
. blockFetchServer immDB ChainDB.getSerialisedBlockWithPoint
txSubmissionProt =
-- never reply, there is no timeout
MuxPeerRaw \_channel -> forever $ threadDelay 10
MiniProtocolCb $ \_ctx _channel -> forever $ threadDelay 10

mkMiniProtocol miniProtocolNum limits proto = MiniProtocol {
miniProtocolNum
Expand Down
Expand Up @@ -59,6 +59,22 @@ import GHC.Stack
import Network.TypedProtocol.Codec (AnyMessage (..), CodecFailure,
mapFailureCodec)
import qualified Network.TypedProtocol.Codec as Codec

import qualified Ouroboros.Network.AnchoredFragment as AF
import Ouroboros.Network.BlockFetch (BlockFetchConfiguration (..),
TraceLabelPeer (..))
import Ouroboros.Network.Channel
import Ouroboros.Network.Mock.Chain (Chain (Genesis))
import Ouroboros.Network.Point (WithOrigin (..))
import qualified Ouroboros.Network.Protocol.ChainSync.Type as CS
import Ouroboros.Network.ControlMessage (ControlMessage (..))
import Ouroboros.Network.NodeToNode (ExpandedInitiatorContext (..), ConnectionId (..),
MiniProtocolParameters (..), ResponderContext (..), IsBigLedgerPeer (..))
import Ouroboros.Network.PeerSelection.PeerMetric (nullMetric)
import Ouroboros.Network.Protocol.KeepAlive.Type
import Ouroboros.Network.Protocol.Limits (waitForever)
import Ouroboros.Network.Protocol.TxSubmission2.Type

import Ouroboros.Consensus.Block
import Ouroboros.Consensus.BlockchainTime
import Ouroboros.Consensus.Config
Expand Down Expand Up @@ -1237,28 +1253,36 @@ directedEdgeInner registry clock (version, blockVersion) (cfg, calcMessageDelay)
-> (String -> b -> RestartCause)
-> ( LimitedApp' m NodeId blk
-> NodeToNodeVersion
-> ControlMessageSTM m
-> ConnectionId NodeId
-> ExpandedInitiatorContext NodeId m
-> Channel m msg
-> m (a, trailingBytes)
)
-- ^ client action to run on node1
-> ( LimitedApp' m NodeId blk
-> NodeToNodeVersion
-> ConnectionId NodeId
-> ResponderContext NodeId
-> Channel m msg
-> m (b, trailingBytes)
)
-- ^ server action to run on node2
-> (msg -> m ())
-> m (m RestartCause, m RestartCause)
miniProtocol proto retClient retServer client server middle = do
(chan, dualChan) <-
createConnectedChannelsWithDelay registry (node1, node2, proto) middle
pure
( (retClient (proto <> ".client") . fst) <$> client app1 version (return Continue) (ConnectionId local (fromCoreNodeId node2)) chan
, (retServer (proto <> ".server") . fst) <$> server app2 version (ConnectionId local (fromCoreNodeId node1)) dualChan
)
(chan, dualChan) <-
createConnectedChannelsWithDelay registry (node1, node2, proto) middle
pure
( (retClient (proto <> ".client") . fst) <$> client app1 version initiatorCtx chan
, (retServer (proto <> ".server") . fst) <$> server app2 version responderCtx dualChan
)
where
initiatorCtx = ExpandedInitiatorContext {
eicConnectionId = ConnectionId local (fromCoreNodeId node2),
eicControlMessage = return Continue,
eicIsBigLedgerPeer = IsNotBigLedgerPeer
}
responderCtx = ResponderContext {
rcConnectionId = ConnectionId local (fromCoreNodeId node1)
}

(>>= withAsyncsWaitAny) $
fmap flattenPairs $
Expand Down
Expand Up @@ -65,6 +65,7 @@ import Ouroboros.Network.Block (Serialised, decodePoint, decodeTip,
encodePoint, encodeTip)
import Ouroboros.Network.BlockFetch
import Ouroboros.Network.Channel
import Ouroboros.Network.Context
import Ouroboros.Network.Driver
import Ouroboros.Network.Mux
import Ouroboros.Network.NodeToClient hiding
Expand Down Expand Up @@ -458,17 +459,21 @@ mkApps kernel Tracers {..} Codecs {..} Handlers {..} =
responder
:: N.NodeToClientVersion
-> Apps m (ConnectionId peer) b b b b a
-> OuroborosApplication 'ResponderMode peer b m Void a
-> OuroborosApplicationWithMinimalCtx 'ResponderMode peer b m Void a
responder version Apps {..} =
nodeToClientProtocols
(\peer _shouldStopSTM -> NodeToClientProtocols {
(NodeToClientProtocols {
localChainSyncProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aChainSyncServer peer))),
ResponderProtocolOnly $ MiniProtocolCb $ \ctx ->
aChainSyncServer (rcConnectionId ctx),
localTxSubmissionProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aTxSubmissionServer peer))),
ResponderProtocolOnly $ MiniProtocolCb $ \ctx ->
aTxSubmissionServer (rcConnectionId ctx),
localStateQueryProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aStateQueryServer peer))),
ResponderProtocolOnly $ MiniProtocolCb $ \ctx ->
aStateQueryServer (rcConnectionId ctx),
localTxMonitorProtocol =
(ResponderProtocolOnly (MuxPeerRaw (aTxMonitorServer peer)))
ResponderProtocolOnly $ MiniProtocolCb $ \ctx ->
aTxMonitorServer (rcConnectionId ctx)
})
version

0 comments on commit a27a3bf

Please sign in to comment.