Skip to content

Commit

Permalink
Avoid ordering peers based on peerid in blockfetch
Browse files Browse the repository at this point in the history
Peers where ordered based on SockAddr, which generally meant that all
peers would pick the same second choice peer to download blocks from in
Deadline mode. And the same third pick and so on.

This changes ranking so that peers are sorted based on G.
  • Loading branch information
karknu committed Jan 17, 2022
1 parent d92cfe3 commit 749c6dc
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 15 deletions.
39 changes: 24 additions & 15 deletions ouroboros-network/src/Ouroboros/Network/BlockFetch/Decision.hs
Expand Up @@ -47,6 +47,7 @@ import Ouroboros.Network.BlockFetch.ClientState (FetchRequest (..),
import Ouroboros.Network.BlockFetch.DeltaQ
(PeerFetchInFlightLimits (..), PeerGSV (..), SizeInBytes,
calculatePeerFetchInFlightLimits, comparePeerGSV,
comparePeerGSV',
estimateExpectedResponseDuration,
estimateResponseDeadlineProbability)

Expand Down Expand Up @@ -162,7 +163,8 @@ fetchDecisions
fetchDecisions fetchDecisionPolicy@FetchDecisionPolicy {
plausibleCandidateChain,
compareCandidateChains,
blockFetchSize
blockFetchSize,
peerSalt
}
fetchMode
currentChain
Expand All @@ -183,6 +185,7 @@ fetchDecisions fetchDecisionPolicy@FetchDecisionPolicy {
-- Reorder chains based on consensus policy and network timing data.
. prioritisePeerChains
fetchMode
peerSalt
compareCandidateChains
blockFetchSize
. map swizzleIG
Expand All @@ -207,7 +210,7 @@ fetchDecisions fetchDecisionPolicy@FetchDecisionPolicy {
where
-- Data swizzling functions to get the right info into each stage.
swizzleI (c, p@(_, inflight,_,_, _)) = (c, inflight, p)
swizzleIG (c, p@(_, inflight,gsvs,_, _)) = (c, inflight, gsvs, p)
swizzleIG (c, p@(_, inflight,gsvs,peer,_)) = (c, inflight, gsvs, peer, p)
swizzleSI (c, p@(status,inflight,_,_, _)) = (c, status, inflight, p)
swizzleSIG (c, p@(status,inflight,gsvs,peer,_)) = (c, status, inflight, gsvs, peer, p)

Expand Down Expand Up @@ -605,19 +608,21 @@ filterWithMaxSlotNo p maxSlotNo =
AF.filterWithStop p ((> maxSlotNo) . MaxSlotNo . blockSlot)

prioritisePeerChains
:: forall header peer. HasHeader header
:: forall extra header peer.
( HasHeader header
, Hashable peer
, Ord peer
)
=> FetchMode
-> Int
-> (AnchoredFragment header -> AnchoredFragment header -> Ordering)
-> (header -> SizeInBytes)
-> [(FetchDecision (CandidateFragments header), PeerFetchInFlight header,
PeerGSV,
peer)]
-> [(FetchDecision [AnchoredFragment header], peer)]
prioritisePeerChains FetchModeDeadline compareCandidateChains blockFetchSize =
--TODO: last tie-breaker is still original order (which is probably
-- peerid order). We should use a random tie breaker so that adversaries
-- cannot get any advantage.

peer,
extra )]
-> [(FetchDecision [AnchoredFragment header], extra)]
prioritisePeerChains FetchModeDeadline salt compareCandidateChains blockFetchSize =
map (\(decision, peer) ->
(fmap (\(_,_,fragment) -> fragment) decision, peer))
. concatMap ( concat
Expand Down Expand Up @@ -647,9 +652,11 @@ prioritisePeerChains FetchModeDeadline compareCandidateChains blockFetchSize =
`on`
(\(band, chain, _fragments) -> (band, chain))))))
. map annotateProbabilityBand
. sortBy (\(_,_,a,ap,_) (_,_,b,bp,_) ->
comparePeerGSV' salt (a,ap) (b,bp))
where
annotateProbabilityBand (Left decline, _, _, peer) = (Left decline, peer)
annotateProbabilityBand (Right (chain,fragments), inflight, gsvs, peer) =
annotateProbabilityBand (Left decline, _, _, _, peer) = (Left decline, peer)
annotateProbabilityBand (Right (chain,fragments), inflight, gsvs, _, peer) =
(Right (band, chain, fragments), peer)
where
band = probabilityBand $
Expand All @@ -667,7 +674,7 @@ prioritisePeerChains FetchModeDeadline compareCandidateChains blockFetchSize =

chainHeadPoint (_,ChainSuffix c,_) = AF.headPoint c

prioritisePeerChains FetchModeBulkSync compareCandidateChains blockFetchSize =
prioritisePeerChains FetchModeBulkSync salt compareCandidateChains blockFetchSize =
map (\(decision, peer) ->
(fmap (\(_, _, fragment) -> fragment) decision, peer))
. sortBy (comparingFst
Expand All @@ -679,9 +686,11 @@ prioritisePeerChains FetchModeBulkSync compareCandidateChains blockFetchSize =
`on`
(\(duration, chain, _fragments) -> (chain, duration)))))
. map annotateDuration
. sortBy (\(_,_,a,ap,_) (_,_,b,bp,_) ->
comparePeerGSV' salt (a,ap) (b,bp))
where
annotateDuration (Left decline, _, _, peer) = (Left decline, peer)
annotateDuration (Right (chain,fragments), inflight, gsvs, peer) =
annotateDuration (Left decline, _, _, _, peer) = (Left decline, peer)
annotateDuration (Right (chain,fragments), inflight, gsvs, _, peer) =
(Right (duration, chain, fragments), peer)
where
-- TODO: consider if we should put this into bands rather than just
Expand Down
13 changes: 13 additions & 0 deletions ouroboros-network/src/Ouroboros/Network/BlockFetch/DeltaQ.hs
Expand Up @@ -15,6 +15,7 @@ module Ouroboros.Network.BlockFetch.DeltaQ (
estimateResponseDeadlineProbability,
estimateExpectedResponseDuration,
comparePeerGSV,
comparePeerGSV',
-- estimateBlockFetchResponse,
-- blockArrivalShedule,
) where
Expand Down Expand Up @@ -69,6 +70,18 @@ comparePeerGSV activePeers salt (a, a_p) (b, b_p) =
inboundGSV = GSV g_in _s_in _v_in
} = g_out + g_in

-- | Order two PeerGSVs based on `g`.
-- Like comparePeerGSV but doesn't take active status into account
comparePeerGSV' :: forall peer.
( Hashable peer
, Ord peer
)
=> Int
-> (PeerGSV, peer)
-> (PeerGSV, peer)
-> Ordering
comparePeerGSV' = comparePeerGSV Set.empty


calculatePeerFetchInFlightLimits :: PeerGSV -> PeerFetchInFlightLimits
calculatePeerFetchInFlightLimits PeerGSV {
Expand Down

0 comments on commit 749c6dc

Please sign in to comment.