Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Jun 23, 2023
1 parent b872a85 commit 555a36a
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 208 deletions.
27 changes: 21 additions & 6 deletions dashboards/swarm/swarm.json
Original file line number Diff line number Diff line change
Expand Up @@ -3071,13 +3071,18 @@
{
"options": {
"0": {
"color": "green",
"color": "blue",
"index": 0,
"text": "Allowed"
"text": "Probing"
},
"1": {
"color": "purple",
"color": "green",
"index": 1,
"text": "Allowed"
},
"2": {
"color": "purple",
"index": 2,
"text": "Blocked"
}
},
Expand Down Expand Up @@ -3145,7 +3150,17 @@
"fixedColor": "purple",
"mode": "fixed"
},
"mappings": [],
"mappings": [
{
"options": {
"0": {
"index": 0,
"text": "-"
}
},
"type": "value"
}
],
"thresholds": {
"mode": "absolute",
"steps": [
Expand All @@ -3169,7 +3184,7 @@
"colorMode": "value",
"graphMode": "none",
"justifyMode": "auto",
"orientation": "auto",
"orientation": "horizontal",
"reduceOptions": {
"calcs": [
"lastNotNull"
Expand Down Expand Up @@ -3302,6 +3317,6 @@
"timezone": "",
"title": "libp2p Swarm",
"uid": "a15PyhO4z",
"version": 6,
"version": 7,
"weekStart": ""
}
163 changes: 102 additions & 61 deletions p2p/net/swarm/black_hole_detector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package swarm

import (
"fmt"
"sync"

ma "github.com/multiformats/go-multiaddr"
Expand All @@ -10,10 +11,24 @@ import (
type blackHoleState int

const (
blackHoleStateAllowed blackHoleState = iota
blackHoleStateProbing blackHoleState = iota
blackHoleStateAllowed
blackHoleStateBlocked
)

func (st blackHoleState) String() string {
switch st {
case blackHoleStateProbing:
return "Probing"
case blackHoleStateAllowed:
return "Allowed"
case blackHoleStateBlocked:
return "Blocked"
default:
return fmt.Sprintf("Unknown %d", st)
}
}

type blackHoleResult int

const (
Expand All @@ -22,32 +37,34 @@ const (
blackHoleResultBlocked
)

// blackHoleFilter provides black hole filtering logic for dials. On detecting a black holed
// network environment, subsequent dials are blocked and only 1 dial every n requests is allowed.
// This should be used in conjunction with an UDP or IPv6 address filter to detect UDP or
// IPv6 black hole.
// Requests are blocked if the success fraction in the last n outcomes is less than
// minSuccessFraction. If a request succeeds in Blocked state, the filter state is reset and n
// subsequent requests are allowed before reevaluating black hole status. Evaluating over n
// outcomes avoids situations where a dial was cancelled because a competing dial succeeded,
// the address was unreachable, and other false negatives.
// blackHoleFilter provides black hole filtering for dials. This filter should be used in
// concert with a UDP of IPv6 address filter to detect UDP or IPv6 black hole. In a black
// holed environments dial requests are blocked and only periodic probes to check the
// state of the black hole are allowed.
//
// Requests are blocked if the number of successes in the last n dials is less than
// minSuccesses. If a request succeeds in Blocked state, the filter state is reset and n
// subsequent requests are allowed before reevaluating black hole state. Dials cancelled
// when some other concurrent dial succeeded are counted as failures. A sufficiently large
// n prevents false negatives in such cases.
type blackHoleFilter struct {
// n is the minimum number of completed dials required before we start blocking.
// Every nth request is allowed irrespective of the status of the detector.
// n serves the dual purpose of being the minimum number of requests after which we
// probe the state of the black hole in blocked state and the minimum number of
// completed dials required before evaluating black hole state.
n int
// minSuccessFraction is the minimum success fraction required to allow dials.
minSuccessFraction float64
// minSuccesses is the minimum number of Success required in the last n dials
// to consider we are not blocked.
minSuccesses int
// name for the detector.
name string

// requests counts number of dial requests up to n. Resets to 0 every nth request.
// requests counts number of dial requests to peers. We handle request at a peer
// level and record results at individual address dial level.
requests int
// dialResults of the last `n` allowed dials. success is true.
// dialResults of the last `n` dials. A successful dial is true.
dialResults []bool
// successes is the count of successful dials in outcomes
successes int
// failures is the count of failed dials in outcomes
failures int
// state is the current state of the detector
state blackHoleState

Expand All @@ -72,16 +89,12 @@ func (b *blackHoleFilter) RecordResult(success bool) {

if success {
b.successes++
} else {
b.failures++
}
b.dialResults = append(b.dialResults, success)

if len(b.dialResults) > b.n {
if b.dialResults[0] {
b.successes--
} else {
b.failures--
}
b.dialResults = b.dialResults[1 : b.n+1]
}
Expand All @@ -90,7 +103,7 @@ func (b *blackHoleFilter) RecordResult(success bool) {
b.trackMetrics()
}

// HandleRequest handles a new dial request for the filter. It returns a
// HandleRequest returns the result of applying the black hole filter for the request.
func (b *blackHoleFilter) HandleRequest() blackHoleResult {
b.mu.Lock()
defer b.mu.Unlock()
Expand All @@ -101,7 +114,7 @@ func (b *blackHoleFilter) HandleRequest() blackHoleResult {

if b.state == blackHoleStateAllowed {
return blackHoleResultAllowed
} else if b.requests%b.n == 0 {
} else if b.state == blackHoleStateProbing || b.requests%b.n == 0 {
return blackHoleResultProbing
} else {
return blackHoleResultBlocked
Expand All @@ -110,47 +123,42 @@ func (b *blackHoleFilter) HandleRequest() blackHoleResult {

func (b *blackHoleFilter) reset() {
b.successes = 0
b.failures = 0
b.dialResults = b.dialResults[:0]
b.requests = 0
b.updateState()
}

func (b *blackHoleFilter) updateState() {
st := b.state
successFraction := 0.0

if len(b.dialResults) < b.n {
b.state = blackHoleStateProbing
} else if b.successes >= b.minSuccesses {
b.state = blackHoleStateAllowed
} else {
successFraction = float64(b.successes) / float64(b.successes+b.failures)
if successFraction >= b.minSuccessFraction {
b.state = blackHoleStateAllowed
} else {
b.state = blackHoleStateBlocked
}
b.state = blackHoleStateBlocked
}

if st != b.state {
if b.state == blackHoleStateAllowed {
log.Debugf("%s blackHoleDetector state changed to Allowed", b.name)
} else {
log.Debugf("%s blackHoleDetector state changed to Blocked. Success fraction is %0.3f", b.name, successFraction)
}
log.Debugf("%s blackHoleDetector state changed from %s to %s", b.name, st, b.state)
}
}

func (b *blackHoleFilter) trackMetrics() {
if b.metricsTracer == nil {
return
}
successFraction := 0.0
if b.successes+b.failures != 0 {
successFraction = float64(b.successes) / float64(b.successes+b.failures)
}

nextRequestAllowedAfter := 0
if b.state == blackHoleStateBlocked {
nextRequestAllowedAfter = b.n - (b.requests % b.n)
}

successFraction := 0.0
if len(b.dialResults) > 0 {
successFraction = float64(b.successes) / float64(len(b.dialResults))
}

b.metricsTracer.UpdatedBlackHoleFilterState(
b.name,
b.state,
Expand All @@ -160,34 +168,67 @@ func (b *blackHoleFilter) trackMetrics() {
}

// blackHoleDetector provides UDP and IPv6 black hole detection using a `blackHoleFilter`
// for each. For details of the black hole detection logic see `blackHoleFilter`
// for each. For details of the black hole detection logic see `blackHoleFilter`.
//
// black hole filtering is done at a peer dial level to ensure that periodic probes to
// detect change of the black hole state are actually dialed and are not skipped
// because of dial prioritisation logic.
type blackHoleDetector struct {
udp, ipv6 *blackHoleFilter
}

func (d *blackHoleDetector) HandleRequest(addr ma.Multiaddr) bool {
if !manet.IsPublicAddr(addr) {
return true
// FilterAddrs filters the peer's addresses removing black holed addresses
func (d *blackHoleDetector) FilterAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
hasUDP, hasIPv6 := false, false
for _, a := range addrs {
if !manet.IsPublicAddr(a) {
continue
}
if isProtocolAddr(a, ma.P_UDP) {
hasUDP = true
}
if isProtocolAddr(a, ma.P_IP6) {
hasIPv6 = true
}
}

udpres := blackHoleResultAllowed
if d.udp != nil && isProtocolAddr(addr, ma.P_UDP) {
udpres = d.udp.HandleRequest()
udpRes := blackHoleResultAllowed
if d.udp != nil && hasUDP {
udpRes = d.udp.HandleRequest()
}

ipv6res := blackHoleResultAllowed
if d.ipv6 != nil && isProtocolAddr(addr, ma.P_IP6) {
ipv6res = d.ipv6.HandleRequest()
ipv6Res := blackHoleResultAllowed
if d.ipv6 != nil && hasIPv6 {
ipv6Res = d.ipv6.HandleRequest()
}

// Allow all probes irrespective of the state of the other filter
if udpres == blackHoleResultProbing || ipv6res == blackHoleResultProbing {
return true
}
return udpres != blackHoleResultBlocked && ipv6res != blackHoleResultBlocked
return ma.FilterAddrs(
addrs,
func(a ma.Multiaddr) bool {
if !manet.IsPublicAddr(a) {
return true
}
// allow all UDP addresses while probing irrespective of IPv6 black hole state
if udpRes == blackHoleResultProbing && isProtocolAddr(a, ma.P_UDP) {
return true
}
// allow all IPv6 addresses while probing irrespective of UDP black hole state
if ipv6Res == blackHoleResultProbing && isProtocolAddr(a, ma.P_IP6) {
return true
}

if udpRes == blackHoleResultBlocked && isProtocolAddr(a, ma.P_UDP) {
return false
}
if ipv6Res == blackHoleResultBlocked && isProtocolAddr(a, ma.P_IP6) {
return false
}
return true
},
)
}

// RecordResult updates the state of the relevant `blackHoleFilter` for addr
// RecordResult updates the state of the relevant `blackHoleFilter`s for addr
func (d *blackHoleDetector) RecordResult(addr ma.Multiaddr, success bool) {
if !manet.IsPublicAddr(addr) {
return
Expand All @@ -204,13 +245,13 @@ func newBlackHoleDetector(detectUDP, detectIPv6 bool, mt MetricsTracer) *blackHo
d := &blackHoleDetector{}

// A black hole is a binary property. On a network if UDP dials are blocked or there is
// no IPv6 connectivity, all dials will fail. So a low min success fraction like 0.01 is
// good enough.
// no IPv6 connectivity, all dials will fail. So a low success rate of 3 out 100 dials
// is good enough.
if detectUDP {
d.udp = &blackHoleFilter{n: 100, minSuccessFraction: 0.01, name: "UDP", metricsTracer: mt}
d.udp = &blackHoleFilter{n: 100, minSuccesses: 3, name: "UDP", metricsTracer: mt}
}
if detectIPv6 {
d.ipv6 = &blackHoleFilter{n: 100, minSuccessFraction: 0.01, name: "IPv6", metricsTracer: mt}
d.ipv6 = &blackHoleFilter{n: 100, minSuccesses: 3, name: "IPv6", metricsTracer: mt}
}
return d
}

0 comments on commit 555a36a

Please sign in to comment.