Skip to content
Permalink
Browse files

simpleMuxClientApplication and simpleMuxServerApplication smart const…

…ructors for MuxApplication

Smart constructor for `MuxClientAndServerApplication` is not provided,
because taking care of all the possible choices of pipelining does not
make it easy.  The two provided smart constructors are for illustration
how to run an application, but also useful in expressing simple client
server application, like the ones we use in tests.
  • Loading branch information...
coot committed May 15, 2019
1 parent 136f22d commit 16587c828a3d7f88049a4f537d8d1363c551af68
@@ -13,6 +13,9 @@ module Ouroboros.Network.Mux.Interface
-- $interface
NetworkInterface (..)
, MuxApplication (..)
, MuxPeer (..)
, simpleMuxClientApplication
, simpleMuxServerApplication
, NetworkNode (..)

-- * Run mux layer on initiated connections
@@ -28,13 +31,21 @@ module Ouroboros.Network.Mux.Interface

import Data.ByteString.Lazy (ByteString)
import Data.Functor (void)
import Numeric.Natural (Natural)

import Control.Monad.Class.MonadAsync
import Control.Monad.Class.MonadThrow ( MonadCatch
, MonadThrow
)
import Control.Tracer (Tracer)

import Control.Exception (Exception)

import Network.TypedProtocol.Core
import Network.TypedProtocol.Codec
import Network.TypedProtocol.Channel
import Network.TypedProtocol.Driver
import Network.TypedProtocol.Pipelined

import Ouroboros.Network.Mux.Types

@@ -85,6 +96,57 @@ data MuxApplication ptcl m where
-> (ptcl -> Channel m ByteString -> m b)
-> MuxApplication ptcl m


-- |
-- This type is only necessary to use the @'simpleMuxClient'@ and
-- @'simpleMuxServer'@ smart constructors.
data MuxPeer failure m where
MuxPeer :: Tracer m (TraceSendRecv ps)
-> Codec ps failure m ByteString
-> Peer ps pr st m a
-> MuxPeer failure m

MuxPeerPipelined
:: Natural
-> Tracer m (TraceSendRecv ps)
-> Codec ps failure m ByteString
-> PeerPipelined ps pr st m a
-> MuxPeer failure m

-- |
-- Smart constructor for @'MuxClientApplication'@. It is a simple client, since
-- none of the applications requires resource handling to run in the monad @m@.
-- Each one is simply run either by @'runPeer'@ or @'runPipelinedPeer'@.
--
simpleMuxClientApplication
:: MonadThrow m
=> MonadCatch m
=> MonadAsync m
=> Exception failure
=> (ptcl -> MuxPeer failure m)
-> MuxApplication ptcl m
simpleMuxClientApplication fn = MuxClientApplication $ \ptcl channel ->
case fn ptcl of
MuxPeer tracer codec peer -> void $ runPeer tracer codec channel peer
MuxPeerPipelined n tracer codec peer -> void $ runPipelinedPeer n tracer codec channel peer


-- |
-- Smart constructor for @'MuxServerApplicatin'@, similar to @'simpleMuxClient'@.
--
simpleMuxServerApplication
:: MonadThrow m
=> MonadCatch m
=> MonadAsync m
=> Exception failure
=> (ptcl -> MuxPeer failure m)
-> MuxApplication ptcl m
simpleMuxServerApplication fn = MuxServerApplication $ \ptcl channel ->
case fn ptcl of
MuxPeer tracer codec peer -> void $ runPeer tracer codec channel peer
MuxPeerPipelined n tracer codec peer -> void $ runPipelinedPeer n tracer codec channel peer


-- |
-- Public network interface for 'ouroboros-network'.
--
@@ -20,16 +20,12 @@ import Test.Tasty.QuickCheck (testProperty)

import Control.Tracer (nullTracer)

import Network.TypedProtocol.Core
import Network.TypedProtocol.Driver

import Ouroboros.Network.Chain (Chain, ChainUpdate, Point)
import qualified Ouroboros.Network.Chain as Chain
import qualified Ouroboros.Network.ChainProducerState as CPS
import qualified Ouroboros.Network.Mux as Mx
import qualified Ouroboros.Network.Mux.Interface as Mx
import Ouroboros.Network.Pipe
import Ouroboros.Network.Protocol.ChainSync.Type as ChainSync
import Ouroboros.Network.Protocol.ChainSync.Client as ChainSync
import Ouroboros.Network.Protocol.ChainSync.Codec as ChainSync
import Ouroboros.Network.Protocol.ChainSync.Examples as ChainSync
@@ -83,18 +79,24 @@ demo chain0 updates = do
let Just expectedChain = Chain.applyChainUpdates updates chain0
target = Chain.headPoint expectedChain

consumerPeer :: Peer (ChainSync.ChainSync block (Point block)) AsClient ChainSync.StIdle IO ()
consumerPeer = ChainSync.chainSyncClientPeer
(ChainSync.chainSyncClientExample consumerVar
(consumerClient done target consumerVar))
consumerPeers = Mx.MuxClientApplication $ \Mxt.ChainSync1 channel -> runPeer nullTracer ChainSync.codecChainSync channel consumerPeer

producerPeer :: Peer (ChainSync.ChainSync block (Point block)) AsServer ChainSync.StIdle IO ()
producerPeer = ChainSync.chainSyncServerPeer (ChainSync.chainSyncServerExample () producerVar)
producerPeers = Mx.MuxServerApplication $ \Mxt.ChainSync1 channel -> runPeer nullTracer ChainSync.codecChainSync channel producerPeer

_ <- async $ runNetworkNodeWithPipe producerPeers hndRead1 hndWrite2
_ <- async $ runNetworkNodeWithPipe consumerPeers hndRead2 hndWrite1
consumerApp :: Mx.MuxApplication Mxt.TestProtocols1 IO
consumerApp = Mx.simpleMuxClientApplication $
\Mxt.ChainSync1 ->
Mx.MuxPeer nullTracer
ChainSync.codecChainSync
(ChainSync.chainSyncClientPeer
(ChainSync.chainSyncClientExample consumerVar
(consumerClient done target consumerVar)))

producerApp :: Mx.MuxApplication Mxt.TestProtocols1 IO
producerApp = Mx.simpleMuxServerApplication $
\Mxt.ChainSync1 ->
Mx.MuxPeer nullTracer
ChainSync.codecChainSync
(ChainSync.chainSyncServerPeer (ChainSync.chainSyncServerExample () producerVar))

_ <- async $ runNetworkNodeWithPipe producerApp hndRead1 hndWrite2
_ <- async $ runNetworkNodeWithPipe consumerApp hndRead2 hndWrite1

void $ fork $ sequence_
[ do threadDelay 10e-3 -- 10 milliseconds, just to provide interest
@@ -26,7 +26,6 @@ import System.IO.Error
#endif

import Network.TypedProtocol.Core
import Network.TypedProtocol.Driver
import qualified Network.TypedProtocol.ReqResp.Type as ReqResp
import qualified Network.TypedProtocol.ReqResp.Client as ReqResp
import qualified Network.TypedProtocol.ReqResp.Server as ReqResp
@@ -41,7 +40,6 @@ import Ouroboros.Network.Socket
import Ouroboros.Network.Chain (Chain, ChainUpdate, Point)
import qualified Ouroboros.Network.Chain as Chain
import qualified Ouroboros.Network.ChainProducerState as CPS
import qualified Ouroboros.Network.Protocol.ChainSync.Type as ChainSync
import qualified Ouroboros.Network.Protocol.ChainSync.Client as ChainSync
import qualified Ouroboros.Network.Protocol.ChainSync.Codec as ChainSync
import qualified Ouroboros.Network.Protocol.ChainSync.Examples as ChainSync
@@ -160,21 +158,17 @@ prop_socket_send_recv clientAddr serverAddr f xs = do
sv <- newEmptyTMVarM

let -- Server Node; only req-resp server
srvPeer :: Peer (ReqResp.ReqResp Int Int) AsServer ReqResp.StIdle IO ()
srvPeer = ReqResp.reqRespServerPeer (reqRespServerMapAccumL sv (\a -> pure . f a) 0)
srvPeers = MuxServerApplication $ \Mxt.ReqResp1 channel -> runPeer nullTracer ReqResp.codecReqResp channel srvPeer
serverApp = simpleMuxServerApplication $ \Mxt.ReqResp1 -> MuxPeer nullTracer ReqResp.codecReqResp (ReqResp.reqRespServerPeer (reqRespServerMapAccumL sv (\a -> pure . f a) 0))
serNet = NetworkInterface {
nodeAddress = serverAddr,
nodeApplication = srvPeers
nodeApplication = serverApp
}

-- Client Node; only req-resp client
cliPeer :: Peer (ReqResp.ReqResp Int Int) AsClient ReqResp.StIdle IO ()
cliPeer = ReqResp.reqRespClientPeer (reqRespClientMap cv xs)
cliPeers = MuxClientApplication $ \ Mxt.ReqResp1 channel -> runPeer nullTracer ReqResp.codecReqResp channel cliPeer
clientApp = simpleMuxClientApplication $ \ Mxt.ReqResp1 -> MuxPeer nullTracer ReqResp.codecReqResp (ReqResp.reqRespClientPeer (reqRespClientMap cv xs))
cliNet = NetworkInterface {
nodeAddress = clientAddr,
nodeApplication = cliPeers
nodeApplication = clientApp
}

res <-
@@ -197,9 +191,12 @@ prop_socket_recv_close f _ = ioProperty $ do

sv <- newEmptyTMVarM

let srvPeer :: Peer (ReqResp.ReqResp Int Int) AsServer ReqResp.StIdle IO ()
srvPeer = ReqResp.reqRespServerPeer (reqRespServerMapAccumL sv (\a -> pure . f a) 0)
srvPeers = MuxServerApplication $ \Mxt.ReqResp1 channel -> runPeer nullTracer ReqResp.codecReqResp channel srvPeer
let app :: MuxApplication Mxt.TestProtocols3 IO
app = simpleMuxServerApplication $
\Mxt.ReqResp1 ->
MuxPeer nullTracer
ReqResp.codecReqResp
(ReqResp.reqRespServerPeer (reqRespServerMapAccumL sv (\a -> pure . f a) 0))

bracket
(Socket.socket Socket.AF_INET Socket.Stream Socket.defaultProtocol)
@@ -220,7 +217,7 @@ prop_socket_recv_close f _ = ioProperty $ do
$ \(sd',_) -> do
bearer <- socketAsMuxBearer sd'
Mx.muxBearerSetState bearer Mx.Connected
Mx.muxStart (miniProtocolDescription srvPeers) bearer
Mx.muxStart (miniProtocolDescription app) bearer
)
$ \muxAsync -> do

@@ -249,12 +246,18 @@ prop_socket_client_connect_error _ xs = ioProperty $ do

cv <- newEmptyTMVarM

let cliPeer :: Peer (ReqResp.ReqResp Int Int) AsClient ReqResp.StIdle IO ()
cliPeer = ReqResp.reqRespClientPeer (reqRespClientMap cv xs)
cliPeers = MuxClientApplication $ \Mxt.ReqResp1 channel -> runPeer nullTracer ReqResp.codecReqResp channel cliPeer
let app :: MuxApplication Mxt.TestProtocols3 IO
app = simpleMuxClientApplication $
\Mxt.ReqResp1 ->
MuxPeer nullTracer
ReqResp.codecReqResp
(ReqResp.reqRespClientPeer (reqRespClientMap cv xs)
:: Peer (ReqResp.ReqResp Int Int) AsClient ReqResp.StIdle IO ()
)

ni = NetworkInterface {
nodeAddress = serverAddr,
nodeApplication = cliPeers
nodeApplication = app
}

(res :: Either IOException Bool)
@@ -279,22 +282,31 @@ demo chain0 updates = do

let Just expectedChain = Chain.applyChainUpdates updates chain0
target = Chain.headPoint expectedChain
consumerPeer :: Peer (ChainSync.ChainSync block (Point block)) AsClient ChainSync.StIdle IO ()
consumerPeer = ChainSync.chainSyncClientPeer

consumerApp :: MuxApplication Mxt.TestProtocols1 IO
consumerApp = simpleMuxClientApplication $
\Mxt.ChainSync1 ->
MuxPeer nullTracer
ChainSync.codecChainSync
(ChainSync.chainSyncClientPeer
(ChainSync.chainSyncClientExample consumerVar
(consumerClient done target consumerVar))
consumerPeers = MuxClientApplication $ \Mxt.ChainSync1 channel -> runPeer nullTracer ChainSync.codecChainSync channel consumerPeer
(consumerClient done target consumerVar)))

consumerNet = NetworkInterface {
nodeAddress = consumerAddress,
nodeApplication = consumerPeers
nodeApplication = consumerApp
}

producerPeer :: Peer (ChainSync.ChainSync block (Point block)) AsServer ChainSync.StIdle IO ()
producerPeer = ChainSync.chainSyncServerPeer (ChainSync.chainSyncServerExample () producerVar)
producerPeers = MuxServerApplication $ \Mxt.ChainSync1 channel -> runPeer nullTracer ChainSync.codecChainSync channel producerPeer
producerApp :: MuxApplication Mxt.TestProtocols1 IO
producerApp = simpleMuxServerApplication $
\Mxt.ChainSync1 ->
MuxPeer nullTracer
ChainSync.codecChainSync
(ChainSync.chainSyncServerPeer (ChainSync.chainSyncServerExample () producerVar))

producerNet = NetworkInterface {
nodeAddress = producerAddress,
nodeApplication = producerPeers
nodeApplication = producerApp
}

withNetworkNode producerNet $ \_ ->

0 comments on commit 16587c8

Please sign in to comment.
You can’t perform that action at this time.