Skip to content
Permalink
Browse files

Merge #636

636: Changes in `ouroboros-network` needed for consensus integration r=coot a=coot



Co-authored-by: Marcin Szamotulski <profunctor@pm.me>
  • Loading branch information...
iohk-bors and coot committed Jun 12, 2019
2 parents c3109b8 + 92ef893 commit 5ac27236352813a0ac3c764ea8277d3651c71cc6
@@ -436,7 +436,7 @@ runClient tracer clientOptions genesisConfig epochSlots db = case clientOptions
addrInfosLocal <- Network.getAddrInfo (Just addrInfoHints) (Just "127.0.0.1") (Just "0")
addrInfosRemote <- Network.getAddrInfo (Just addrInfoHints) (Just host) (Just port)
case (addrInfosLocal, addrInfosRemote) of
(addrInfoLocal : _, addrInfoRemote : _) -> connectTo
(addrInfoLocal : _, addrInfoRemote : _) -> connectToNode
encodeTerm
decodeTerm
(initiatorVersions epochSlots chainSyncClient)
@@ -149,7 +149,7 @@ main = do
stopCondition = const (pure Nothing)
tracer = contramap (\txt -> ("", Info, txt)) trace
client = Client.chainSyncClient (clientFold tracer genesisConfig stopCondition cvs)
connectTo
connectToNode
encodeTerm
decodeTerm
(initiatorVersions epochSlots client)
@@ -15,9 +15,11 @@ import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Trans.Resource (ResourceT, runResourceT)
import Control.Monad.Trans.Class (lift)
import Control.Tracer (nullTracer)
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Map as Map
import Data.Text (Text)
import Data.Word (Word16)
import Data.Void (Void)

import qualified Cardano.Chain.Slotting as Cardano

@@ -68,7 +70,7 @@ initiatorVersions
:: ( Monad m, MonadST m, MonadUnliftIO m, MonadThrow m, MonadThrow (ResourceT m) )
=> Cardano.EpochSlots -- ^ Needed for the codec, sadly
-> ChainSyncClient Block Point (ResourceT m) ()
-> Versions VNumber (CodecCBORTerm Text) (MuxApplication InitiatorApp Ptcl m)
-> Versions VNumber (CodecCBORTerm Text) (MuxApplication InitiatorApp Ptcl m LBS.ByteString () Void)
initiatorVersions epochSlots client = Versions $ Map.fromList
[ (VNumber 0, Sigma () (Version clientMuxApp unitCodecCBORTerm))
]
@@ -82,7 +84,7 @@ responderVersions
:: ( Monad m, MonadST m, MonadUnliftIO m, MonadThrow m, MonadThrow (ResourceT m) )
=> Cardano.EpochSlots -- ^ Needed for the codec; must match that of the initiator.
-> ChainSyncServer Block Point (ResourceT m) ()
-> Versions VNumber (CodecCBORTerm Text) (MuxApplication ResponderApp Ptcl m)
-> Versions VNumber (CodecCBORTerm Text) (MuxApplication ResponderApp Ptcl m LBS.ByteString Void ())
responderVersions epochSlots server = Versions $ Map.fromList
[ (VNumber 0, Sigma () (Version serverMuxApp unitCodecCBORTerm))
]
@@ -14,6 +14,7 @@ import qualified Data.ByteString.Lazy as LBS
import qualified Data.Map as Map
import qualified Data.Set as Set
import Data.Set (Set)
import Data.Void (Void)

import Control.Concurrent (threadDelay)
import Control.Concurrent.STM (STM, atomically, check)
@@ -138,18 +139,18 @@ instance MiniProtocolLimits DemoProtocol0 where

clientPingPong :: Bool -> IO ()
clientPingPong pipelined =
connectTo
connectToNode
(\(DictVersion codec) -> encodeTerm codec)
(\(DictVersion codec) -> decodeTerm codec)
(simpleSingletonVersions (0::Int) (NodeToNodeVersionData 0) (DictVersion nodeToNodeCodecCBORTerm) muxApplication)
Nothing
defaultLocalSocketAddrInfo
where
muxApplication :: MuxApplication InitiatorApp DemoProtocol0 IO
muxApplication :: MuxApplication InitiatorApp DemoProtocol0 IO LBS.ByteString () Void
muxApplication =
simpleMuxInitiatorApplication protocols

protocols :: DemoProtocol0 -> MuxPeer DeserialiseFailure IO ()
protocols :: DemoProtocol0 -> MuxPeer DeserialiseFailure IO LBS.ByteString ()
protocols PingPong0 | pipelined =
MuxPeerPipelined
(contramap show stdoutTracer)
@@ -177,11 +178,11 @@ serverPingPong =
(simpleSingletonVersions (0::Int) (NodeToNodeVersionData 0) (DictVersion nodeToNodeCodecCBORTerm) muxApplication) $ \serverAsync ->
wait serverAsync -- block until async exception
where
muxApplication :: MuxApplication ResponderApp DemoProtocol0 IO
muxApplication :: MuxApplication ResponderApp DemoProtocol0 IO LBS.ByteString Void ()
muxApplication =
simpleMuxResponderApplication protocols

protocols :: DemoProtocol0 -> MuxPeer DeserialiseFailure IO ()
protocols :: DemoProtocol0 -> MuxPeer DeserialiseFailure IO LBS.ByteString ()
protocols PingPong0 =
MuxPeer
(contramap show stdoutTracer)
@@ -219,18 +220,18 @@ instance MiniProtocolLimits DemoProtocol1 where

clientPingPong2 :: IO ()
clientPingPong2 =
connectTo
connectToNode
(\(DictVersion codec) -> encodeTerm codec)
(\(DictVersion codec) -> decodeTerm codec)
(simpleSingletonVersions (0::Int) (NodeToNodeVersionData 0) (DictVersion nodeToNodeCodecCBORTerm) muxApplication)
Nothing
defaultLocalSocketAddrInfo
where
muxApplication :: MuxApplication InitiatorApp DemoProtocol1 IO
muxApplication :: MuxApplication InitiatorApp DemoProtocol1 IO LBS.ByteString () Void
muxApplication =
simpleMuxInitiatorApplication protocols

protocols :: DemoProtocol1 -> MuxPeer DeserialiseFailure IO ()
protocols :: DemoProtocol1 -> MuxPeer DeserialiseFailure IO LBS.ByteString ()
protocols PingPong1 =
MuxPeer
(contramap (show . (,) (1 :: Int)) stdoutTracer)
@@ -271,11 +272,11 @@ serverPingPong2 =
(simpleSingletonVersions (0::Int) (NodeToNodeVersionData 0) (DictVersion nodeToNodeCodecCBORTerm) muxApplication) $ \serverAsync ->
wait serverAsync -- block until async exception
where
muxApplication :: MuxApplication ResponderApp DemoProtocol1 IO
muxApplication :: MuxApplication ResponderApp DemoProtocol1 IO LBS.ByteString Void ()
muxApplication =
simpleMuxResponderApplication protocols

protocols :: DemoProtocol1 -> MuxPeer DeserialiseFailure IO ()
protocols :: DemoProtocol1 -> MuxPeer DeserialiseFailure IO LBS.ByteString ()
protocols PingPong1 =
MuxPeer
(contramap (show . (,) (1 :: Int)) stdoutTracer)
@@ -310,18 +311,18 @@ instance MiniProtocolLimits DemoProtocol2 where
clientChainSync :: [FilePath] -> IO ()
clientChainSync sockAddrs =
forConcurrently_ sockAddrs $ \sockAddr ->
connectTo
connectToNode
(\(DictVersion codec) -> encodeTerm codec)
(\(DictVersion codec) -> decodeTerm codec)
(simpleSingletonVersions (0::Int) (NodeToNodeVersionData 0) (DictVersion nodeToNodeCodecCBORTerm) muxApplication)
Nothing
(mkLocalSocketAddrInfo sockAddr)
where
muxApplication :: MuxApplication InitiatorApp DemoProtocol2 IO
muxApplication :: MuxApplication InitiatorApp DemoProtocol2 IO LBS.ByteString () Void
muxApplication =
simpleMuxInitiatorApplication protocols

protocols :: DemoProtocol2 -> MuxPeer DeserialiseFailure IO ()
protocols :: DemoProtocol2 -> MuxPeer DeserialiseFailure IO LBS.ByteString ()
protocols ChainSync2 =
MuxPeer
(contramap show stdoutTracer)
@@ -341,11 +342,11 @@ serverChainSync sockAddr =
where
prng = mkSMGen 0

muxApplication :: MuxApplication ResponderApp DemoProtocol2 IO
muxApplication :: MuxApplication ResponderApp DemoProtocol2 IO LBS.ByteString Void ()
muxApplication =
simpleMuxResponderApplication protocols

protocols :: DemoProtocol2 -> MuxPeer DeserialiseFailure IO ()
protocols :: DemoProtocol2 -> MuxPeer DeserialiseFailure IO LBS.ByteString ()
protocols ChainSync2 =
MuxPeer
(contramap show stdoutTracer)
@@ -383,7 +384,7 @@ clientBlockFetch sockAddrs = do
currentChainVar <- newTVarIO genesisChainFragment

let muxApplication :: FilePath
-> MuxApplication InitiatorApp DemoProtocol3 IO
-> MuxApplication InitiatorApp DemoProtocol3 IO LBS.ByteString () Void
muxApplication peerid =
MuxInitiatorApplication (protocols peerid)

@@ -474,7 +475,7 @@ clientBlockFetch sockAddrs = do

peerAsyncs <- sequence
[ async $
connectTo
connectToNode
(\(DictVersion codec) -> encodeTerm codec)
(\(DictVersion codec) -> decodeTerm codec)
(simpleSingletonVersions (0::Int) (NodeToNodeVersionData 0) (DictVersion nodeToNodeCodecCBORTerm) (muxApplication sockAddr))
@@ -520,11 +521,11 @@ serverBlockFetch sockAddr =
where
prng = mkSMGen 0

muxApplication :: MuxApplication ResponderApp DemoProtocol3 IO
muxApplication :: MuxApplication ResponderApp DemoProtocol3 IO LBS.ByteString Void ()
muxApplication =
simpleMuxResponderApplication protocols

protocols :: DemoProtocol3 -> MuxPeer DeserialiseFailure IO ()
protocols :: DemoProtocol3 -> MuxPeer DeserialiseFailure IO LBS.ByteString ()
protocols ChainSync3 =
MuxPeer
(contramap show stdoutTracer)
@@ -1,6 +1,7 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}

@@ -40,10 +41,10 @@ import Ouroboros.Network.Mux.Types
-- | muxStart starts a mux bearer for the specified protocols corresponding to
-- one of the provided Versions.
-- TODO: replace MonadSay with iohk-monitoring-framework.
muxStart :: forall m appType ptcl.
muxStart :: forall m appType ptcl a b.
( MonadAsync m, MonadSay m, MonadSTM m, MonadThrow m, MonadThrow (STM m)
, MonadMask m , Ord ptcl, Enum ptcl, Bounded ptcl, Show ptcl, MiniProtocolLimits ptcl)
=> MuxApplication appType ptcl m
=> MuxApplication appType ptcl m BL.ByteString a b
-> MuxBearer ptcl m
-> m ()
muxStart app bearer = do
@@ -34,8 +34,7 @@ module Ouroboros.Network.Mux.Interface
, DictVersion (..)
) where

import Data.ByteString.Lazy (ByteString)
import Data.Functor (void)
import Data.Void (Void)

import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadThrow ( MonadCatch
@@ -98,34 +97,34 @@ type family HasResponder (appType :: AppType) :: Bool where
-- serving downstream peers using server side of each protocol and getting
-- updates from upstream peers using client side of each of the protocols.
--
data MuxApplication (appType :: AppType) ptcl m where
data MuxApplication (appType :: AppType) ptcl m bytes a b where
MuxInitiatorApplication
-- Initiator application; most simple application will be @'runPeer'@ or
-- @'runPipelinedPeer'@ supplied with a codec and a @'Peer'@ for each
-- @ptcl@. But it allows to handle resources if just application of
-- @'runPeer'@ is not enough. It will be run as @'ModeInitiator'@.
:: (ptcl -> Channel m ByteString -> m ())
-> MuxApplication InitiatorApp ptcl m
:: (ptcl -> Channel m bytes -> m a)
-> MuxApplication InitiatorApp ptcl m bytes a Void

MuxResponderApplication
-- Responder application; similarly to the @'MuxInitiatorApplication'@ but it
-- will be run using @'ModeResponder'@.
:: (ptcl -> Channel m ByteString -> m ())
-> MuxApplication ResponderApp ptcl m
:: (ptcl -> Channel m bytes -> m a)
-> MuxApplication ResponderApp ptcl m bytes Void a

MuxInitiatorAndResponderApplication
-- Initiator and server applications.
:: (ptcl -> Channel m ByteString -> m ())
-> (ptcl -> Channel m ByteString -> m ())
-> MuxApplication InitiatorAndResponderApp ptcl m
:: (ptcl -> Channel m bytes -> m a)
-> (ptcl -> Channel m bytes -> m b)
-> MuxApplication InitiatorAndResponderApp ptcl m bytes a b

-- |
-- Accessor for the client side of a @'MuxApplication'@.
--
initiatorApplication
:: HasInitiator appType ~ True
=> (MuxApplication appType ptcl m)
-> (ptcl -> Channel m ByteString -> m ())
=> MuxApplication appType ptcl m bytes a b
-> (ptcl -> Channel m bytes -> m a)
initiatorApplication (MuxInitiatorApplication app) = \ptcl channel -> app ptcl channel
initiatorApplication (MuxInitiatorAndResponderApplication app _) = \ptcl channel -> app ptcl channel

@@ -134,25 +133,25 @@ initiatorApplication (MuxInitiatorAndResponderApplication app _) = \ptcl channel
--
responderApplication
:: HasResponder appType ~ True
=> (MuxApplication appType ptcl m)
-> (ptcl -> Channel m ByteString -> m ())
=> MuxApplication appType ptcl m bytes a b
-> (ptcl -> Channel m bytes -> m b)
responderApplication (MuxResponderApplication app) = app
responderApplication (MuxInitiatorAndResponderApplication _ app) = app

-- |
-- This type is only necessary to use the @'simpleMuxClient'@ and
-- @'simpleMuxServer'@ smart constructors.
data MuxPeer failure m a where
data MuxPeer failure m bytes a where
MuxPeer :: Tracer m (TraceSendRecv ps)
-> Codec ps failure m ByteString
-> Codec ps failure m bytes
-> Peer ps pr st m a
-> MuxPeer failure m a
-> MuxPeer failure m bytes a

MuxPeerPipelined
:: Tracer m (TraceSendRecv ps)
-> Codec ps failure m ByteString
-> Codec ps failure m bytes
-> PeerPipelined ps pr st m a
-> MuxPeer failure m a
-> MuxPeer failure m bytes a


-- |
@@ -164,8 +163,8 @@ runMuxPeer
, MonadAsync m
, Exception failure
)
=> MuxPeer failure m a
-> Channel m ByteString
=> MuxPeer failure m bytes a
-> Channel m bytes
-> m a
runMuxPeer (MuxPeer tracer codec peer) channel =
runPeer tracer codec channel peer
@@ -184,10 +183,10 @@ simpleMuxInitiatorApplication
=> MonadCatch m
=> MonadAsync m
=> Exception failure
=> (ptcl -> MuxPeer failure m a)
-> MuxApplication InitiatorApp ptcl m
=> (ptcl -> MuxPeer failure m bytes a)
-> MuxApplication InitiatorApp ptcl m bytes a Void
simpleMuxInitiatorApplication fn = MuxInitiatorApplication $ \ptcl channel ->
void $ runMuxPeer (fn ptcl) channel
runMuxPeer (fn ptcl) channel


-- |
@@ -198,7 +197,7 @@ simpleMuxResponderApplication
=> MonadCatch m
=> MonadAsync m
=> Exception failure
=> (ptcl -> MuxPeer failure m a)
-> MuxApplication ResponderApp ptcl m
=> (ptcl -> MuxPeer failure m bytes a)
-> MuxApplication ResponderApp ptcl m bytes Void a
simpleMuxResponderApplication fn = MuxResponderApplication $ \ptcl channel ->
void $ runMuxPeer (fn ptcl) channel
runMuxPeer (fn ptcl) channel
@@ -24,7 +24,9 @@ import qualified Codec.CBOR.Term as CBOR
import Codec.Serialise (Serialise (..))
import Codec.SerialiseTerm

import Ouroboros.Network.Mux.Types (ProtocolEnum(..))
import Ouroboros.Network.Mux.Types ( MiniProtocolLimits (..)
, ProtocolEnum(..)
)

import Ouroboros.Network.NodeToNode (DictVersion (..))

@@ -52,8 +54,13 @@ instance ProtocolEnum NodeToClientProtocols where
toProtocolEnum 6 = Just LocalTxSubmission
toProtocolEnum _ = Nothing

-- |
-- Enumeration of node to client protocol versions.
instance MiniProtocolLimits NodeToClientProtocols where
-- TODO: provide sensible limits
-- https://github.com/input-output-hk/ouroboros-network/issues/575
maximumMessageSize _ = 0xffffffff
maximumIngressQueue _ = 0xffffffff

-- | Enumeration of node to client protocol versions.
--
data NodeToClientVersion = NodeToClientV_1
deriving (Eq, Ord, Enum, Show, Typeable)
@@ -66,8 +73,7 @@ instance Serialise NodeToClientVersion where
1 -> return NodeToClientV_1
_ -> fail "decode NodeToNodeVersion: unknown tag"

-- |
-- Version data for NodeToClient protocol v1
-- | Version data for NodeToClient protocol v1
--
newtype NodeToClientVersionData = NodeToClientVersionData
{ networkMagic :: Word16 }

0 comments on commit 5ac2723

Please sign in to comment.
You can’t perform that action at this time.