Skip to content

Commit

Permalink
Update the pipelined peer driver.
Browse files Browse the repository at this point in the history
The SenderCollect case of runPipelinedPeer relies on the new tryReadTBQueue.
  • Loading branch information
dcoutts committed Feb 11, 2019
1 parent a2bfdf5 commit 1e11f58
Showing 1 changed file with 55 additions and 30 deletions.
85 changes: 55 additions & 30 deletions typed-protocols/src/Network/TypedProtocol/Driver.hs
Expand Up @@ -34,6 +34,8 @@ import Network.TypedProtocol.Codec

import Control.Monad.Class.MonadSTM

import Numeric.Natural (Natural)


-- $intro
--
Expand Down Expand Up @@ -121,75 +123,98 @@ runDecoder Channel{recv} = go
runPipelinedPeer
:: forall ps (st :: ps) pk failure bytes m a.
MonadSTM m
=> Codec ps pk failure m bytes
=> Natural
-> Codec ps pk failure m bytes
-> Channel m bytes
-> PeerSender ps pk st m a
-> PeerPipelined ps pk st m a
-> m a
runPipelinedPeer codec channel peer = do
queue <- atomically $ newTBQueue 10 --TODO: size?
fork $ runPipelinedPeerReceiverQueue queue codec channel
runPipelinedPeerSender queue codec channel peer
runPipelinedPeer maxOutstanding codec channel (PeerPipelined peer) = do
receiveQueue <- atomically $ newTBQueue maxOutstanding
collectQueue <- atomically $ newTBQueue maxOutstanding
fork $ runPipelinedPeerReceiverQueue receiveQueue collectQueue
codec channel
runPipelinedPeerSender receiveQueue collectQueue
codec channel peer
--TODO: manage the fork + exceptions here


data ReceiveHandler ps pk m where
ReceiveHandler :: PeerReceiver ps pk (st :: ps) (st' :: ps) m
-> ReceiveHandler ps pk m
data ReceiveHandler ps pk m c where
ReceiveHandler :: PeerReceiver ps pk (st :: ps) (st' :: ps) m c
-> ReceiveHandler ps pk m c


runPipelinedPeerSender
:: forall ps (st :: ps) pk failure bytes m a.
:: forall ps (st :: ps) pk failure bytes c m a.
MonadSTM m
=> TBQueue m (ReceiveHandler ps pk m)
=> TBQueue m (ReceiveHandler ps pk m c)
-> TBQueue m c
-> Codec ps pk failure m bytes
-> Channel m bytes
-> PeerSender ps pk st m a
-> PeerSender ps pk st Z c m a
-> m a
runPipelinedPeerSender queue Codec{encode} Channel{send} = go
runPipelinedPeerSender receiveQueue collectQueue Codec{encode} Channel{send} =
go Zero
where
go :: forall st'. PeerSender ps pk st' m a -> m a
go (SenderEffect k) = k >>= go
go (SenderDone _ x) = return x
go :: forall st' n. Nat n -> PeerSender ps pk st' n c m a -> m a
go n (SenderEffect k) = k >>= go n
go Zero (SenderDone _ x) = return x

go (SenderYield stok msg receiver k) = do
atomically (writeTBQueue queue (ReceiveHandler receiver))
go n (SenderYield stok msg k) = do
send (encode stok msg)
go k
go n k

go n (SenderPipeline stok msg receiver k) = do
atomically (writeTBQueue receiveQueue (ReceiveHandler receiver))
send (encode stok msg)
go (Succ n) k

go (Succ n) (SenderCollect Nothing k) = do
c <- atomically (readTBQueue collectQueue)
go n (k c)

go (Succ n) (SenderCollect (Just k') k) = do
mc <- atomically (tryReadTBQueue collectQueue)
case mc of
Nothing -> go (Succ n) k'
Just c -> go n (k c)


runPipelinedPeerReceiverQueue
:: forall ps pk failure bytes m.
:: forall ps pk failure bytes m c.
MonadSTM m
=> TBQueue m (ReceiveHandler ps pk m)
=> TBQueue m (ReceiveHandler ps pk m c)
-> TBQueue m c
-> Codec ps pk failure m bytes
-> Channel m bytes
-> m ()
runPipelinedPeerReceiverQueue queue codec channel = go Nothing
runPipelinedPeerReceiverQueue receiveQueue collectQueue codec channel = go Nothing
where
go :: Maybe bytes -> m ()
go trailing = do
ReceiveHandler receiver <- atomically (readTBQueue queue)
trailing' <- runPipelinedPeerReceiver codec channel trailing receiver
ReceiveHandler receiver <- atomically (readTBQueue receiveQueue)
--TODO: use 'try' here once we have MonadCatch
(c, trailing') <- runPipelinedPeerReceiver codec channel trailing receiver
atomically (writeTBQueue collectQueue c)
go trailing'


runPipelinedPeerReceiver
:: forall ps (st :: ps) (stdone :: ps) pk failure bytes m.
:: forall ps (st :: ps) (stdone :: ps) pk failure bytes m c.
Monad m
=> Codec ps pk failure m bytes
-> Channel m bytes
-> Maybe bytes
-> PeerReceiver ps pk (st :: ps) (stdone :: ps) m
-> m (Maybe bytes)
-> PeerReceiver ps pk (st :: ps) (stdone :: ps) m c
-> m (c, Maybe bytes)
runPipelinedPeerReceiver Codec{decode} channel = go
where
go :: forall st' st''.
Maybe bytes
-> PeerReceiver ps pk st' st'' m
-> m (Maybe bytes)
-> PeerReceiver ps pk st' st'' m c
-> m (c, Maybe bytes)
go trailing (ReceiverEffect k) = k >>= go trailing

go trailing ReceiverDone = return trailing
go trailing (ReceiverDone x) = return (x, trailing)

go trailing (ReceiverAwait stok k) = do
decoder <- decode stok
Expand Down

0 comments on commit 1e11f58

Please sign in to comment.