Skip to content

Commit

Permalink
Hacking on an improvement to pipelined protocols
Browse files Browse the repository at this point in the history
Allowing the handlers to return a result which is collected in the
main thread of the peer.
  • Loading branch information
dcoutts committed Feb 11, 2019
1 parent 5f4880d commit 126c8ce
Show file tree
Hide file tree
Showing 6 changed files with 548 additions and 272 deletions.
86 changes: 56 additions & 30 deletions typed-protocols/src/Network/TypedProtocol/Driver.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import Network.TypedProtocol.Codec

import Control.Monad.Class.MonadSTM

import Numeric.Natural (Natural)


-- $intro
--
Expand Down Expand Up @@ -121,75 +123,99 @@ runDecoder Channel{recv} = go
runPipelinedPeer
:: forall ps (st :: ps) pk failure bytes m a.
MonadSTM m
=> Codec ps pk failure m bytes
=> Natural
-> Codec ps pk failure m bytes
-> Channel m bytes
-> PeerSender ps pk st m a
-> PeerPipelined ps pk st m a
-> m a
runPipelinedPeer codec channel peer = do
queue <- atomically $ newTBQueue 10 --TODO: size?
fork $ runPipelinedPeerReceiverQueue queue codec channel
runPipelinedPeerSender queue codec channel peer
runPipelinedPeer maxOutstanding codec channel (PeerPipelined peer) = do
receiveQueue <- atomically $ newTBQueue maxOutstanding
collectQueue <- atomically $ newTBQueue maxOutstanding
fork $ runPipelinedPeerReceiverQueue receiveQueue collectQueue
codec channel
runPipelinedPeerSender receiveQueue collectQueue
codec channel peer
--TODO: manage the fork + exceptions here


data ReceiveHandler ps pk m where
ReceiveHandler :: PeerReceiver ps pk (st :: ps) (st' :: ps) m
-> ReceiveHandler ps pk m
data ReceiveHandler ps pk m c where
ReceiveHandler :: PeerReceiver ps pk (st :: ps) (st' :: ps) m c
-> ReceiveHandler ps pk m c


runPipelinedPeerSender
:: forall ps (st :: ps) pk failure bytes m a.
:: forall ps (st :: ps) pk failure bytes c m a.
MonadSTM m
=> TBQueue m (ReceiveHandler ps pk m)
=> TBQueue m (ReceiveHandler ps pk m c)
-> TBQueue m c
-> Codec ps pk failure m bytes
-> Channel m bytes
-> PeerSender ps pk st m a
-> PeerSender ps pk st Z c m a
-> m a
runPipelinedPeerSender queue Codec{encode} Channel{send} = go
runPipelinedPeerSender receiveQueue collectQueue Codec{encode} Channel{send} =
go Zero
where
go :: forall st'. PeerSender ps pk st' m a -> m a
go (SenderEffect k) = k >>= go
go (SenderDone _ x) = return x
go :: forall st' n. Nat n -> PeerSender ps pk st' n c m a -> m a
go n (SenderEffect k) = k >>= go n
go Zero (SenderDone _ x) = return x

go (SenderYield stok msg receiver k) = do
atomically (writeTBQueue queue (ReceiveHandler receiver))
go n (SenderYield stok msg k) = do
send (encode stok msg)
go k
go n k

go n (SenderPipeline stok msg receiver k) = do
atomically (writeTBQueue receiveQueue (ReceiveHandler receiver))
send (encode stok msg)
go (Succ n) k

go (Succ n) (SenderCollect Nothing k) = do
c <- atomically (readTBQueue collectQueue)
go n (k c)

go (Succ n) (SenderCollect (Just k') k) = do
fail "TODO: need tryReadTBQueue"
-- mc <- atomically (tryReadTBQueue collectQueue)
-- case mc of
-- Nothing -> go (Succ n) k'
-- Just c -> go n (k c)


runPipelinedPeerReceiverQueue
:: forall ps pk failure bytes m.
:: forall ps pk failure bytes m c.
MonadSTM m
=> TBQueue m (ReceiveHandler ps pk m)
=> TBQueue m (ReceiveHandler ps pk m c)
-> TBQueue m c
-> Codec ps pk failure m bytes
-> Channel m bytes
-> m ()
runPipelinedPeerReceiverQueue queue codec channel = go Nothing
runPipelinedPeerReceiverQueue receiveQueue collectQueue codec channel = go Nothing
where
go :: Maybe bytes -> m ()
go trailing = do
ReceiveHandler receiver <- atomically (readTBQueue queue)
trailing' <- runPipelinedPeerReceiver codec channel trailing receiver
ReceiveHandler receiver <- atomically (readTBQueue receiveQueue)
--TODO: use 'try' here once we have MonadCatch
(c, trailing') <- runPipelinedPeerReceiver codec channel trailing receiver
atomically (writeTBQueue collectQueue c)
go trailing'


runPipelinedPeerReceiver
:: forall ps (st :: ps) (stdone :: ps) pk failure bytes m.
:: forall ps (st :: ps) (stdone :: ps) pk failure bytes m c.
Monad m
=> Codec ps pk failure m bytes
-> Channel m bytes
-> Maybe bytes
-> PeerReceiver ps pk (st :: ps) (stdone :: ps) m
-> m (Maybe bytes)
-> PeerReceiver ps pk (st :: ps) (stdone :: ps) m c
-> m (c, Maybe bytes)
runPipelinedPeerReceiver Codec{decode} channel = go
where
go :: forall st' st''.
Maybe bytes
-> PeerReceiver ps pk st' st'' m
-> m (Maybe bytes)
-> PeerReceiver ps pk st' st'' m c
-> m (c, Maybe bytes)
go trailing (ReceiverEffect k) = k >>= go trailing

go trailing ReceiverDone = return trailing
go trailing (ReceiverDone x) = return (x, trailing)

go trailing (ReceiverAwait stok k) = do
decoder <- decode stok
Expand Down
71 changes: 60 additions & 11 deletions typed-protocols/src/Network/TypedProtocol/PingPong/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -69,47 +69,96 @@ pingPongClientPeer (SendMsgPing next) =
client <- next
pure $ pingPongClientPeer client


--
-- Pipelined client
--

-- | A ping-pong client designed for running the 'PingPong' protocol in
-- a pipelined way.
--
data PingPongSender m a where
data PingPongClientPipelined m a where
-- | A 'PingPongSender', but starting with zero outstanding pipelined
-- responses, and for any internal collect type @c@.
PingPongClientPipelined ::
PingPongSender Z c m a
-> PingPongClientPipelined m a


data PingPongSender n c m a where
-- |
-- Send a `Ping` message but alike in `PingPongClient` do not await for the
-- resopnse, instead supply a monadic action which will run on a received
-- `Pong` message.
SendMsgPingPipelined
:: m () -- receive action
-> PingPongSender m a -- continuation
-> PingPongSender m a
:: m c -- pong receive action
-> PingPongSender (S n) c m a -- continuation
-> PingPongSender n c m a

-- | Collect the result of a previous pipelined receive action.
--
-- This (optionally) provides two choices:
--
-- * Continue without a pipelined result
-- * Continue with a pipelined result
--
-- Since presenting the first choice is optional, this allows expressing
-- both a blocking collect and a non-blocking collect. This allows
-- implementations to express policies such as sending a short sequence
-- of messages and then waiting for all replies, but also a maximum pipelining
-- policy that keeps a large number of messages in flight but collects results
-- eagerly.
--
CollectPipelined
:: Maybe (PingPongSender (S n) c m a)
-> (c -> PingPongSender n c m a)
-> PingPongSender (S n) c m a

-- | Termination of the ping-pong protocol.
--
-- Note that all pipelined results must be collected before terminating.
--
SendMsgDonePipelined
:: a -> PingPongSender m a
:: a -> PingPongSender Z c m a



pingPongClientPeerPipelined
:: Monad m
=> PingPongClientPipelined m a
-> PeerPipelined PingPong AsClient StIdle m a
pingPongClientPeerPipelined (PingPongClientPipelined peer) =
PeerPipelined (pingPongClientPeerSender peer)


pingPongClientPeerSender
:: Monad m
=> PingPongSender m a
-> PeerSender PingPong AsClient StIdle m a
=> PingPongSender n c m a
-> PeerSender PingPong AsClient StIdle n c m a

pingPongClientPeerSender (SendMsgDonePipelined result) =
-- Send `MsgDone` and complete the protocol
SenderYield
(ClientAgency TokIdle)
MsgDone
ReceiverDone
(SenderDone TokDone result)

pingPongClientPeerSender (SendMsgPingPipelined receive next) =
-- Piplined yield: send `MsgPing`, imediatelly follow with the next step.
-- Await for a response in a continuation.
SenderYield
SenderPipeline
(ClientAgency TokIdle)
MsgPing
-- response handler
(ReceiverAwait (ServerAgency TokBusy) $ \MsgPong ->
ReceiverEffect $ do
receive
return ReceiverDone)
x <- receive
return (ReceiverDone x))
-- run the next step of the ping-pong protocol.
(pingPongClientPeerSender next)

pingPongClientPeerSender (CollectPipelined mNone collect) =
SenderCollect
(fmap pingPongClientPeerSender mNone)
(pingPongClientPeerSender . collect)

118 changes: 103 additions & 15 deletions typed-protocols/src/Network/TypedProtocol/PingPong/Examples.hs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE BangPatterns #-}

module Network.TypedProtocol.PingPong.Examples where

import Network.TypedProtocol.PingPong.Server
import Network.TypedProtocol.PingPong.Client

import Control.Monad.Class.MonadSTM
import Network.TypedProtocol.Pipelined


-- | The standard stateless ping-pong server instance.
Expand All @@ -28,10 +31,10 @@ pingPongServerCount
=> PingPongServer m Int
pingPongServerCount = go 0
where
go !c = PingPongServer {
recvMsgPing = pure $ go (succ c),
recvMsgDone = c
}
go !c = PingPongServer {
recvMsgPing = pure $ go (succ c),
recvMsgDone = c
}


-- | An example ping-pong client that sends pings as fast as possible forever‽
Expand All @@ -50,14 +53,99 @@ pingPongClientCount 0 = SendMsgDone ()
pingPongClientCount n = SendMsgPing (pure (pingPongClientCount (n-1)))


pingPongSenderCount
:: MonadSTM m
=> TVar m Int
-> Int
-> PingPongSender m ()
pingPongSenderCount var = go
where
go 0 = SendMsgDonePipelined ()
go n = SendMsgPingPipelined (atomically $ modifyTVar var succ)
(go (pred n))
--
-- Pipelined examples
--


-- | A pipelined ping-pong client that sends eagerly rather than waiting to
-- collect any replies. This is maximum pipelining in some sense, and
-- correspondingly it gives minimum choice to the environment (drivers).
--
-- It returns the interleaving of ping indexes sent, and collected.
--
pingPongClientPipelinedMax
:: forall m. Monad m
=> Int
-> PingPongClientPipelined m [Either Int Int]
pingPongClientPipelinedMax c =
PingPongClientPipelined (go [] Zero 0)
where
go :: [Either Int Int] -> Nat o -> Int
-> PingPongSender o Int m [Either Int Int]
go acc o n | n < c
= SendMsgPingPipelined
(return n)
(go (Left n : acc) (Succ o) (succ n))
go acc Zero _ = SendMsgDonePipelined (reverse acc)
go acc (Succ o) n = CollectPipelined
Nothing
(\n' -> go (Right n' : acc) o n)


-- | A pipelined ping-pong client that sends eagerly but always tries to
-- collect any replies if they are available. This allows pipelining but
-- keeps it to a minimum, and correspondingly it gives maximum choice to the
-- environment (drivers).
--
-- It returns the interleaving of ping indexes sent, and collected.
--
pingPongClientPipelinedMin
:: forall m. Monad m
=> Int
-> PingPongClientPipelined m [Either Int Int]
pingPongClientPipelinedMin c =
PingPongClientPipelined (go [] Zero 0)
where
go :: [Either Int Int] -> Nat o -> Int
-> PingPongSender o Int m [Either Int Int]
go acc (Succ o) n = CollectPipelined
(if n < c then Just (ping acc (Succ o) n)
else Nothing)
(\n' -> go (Right n' : acc) o n)
go acc Zero n | n < c
= ping acc Zero n
go acc Zero _ = SendMsgDonePipelined (reverse acc)

ping :: [Either Int Int] -> Nat o -> Int
-> PingPongSender o Int m [Either Int Int]
ping acc o n = SendMsgPingPipelined
(return n)
(go (Left n : acc) (Succ o) (succ n))


-- | A pipelined ping-pong client that sends eagerly up to some maximum limit
-- of outstanding requests. It is also always ready to collect any replies if
-- they are available. This allows limited pipelining and correspondingly
-- limited choice to the environment (drivers).
--
-- It returns the interleaving of ping indexes sent, and collected.
--
pingPongClientPipelinedLimited
:: forall m. Monad m
=> Int -> Int
-> PingPongClientPipelined m [Either Int Int]
pingPongClientPipelinedLimited omax c =
PingPongClientPipelined (go [] Zero 0)
where
go :: [Either Int Int] -> Nat o -> Int
-> PingPongSender o Int m [Either Int Int]
go acc (Succ o) n = CollectPipelined
(if n < c && int (Succ o) < omax
then Just (ping acc (Succ o) n)
else Nothing)
(\n' -> go (Right n' : acc) o n)
go acc Zero n | n < c
= ping acc Zero n
go acc Zero _ = SendMsgDonePipelined (reverse acc)

ping :: [Either Int Int] -> Nat o -> Int
-> PingPongSender o Int m [Either Int Int]
ping acc o n = SendMsgPingPipelined
(return n)
(go (Left n : acc) (Succ o) (succ n))

-- this isn't supposed to be efficient, it's just for the example
int :: Nat n -> Int
int Zero = 0
int (Succ n) = succ (int n)
Loading

0 comments on commit 126c8ce

Please sign in to comment.