Skip to content

Commit

Permalink
refactors cleanup in a goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
yhassanzadeh13 committed May 9, 2022
1 parent b3f58bc commit a9f4edf
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 12 deletions.
36 changes: 27 additions & 9 deletions backoff.go
@@ -1,6 +1,7 @@
package pubsub

import (
"context"
"sync"
"time"

Expand All @@ -10,8 +11,9 @@ import (
const (
MinBackoffDelay = 100 * time.Millisecond
MaxBackoffDelay = 10 * time.Second
TimeToLive = 10 * time.Minute
BackoffMultiplier = 2
TimeToLive = 10 * time.Minute
BackoffCleanupInterval = 1 * time.Minute
BackoffMultiplier = 2
)

type backoffHistory struct {
Expand All @@ -22,15 +24,21 @@ type backoffHistory struct {
type backoff struct {
mu sync.Mutex
info map[peer.ID]*backoffHistory
ct int // size threshold that kicks off the cleaner
ct int // size threshold that kicks off the cleaner
ci time.Duration // cleanup intervals
}

func newBackoff(sizeThreshold int) *backoff {
return &backoff{
func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Duration) *backoff {
b := &backoff{
mu: sync.Mutex{},
ct: sizeThreshold,
ci: cleanupInterval,
info: make(map[peer.ID]*backoffHistory),
}

go b.cleanupLoop(ctx)

return b
}

func (b *backoff) updateAndGet(id peer.ID) time.Duration {
Expand All @@ -55,17 +63,27 @@ func (b *backoff) updateAndGet(id peer.ID) time.Duration {
h.lastTried = time.Now()
b.info[id] = h

if len(b.info) > b.ct {
b.cleanup()
}

return h.duration
}

func (b *backoff) cleanup() {
b.mu.Lock()
defer b.mu.Unlock()

for id, h := range b.info {
if time.Since(h.lastTried) > TimeToLive {
delete(b.info, id)
}
}
}

func (b *backoff) cleanupLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return // pubsub shutting down
case <-time.Tick(b.ci):
b.cleanup()
}
}
}
19 changes: 17 additions & 2 deletions backoff_test.go
@@ -1,6 +1,7 @@
package pubsub

import (
"context"
"fmt"
"math"
"testing"
Expand All @@ -12,7 +13,14 @@ import (
func TestBackoff_Update(t *testing.T){
id1 := peer.ID("peer-1")
id2 := peer.ID("peer-2")
b := newBackoff(10)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

size := 10
cleanupInterval := 5 * time.Second

b := newBackoff(ctx, size, cleanupInterval)

if len(b.info) > 0 {
t.Fatal("non-empty info map for backoff")
Expand Down Expand Up @@ -56,8 +64,12 @@ func TestBackoff_Update(t *testing.T){
}

func TestBackoff_Clean(t *testing.T){
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

size := 10
b := newBackoff(size)
cleanupInterval := 5 * time.Second
b := newBackoff(ctx, size, cleanupInterval)

for i := 0; i < size; i++{
id := peer.ID(fmt.Sprintf("peer-%d", i))
Expand All @@ -69,6 +81,9 @@ func TestBackoff_Clean(t *testing.T){
t.Fatalf("info map size mismatch, expected: %d, got: %d", size, len(b.info))
}

// waits for a cleanup loop to kick-in
time.Sleep(cleanupInterval)

// next update should trigger cleanup
got := b.updateAndGet(peer.ID("some-new-peer"))
if got != time.Duration(0) {
Expand Down
2 changes: 1 addition & 1 deletion pubsub.go
Expand Up @@ -254,7 +254,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
newPeerError: make(chan peer.ID),
peerDead: make(chan struct{}, 1),
peerDeadPend: make(map[peer.ID]struct{}),
deadPeerBackoff: newBackoff(1000),
deadPeerBackoff: newBackoff(ctx, 1000, BackoffCleanupInterval),
cancelCh: make(chan *Subscription),
getPeers: make(chan *listPeerReq),
addSub: make(chan *addSubReq),
Expand Down

0 comments on commit a9f4edf

Please sign in to comment.