Skip to content

Commit

Permalink
Refactor the interface
Browse files Browse the repository at this point in the history
* allow for resource handling when running a protocol
* specify up front wheather to run client or server (or both) sides for
all the mini-protocols.
  • Loading branch information
coot committed May 15, 2019
1 parent e2e2415 commit 136f22d
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 106 deletions.
134 changes: 47 additions & 87 deletions ouroboros-network/src/Ouroboros/Network/Mux/Interface.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module Ouroboros.Network.Mux.Interface
-- * High level interface for the multiplex layer
-- $interface
NetworkInterface (..)
, MuxPeer (..)
, MuxApplication (..)
, NetworkNode (..)

-- * Run mux layer on initiated connections
Expand All @@ -26,22 +26,15 @@ module Ouroboros.Network.Mux.Interface
, miniProtocolDescription
) where

import qualified Codec.CBOR.Read as CBOR
import Data.ByteString.Lazy (ByteString)
import Data.Functor (void)
import Numeric.Natural (Natural)

import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadThrow ( MonadCatch
, MonadThrow
)

import Control.Tracer

import Network.TypedProtocol.Core
import Network.TypedProtocol.Codec
import Network.TypedProtocol.Driver
import Network.TypedProtocol.Pipelined
import Network.TypedProtocol.Channel

import Ouroboros.Network.Mux.Types

Expand All @@ -56,64 +49,41 @@ import Ouroboros.Network.Mux.Types
-- a function that runs the mux layer on it.
--

-- |
-- Specification for peers of a protocol. This type instructs the multiplexing
-- layer to run a given client \/ server peers.
--
data MuxPeer m where
-- |
-- A non pipeliend peer together with a codec.
--
OnlyClient
:: forall ps (st :: ps) m a.
Tracer m (TraceSendRecv ps)
-> Codec ps CBOR.DeserialiseFailure m ByteString
-> Peer ps AsClient st m a
-> MuxPeer m

-- |
-- A pipelined peer together with a codec.
--
OnlyPipelinedClient
:: forall ps (st :: ps) m a.
Natural
-> Tracer m (TraceSendRecv ps)
-> Codec ps CBOR.DeserialiseFailure m ByteString
-> PeerPipelined ps AsClient st m a
-> MuxPeer m

-- |
-- Server peer with a codec
OnlyServer
:: forall ps (st :: ps) m a.
Tracer m (TraceSendRecv ps)
-> Codec ps CBOR.DeserialiseFailure m ByteString
-> Peer ps AsServer st m a
-> MuxPeer m

-- |
-- Client and server peers with the corresponding codec.
--
ClientAndServer
:: forall ps (st :: ps) m a.
Tracer m (TraceSendRecv ps)
-> Codec ps CBOR.DeserialiseFailure m ByteString
-> Peer ps AsClient st m a
-> Peer ps AsServer st m a
-> MuxPeer m

-- |
-- Pipelined client and a server with the correspnding codec.
--
PipelinedClientAndServer
:: forall ps (st :: ps) m a.
Natural
-> Tracer m (TraceSendRecv ps)
-> Codec ps CBOR.DeserialiseFailure m ByteString
-> PeerPipelined ps AsClient st m a
-> Peer ps AsServer st m a
-> MuxPeer m

-- |
-- Application run by mux layer.
--
-- * enumeration of client application, e.g. a wallet application communicating
-- with a node using ChainSync and TxSubmission protocols; this only requires
-- to run client side of each protocol.
--
-- * enumeration of server applications: this application type is mostly useful
-- tests.
--
-- * enumeration of both client and server applications, e.g. a full node
-- serving downstream peers using server side of each protocol and getting
-- updates from upstream peers using client side of each of the protocols.
--
data MuxApplication ptcl m where
MuxClientApplication
-- Client application; most simple application will be @'runPeer'@ or
-- @'runPipelinedPeer'@ supplied with a codec and a @'Peer'@ for each
-- @ptcl@. But it allows to handle resources if just application of
-- @'runPeer'@ is not enough. It will be run as @'muxInitiator'@.
:: (ptcl -> Channel m ByteString -> m a)
-> MuxApplication ptcl m

MuxServerApplication
-- Server application; similarly to the @'MuxClientApplication'@ but it
-- will be run using @'muxResponder'@.
:: (ptcl -> Channel m ByteString -> m a)
-> MuxApplication ptcl m

MuxClientAndServerApplication
-- Client and server applications.
:: (ptcl -> Channel m ByteString -> m a)
-> (ptcl -> Channel m ByteString -> m b)
-> MuxApplication ptcl m

-- |
-- Public network interface for 'ouroboros-network'.
Expand All @@ -124,12 +94,12 @@ data NetworkInterface ptcl addr m r = NetworkInterface {
-- listen for incoming connections. Some bearers do not have a notion of
-- address.
--
nodeAddress :: addr,
nodeAddress :: addr,

-- |
-- Map of protocols that we run
--
protocols :: ptcl -> MuxPeer m
nodeApplication :: MuxApplication ptcl m
}

-- | Low level network interface. It can be intiatiated using a socket, pair
Expand Down Expand Up @@ -206,30 +176,20 @@ miniProtocolDescription
, MonadCatch m
, MonadThrow m
)
=> MuxPeer m
-> MiniProtocolDescription ptcl m
miniProtocolDescription (OnlyClient tr codec peer) =
=> MuxApplication ptcl m
-> MiniProtocolDescriptions ptcl m
miniProtocolDescription (MuxClientApplication client) = \ptcl ->
MiniProtocolDescription {
mpdInitiator = Just (\chan -> void (runPeer tr codec chan peer)),
mpdInitiator = Just (void . client ptcl),
mpdResponder = Nothing
}
miniProtocolDescription (OnlyPipelinedClient omax tr codec peer) =
miniProtocolDescription (MuxServerApplication server) = \ptcl ->
MiniProtocolDescription {
mpdInitiator = Just (\chan -> void (runPipelinedPeer omax tr codec chan peer)),
mpdInitiator = Just (void . server ptcl),
mpdResponder = Nothing
}
miniProtocolDescription (OnlyServer tr codec peer) =
MiniProtocolDescription {
mpdInitiator = Nothing,
mpdResponder = Just (\chan -> void (runPeer tr codec chan peer))
}
miniProtocolDescription (ClientAndServer tr codec clientPeer serverPeer) =
MiniProtocolDescription {
mpdInitiator = Just (\chan -> void (runPeer tr codec chan clientPeer)),
mpdResponder = Just (\chan -> void (runPeer tr codec chan serverPeer))
}
miniProtocolDescription (PipelinedClientAndServer omax tr codec clientPeer serverPeer) =
miniProtocolDescription (MuxClientAndServerApplication client server) = \ptcl ->
MiniProtocolDescription {
mpdInitiator = Just (\chan -> void (runPipelinedPeer omax tr codec chan clientPeer)),
mpdResponder = Just (\chan -> void (runPeer tr codec chan serverPeer))
mpdInitiator = Just (void . client ptcl),
mpdResponder = Just (void . server ptcl)
}
6 changes: 3 additions & 3 deletions ouroboros-network/src/Ouroboros/Network/Pipe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ pipeAsMuxBearer pcRead pcWrite = do

runNetworkNodeWithPipe
:: (Mx.ProtocolEnum ptcl, Ord ptcl, Enum ptcl, Bounded ptcl)
=> (ptcl -> Mx.MuxPeer IO)
=> Mx.MuxApplication ptcl IO
-> Handle -- ^ read handle
-> Handle -- ^ write handle
-> IO ()
runNetworkNodeWithPipe protocols pcRead pcWrite = do
let mpds = Mx.miniProtocolDescription . protocols
runNetworkNodeWithPipe application pcRead pcWrite = do
let mpds = Mx.miniProtocolDescription application
bearer <- pipeAsMuxBearer pcRead pcWrite
Mx.muxStart mpds bearer
4 changes: 2 additions & 2 deletions ouroboros-network/src/Ouroboros/Network/Socket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ withNetworkNode
=> NetworkInterface ptcl Socket.AddrInfo IO r
-> (NetworkNode Socket.AddrInfo IO r -> IO t)
-> IO t
withNetworkNode NetworkInterface {nodeAddress, protocols} k =
withNetworkNode NetworkInterface {nodeAddress, nodeApplication} k =
bracket mkSocket Socket.close $ \sd -> do

killVar <- newEmptyTMVarM
Expand All @@ -158,7 +158,7 @@ withNetworkNode NetworkInterface {nodeAddress, protocols} k =
where

mpds :: Mx.MiniProtocolDescriptions ptcl IO
mpds = miniProtocolDescription . protocols
mpds = miniProtocolDescription nodeApplication

-- Make the server listening socket
mkSocket :: IO Socket.Socket
Expand Down
5 changes: 3 additions & 2 deletions ouroboros-network/test/Test/Pipe.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import Test.Tasty.QuickCheck (testProperty)
import Control.Tracer (nullTracer)

import Network.TypedProtocol.Core
import Network.TypedProtocol.Driver

import Ouroboros.Network.Chain (Chain, ChainUpdate, Point)
import qualified Ouroboros.Network.Chain as Chain
Expand Down Expand Up @@ -86,11 +87,11 @@ demo chain0 updates = do
consumerPeer = ChainSync.chainSyncClientPeer
(ChainSync.chainSyncClientExample consumerVar
(consumerClient done target consumerVar))
consumerPeers Mxt.ChainSync1 = Mx.OnlyClient nullTracer ChainSync.codecChainSync consumerPeer
consumerPeers = Mx.MuxClientApplication $ \Mxt.ChainSync1 channel -> runPeer nullTracer ChainSync.codecChainSync channel consumerPeer

producerPeer :: Peer (ChainSync.ChainSync block (Point block)) AsServer ChainSync.StIdle IO ()
producerPeer = ChainSync.chainSyncServerPeer (ChainSync.chainSyncServerExample () producerVar)
producerPeers Mxt.ChainSync1 = Mx.OnlyServer nullTracer ChainSync.codecChainSync producerPeer
producerPeers = Mx.MuxServerApplication $ \Mxt.ChainSync1 channel -> runPeer nullTracer ChainSync.codecChainSync channel producerPeer

_ <- async $ runNetworkNodeWithPipe producerPeers hndRead1 hndWrite2
_ <- async $ runNetworkNodeWithPipe consumerPeers hndRead2 hndWrite1
Expand Down
25 changes: 13 additions & 12 deletions ouroboros-network/test/Test/Socket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import System.IO.Error
#endif

import Network.TypedProtocol.Core
import Network.TypedProtocol.Driver
import qualified Network.TypedProtocol.ReqResp.Type as ReqResp
import qualified Network.TypedProtocol.ReqResp.Client as ReqResp
import qualified Network.TypedProtocol.ReqResp.Server as ReqResp
Expand Down Expand Up @@ -161,19 +162,19 @@ prop_socket_send_recv clientAddr serverAddr f xs = do
let -- Server Node; only req-resp server
srvPeer :: Peer (ReqResp.ReqResp Int Int) AsServer ReqResp.StIdle IO ()
srvPeer = ReqResp.reqRespServerPeer (reqRespServerMapAccumL sv (\a -> pure . f a) 0)
srvPeers Mxt.ReqResp1 = OnlyServer nullTracer ReqResp.codecReqResp srvPeer
srvPeers = MuxServerApplication $ \Mxt.ReqResp1 channel -> runPeer nullTracer ReqResp.codecReqResp channel srvPeer
serNet = NetworkInterface {
nodeAddress = serverAddr,
protocols = srvPeers
nodeApplication = srvPeers
}

-- Client Node; only req-resp client
cliPeer :: Peer (ReqResp.ReqResp Int Int) AsClient ReqResp.StIdle IO ()
cliPeer = ReqResp.reqRespClientPeer (reqRespClientMap cv xs)
cliPeers Mxt.ReqResp1 = OnlyClient nullTracer ReqResp.codecReqResp cliPeer
cliPeers = MuxClientApplication $ \ Mxt.ReqResp1 channel -> runPeer nullTracer ReqResp.codecReqResp channel cliPeer
cliNet = NetworkInterface {
nodeAddress = clientAddr,
protocols = cliPeers
nodeApplication = cliPeers
}

res <-
Expand All @@ -198,7 +199,7 @@ prop_socket_recv_close f _ = ioProperty $ do

let srvPeer :: Peer (ReqResp.ReqResp Int Int) AsServer ReqResp.StIdle IO ()
srvPeer = ReqResp.reqRespServerPeer (reqRespServerMapAccumL sv (\a -> pure . f a) 0)
srvPeers Mxt.ReqResp1 = OnlyServer nullTracer ReqResp.codecReqResp srvPeer
srvPeers = MuxServerApplication $ \Mxt.ReqResp1 channel -> runPeer nullTracer ReqResp.codecReqResp channel srvPeer

bracket
(Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol)
Expand All @@ -219,7 +220,7 @@ prop_socket_recv_close f _ = ioProperty $ do
$ \(sd',_) -> do
bearer <- socketAsMuxBearer sd'
Mx.muxBearerSetState bearer Mx.Connected
Mx.muxStart (miniProtocolDescription . srvPeers) bearer
Mx.muxStart (miniProtocolDescription srvPeers) bearer
)
$ \muxAsync -> do

Expand Down Expand Up @@ -250,10 +251,10 @@ prop_socket_client_connect_error _ xs = ioProperty $ do

let cliPeer :: Peer (ReqResp.ReqResp Int Int) AsClient ReqResp.StIdle IO ()
cliPeer = ReqResp.reqRespClientPeer (reqRespClientMap cv xs)
cliPeers Mxt.ReqResp1 = OnlyClient nullTracer ReqResp.codecReqResp cliPeer
cliPeers = MuxClientApplication $ \Mxt.ReqResp1 channel -> runPeer nullTracer ReqResp.codecReqResp channel cliPeer
ni = NetworkInterface {
nodeAddress = serverAddr,
protocols = cliPeers
nodeApplication = cliPeers
}

(res :: Either IOException Bool)
Expand Down Expand Up @@ -282,18 +283,18 @@ demo chain0 updates = do
consumerPeer = ChainSync.chainSyncClientPeer
(ChainSync.chainSyncClientExample consumerVar
(consumerClient done target consumerVar))
consumerPeers Mxt.ChainSync1 = OnlyClient nullTracer ChainSync.codecChainSync consumerPeer
consumerPeers = MuxClientApplication $ \Mxt.ChainSync1 channel -> runPeer nullTracer ChainSync.codecChainSync channel consumerPeer
consumerNet = NetworkInterface {
nodeAddress = consumerAddress,
protocols = consumerPeers
nodeApplication = consumerPeers
}

producerPeer :: Peer (ChainSync.ChainSync block (Point block)) AsServer ChainSync.StIdle IO ()
producerPeer = ChainSync.chainSyncServerPeer (ChainSync.chainSyncServerExample () producerVar)
producerPeers Mxt.ChainSync1 = OnlyServer nullTracer ChainSync.codecChainSync producerPeer
producerPeers = MuxServerApplication $ \Mxt.ChainSync1 channel -> runPeer nullTracer ChainSync.codecChainSync channel producerPeer
producerNet = NetworkInterface {
nodeAddress = producerAddress,
protocols = producerPeers
nodeApplication = producerPeers
}

withNetworkNode producerNet $ \_ ->
Expand Down

0 comments on commit 136f22d

Please sign in to comment.