Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backoff system implementation #1

Merged
merged 13 commits into from
Oct 12, 2021
11 changes: 11 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/libp2p/go-libp2p-backoff

replace github.com/libp2p/go-libp2p-core => /Users/petar/src/github.com/libp2p/go-libp2p-core

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove


require (
github.com/libp2p/go-libp2p-core v0.0.1
github.com/libp2p/go-libp2p-discovery v0.2.0
github.com/multiformats/go-multiaddr v0.2.1
)

go 1.13
233 changes: 233 additions & 0 deletions go.sum

Large diffs are not rendered by default.

54 changes: 54 additions & 0 deletions policy/exp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package policy

import (
"math/rand"
"time"

discovery "github.com/libp2p/go-libp2p-discovery"
)

type ExpBackoffPolicy struct {
Min time.Duration
Max time.Duration
Jitter discovery.Jitter
TimeUnits time.Duration
Base float64
Offset time.Duration
RNG *rand.Rand
}

func (p *ExpBackoffPolicy) NewBackoffState() BackoffState {
return &expBackoffState{
underlying: discovery.NewExponentialBackoff(
p.Min,
p.Max,
p.Jitter,
p.TimeUnits,
p.Base,
p.Offset,
p.RNG,
)(),
}
}

type expBackoffState struct {
offUntil time.Time
underlying discovery.BackoffStrategy
}

func (s *expBackoffState) Clear(now time.Time) {
s.offUntil = now
s.underlying.Reset()
}

func (s *expBackoffState) Backoff(now time.Time) {
// Unlike github.com/libp2p/go-libp2p-discovery, we do not
// count backoffs that occur on top of prior backoffs.
if now.After(s.offUntil) {
s.offUntil = now.Add(s.underlying.Delay())
}
}

func (s *expBackoffState) TimeToClear(now time.Time) time.Duration {
return time.Duration(0)
}
51 changes: 51 additions & 0 deletions policy/policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package policy

import "time"

// BackoffPolicy is a factory for backoff strategies of a given type.
// A strategy is the state and arithmetic logic of a timer.
type BackoffPolicy interface {
NewBackoffState() BackoffState
}

// BackoffState implements the runtime state for a specific backoff policy.
//
// Implementations of BackoffState are purely concerned with the "arithmetic"
// of computing when the respective timer should be cleared.
//
// This interface allows for the implementation of flexible backoff policies.
// For instance, a policy could treat a burst of backoffs as a single one.
//
// BackoffState is an analog of github.com/libp2p/go-libp2p-discovery.BackoffStrategy.
// The latter, however, is not able to describe logic that adapts to bursts of backoffs.
type BackoffState interface {

// Clear informs the policy of the current time and sets its state to cleared.
Clear(now time.Time)

// Backoff informs the policy of the current time and sets its state to backing off.
Backoff(now time.Time)

// TimeToClear informs the policy of the current time and returns the duration
// remaining until the back off state is cleared. Zero or negative durations indicate
// that the state is already cleared.
TimeToClear(now time.Time) time.Duration
}

type NoBackoffPolicy struct{}

func (NoBackoffPolicy) NewBackoffState() BackoffState {
return noBackoffState{}
}

type noBackoffState struct{}

func (noBackoffState) Clear(now time.Time) {
}

func (noBackoffState) Backoff(now time.Time) {
}

func (noBackoffState) TimeToClear(now time.Time) time.Duration {
return time.Duration(0)
}
68 changes: 68 additions & 0 deletions shared/shared.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package shared

import (
"sync"

"github.com/libp2p/go-libp2p-backoff/policy"
"github.com/libp2p/go-libp2p-backoff/tree"
"github.com/libp2p/go-libp2p-core/backoff"
ma "github.com/multiformats/go-multiaddr"
)

// CHECKOUT: https://github.com/libp2p/go-libp2p-discovery

var DefaultIPBackoffPolicy = policy.NoBackoffPolicy{} // TODO: use a real policy
var DefaultTransportBackoffPolicy = policy.NoBackoffPolicy{} // TODO: use a real policy
var DefaultSwarmBackoffPolicy = policy.NoBackoffPolicy{} // TODO: use a real policy
var DefaultProtocolBackoffPolicy = policy.NoBackoffPolicy{} // TODO: use a real policy

func NewSharedBackoffs() backoff.SharedBackoffs {
b := &sharedBackoffs{
root: tree.NewBackoffTreeTimer(nil, policy.NoBackoffPolicy{}),
}
b.root.StartGC()
return b
}

type sharedBackoffs struct {
rlk sync.Mutex
root *tree.BackoffTreeTimer
}

func (sh *sharedBackoffs) IP(addr ma.Multiaddr) backoff.BackoffTimer {
ipComp, _ := ma.SplitFirst(addr)
return sh.root.
Subtimer("dial", policy.NoBackoffPolicy{}). // this creates a namespace for dialing-related timers
Subtimer(ipComp.String(), DefaultIPBackoffPolicy)
}

func (sh *sharedBackoffs) Transport(addr ma.Multiaddr) backoff.BackoffTimer {
ipComp, rest := ma.SplitFirst(addr)
transportComp, _ := ma.SplitFirst(rest)
return sh.root.
Subtimer("dial", policy.NoBackoffPolicy{}). // this creates a namespace for dialing-related timers
Subtimer(ipComp.String(), DefaultIPBackoffPolicy).
Subtimer(transportComp.String(), DefaultTransportBackoffPolicy)
}

func (sh *sharedBackoffs) Swarm(addr ma.Multiaddr) backoff.BackoffTimer {
ipComp, rest := ma.SplitFirst(addr)
transportComp, _ := ma.SplitFirst(rest)
return sh.root.
Subtimer("dial", policy.NoBackoffPolicy{}). // this creates a namespace for dialing-related timers
Subtimer(ipComp.String(), DefaultIPBackoffPolicy).
Subtimer(transportComp.String(), DefaultTransportBackoffPolicy).
Subtimer("swarm", DefaultSwarmBackoffPolicy)
}

func (sh *sharedBackoffs) Protocol(addr ma.Multiaddr) backoff.BackoffTimer {
ipComp, rest := ma.SplitFirst(addr)
transportComp, rest2 := ma.SplitFirst(rest)
protocolComp, _ := ma.SplitFirst(rest2)
return sh.root.
Subtimer("dial", policy.NoBackoffPolicy{}). // this creates a namespace for dialing-related timers
Subtimer(ipComp.String(), DefaultIPBackoffPolicy).
Subtimer(transportComp.String(), DefaultTransportBackoffPolicy).
Subtimer("swarm", DefaultSwarmBackoffPolicy).
Subtimer(protocolComp.String(), DefaultProtocolBackoffPolicy)
}
105 changes: 105 additions & 0 deletions tree/timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package tree

import (
"sync"
"time"

"github.com/libp2p/go-libp2p-backoff/policy"
"github.com/libp2p/go-libp2p-core/backoff"
)

func NewBackoffTreeTimer(parent backoff.BackoffTimer, policy policy.BackoffPolicy) *BackoffTreeTimer {
return &BackoffTreeTimer{
parent: parent,
state: policy.NewBackoffState(),
children: map[string]*BackoffTreeTimer{},
}
}

type BackoffTreeTimer struct {
parent backoff.BackoffTimer
sync.Mutex
clk sync.Mutex // lock for children
children map[string]*BackoffTreeTimer
slk sync.Mutex // lock for state
state policy.BackoffState
}

func (t *BackoffTreeTimer) Subtimer(childName string, policy policy.BackoffPolicy) *BackoffTreeTimer {
t.clk.Lock()
defer t.clk.Unlock()
if child := t.children[childName]; child != nil {
return child
} else {
d := NewBackoffTreeTimer(t, policy)
t.children[childName] = d
return d
}
}

var BackoffGCInterval = time.Minute

func (t *BackoffTreeTimer) StartGC() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a task like this should probably run within a context so that it can be completed on shutdown.

go func() {
for {
time.Sleep(BackoffGCInterval)
t.GC()
}
}()
}

// GC runs garbage collection on this timer's descendants.
// GC returns the number of subtrees of this node after garbage collection.
func (t *BackoffTreeTimer) GC() int {
t.clk.Lock()
defer t.clk.Unlock()
for name, child := range t.children {
if child.GC() == 0 && child.TimeToClear() <= 0 {
delete(t.children, name)
}
}
return len(t.children)
}

func (t *BackoffTreeTimer) NumChildren() int {
t.clk.Lock()
defer t.clk.Unlock()
return len(t.children)
}

// Wait implements BackoffTimer interface.
func (t *BackoffTreeTimer) Wait() {
time.Sleep(t.TimeToClear())
}

// TimeToClear implements BackoffTimer interface.
func (t *BackoffTreeTimer) TimeToClear() time.Duration {
pttc := time.Duration(0)
if p := t.parent; p != nil {
pttc = p.TimeToClear()
}
t.slk.Lock()
defer t.slk.Unlock()
return maxDur(pttc, t.state.TimeToClear(time.Now()))
}

func maxDur(dx, dy time.Duration) time.Duration {
if dx > dy {
return dx
}
return dy
}

// Clear implements BackoffTimer interface.
func (t *BackoffTreeTimer) Clear() {
t.slk.Lock()
defer t.slk.Unlock()
t.state.Clear(time.Now())
}

// Backoff implements BackoffTimer interface.
func (t *BackoffTreeTimer) Backoff() {
t.slk.Lock()
defer t.slk.Unlock()
t.state.Backoff(time.Now())
}
65 changes: 65 additions & 0 deletions tree/timer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package tree

import (
"testing"
"time"

"github.com/libp2p/go-libp2p-backoff/policy"
)

type TestBackoffPolicy struct{}

func (TestBackoffPolicy) NewBackoffState() policy.BackoffState {
return &testBackoffState{}
}

type testBackoffState struct {
backingOff bool
}

func (s *testBackoffState) Clear(now time.Time) {
s.backingOff = false
}

func (s *testBackoffState) Backoff(now time.Time) {
s.backingOff = true
}

func (s *testBackoffState) TimeToClear(now time.Time) time.Duration {
if s.backingOff {
return time.Second
} else {
return time.Duration(0)
}
}

func TestTimerGC1(t *testing.T) {
// create root timer
root := NewBackoffTreeTimer(nil, TestBackoffPolicy{})
// create a child timer, which is cleared on init
root.Subtimer("child", TestBackoffPolicy{})
if root.NumChildren() != 1 {
t.Errorf("expecting one child")
}
// run GC on the root (should remove the child timer)
if root.GC() != 0 {
t.Errorf("GC did not remove child")
}
}

func TestTimerGC2(t *testing.T) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a test with two children subtimers active in different go-routines would be potentially useful to find races.

// create root timer
root := NewBackoffTreeTimer(nil, TestBackoffPolicy{})
// create a child timer, which is cleared on init
child := root.Subtimer("child", TestBackoffPolicy{})
// create child of child, in backoff state
child2 := child.Subtimer("child2", TestBackoffPolicy{})
child2.Backoff()
if root.NumChildren() != 1 || child.NumChildren() != 1 {
t.Errorf("expecting root has one child with one child")
}
// run GC on the root (should remove the child timer)
if root.GC() != 1 {
t.Errorf("GC removed a child with a descendant in backoff mode")
}
}