Skip to content

Commit

Permalink
connectThroughQueue: connect server sides through a bounded queue
Browse files Browse the repository at this point in the history
  • Loading branch information
coot committed Dec 5, 2018
1 parent a2318d5 commit 6a8bd36
Showing 1 changed file with 69 additions and 1 deletion.
70 changes: 69 additions & 1 deletion src/Ouroboros/Network/Protocol/BlockFetch/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,27 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.Protocol.BlockFetch.Server where
module Ouroboros.Network.Protocol.BlockFetch.Server
( BlockFetchServerReceiver (..)
, constantReceiver
, blockFetchServerReceiverStream
, BlockFetchServer (..)
, BlockFetchSender (..)
, BlockFetchSendBlocks (..)
, blockFetchServerStream
, connectThroughQueue
)
where

import Data.Functor (($>))
import Numeric.Natural (Natural)
import Pipes (Producer)
import qualified Pipes

import Protocol.Core

import Ouroboros.Network.MonadClass.MonadSTM (MonadSTM (..))

import Ouroboros.Network.Protocol.BlockFetch.Type

{-------------------------------------------------------------------------------
Expand Down Expand Up @@ -136,3 +152,55 @@ blockFetchServerStream server = lift $ handleStAwait <$> runBlockFetchServer ser
part (MessageBlock block) (lift $ sendBlocks <$> msender)
sendBlocks (SendMessageBatchDone server') =
part MessageBatchDone (blockFetchServerStream server')

-- | Connection between the server side of @'BlockFetchClientProtocol'@ and the
-- server side of @'BlockFetchServerProtocol'@>
--
connectThroughQueue'
:: forall header block m.
MonadSTM m
=> TBQueue m (ChainRange header)
-> (ChainRange header -> m (Maybe (Producer block m ())))
-> ( BlockFetchServerReceiver header m ()
, BlockFetchServer header block m ()
)
connectThroughQueue' queue blockStream = (receiver, server)
where
receiver :: BlockFetchServerReceiver header m ()
receiver = constantReceiver (atomically . writeTBQueue queue) ()

server :: BlockFetchServer header block m ()
server = BlockFetchServer $ do
mstream <- atomically (readTBQueue queue) >>= blockStream
case mstream :: Maybe (Producer block m ()) of
Nothing -> return $ SendMessageNoBlocks server
Just stream -> do
nxt <- Pipes.next stream
case nxt of
Left _ -> return $ SendMessageNoBlocks server
Right (block, stream') -> return $ SendMessageStartBatch block (sendStream stream')

sendStream
:: Producer block m ()
-> m (BlockFetchSendBlocks header block m ())
sendStream stream = do
nxt <- Pipes.next stream
case nxt of
Left _ -> return $ SendMessageBatchDone server
Right (b, stream') -> return $ SendMessageBlock b (sendStream stream')

-- | Connect server side of @'BlockFetchClientProtocol'@ and
-- @'BlockFetchSErverProtocol'@ thought a freshly constructed @'TBQueue'@.
--
connectThroughQueue
:: forall header block m.
MonadSTM m
=> Natural
-- ^ queue size
-> (ChainRange header -> m (Maybe (Producer block m ())))
-> m ( BlockFetchServerReceiver header m ()
, BlockFetchServer header block m ()
)
connectThroughQueue queueSize blockStream = do
queue <- atomically $ newTBQueue queueSize
return $ connectThroughQueue' queue blockStream

0 comments on commit 6a8bd36

Please sign in to comment.