Skip to content
Permalink
Browse files

Update the tests and other demos for the codec type change

All of these ones ignore the bytes passed in, so use fmap const on the
decoder.

The Byron case will use something like:
  fmap (\bytes -> BSL.toStrict . slice bytes)
to annotate the decoded result with the associated input bytes.

See for example decodeFullAnnotatedBytes from cardano-binary.
  • Loading branch information...
dcoutts committed Aug 13, 2019
1 parent 6b69e7b commit d1a045aae3be05580ea6622d69ab177bce58312c
@@ -49,6 +49,7 @@ import Ouroboros.Network.Socket
import Ouroboros.Network.Mux
import Ouroboros.Network.NodeToNode

import Network.TypedProtocol.Codec
import Network.TypedProtocol.Channel
import Network.TypedProtocol.Driver
import Network.TypedProtocol.Pipelined
@@ -60,13 +61,14 @@ import Network.TypedProtocol.PingPong.Server as PingPong
import Ouroboros.Network.Protocol.Handshake.Type
import Ouroboros.Network.Protocol.Handshake.Version

import Ouroboros.Network.Protocol.ChainSync.Codec as ChainSync
import Ouroboros.Network.Protocol.ChainSync.Server as ChainSync
import Ouroboros.Network.Protocol.ChainSync.Client as ChainSync
import qualified Ouroboros.Network.Protocol.ChainSync.Type as ChainSync
import qualified Ouroboros.Network.Protocol.ChainSync.Codec as ChainSync
import qualified Ouroboros.Network.Protocol.ChainSync.Server as ChainSync
import qualified Ouroboros.Network.Protocol.ChainSync.Client as ChainSync

import Ouroboros.Network.Protocol.BlockFetch.Type as BlockFetch
import Ouroboros.Network.Protocol.BlockFetch.Codec as BlockFetch
import Ouroboros.Network.Protocol.BlockFetch.Server as BlockFetch
import qualified Ouroboros.Network.Protocol.BlockFetch.Type as BlockFetch
import qualified Ouroboros.Network.Protocol.BlockFetch.Codec as BlockFetch
import qualified Ouroboros.Network.Protocol.BlockFetch.Server as BlockFetch

import Ouroboros.Network.BlockFetch
import Ouroboros.Network.BlockFetch.Client
@@ -343,8 +345,8 @@ clientChainSync sockAddrs =
protocols ChainSync2 =
MuxPeer
(contramap show stdoutTracer)
(codecChainSync CBOR.encode CBOR.decode CBOR.encode CBOR.decode)
(chainSyncClientPeer chainSyncClient)
codecChainSync
(ChainSync.chainSyncClientPeer chainSyncClient)


serverChainSync :: FilePath -> IO ()
@@ -370,8 +372,17 @@ serverChainSync sockAddr = do
protocols ChainSync2 =
MuxPeer
(contramap show stdoutTracer)
(codecChainSync CBOR.encode CBOR.decode CBOR.encode CBOR.decode)
(chainSyncServerPeer (chainSyncServer prng))
codecChainSync
(ChainSync.chainSyncServerPeer (chainSyncServer prng))

codecChainSync :: (CBOR.Serialise block, CBOR.Serialise point)
=> Codec (ChainSync.ChainSync block point)
CBOR.DeserialiseFailure
IO LBS.ByteString
codecChainSync =
ChainSync.codecChainSync
CBOR.encode (fmap const CBOR.decode)
CBOR.encode CBOR.decode



@@ -414,10 +425,10 @@ clientBlockFetch sockAddrs = do
bracket register unregister $ \chainVar ->
runPeer
nullTracer -- (contramap (show . TraceLabelPeer peerid) stdoutTracer)
(codecChainSync CBOR.encode CBOR.decode CBOR.encode CBOR.decode)
codecChainSync
peerid
channel
(chainSyncClientPeer
(ChainSync.chainSyncClientPeer
(chainSyncClient' syncTracer currentChainVar chainVar))
where
register = atomically $ do
@@ -433,7 +444,7 @@ clientBlockFetch sockAddrs = do
bracketFetchClient registry peerid $ \clientCtx ->
runPipelinedPeer
nullTracer -- (contramap (show . TraceLabelPeer peerid) stdoutTracer)
(codecBlockFetch CBOR.encode CBOR.encode CBOR.decode CBOR.decode)
codecBlockFetch
peerid
channel
(blockFetchClient clientCtx)
@@ -554,54 +565,67 @@ serverBlockFetch sockAddr = do
protocols ChainSync3 =
MuxPeer
(contramap show stdoutTracer)
(codecChainSync CBOR.encode CBOR.decode CBOR.encode CBOR.decode)
(chainSyncServerPeer (chainSyncServer prng))
codecChainSync
(ChainSync.chainSyncServerPeer (chainSyncServer prng))

protocols BlockFetch3 =
MuxPeer
(contramap show stdoutTracer)
(codecBlockFetch CBOR.encode CBOR.encode CBOR.decode CBOR.decode)
(blockFetchServerPeer (blockFetchServer prng))
codecBlockFetch
(BlockFetch.blockFetchServerPeer (blockFetchServer prng))

codecBlockFetch :: Codec (BlockFetch.BlockFetch Block)
CBOR.DeserialiseFailure
IO LBS.ByteString
codecBlockFetch =
BlockFetch.codecBlockFetch
CBOR.encode (fmap const CBOR.decode)
CBOR.encode CBOR.decode


--
-- Chain sync and block fetch protocol handlers
--

chainSyncClient :: ChainSyncClient BlockHeader (Point BlockHeader) IO ()
chainSyncClient :: ChainSync.ChainSyncClient
BlockHeader (Point BlockHeader) IO ()
chainSyncClient =
ChainSyncClient $ do
ChainSync.ChainSyncClient $ do
curvar <- newTVarIO genesisChainFragment
chainvar <- newTVarIO genesisChainFragment
let ChainSyncClient k = chainSyncClient' nullTracer curvar chainvar
let ChainSync.ChainSyncClient k =
chainSyncClient' nullTracer curvar chainvar
k

chainSyncClient' :: Tracer IO (Point BlockHeader, Point BlockHeader)
-> TVar (AF.AnchoredFragment BlockHeader)
-> TVar (AF.AnchoredFragment BlockHeader)
-> ChainSyncClient BlockHeader (Point BlockHeader) IO ()
-> ChainSync.ChainSyncClient
BlockHeader (Point BlockHeader) IO ()
chainSyncClient' syncTracer _currentChainVar candidateChainVar =
ChainSyncClient (return requestNext)
ChainSync.ChainSyncClient (return requestNext)
where
requestNext :: ClientStIdle BlockHeader (Point BlockHeader) IO ()
requestNext :: ChainSync.ClientStIdle
BlockHeader (Point BlockHeader) IO ()
requestNext =
SendMsgRequestNext
ChainSync.SendMsgRequestNext
handleNext
(return handleNext) -- wait case, could trace

handleNext :: ClientStNext BlockHeader (Point BlockHeader) IO ()
handleNext :: ChainSync.ClientStNext
BlockHeader (Point BlockHeader) IO ()
handleNext =
ClientStNext {
ChainSync.ClientStNext {
recvMsgRollForward = \header _pHead ->
ChainSyncClient $ do
ChainSync.ChainSyncClient $ do
addBlock header
--FIXME: the notTooFarAhead bit is not working
-- it seems the current chain is always of length 1.
-- notTooFarAhead
return requestNext

, recvMsgRollBackward = \pIntersect _pHead ->
ChainSyncClient $ do
ChainSync.ChainSyncClient $ do
rollback pIntersect
return requestNext
}
@@ -632,73 +656,77 @@ chainSyncClient' syncTracer _currentChainVar candidateChainVar =
-}
chainSyncServer :: RandomGen g
=> g
-> ChainSyncServer BlockHeader (Point BlockHeader) IO ()
-> ChainSync.ChainSyncServer
BlockHeader (Point BlockHeader) IO ()
chainSyncServer seed =
let blocks = chainGenerator seed in
ChainSyncServer (return (idleState blocks))
ChainSync.ChainSyncServer (return (idleState blocks))
where
idleState :: [Block]
-> ServerStIdle BlockHeader (Point BlockHeader) IO ()
-> ChainSync.ServerStIdle
BlockHeader (Point BlockHeader) IO ()
idleState blocks =
ServerStIdle {
ChainSync.ServerStIdle {
recvMsgRequestNext = do threadDelay 500000
return (Left (nextState blocks)),
recvMsgFindIntersect = \_ -> return (intersectState blocks),
recvMsgDoneClient = return ()
}

nextState :: [Block]
-> ServerStNext BlockHeader (Point BlockHeader) IO ()
-> ChainSync.ServerStNext
BlockHeader (Point BlockHeader) IO ()
nextState [] = error "chainSyncServer: impossible"
nextState (block:blocks) =
SendMsgRollForward
ChainSync.SendMsgRollForward
(blockHeader block)
-- pretend chain head is next one:
(blockPoint (blockHeader (head blocks)))
(ChainSyncServer (return (idleState blocks)))
(ChainSync.ChainSyncServer (return (idleState blocks)))

intersectState :: [Block]
-> ServerStIntersect BlockHeader (Point BlockHeader) IO ()
-> ChainSync.ServerStIntersect
BlockHeader (Point BlockHeader) IO ()
intersectState blocks =
SendMsgIntersectNotFound
ChainSync.SendMsgIntersectNotFound
-- pretend chain head is next one:
(blockPoint (blockHeader (head blocks)))
(ChainSyncServer (return (idleState blocks)))
(ChainSync.ChainSyncServer (return (idleState blocks)))


blockFetchServer :: RandomGen g
=> g
-> BlockFetchServer Block IO ()
-> BlockFetch.BlockFetchServer Block IO ()
blockFetchServer seed =
let blocks = chainGenerator seed in
idleState blocks
where
idleState blocks =
BlockFetchServer
BlockFetch.BlockFetchServer
(\range -> return (senderState range blocks))
()

senderState range blocks =
case selectBlockRange range blocks of
Nothing ->
SendMsgNoBlocks (return (idleState blocks))
BlockFetch.SendMsgNoBlocks (return (idleState blocks))

Just (batch, blocks') ->
SendMsgStartBatch $ do
BlockFetch.SendMsgStartBatch $ do
threadDelay 1000000
return (sendingState batch blocks')

sendingState [] blocks =
SendMsgBatchDone (return (idleState blocks))
BlockFetch.SendMsgBatchDone (return (idleState blocks))
sendingState (b:batch) blocks =
SendMsgBlock b $ do
BlockFetch.SendMsgBlock b $ do
threadDelay 1000000
return (sendingState batch blocks)

selectBlockRange :: ChainRange Block
selectBlockRange :: BlockFetch.ChainRange Block
-> [Block]
-> Maybe ([Block], [Block])
selectBlockRange (ChainRange lower upper) blocks0 = do
selectBlockRange (BlockFetch.ChainRange lower upper) blocks0 = do
(_, blocks1) <- splitBeforePoint lower blocks0
(bs, b:remaining) <- splitBeforePoint upper blocks1
return (bs ++ [b], remaining)
@@ -196,7 +196,7 @@ runFetchClient tracer registry peerid channel client =
runPipelinedPeer tracer codec peerid channel $
client clientCtx
where
codec = codecBlockFetch encode encode decode decode
codec = codecBlockFetch encode (fmap const decode) encode decode

runFetchServer :: (MonadThrow m, MonadST m,
Serialise block,
@@ -210,7 +210,7 @@ runFetchServer tracer peerid channel server =
runPeer tracer codec peerid channel $
blockFetchServerPeer server
where
codec = codecBlockFetch encode encode decode decode
codec = codecBlockFetch encode (fmap const decode) encode decode

runFetchClientAndServerAsync
:: (MonadCatch m, MonadAsync m, MonadFork m, MonadTimer m,
@@ -266,7 +266,7 @@ prop_channel createChannels chain points = do
(bodies, ()) <-
runConnectedPeers
createChannels nullTracer
(codecBlockFetch S.encode S.encode S.decode S.decode)
codec
"client" "server"
(blockFetchClientPeer (testClient chain points))
(blockFetchServerPeer (testServer chain))
@@ -298,6 +298,13 @@ prop_pipe_IO (TestChainAndPoints chain points) =
-- Codec properties
--

codec :: MonadST m
=> Codec (BlockFetch Block)
S.DeserialiseFailure
m ByteString
codec = codecBlockFetch S.encode (fmap const S.decode)
S.encode S.decode

instance Arbitrary (AnyMessageAndAgency (BlockFetch Block)) where
arbitrary = oneof
[ AnyMessageAndAgency (ClientAgency TokIdle) <$>
@@ -327,19 +334,19 @@ prop_codec_BlockFetch
:: AnyMessageAndAgency (BlockFetch Block)
-> Bool
prop_codec_BlockFetch msg =
runST (prop_codecM (codecBlockFetch S.encode S.encode S.decode S.decode) msg)
runST (prop_codecM codec msg)

prop_codec_splits2_BlockFetch
:: AnyMessageAndAgency (BlockFetch Block)
-> Bool
prop_codec_splits2_BlockFetch msg =
runST (prop_codec_splitsM splits2 (codecBlockFetch S.encode S.encode S.decode S.decode) msg)
runST (prop_codec_splitsM splits2 codec msg)

prop_codec_splits3_BlockFetch
:: AnyMessageAndAgency (BlockFetch Block)
-> Bool
prop_codec_splits3_BlockFetch msg =
runST (prop_codec_splitsM splits3 (codecBlockFetch S.encode S.encode S.decode S.decode) msg)
runST (prop_codec_splitsM splits3 codec msg)


--
@@ -189,19 +189,26 @@ instance (Eq header, Eq point) => Eq (AnyMessage (ChainSync header point)) where
AnyMessage MsgDone == AnyMessage MsgDone = True
_ == _ = False

codec :: (MonadST m, S.Serialise block, S.Serialise point)
=> Codec (ChainSync block point)
S.DeserialiseFailure
m ByteString
codec = codecChainSync S.encode (fmap const S.decode)
S.encode S.decode

prop_codec_ChainSync
:: AnyMessageAndAgency (ChainSync BlockHeader (Point BlockHeader))
-> Bool
prop_codec_ChainSync msg =
ST.runST $ prop_codecM (codecChainSync S.encode S.decode S.encode S.decode) msg
ST.runST $ prop_codecM codec msg

prop_codec_splits2_ChainSync
:: AnyMessageAndAgency (ChainSync BlockHeader (Point BlockHeader))
-> Bool
prop_codec_splits2_ChainSync msg =
ST.runST $ prop_codec_splitsM
splits2
(codecChainSync S.encode S.decode S.encode S.decode)
codec
msg

prop_codec_splits3_ChainSync
@@ -210,7 +217,7 @@ prop_codec_splits3_ChainSync
prop_codec_splits3_ChainSync msg =
ST.runST $ prop_codec_splitsM
splits3
(codecChainSync S.encode S.decode S.encode S.decode)
codec
msg

chainSyncDemo
@@ -237,8 +244,8 @@ chainSyncDemo clientChan serverChan (ChainProducerStateForkTest cps chain) = do

client = ChainSyncExamples.chainSyncClientExample chainVar (testClient doneVar (Chain.headPoint pchain))

void $ fork (void $ runPeer nullTracer (codecChainSync S.encode S.decode S.encode S.decode) "server" serverChan (chainSyncServerPeer server))
void $ fork (void $ runPeer nullTracer (codecChainSync S.encode S.decode S.encode S.decode) "client" clientChan (chainSyncClientPeer client))
void $ fork (void $ runPeer nullTracer codec "server" serverChan (chainSyncServerPeer server))
void $ fork (void $ runPeer nullTracer codec "client" clientChan (chainSyncClientPeer client))

atomically $ do
done <- readTVar doneVar
@@ -104,7 +104,9 @@ demo chain0 updates delay = do
(\ChainSyncPr ->
Mx.MuxPeer
nullTracer
(ChainSync.codecChainSync encode decode encode decode)
(ChainSync.codecChainSync
encode (fmap const decode)
encode decode)
(consumerPeer))

producerPeer :: Peer (ChainSync.ChainSync block (Point block)) AsServer ChainSync.StIdle m ()
@@ -113,7 +115,9 @@ demo chain0 updates delay = do
(\ChainSyncPr ->
Mx.MuxPeer
nullTracer
(ChainSync.codecChainSync encode decode encode decode)
(ChainSync.codecChainSync
encode (fmap const decode)
encode decode)
producerPeer)

clientAsync <- async $ Mx.runMuxWithQueues "consumer" (Mx.toApplication consumerApp) client_w client_r sduLen Nothing

0 comments on commit d1a045a

Please sign in to comment.
You can’t perform that action at this time.