Skip to content

Commit

Permalink
typed-protocols: updated ouroboros-netowrk-framework
Browse files Browse the repository at this point in the history
  • Loading branch information
coot committed Sep 26, 2021
1 parent 5e48c9d commit 1803408
Show file tree
Hide file tree
Showing 12 changed files with 529 additions and 390 deletions.
37 changes: 3 additions & 34 deletions ouroboros-network-framework/demo/ping-pong.hs
Expand Up @@ -36,11 +36,11 @@ import Ouroboros.Network.Protocol.Handshake.Codec
import Ouroboros.Network.Protocol.Handshake.Unversioned
import Ouroboros.Network.Protocol.Handshake.Version

import Network.TypedProtocol.Pipelined
import Network.TypedProtocol.PingPong.Type (PingPong)
import Network.TypedProtocol.PingPong.Client as PingPong
import Network.TypedProtocol.PingPong.Server as PingPong
import Network.TypedProtocol.PingPong.Codec.CBOR as PingPong
import Network.TypedProtocol.PingPong.Examples


main :: IO ()
Expand Down Expand Up @@ -123,10 +123,10 @@ clientPingPong pipelined =

pingPongInitiator | pipelined =
InitiatorProtocolOnly $
MuxPeerPipelined
MuxPeer
(contramap show stdoutTracer)
codecPingPong
(pingPongClientPeerPipelined (pingPongClientPipelinedMax 5))
(void $ pingPongClientPeerPipelined (pingPongClientPipelinedMax 5))

| otherwise =
InitiatorProtocolOnly $
Expand All @@ -136,10 +136,6 @@ clientPingPong pipelined =
(pingPongClientPeer (pingPongClientCount 5))


pingPongClientCount :: Applicative m => Int -> PingPongClient m ()
pingPongClientCount 0 = PingPong.SendMsgDone ()
pingPongClientCount n = SendMsgPing (pure (pingPongClientCount (n-1)))

serverPingPong :: IO Void
serverPingPong =
withIOManager $ \iomgr -> do
Expand Down Expand Up @@ -170,16 +166,6 @@ serverPingPong =
codecPingPong
(pingPongServerPeer pingPongServerStandard)

pingPongServerStandard
:: Applicative m
=> PingPongServer m ()
pingPongServerStandard =
PingPongServer {
recvMsgPing = pure pingPongServerStandard,
recvMsgDone = ()
}


--
-- Ping pong demo2
--
Expand Down Expand Up @@ -233,23 +219,6 @@ clientPingPong2 =
codecPingPong
(pingPongClientPeer (pingPongClientCount 5))

pingPongClientPipelinedMax
:: forall m. Monad m
=> Int
-> PingPongClientPipelined m ()
pingPongClientPipelinedMax c =
PingPongClientPipelined (go [] Zero 0)
where
go :: [Either Int Int] -> Nat o -> Int
-> PingPongSender o Int m ()
go acc o n | n < c
= SendMsgPingPipelined
(return n)
(go (Left n : acc) (Succ o) (succ n))
go _ Zero _ = SendMsgDonePipelined ()
go acc (Succ o) n = CollectPipelined
Nothing
(\n' -> go (Right n' : acc) o n)

serverPingPong2 :: IO Void
serverPingPong2 =
Expand Down
2 changes: 2 additions & 0 deletions ouroboros-network-framework/ouroboros-network-framework.cabal
Expand Up @@ -66,13 +66,15 @@ library
, hashable
, mtl
, nothunks
, singletons
, stm
, text
, time
, quiet

, cardano-prelude
, contra-tracer
, strict-containers

, io-classes >=0.1 && < 0.3
, monoidal-synchronisation
Expand Down
106 changes: 73 additions & 33 deletions ouroboros-network-framework/src/Ouroboros/Network/Channel.hs
Expand Up @@ -57,23 +57,31 @@ data Channel m a = Channel {
-- It may raise exceptions (as appropriate for the monad and kind of
-- channel).
--
recv :: m (Maybe a)
recv :: m (Maybe a),

-- | Try read some input from the channel. The outer @Nothing@
-- indicates that data is not available, the inner @Nothing@ indicates an
-- EOF.
--
tryRecv :: m (Maybe (Maybe a))
}

-- TODO: eliminate the second Channel type and these conversion functions.

fromChannel :: Mx.Channel m
-> Channel m LBS.ByteString
fromChannel Mx.Channel { Mx.send, Mx.recv } = Channel {
send = send,
recv = recv
fromChannel Mx.Channel { Mx.send, Mx.recv, Mx.tryRecv } = Channel {
send = send,
recv = recv,
tryRecv = tryRecv
}

toChannel :: Channel m LBS.ByteString
-> Mx.Channel m
toChannel Channel { send, recv } = Mx.Channel {
Mx.send = send,
Mx.recv = recv
toChannel Channel { send, recv, tryRecv } = Mx.Channel {
Mx.send = send,
Mx.recv = recv,
Mx.tryRecv = tryRecv
}

-- | Create a local pipe, with both ends in this process, and expose that as
Expand All @@ -96,9 +104,11 @@ isoKleisliChannel
-> (b -> m a)
-> Channel m a
-> Channel m b
isoKleisliChannel f finv Channel{send, recv} = Channel {
send = finv >=> send,
recv = recv >>= traverse f
isoKleisliChannel f finv Channel{send, recv, tryRecv} = Channel {
send = finv >=> send,
recv = recv >>= traverse f,
tryRecv = tryRecv >>= traverse (traverse f)

}


Expand All @@ -107,8 +117,9 @@ hoistChannel
-> Channel m a
-> Channel n a
hoistChannel nat channel = Channel
{ send = nat . send channel
, recv = nat (recv channel)
{ send = nat . send channel
, recv = nat (recv channel)
, tryRecv = nat (tryRecv channel)
}

-- | A 'Channel' with a fixed input, and where all output is discarded.
Expand All @@ -123,14 +134,16 @@ hoistChannel nat channel = Channel
fixedInputChannel :: MonadSTM m => [a] -> m (Channel m a)
fixedInputChannel xs0 = do
v <- atomically $ newTVar xs0
return Channel {send, recv = recv v}
return Channel {send, recv = recv v, tryRecv = tryRecv v}
where
recv v = atomically $ do
xs <- readTVar v
case xs of
[] -> return Nothing
(x:xs') -> writeTVar v xs' >> return (Just x)

tryRecv v = Just <$> recv v

send _ = return ()


Expand All @@ -142,10 +155,11 @@ mvarsAsChannel :: MonadSTM m
-> TMVar m a
-> Channel m a
mvarsAsChannel bufferRead bufferWrite =
Channel{send, recv}
Channel{send, recv, tryRecv}
where
send x = atomically (putTMVar bufferWrite x)
recv = atomically (Just <$> takeTMVar bufferRead)
send x = atomically (putTMVar bufferWrite x)
recv = atomically (Just <$> takeTMVar bufferRead)
tryRecv = Just <$> recv


-- | Create a pair of channels that are connected via one-place buffers.
Expand Down Expand Up @@ -178,9 +192,10 @@ createConnectedBufferedChannels sz = do
pure (wrap chan1, wrap chan2)
where
wrap :: Channel (STM m) a -> Channel m a
wrap Channel{send, recv} = Channel
{ send = atomically . send
, recv = atomically recv
wrap Channel{send, recv, tryRecv} = Channel
{ send = atomically . send
, recv = atomically recv
, tryRecv = atomically tryRecv
}

-- | As 'createConnectedBufferedChannels', but in 'STM'.
Expand All @@ -196,10 +211,11 @@ createConnectedBufferedChannelsSTM sz = do
queuesAsChannel bufferA bufferB)
where
queuesAsChannel bufferRead bufferWrite =
Channel{send, recv}
Channel{send, recv, tryRecv}
where
send x = writeTBQueue bufferWrite x
recv = Just <$> readTBQueue bufferRead
send x = writeTBQueue bufferWrite x
recv = Just <$> readTBQueue bufferRead
tryRecv = Just <$> recv


-- | Create a pair of channels that are connected via N-place buffers.
Expand All @@ -223,13 +239,14 @@ createPipelineTestChannels sz = do
queuesAsChannel bufferA bufferB)
where
queuesAsChannel bufferRead bufferWrite =
Channel{send, recv}
Channel{send, recv, tryRecv}
where
send x = atomically $ do
full <- isFullTBQueue bufferWrite
if full then error failureMsg
else writeTBQueue bufferWrite x
recv = atomically (Just <$> readTBQueue bufferRead)
send x = atomically $ do
full <- isFullTBQueue bufferWrite
if full then error failureMsg
else writeTBQueue bufferWrite x
recv = atomically (Just <$> readTBQueue bufferRead)
tryRecv = Just <$> recv

failureMsg = "createPipelineTestChannels: "
++ "maximum pipeline depth exceeded: " ++ show sz
Expand All @@ -248,7 +265,7 @@ handlesAsChannel :: IO.Handle -- ^ Read handle
-> IO.Handle -- ^ Write handle
-> Channel IO LBS.ByteString
handlesAsChannel hndRead hndWrite =
Channel{send, recv}
Channel{send, recv, tryRecv}
where
send :: LBS.ByteString -> IO ()
send chunk = do
Expand All @@ -262,6 +279,14 @@ handlesAsChannel hndRead hndWrite =
then return Nothing
else Just . LBS.fromStrict <$> BS.hGetSome hndRead smallChunkSize

tryRecv :: IO (Maybe (Maybe LBS.ByteString))
tryRecv = do
eof <- IO.hIsEOF hndRead
if eof
then return Nothing
else Just . Just <$> LBS.hGetNonBlocking hndRead smallChunkSize



-- | Transform a channel to add an extra action before /every/ send and after
-- /every/ receive.
Expand All @@ -272,7 +297,7 @@ channelEffect :: forall m a.
-> (Maybe a -> m ()) -- ^ Action after 'recv'
-> Channel m a
-> Channel m a
channelEffect beforeSend afterRecv Channel{send, recv} =
channelEffect beforeSend afterRecv Channel{send, recv, tryRecv} =
Channel{
send = \x -> do
beforeSend x
Expand All @@ -282,6 +307,13 @@ channelEffect beforeSend afterRecv Channel{send, recv} =
mx <- recv
afterRecv mx
return mx

, tryRecv = do
mx <- tryRecv
case mx of
Just x -> afterRecv x
_ -> return ()
return mx
}

-- | Delay a channel on the receiver end.
Expand All @@ -308,10 +340,11 @@ loggingChannel :: ( MonadSay m
=> id
-> Channel m a
-> Channel m a
loggingChannel ident Channel{send,recv} =
loggingChannel ident Channel{send,recv,tryRecv} =
Channel {
send = loggingSend,
recv = loggingRecv
send = loggingSend,
recv = loggingRecv,
tryRecv = loggingTryRecv
}
where
loggingSend a = do
Expand All @@ -324,3 +357,10 @@ loggingChannel ident Channel{send,recv} =
Nothing -> return ()
Just a -> say (show ident ++ ":recv:" ++ show a)
return msg

loggingTryRecv = do
msg <- tryRecv
case msg of
Just (Just a) -> say (show ident ++ ":recv:" ++ show a)
_ -> return ()
return msg
7 changes: 0 additions & 7 deletions ouroboros-network-framework/src/Ouroboros/Network/Driver.hs
Expand Up @@ -2,17 +2,10 @@
-- | Drivers for running 'Peer's with a 'Codec' and a 'Channel'.
--
module Ouroboros.Network.Driver (

-- * Normal peers
runPeer,
runPeerWithLimits,
TraceSendRecv(..),

-- * Pipelined peers
runPipelinedPeer,
runPipelinedPeerWithLimits,
) where

import Ouroboros.Network.Driver.Simple
import Ouroboros.Network.Driver.Limits

0 comments on commit 1803408

Please sign in to comment.