Navigation Menu

Skip to content

Commit

Permalink
typed-protocols-examples: unbounded buffered channel
Browse files Browse the repository at this point in the history
A channel based on `TQueue`.  It is useful for testing pipelined
protocols, where pipelining depth is not taken into account.
  • Loading branch information
coot committed May 23, 2022
1 parent bfbd973 commit df23f06
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
38 changes: 35 additions & 3 deletions typed-protocols-examples/src/Network/TypedProtocol/Channel.hs
Expand Up @@ -16,6 +16,7 @@ module Network.TypedProtocol.Channel
#endif
, createConnectedChannels
, createConnectedBufferedChannels
, createConnectedBufferedChannelsUnbounded
, createPipelineTestChannels
, channelEffect
, delayChannel
Expand All @@ -30,6 +31,7 @@ import qualified Data.ByteString as BS
import qualified Data.ByteString.Internal as BS (createAndTrim')
import qualified Data.ByteString.Lazy as LBS
import Data.ByteString.Lazy.Internal (smallChunkSize)
import Data.Proxy
import Data.Word (Word8)
import Foreign.C.Error (eAGAIN, eWOULDBLOCK, getErrno, throwErrno)
import Foreign.C.Types
Expand Down Expand Up @@ -142,12 +144,20 @@ mvarsAsChannel bufferRead bufferWrite =
--
-- This is primarily useful for testing protocols.
--
createConnectedChannels :: MonadSTM m => m (Channel m a, Channel m a)
createConnectedChannels :: forall m a. (MonadLabelledSTM m, MonadTraceSTM m, Show a) => m (Channel m a, Channel m a)
createConnectedChannels = do
-- Create two TMVars to act as the channel buffer (one for each direction)
-- and use them to make both ends of a bidirectional channel
bufferA <- atomically $ newEmptyTMVar
bufferB <- atomically $ newEmptyTMVar
bufferA <- atomically $ do
v <- newEmptyTMVar
labelTMVar v "buffer-a"
traceTMVar (Proxy :: Proxy m) v $ \_ a -> pure $ TraceString ("buffer-a: " ++ show a)
return v
bufferB <- atomically $ do
v <- newEmptyTMVar
traceTMVar (Proxy :: Proxy m) v $ \_ a -> pure $ TraceString ("buffer-b: " ++ show a)
labelTMVar v "buffer-b"
return v

return (mvarsAsChannel bufferB bufferA,
mvarsAsChannel bufferA bufferB)
Expand Down Expand Up @@ -180,6 +190,28 @@ createConnectedBufferedChannels sz = do
tryRecv = atomically (fmap Just <$> tryReadTBQueue bufferRead)


-- | Create a pair of channels that are connected via two unbounded buffers.
--
-- This is primarily useful for testing protocols.
--
createConnectedBufferedChannelsUnbounded :: forall m a. MonadSTM m
=> m (Channel m a, Channel m a)
createConnectedBufferedChannelsUnbounded = do
-- Create two TQueues to act as the channel buffers (one for each
-- direction) and use them to make both ends of a bidirectional channel
bufferA <- atomically $ newTQueue
bufferB <- atomically $ newTQueue

return (queuesAsChannel bufferB bufferA,
queuesAsChannel bufferA bufferB)
where
queuesAsChannel bufferRead bufferWrite =
Channel{send, recv, tryRecv}
where
send x = atomically (writeTQueue bufferWrite x)
recv = atomically ( Just <$> readTQueue bufferRead)
tryRecv = atomically (fmap Just <$> tryReadTQueue bufferRead)

-- | Create a pair of channels that are connected via N-place buffers.
--
-- This variant /fails/ when 'send' would exceed the maximum buffer size.
Expand Down
Expand Up @@ -313,8 +313,8 @@ prop_connect_pipelined5 choices (Positive omax) (NonNegative n) =

-- | Run a non-pipelined client and server over a channel using a codec.
--
prop_channel :: ( MonadSTM m, MonadAsync m, MonadCatch m, MonadMask m
, MonadThrow (STM m))
prop_channel :: ( MonadLabelledSTM m, MonadTraceSTM m, MonadAsync m
, MonadCatch m, MonadMask m, MonadThrow (STM m))
=> NonNegative Int
-> m Bool
prop_channel (NonNegative n) = do
Expand Down

0 comments on commit df23f06

Please sign in to comment.