Skip to content

Commit

Permalink
Hashable peer for stable unique comparision in Blockfetch
Browse files Browse the repository at this point in the history
  • Loading branch information
karknu committed Aug 11, 2020
1 parent 2f81dbd commit af96110
Show file tree
Hide file tree
Showing 14 changed files with 66 additions and 14 deletions.
Expand Up @@ -935,6 +935,7 @@ runThreadNetwork systemTime ThreadNetworkArgs
, bfcDecisionLoopInterval = 0.0 -- Mock testsuite can use sub-second slot
-- interval which doesn't play nice with
-- blockfetch descision interval.
, bfcSalt = 0
}
}

Expand Down
10 changes: 6 additions & 4 deletions ouroboros-consensus/src/Ouroboros/Consensus/Node.hs
Expand Up @@ -40,7 +40,7 @@ import Control.Tracer (Tracer, contramap)
import Data.ByteString.Lazy (ByteString)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import System.Random (randomRIO)
import System.Random (randomIO, randomRIO)

import Ouroboros.Network.BlockFetch (BlockFetchConfiguration (..))
import Ouroboros.Network.Diffusion
Expand Down Expand Up @@ -426,6 +426,7 @@ mkNodeArgs
-> IO (NodeArgs IO RemoteConnectionId LocalConnectionId blk)
mkNodeArgs registry cfg mInitBlockForging tracers btime chainDB = do
mBlockForging <- sequence mInitBlockForging
bfsalt <- randomIO -- Per-node specific value used by blockfetch when ranking peers.
return NodeArgs
{ tracers
, registry
Expand All @@ -438,15 +439,16 @@ mkNodeArgs registry cfg mInitBlockForging tracers btime chainDB = do
, maxTxCapacityOverride = NoMaxTxCapacityOverride
, mempoolCapacityOverride = NoMempoolCapacityBytesOverride
, miniProtocolParameters = defaultMiniProtocolParameters
, blockFetchConfiguration = defaultBlockFetchConfiguration
, blockFetchConfiguration = defaultBlockFetchConfiguration bfsalt
}
where
defaultBlockFetchConfiguration :: BlockFetchConfiguration
defaultBlockFetchConfiguration = BlockFetchConfiguration
defaultBlockFetchConfiguration :: Int -> BlockFetchConfiguration
defaultBlockFetchConfiguration bfsalt = BlockFetchConfiguration
{ bfcMaxConcurrencyBulkSync = 1
, bfcMaxConcurrencyDeadline = 1
, bfcMaxRequestsInflight = blockFetchPipeliningMax defaultMiniProtocolParameters
, bfcDecisionLoopInterval = 0.01 -- 10ms
, bfcSalt = bfsalt
}

-- | We allow the user running the node to customise the 'NodeArgs' through
Expand Down
5 changes: 5 additions & 0 deletions ouroboros-consensus/src/Ouroboros/Consensus/NodeId.hs
Expand Up @@ -12,6 +12,7 @@ module Ouroboros.Consensus.NodeId (
) where

import Codec.Serialise (Serialise)
import Data.Hashable
import Data.Word
import GHC.Generics (Generic)
import Quiet
Expand All @@ -34,6 +35,8 @@ instance Condense NodeId where
condense (CoreId (CoreNodeId i)) = "c" ++ show i
condense (RelayId i ) = "r" ++ show i

instance Hashable NodeId

-- | Core node ID
newtype CoreNodeId = CoreNodeId {
unCoreNodeId :: Word64
Expand All @@ -42,5 +45,7 @@ newtype CoreNodeId = CoreNodeId {
deriving newtype (Condense, Serialise, NoUnexpectedThunks)
deriving Show via Quiet CoreNodeId

instance Hashable CoreNodeId

fromCoreNodeId :: CoreNodeId -> NodeId
fromCoreNodeId = CoreId
2 changes: 2 additions & 0 deletions ouroboros-consensus/src/Ouroboros/Consensus/NodeKernel.hs
Expand Up @@ -24,6 +24,7 @@ module Ouroboros.Consensus.NodeKernel (

import Control.Monad
import Control.Monad.Except
import Data.Hashable (Hashable)
import Data.Map.Strict (Map)
import Data.Maybe (isJust)
import Data.Proxy
Expand Down Expand Up @@ -133,6 +134,7 @@ initNodeKernel
, RunNode blk
, NoUnexpectedThunks remotePeer
, Ord remotePeer
, Hashable remotePeer
)
=> NodeArgs m remotePeer localPeer blk
-> m (NodeKernel m remotePeer localPeer blk)
Expand Down
Expand Up @@ -64,6 +64,7 @@ library
, containers >=0.5 && <0.7
, dns < 4.0
, iproute >=1.7 && < 1.8
, hashable
, mtl
, stm
, text
Expand Down
Expand Up @@ -6,6 +6,7 @@ module Ouroboros.Network.ConnectionId where

import Cardano.Prelude (UseIsNormalForm (..), NoUnexpectedThunks (..))

import Data.Hashable
import GHC.Generics (Generic)


Expand All @@ -20,3 +21,5 @@ data ConnectionId addr = ConnectionId {
}
deriving (Eq, Ord, Show, Generic)
deriving NoUnexpectedThunks via (UseIsNormalForm (ConnectionId addr))

instance Hashable a => Hashable (ConnectionId a)
3 changes: 3 additions & 0 deletions ouroboros-network-framework/src/Ouroboros/Network/Snocket.hs
Expand Up @@ -26,6 +26,7 @@ import Control.Exception
import Control.Monad (when)
import Control.Monad.Class.MonadTime (DiffTime)
import Control.Tracer (Tracer)
import Data.Hashable
#if !defined(mingw32_HOST_OS)
import Network.Socket ( Family (AF_UNIX) )
#endif
Expand Down Expand Up @@ -131,6 +132,8 @@ berkeleyAccept ioManager sock = go
newtype LocalAddress = LocalAddress { getFilePath :: FilePath }
deriving (Show, Eq, Ord)

instance Hashable LocalAddress where
hashWithSalt s (LocalAddress path) = hashWithSalt s path

-- | We support either sockets or named pipes.
--
Expand Down
11 changes: 11 additions & 0 deletions ouroboros-network-framework/src/Ouroboros/Network/Socket.hs
Expand Up @@ -12,6 +12,10 @@
-- it is useful to have 'HasInitiator' constraint on 'connectToNode' & friends.
{-# OPTIONS_GHC -Wno-redundant-constraints #-}

-- For Hashable SockAddr
{-# OPTIONS_GHC -Wno-orphans #-}


-- |
-- Module exports interface for running a node over a socket over TCP \/ IP.
--
Expand Down Expand Up @@ -74,10 +78,12 @@ import Control.Monad.Class.MonadThrow
import Control.Exception (throwIO)
import qualified Codec.CBOR.Read as CBOR
import qualified Codec.CBOR.Term as CBOR
import Data.Hashable
import Data.Typeable (Typeable)
import qualified Data.ByteString.Lazy as BL
import Data.Proxy (Proxy (..))
import Data.Void
import Data.Word (Word16)

import qualified Network.Socket as Socket

Expand Down Expand Up @@ -141,6 +147,11 @@ sockAddrFamily (Socket.SockAddrInet _ _ ) = Socket.AF_INET
sockAddrFamily (Socket.SockAddrInet6 _ _ _ _) = Socket.AF_INET6
sockAddrFamily (Socket.SockAddrUnix _ ) = Socket.AF_UNIX

instance Hashable Socket.SockAddr where
hashWithSalt s (Socket.SockAddrInet p a ) = hashWithSalt s (fromIntegral p :: Word16, a)
hashWithSalt s (Socket.SockAddrInet6 p _ a _ ) = hashWithSalt s (fromIntegral p :: Word16, a)
hashWithSalt s (Socket.SockAddrUnix p ) = hashWithSalt s p

-- | We place an upper limit of `30s` on the time we wait on receiving an SDU.
-- There is no upper bound on the time we wait when waiting for a new SDU.
-- This makes it possible for miniprotocols to use timeouts that are larger
Expand Down
3 changes: 2 additions & 1 deletion ouroboros-network/demo/chain-sync.hs
Expand Up @@ -384,7 +384,8 @@ clientBlockFetch sockAddrs = withIOManager $ \iocp -> do
bfcMaxConcurrencyBulkSync = 1,
bfcMaxConcurrencyDeadline = 2,
bfcMaxRequestsInflight = 10,
bfcDecisionLoopInterval = 0.01
bfcDecisionLoopInterval = 0.01,
bfcSalt = 0
})
>> return ()

Expand Down
8 changes: 7 additions & 1 deletion ouroboros-network/src/Ouroboros/Network/BlockFetch.hs
Expand Up @@ -99,6 +99,7 @@ module Ouroboros.Network.BlockFetch (
SizeInBytes,
) where

import Data.Hashable (Hashable)
import Data.Map.Strict (Map)
import Data.Void

Expand Down Expand Up @@ -218,7 +219,10 @@ data BlockFetchConfiguration =
bfcMaxRequestsInflight :: !Word,

-- | Desired intervall between calls to fetchLogicIteration
bfcDecisionLoopInterval :: !DiffTime
bfcDecisionLoopInterval :: !DiffTime,

-- | Salt used when comparing peers
bfcSalt :: !Int
}

-- | Execute the block fetch logic. It monitors the current chain and candidate
Expand All @@ -236,6 +240,7 @@ blockFetchLogic :: forall peer header block m.
, MonadMonotonicTime m
, MonadSTM m
, Ord peer
, Hashable peer
)
=> Tracer m [TraceLabelPeer peer (FetchDecision [Point header])]
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
Expand Down Expand Up @@ -270,6 +275,7 @@ blockFetchLogic decisionTracer clientStateTracer
maxConcurrencyBulkSync = bfcMaxConcurrencyBulkSync,
maxConcurrencyDeadline = bfcMaxConcurrencyDeadline,
decisionLoopInterval = bfcDecisionLoopInterval,
peerSalt = bfcSalt,

plausibleCandidateChain,
compareCandidateChains,
Expand Down
Expand Up @@ -26,6 +26,7 @@ import qualified Data.Set as Set

import Data.List (foldl', groupBy, sortBy, transpose)
import Data.Function (on)
import Data.Hashable
import Data.Maybe (mapMaybe)
import Data.Set (Set)

Expand Down Expand Up @@ -58,6 +59,7 @@ data FetchDecisionPolicy header = FetchDecisionPolicy {
maxConcurrencyBulkSync :: Word,
maxConcurrencyDeadline :: Word,
decisionLoopInterval :: DiffTime,
peerSalt :: Int,

plausibleCandidateChain :: AnchoredFragment header
-> AnchoredFragment header -> Bool,
Expand Down Expand Up @@ -146,7 +148,8 @@ type CandidateFragments header = (ChainSuffix header, [AnchoredFragment header])


fetchDecisions
:: (Eq peer,
:: (Ord peer,
Hashable peer,
HasHeader header,
HeaderHash header ~ HeaderHash block)
=> FetchDecisionPolicy header
Expand Down Expand Up @@ -779,6 +782,7 @@ obviously take that into account when considering later peer chains.
fetchRequestDecisions
:: forall extra header peer.
( Eq peer
, Hashable peer
, HasHeader header
)
=> FetchDecisionPolicy header
Expand Down Expand Up @@ -880,7 +884,7 @@ fetchRequestDecisions fetchDecisionPolicy fetchMode chains =
nPreferedPeers =
map snd
. take (fromIntegral maxConcurrentFetchPeers)
. sortBy (\(a, _) (b, _) -> comparePeerGSV a b)
. sortBy (\a b -> comparePeerGSV (peerSalt fetchDecisionPolicy) a b)
. map (\(_, _, _, gsv, p, _) -> (gsv, p))
$ chains

Expand Down
15 changes: 12 additions & 3 deletions ouroboros-network/src/Ouroboros/Network/BlockFetch/DeltaQ.hs
Expand Up @@ -18,6 +18,7 @@ module Ouroboros.Network.BlockFetch.DeltaQ (
) where

import Data.Fixed as Fixed (Pico)
import Data.Hashable
import Control.Monad.Class.MonadTime

import Ouroboros.Network.DeltaQ
Expand All @@ -29,9 +30,17 @@ data PeerFetchInFlightLimits = PeerFetchInFlightLimits {
}
deriving Show

-- Order two PeerGSVs based on `g`.
comparePeerGSV :: PeerGSV -> PeerGSV -> Ordering
comparePeerGSV a b = compare (gs a) (gs b)
-- | Order two PeerGSVs based on `g`.
-- Incase the g values are within +/- 5% of each other `peer` is used as a tie breaker.
-- The salt is unique per running node, which avoids all nodes prefering the same peer in case of
-- a tie.
comparePeerGSV :: Hashable peer => Int -> (PeerGSV, peer) -> (PeerGSV, peer) -> Ordering
comparePeerGSV salt (a, a_p) (b, b_p) =
let gs_a = gs a
gs_b = gs b in
if abs (gs_a - gs_b) < 0.05*gs_a
then compare (hashWithSalt salt a_p) (hashWithSalt salt b_p)
else compare gs_a gs_b
where
gs :: PeerGSV -> DiffTime
gs PeerGSV { outboundGSV = GSV g_out _s_out _v_out,
Expand Down
7 changes: 5 additions & 2 deletions ouroboros-network/src/Ouroboros/Network/BlockFetch/State.hs
Expand Up @@ -17,6 +17,7 @@ module Ouroboros.Network.BlockFetch.State (
) where

import Data.Functor.Contravariant (contramap)
import Data.Hashable (Hashable)
import qualified Data.Map.Strict as Map
import Data.Map.Strict (Map)
import qualified Data.Set as Set
Expand Down Expand Up @@ -62,6 +63,7 @@ fetchLogicIterations
, MonadMonotonicTime m
, MonadSTM m
, Ord peer
, Hashable peer
)
=> Tracer m [TraceLabelPeer peer (FetchDecision [Point header])]
-> Tracer m (TraceLabelPeer peer (TraceFetchClientState header))
Expand Down Expand Up @@ -107,7 +109,7 @@ iterateForever x0 m = go x0 where go x = m x >>= go
-- * deciding for each peer if we will initiate a new fetch request
--
fetchLogicIteration
:: (MonadSTM m, Ord peer,
:: (Hashable peer, MonadSTM m, Ord peer,
HasHeader header, HasHeader block,
HeaderHash header ~ HeaderHash block)
=> Tracer m [TraceLabelPeer peer (FetchDecision [Point header])]
Expand Down Expand Up @@ -174,7 +176,8 @@ fetchLogicIteration decisionTracer clientStateTracer
fetchDecisionsForStateSnapshot
:: (HasHeader header,
HeaderHash header ~ HeaderHash block,
Ord peer)
Ord peer,
Hashable peer)
=> FetchDecisionPolicy header
-> FetchStateSnapshot peer header block m
-> [( FetchDecision (FetchRequest header),
Expand Down
Expand Up @@ -132,7 +132,8 @@ blockFetchExample1 decisionTracer clientStateTracer clientMsgTracer
bfcMaxConcurrencyBulkSync = 1,
bfcMaxConcurrencyDeadline = 2,
bfcMaxRequestsInflight = 10,
bfcDecisionLoopInterval = 0.01
bfcDecisionLoopInterval = 0.01,
bfcSalt = 0
})
>> return ()

Expand Down

0 comments on commit af96110

Please sign in to comment.