Skip to content
Permalink
Browse files
Separate decision engine ledger on two parts: score and the wantlist
This is the first step to make external decision logic (tagging
peers with score values) possible.

The wantlist still resides in the original `ledger` struct while
sent/received byte accounting and scores are extracted to the new
`scoreledger` struct managed by the original `scoreWorker()` logic.
The accounting is integrated into the `Engine` via `ScoreLedger`
interface making it possible to replace the original `scoreWorker()`
with some other logic. The interface, however, doesn't allow a
score logic to directly touch peer tags: the logic may decide about
score values while tagging itself is still under control of Engine.

Note: with this commit it's yet not possible to replace the original
score logic because there is no public methods for that.
  • Loading branch information
Paul Wolneykien committed Aug 21, 2020
1 parent 72d351c commit f4977b7af677e84ca7ee6872ffd4b2d3409609da
Show file tree
Hide file tree
Showing 4 changed files with 415 additions and 225 deletions.
@@ -70,25 +70,6 @@ const (
// on their behalf.
queuedTagWeight = 10

// the alpha for the EWMA used to track short term usefulness
shortTermAlpha = 0.5

// the alpha for the EWMA used to track long term usefulness
longTermAlpha = 0.05

// how frequently the engine should sample usefulness. Peers that
// interact every shortTerm time period are considered "active".
shortTerm = 10 * time.Second

// long term ratio defines what "long term" means in terms of the
// shortTerm duration. Peers that interact once every longTermRatio are
// considered useful over the long term.
longTermRatio = 10

// long/short term scores for tagging peers
longTermScore = 10 // this is a high tag but it grows _very_ slowly.
shortTermScore = 10 // this is a high tag but it'll go away quickly if we aren't using the peer.

// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock = 1024
@@ -119,6 +100,31 @@ type PeerTagger interface {
UntagPeer(p peer.ID, tag string)
}

// Assigns a specific score to a peer
type ScorePeerFunc func(peer.ID, int)

// ScoreLedger is an external ledger dealing with peer scores.
type ScoreLedger interface {
// Initializes the score ledger. Should be called before Start().
Init(scorePeer ScorePeerFunc)
// Returns aggregated data communication with a given peer.
GetReceipt(p peer.ID) *Receipt
// Increments the sent counter for the given peer.
AddToSentBytes(p peer.ID, n int)
// Increments the received counter for the given peer.
AddToReceivedBytes(p peer.ID, n int)
// PeerConnected should be called when a new peer connects,
// meaning the ledger should open accounting.
PeerConnected(p peer.ID)
// PeerDisconnected should be called when a peer disconnects to
// clean up the accounting.
PeerDisconnected(p peer.ID)
// Starts the ledger sampling process.
Start()
// Closes (stops) the sampling process.
Close()
}

// Engine manages sending requested blocks to peers.
type Engine struct {
// peerRequestQueue is a priority queue of requests received from peers.
@@ -145,9 +151,12 @@ type Engine struct {

lock sync.RWMutex // protects the fields immediatly below

// ledgerMap lists Ledgers by their Partner key.
// ledgerMap lists block-related Ledgers by their Partner key.
ledgerMap map[peer.ID]*ledger

// an external ledger dealing with peer scores
scoreLedger ScoreLedger

ticker *time.Ticker

taskWorkerLock sync.Mutex
@@ -157,35 +166,29 @@ type Engine struct {
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock int

// how frequently the engine should sample peer usefulness
peerSampleInterval time.Duration
// used by the tests to detect when a sample is taken
sampleCh chan struct{}

sendDontHaves bool

self peer.ID
}

// NewEngine creates a new block sending engine for the given block store
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID) *Engine {
return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, shortTerm, nil)
return newEngine(ctx, bs, peerTagger, self, maxBlockSizeReplaceHasWithBlock, nil)
}

// This constructor is used by the tests
func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID,
maxReplaceSize int, peerSampleInterval time.Duration, sampleCh chan struct{}) *Engine {
maxReplaceSize int, scoreLedger ScoreLedger) *Engine {

e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
scoreLedger: scoreLedger,
bsm: newBlockstoreManager(ctx, bs, blockstoreWorkerCount),
peerTagger: peerTagger,
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
ticker: time.NewTicker(time.Millisecond * 100),
maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
peerSampleInterval: peerSampleInterval,
sampleCh: sampleCh,
taskWorkerCount: taskWorkerCount,
sendDontHaves: true,
self: self,
@@ -197,6 +200,16 @@ func newEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger,
peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved),
peertaskqueue.TaskMerger(newTaskMerger()),
peertaskqueue.IgnoreFreezing(true))
if scoreLedger == nil {
e.scoreLedger = NewDefaultScoreLedger()
}
e.scoreLedger.Init(func(p peer.ID, score int) {
if score == 0 {
e.peerTagger.UntagPeer(p, e.tagUseful)
} else {
e.peerTagger.TagPeer(p, e.tagUseful, score)
}
})
return e
}

@@ -214,7 +227,11 @@ func (e *Engine) SetSendDontHaves(send bool) {
func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
// Start up blockstore manager
e.bsm.start(px)
px.Go(e.scoreWorker)
e.scoreLedger.Start()
px.Go(func(ppx process.Process) {
<-ppx.Closing()
e.scoreLedger.Close()
})

for i := 0; i < e.taskWorkerCount; i++ {
px.Go(func(px process.Process) {
@@ -223,109 +240,6 @@ func (e *Engine) StartWorkers(ctx context.Context, px process.Process) {
}
}

// scoreWorker keeps track of how "useful" our peers are, updating scores in the
// connection manager.
//
// It does this by tracking two scores: short-term usefulness and long-term
// usefulness. Short-term usefulness is sampled frequently and highly weights
// new observations. Long-term usefulness is sampled less frequently and highly
// weights on long-term trends.
//
// In practice, we do this by keeping two EWMAs. If we see an interaction
// within the sampling period, we record the score, otherwise, we record a 0.
// The short-term one has a high alpha and is sampled every shortTerm period.
// The long-term one has a low alpha and is sampled every
// longTermRatio*shortTerm period.
//
// To calculate the final score, we sum the short-term and long-term scores then
// adjust it ±25% based on our debt ratio. Peers that have historically been
// more useful to us than we are to them get the highest score.
func (e *Engine) scoreWorker(px process.Process) {
ticker := time.NewTicker(e.peerSampleInterval)
defer ticker.Stop()

type update struct {
peer peer.ID
score int
}
var (
lastShortUpdate, lastLongUpdate time.Time
updates []update
)

for i := 0; ; i = (i + 1) % longTermRatio {
var now time.Time
select {
case now = <-ticker.C:
case <-px.Closing():
return
}

// The long term update ticks every `longTermRatio` short
// intervals.
updateLong := i == 0

e.lock.Lock()
for _, ledger := range e.ledgerMap {
ledger.lk.Lock()

// Update the short-term score.
if ledger.lastExchange.After(lastShortUpdate) {
ledger.shortScore = ewma(ledger.shortScore, shortTermScore, shortTermAlpha)
} else {
ledger.shortScore = ewma(ledger.shortScore, 0, shortTermAlpha)
}

// Update the long-term score.
if updateLong {
if ledger.lastExchange.After(lastLongUpdate) {
ledger.longScore = ewma(ledger.longScore, longTermScore, longTermAlpha)
} else {
ledger.longScore = ewma(ledger.longScore, 0, longTermAlpha)
}
}

// Calculate the new score.
//
// The accounting score adjustment prefers peers _we_
// need over peers that need us. This doesn't help with
// leeching.
score := int((ledger.shortScore + ledger.longScore) * ((ledger.Accounting.Score())*.5 + .75))

// Avoid updating the connection manager unless there's a change. This can be expensive.
if ledger.score != score {
// put these in a list so we can perform the updates outside _global_ the lock.
updates = append(updates, update{ledger.Partner, score})
ledger.score = score
}
ledger.lk.Unlock()
}
e.lock.Unlock()

// record the times.
lastShortUpdate = now
if updateLong {
lastLongUpdate = now
}

// apply the updates
for _, update := range updates {
if update.score == 0 {
e.peerTagger.UntagPeer(update.peer, e.tagUseful)
} else {
e.peerTagger.TagPeer(update.peer, e.tagUseful, update.score)
}
}
// Keep the memory. It's not much and it saves us from having to allocate.
updates = updates[:0]

// Used by the tests
if e.sampleCh != nil {
e.sampleCh <- struct{}{}
}
}
}

func (e *Engine) onPeerAdded(p peer.ID) {
e.peerTagger.TagPeer(p, e.tagQueued, queuedTagWeight)
}
@@ -347,21 +261,9 @@ func (e *Engine) WantlistForPeer(p peer.ID) []wl.Entry {
return entries
}

// LedgerForPeer returns aggregated data about blocks swapped and communication
// with a given peer.
// LedgerForPeer returns aggregated data communication with a given peer.
func (e *Engine) LedgerForPeer(p peer.ID) *Receipt {
ledger := e.findOrCreate(p)

ledger.lk.Lock()
defer ledger.lk.Unlock()

return &Receipt{
Peer: ledger.Partner.String(),
Value: ledger.Accounting.Value(),
Sent: ledger.Accounting.BytesSent,
Recv: ledger.Accounting.BytesRecv,
Exchanged: ledger.ExchangeCount(),
}
return e.scoreLedger.GetReceipt(p)
}

// Each taskWorker pulls items off the request queue up to the maximum size
@@ -671,7 +573,7 @@ func (e *Engine) ReceiveFrom(from peer.ID, blks []blocks.Block, haves []cid.Cid)
// Record how many bytes were received in the ledger
for _, blk := range blks {
log.Debugw("Bitswap engine <- block", "local", e.self, "from", from, "cid", blk.Cid(), "size", len(blk.RawData()))
l.ReceivedBytes(len(blk.RawData()))
e.scoreLedger.AddToReceivedBytes(l.Partner, len(blk.RawData()))
}

l.lk.Unlock()
@@ -741,7 +643,7 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {

// Remove sent blocks from the want list for the peer
for _, block := range m.Blocks() {
l.SentBytes(len(block.RawData()))
e.scoreLedger.AddToSentBytes(l.Partner, len(block.RawData()))
l.wantList.RemoveType(block.Cid(), pb.Message_Wantlist_Block)
}

@@ -764,6 +666,8 @@ func (e *Engine) PeerConnected(p peer.ID) {
if !ok {
e.ledgerMap[p] = newLedger(p)
}

e.scoreLedger.PeerConnected(p)
}

// PeerDisconnected is called when a peer disconnects.
@@ -772,6 +676,8 @@ func (e *Engine) PeerDisconnected(p peer.ID) {
defer e.lock.Unlock()

delete(e.ledgerMap, p)

e.scoreLedger.PeerDisconnected(p)
}

// If the want is a want-have, and it's below a certain size, send the full
@@ -782,13 +688,11 @@ func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize in
}

func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
// NB not threadsafe
return e.findOrCreate(p).Accounting.BytesSent
return e.LedgerForPeer(p).Sent
}

func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
// NB not threadsafe
return e.findOrCreate(p).Accounting.BytesRecv
return e.LedgerForPeer(p).Recv
}

// ledger lazily instantiates a ledger
@@ -97,7 +97,7 @@ func newTestEngine(ctx context.Context, idStr string) engineSet {
func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}) engineSet {
fpt := &fakePeerTagger{}
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := newEngine(ctx, bs, fpt, "localhost", 0, peerSampleInterval, sampleCh)
e := newEngine(ctx, bs, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
return engineSet{
Peer: peer.ID(idStr),
@@ -185,7 +185,7 @@ func peerIsPartner(p peer.ID, e *Engine) bool {
func TestOutboxClosedWhenEngineClosed(t *testing.T) {
ctx := context.Background()
t.SkipNow() // TODO implement *Engine.Close
e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, shortTerm, nil)
e := newEngine(ctx, blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())), &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
var wg sync.WaitGroup
wg.Add(1)
@@ -513,7 +513,7 @@ func TestPartnerWantHaveWantBlockNonActive(t *testing.T) {
testCases = onlyTestCases
}

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))
for i, testCase := range testCases {
t.Logf("Test case %d:", i)
@@ -669,7 +669,7 @@ func TestPartnerWantHaveWantBlockActive(t *testing.T) {
testCases = onlyTestCases
}

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

var next envChan
@@ -854,7 +854,7 @@ func TestPartnerWantsThenCancels(t *testing.T) {
ctx := context.Background()
for i := 0; i < numRounds; i++ {
expected := make([][]string, 0, len(testcases))
e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil)
e := newEngine(ctx, bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
for _, testcase := range testcases {
set := testcase[0]
@@ -879,7 +879,7 @@ func TestSendReceivedBlocksToPeersThatWantThem(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
@@ -923,7 +923,7 @@ func TestSendDontHave(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)
@@ -987,7 +987,7 @@ func TestWantlistForPeer(t *testing.T) {
partner := libp2ptest.RandPeerIDFatal(t)
otherPeer := libp2ptest.RandPeerIDFatal(t)

e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, shortTerm, nil)
e := newEngine(context.Background(), bs, &fakePeerTagger{}, "localhost", 0, NewTestScoreLedger(shortTerm, nil))
e.StartWorkers(context.Background(), process.WithTeardown(func() error { return nil }))

blks := testutil.GenerateBlocksOfSize(4, 8*1024)

0 comments on commit f4977b7

Please sign in to comment.