Skip to content

Commit

Permalink
Merge #3852
Browse files Browse the repository at this point in the history
3852: Add block-fetch to Diffusion tests r=coot a=bolt12

# Description

This PR makes it so header-body split and block-fetch run in Diffusion tests.



Co-authored-by: Armando Santos <armando@well-typed.com>
  • Loading branch information
iohk-bors[bot] and bolt12 committed Aug 12, 2022
2 parents 4dcfa97 + 276588a commit b9adb9a
Show file tree
Hide file tree
Showing 19 changed files with 800 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ import qualified Ouroboros.Network.Driver.Simple as Driver
import Ouroboros.Network.MockChain.Chain (Chain)
import qualified Ouroboros.Network.MockChain.Chain as Chain
import qualified Ouroboros.Network.Mux as Mux
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion)
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion,
isPipeliningEnabled)
import Ouroboros.Network.Protocol.BlockFetch.Codec (codecBlockFetchId)
import Ouroboros.Network.Protocol.BlockFetch.Server
(BlockFetchBlockSender (SendMsgNoBlocks, SendMsgStartBatch),
Expand Down Expand Up @@ -166,7 +167,7 @@ runBlockFetchTest BlockFetchClientTestSetup{..} = withRegistry \registry -> do
blockFetchCfg

let runBlockFetchClient peerId =
bracketFetchClient fetchClientRegistry ntnVersion peerId \clientCtx -> do
bracketFetchClient fetchClientRegistry ntnVersion isPipeliningEnabled peerId \clientCtx -> do
let bfClient = blockFetchClient
ntnVersion
(readTVar varControlMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ import Ouroboros.Network.AnchoredFragment (AnchoredFragment,
import qualified Ouroboros.Network.AnchoredFragment as AF
import qualified Ouroboros.Network.AnchoredSeq as AS
import Ouroboros.Network.Block (Tip, getTipBlockNo)
import Ouroboros.Network.BlockFetch.ClientState
(WhetherReceivingTentativeBlocks (..))
import Ouroboros.Network.Mux (ControlMessage (..), ControlMessageSTM)
import Ouroboros.Network.NodeToNode.Version (isPipeliningEnabled)
import Ouroboros.Network.PeerSelection.PeerMetric.Type
Expand Down Expand Up @@ -735,10 +737,10 @@ chainSyncClient mkPipelineDecision0 tracer cfg
whenJust (isInvalidBlock hash) $ \reason ->
disconnect $ InvalidBlock hdrPoint hash reason
disconnectWhenInvalid $
if isPipeliningEnabled version
-- Disconnect if the parent block of `hdr` is known to be invalid.
then headerPrevHash hdr
else BlockHash (headerHash hdr)
case isPipeliningEnabled version of
-- Disconnect if the parent block of `hdr` is known to be invalid.
ReceivingTentativeBlocks -> headerPrevHash hdr
NotReceivingTentativeBlocks -> BlockHash (headerHash hdr)

-- Get the ledger view required to validate the header
-- NOTE: This will block if we are too far behind.
Expand Down Expand Up @@ -1020,7 +1022,9 @@ invalidBlockRejector tracer version getIsInvalidBlock getCandidate =
-- it's explicit, only skip it if it's annotated as tentative
mapM_ (uncurry disconnect) $ firstJust
(\hdr -> (hdr,) <$> isInvalidBlock (headerHash hdr))
( (if isPipeliningEnabled version then drop 1 else id)
( (case isPipeliningEnabled version of
ReceivingTentativeBlocks -> drop 1
NotReceivingTentativeBlocks -> id)
$ AF.toNewestFirst theirFrag
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,9 +540,9 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe
bracketWithPrivateRegistry
(chainSyncHeaderServerFollower
(getChainDB kernel)
( if isPipeliningEnabled version
then ChainDB.TentativeChain
else ChainDB.SelectedChain
( case isPipeliningEnabled version of
ReceivingTentativeBlocks -> ChainDB.TentativeChain
NotReceivingTentativeBlocks -> ChainDB.SelectedChain
)
)
ChainDB.followerClose
Expand All @@ -564,7 +564,8 @@ mkApps kernel Tracers {..} mkCodecs ByteLimits {..} genChainSyncTimeout ReportPe
-> m (NodeToNodeInitiatorResult, Maybe bBF)
aBlockFetchClient version controlMessageSTM them channel = do
labelThisThread "BlockFetchClient"
bracketFetchClient (getFetchClientRegistry kernel) version them $ \clientCtx -> do
bracketFetchClient (getFetchClientRegistry kernel) version
isPipeliningEnabled them $ \clientCtx -> do
((), trailing) <- runPipelinedPeerWithLimits
(contramap (TraceLabelPeer them) tBlockFetchTracer)
(cBlockFetchCodec (mkCodecs version))
Expand Down
4 changes: 3 additions & 1 deletion ouroboros-network-framework/src/Ouroboros/Network/Snocket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,15 @@ instance Hashable LocalAddress where
hashWithSalt s (LocalAddress path) = hashWithSalt s path

newtype TestAddress addr = TestAddress { getTestAddress :: addr }
deriving (Eq, Ord, Typeable)
deriving (Eq, Ord, Typeable, Generic)

instance Show addr => Show (TestAddress addr) where
showsPrec d (TestAddress addr) =
showString "TestAddress "
. showParen True (showsPrec d addr)

instance Hashable addr => Hashable (TestAddress addr)

-- | We support either sockets or named pipes.
--
-- There are three families of addresses: 'SocketFamily' usef for Berkeley
Expand Down
6 changes: 4 additions & 2 deletions ouroboros-network-framework/src/Simulation/Network/Snocket.hs
Original file line number Diff line number Diff line change
Expand Up @@ -759,8 +759,7 @@ mkSnocket state tr = Snocket { getLocalAddr
conn <- mkConnection tr bearerInfo connId
writeTVar fdVarLocal (FDConnecting connId conn)
modifyTVar (nsConnections state)
(Map.insert (normaliseId connId)
(dualConnection conn))
(Map.insert (normaliseId connId) conn)
-- so far it looks like normal open, it still might turn up
-- a simultaneous open if the other side will open the
-- connection before it would be put on its accept loop
Expand Down Expand Up @@ -859,6 +858,9 @@ mkSnocket state tr = Snocket { getLocalAddr
atomically $ modifyTVar (nsConnections state)
(Map.delete (normaliseId connId))
throwIO e

-- TODO: SimOpen and NormalOpen are irrelevant here
-- If 'o' is SimOpen then 'connState' is already 'ESTABLISHED'
Right (fd_', o) -> do
-- successful open

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
module Ouroboros.Network.Testing.Data.AbsBearerInfo
( AbsBearerInfoScript (..)
, canFail
, NonFailingAbsBearerInfo (..)
, NonFailingAbsBearerInfoScript (..)
, AbsDelay (..)
, delay
Expand Down Expand Up @@ -259,8 +260,9 @@ instance Arbitrary AbsBearerInfo where
| a <- shrink (abiSDUSize abi)
]

newtype AbsBearerInfoScript =
AbsBearerInfoScript { unBIScript :: Script AbsBearerInfo }
newtype AbsBearerInfoScript = AbsBearerInfoScript {
unBIScript :: Script AbsBearerInfo
}
deriving Show via (Script AbsBearerInfo)
deriving stock Eq

Expand Down Expand Up @@ -288,15 +290,16 @@ instance Arbitrary AbsBearerInfoScript where
, script' /= script
]

newtype NonFailingAbsBearerInfoScript =
NonFailingAbsBearerInfoScript { unNFBIScript :: Script AbsBearerInfo }
deriving Show via (Script AbsBearerInfo)
newtype NonFailingAbsBearerInfo = NonFailingAbsBearerInfo {
unNFBI :: AbsBearerInfo
}
deriving Show via AbsBearerInfo
deriving stock Eq

toNonFailingAbsBearerInfoScript :: AbsBearerInfoScript
-> NonFailingAbsBearerInfoScript
toNonFailingAbsBearerInfoScript (AbsBearerInfoScript script) =
NonFailingAbsBearerInfoScript $ fmap unfail script
toNonFailingAbsBearerInfo :: AbsBearerInfo
-> NonFailingAbsBearerInfo
toNonFailingAbsBearerInfo script =
NonFailingAbsBearerInfo $ unfail script
where
unfail :: AbsBearerInfo -> AbsBearerInfo
unfail bi =
Expand All @@ -311,6 +314,23 @@ toNonFailingAbsBearerInfoScript (AbsBearerInfoScript script) =
unfailAtt (SpeedAttenuation speed _ _) = NoAttenuation speed
unfailAtt a = a

instance Arbitrary NonFailingAbsBearerInfo where
arbitrary = toNonFailingAbsBearerInfo <$> arbitrary
shrink (NonFailingAbsBearerInfo script) =
NonFailingAbsBearerInfo <$> shrink script

newtype NonFailingAbsBearerInfoScript = NonFailingAbsBearerInfoScript {
unNFBIScript :: Script AbsBearerInfo
}
deriving Show via (Script AbsBearerInfo)
deriving stock Eq

toNonFailingAbsBearerInfoScript :: AbsBearerInfoScript
-> NonFailingAbsBearerInfoScript
toNonFailingAbsBearerInfoScript (AbsBearerInfoScript script) =
NonFailingAbsBearerInfoScript
$ fmap (unNFBI . toNonFailingAbsBearerInfo) script

instance Arbitrary NonFailingAbsBearerInfoScript where
arbitrary = toNonFailingAbsBearerInfoScript <$> arbitrary
shrink (NonFailingAbsBearerInfoScript script) =
Expand Down
15 changes: 7 additions & 8 deletions ouroboros-network/demo/chain-sync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

{-# OPTIONS_GHC -Wno-unticked-promoted-constructors #-}
Expand Down Expand Up @@ -293,7 +292,7 @@ clientBlockFetch sockAddrs = withIOManager $ \iocp -> do
-- TODO: this currently needs MuxPeerRaw because of the resource
-- bracket
MuxPeerRaw $ \channel ->
bracketFetchClient registry maxBound connectionId $ \clientCtx ->
bracketFetchClient registry maxBound isPipeliningEnabled connectionId $ \clientCtx ->
runPipelinedPeer
nullTracer -- (contramap (show . TraceLabelPeer connectionId) stdoutTracer)
codecBlockFetch
Expand Down Expand Up @@ -503,15 +502,15 @@ chainSyncClient' syncTracer _currentChainVar candidateChainVar =
BlockHeader (Point BlockHeader) (Point BlockHeader) IO ()
handleNext =
ChainSync.ClientStNext {
recvMsgRollForward = \header _pHead ->
ChainSync.recvMsgRollForward = \header _pHead ->
ChainSync.ChainSyncClient $ do
addBlock header
--FIXME: the notTooFarAhead bit is not working
-- it seems the current chain is always of length 1.
-- notTooFarAhead
return requestNext

, recvMsgRollBackward = \pIntersect _pHead ->
, ChainSync.recvMsgRollBackward = \pIntersect _pHead ->
ChainSync.ChainSyncClient $ do
rollback pIntersect
return requestNext
Expand Down Expand Up @@ -554,10 +553,10 @@ chainSyncServer seed =
BlockHeader (Point BlockHeader) (Point BlockHeader) IO ()
idleState blocks =
ChainSync.ServerStIdle {
recvMsgRequestNext = do threadDelay 500000
return (Left (nextState blocks)),
recvMsgFindIntersect = \_ -> return (intersectState blocks),
recvMsgDoneClient = return ()
ChainSync.recvMsgRequestNext = do threadDelay 500000
return (Left (nextState blocks)),
ChainSync.recvMsgFindIntersect = \_ -> return (intersectState blocks),
ChainSync.recvMsgDoneClient = return ()
}

nextState :: [Block]
Expand Down
1 change: 1 addition & 0 deletions ouroboros-network/ouroboros-network.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ test-suite test
Test.Ouroboros.Network.PeerSelection.PeerGraph
Test.Ouroboros.Network.NodeToNode.Version
Test.Ouroboros.Network.NodeToClient.Version
Test.Ouroboros.Network.Orphans
Test.Ouroboros.Network.ShrinkCarefully
Test.Ouroboros.Network.Testnet
Test.Ouroboros.Network.Testnet.Simulation.Node
Expand Down
5 changes: 2 additions & 3 deletions ouroboros-network/src/Ouroboros/Network/BlockFetch/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import Ouroboros.Network.Block
import Network.TypedProtocol.Core
import Network.TypedProtocol.Pipelined
import Ouroboros.Network.Mux (ControlMessageSTM)
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion (..))
import Ouroboros.Network.Protocol.BlockFetch.Type

import Ouroboros.Network.AnchoredFragment (AnchoredFragment)
Expand Down Expand Up @@ -74,11 +73,11 @@ type BlockFetchClient header block m a =
-- | The implementation of the client side of block fetch protocol designed to
-- work in conjunction with our fetch logic.
--
blockFetchClient :: forall header block m.
blockFetchClient :: forall header block versionNumber m.
(MonadSTM m, MonadThrow m, MonadTime m,
HasHeader header, HasHeader block,
HeaderHash header ~ HeaderHash block)
=> NodeToNodeVersion
=> versionNumber
-> ControlMessageSTM m
-> FetchedMetricsTracer m
-> FetchClientContext header block m
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.BlockFetch.ClientRegistry
( -- * Registry of block fetch clients
FetchClientRegistry
FetchClientRegistry (..)
, newFetchClientRegistry
, bracketFetchClient
, bracketKeepAliveClient
Expand All @@ -32,8 +30,6 @@ import Control.Tracer (Tracer)

import Ouroboros.Network.BlockFetch.ClientState
import Ouroboros.Network.DeltaQ
import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion (..),
isPipeliningEnabled)



Expand All @@ -47,13 +43,22 @@ import Ouroboros.Network.NodeToNode.Version (NodeToNodeVersion (..),
-- and shut down.
--
data FetchClientRegistry peer header block m =
FetchClientRegistry
(StrictTMVar m (Tracer m (TraceLabelPeer peer (TraceFetchClientState header)),
WhetherReceivingTentativeBlocks -> STM m (FetchClientPolicy header block m)))
(StrictTVar m (Map peer (FetchClientStateVars m header)))
(StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())))
(StrictTVar m (Map peer PeerGSV))
(StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())))
FetchClientRegistry {
fcrCtxVar
:: StrictTMVar
m ( Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
, WhetherReceivingTentativeBlocks
-> STM m (FetchClientPolicy header block m)
),
fcrFetchRegistry
:: StrictTVar m (Map peer (FetchClientStateVars m header)),
fcrSyncRegistry
:: StrictTVar m (Map peer (ThreadId m, StrictTMVar m ())),
fcrDqRegistry
:: StrictTVar m (Map peer PeerGSV),
fcrKeepRegistry
:: StrictTVar m (Map peer (ThreadId m, StrictTMVar m ()))
}

newFetchClientRegistry :: MonadSTM m
=> m (FetchClientRegistry peer header block m)
Expand All @@ -69,16 +74,19 @@ newFetchClientRegistry = FetchClientRegistry <$> newEmptyTMVarIO
--
-- It also manages synchronisation with the corresponding chain sync client.
--
bracketFetchClient :: forall m a peer header block.
bracketFetchClient :: forall m a peer header block version.
(MonadThrow m, MonadSTM m, MonadFork m, MonadMask m,
Ord peer)
=> FetchClientRegistry peer header block m
-> NodeToNodeVersion
-> version
-> (version -> WhetherReceivingTentativeBlocks)
-- ^ is pipelining enabled function
-> peer
-> (FetchClientContext header block m -> m a)
-> m a
bracketFetchClient (FetchClientRegistry ctxVar
fetchRegistry syncRegistry dqRegistry keepRegistry) version peer action = do
fetchRegistry syncRegistry dqRegistry keepRegistry)
version isPipeliningEnabled peer action = do
ksVar <- newEmptyTMVarIO
bracket (register ksVar) (uncurry (unregister ksVar)) (action . fst)
where
Expand All @@ -100,9 +108,7 @@ bracketFetchClient (FetchClientRegistry ctxVar

-- allocate the policy specific for this peer's negotiated version
policy <- do
let pipeliningEnabled
| isPipeliningEnabled version = ReceivingTentativeBlocks
| otherwise = NotReceivingTentativeBlocks
let pipeliningEnabled = isPipeliningEnabled version
mkPolicy pipeliningEnabled

stateVars <- newFetchClientStateVars
Expand Down Expand Up @@ -236,7 +242,6 @@ bracketKeepAliveClient(FetchClientRegistry _ctxVar
assert (peer `Map.member` m) $
Map.delete peer m


setFetchClientContext :: MonadSTM m
=> FetchClientRegistry peer header block m
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
Expand Down
8 changes: 6 additions & 2 deletions ouroboros-network/src/Ouroboros/Network/NodeToNode/Version.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import Data.Typeable (Typeable)

import qualified Codec.CBOR.Term as CBOR

import Ouroboros.Network.BlockFetch.ClientState
(WhetherReceivingTentativeBlocks (..))
import Ouroboros.Network.CodecCBORTerm
import Ouroboros.Network.Magic
import Ouroboros.Network.Protocol.Handshake.Version (Accept (..),
Expand Down Expand Up @@ -145,5 +147,7 @@ data ConnectionMode = UnidirectionalMode | DuplexMode

-- | Check whether a version enabling diffusion pipelining has been
-- negotiated.
isPipeliningEnabled :: NodeToNodeVersion -> Bool
isPipeliningEnabled v = v >= NodeToNodeV_8
isPipeliningEnabled :: NodeToNodeVersion -> WhetherReceivingTentativeBlocks
isPipeliningEnabled v
| v >= NodeToNodeV_8 = ReceivingTentativeBlocks
| otherwise = NotReceivingTentativeBlocks

0 comments on commit b9adb9a

Please sign in to comment.