Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
feat(Benchmarks): Add real world dup blocks test
Browse files Browse the repository at this point in the history
- add a delay generator that similates real world latencies one might encounter on the internet
- modify virtual network to accept different latencies for different
peers based on using NextWaitTime on passed delay
- modify dup_blocks_test subtestDistributeAndFetch to accept a custom
delay
- Add a real world benchmarks that simulates the kinds of problems one might
encounter bitswaping with a long lived session and a large swarm of
peers with real world latency distributions (that causes #8 not to
function well in practice)
  • Loading branch information
hannahhoward committed Nov 14, 2018
1 parent 663e702 commit 39fa3c7
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 28 deletions.
73 changes: 52 additions & 21 deletions dup_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,71 +33,102 @@ type runStats struct {
var benchmarkLog []runStats

func BenchmarkDups2Nodes(b *testing.B) {
fixedDelay := delay.Fixed(10 * time.Millisecond)
b.Run("AllToAll-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, allToAll, oneAtATime)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, allToAll, oneAtATime)
})
b.Run("AllToAll-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, allToAll, batchFetchAll)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, allToAll, batchFetchAll)
})

b.Run("Overlap1-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap1, oneAtATime)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap1, oneAtATime)
})

b.Run("Overlap2-BatchBy10", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap2, batchFetchBy10)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap2, batchFetchBy10)
})

b.Run("Overlap3-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, oneAtATime)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, oneAtATime)
})
b.Run("Overlap3-BatchBy10", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, batchFetchBy10)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, batchFetchBy10)
})
b.Run("Overlap3-AllConcurrent", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, fetchAllConcurrent)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, fetchAllConcurrent)
})
b.Run("Overlap3-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, batchFetchAll)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, batchFetchAll)
})
b.Run("Overlap3-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetch(b, 3, 100, overlap3, unixfsFileFetch)
subtestDistributeAndFetch(b, 3, 100, fixedDelay, overlap3, unixfsFileFetch)
})
b.Run("10Nodes-AllToAll-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, oneAtATime)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, oneAtATime)
})
b.Run("10Nodes-AllToAll-BatchFetchBy10", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, batchFetchBy10)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, batchFetchBy10)
})
b.Run("10Nodes-AllToAll-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, batchFetchAll)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, batchFetchAll)
})
b.Run("10Nodes-AllToAll-AllConcurrent", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, fetchAllConcurrent)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, fetchAllConcurrent)
})
b.Run("10Nodes-AllToAll-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, allToAll, unixfsFileFetch)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, allToAll, unixfsFileFetch)
})
b.Run("10Nodes-OnePeerPerBlock-OneAtATime", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, oneAtATime)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, oneAtATime)
})
b.Run("10Nodes-OnePeerPerBlock-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, batchFetchAll)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, batchFetchAll)
})
b.Run("10Nodes-OnePeerPerBlock-UnixfsFetch", func(b *testing.B) {
subtestDistributeAndFetch(b, 10, 100, onePeerPerBlock, unixfsFileFetch)
subtestDistributeAndFetch(b, 10, 100, fixedDelay, onePeerPerBlock, unixfsFileFetch)
})
b.Run("200Nodes-AllToAll-BigBatch", func(b *testing.B) {
subtestDistributeAndFetch(b, 200, 20, allToAll, batchFetchAll)
subtestDistributeAndFetch(b, 200, 20, fixedDelay, allToAll, batchFetchAll)
})

out, _ := json.MarshalIndent(benchmarkLog, "", " ")
ioutil.WriteFile("benchmark.json", out, 0666)
}

func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, df distFunc, ff fetchFunc) {
const fastSpeed = 60 * time.Millisecond
const mediumSpeed = 200 * time.Millisecond
const slowSpeed = 800 * time.Millisecond
const superSlowSpeed = 4000 * time.Millisecond
const distribution = 20 * time.Millisecond

func BenchmarkDupsManyNodesRealWorldNetwork(b *testing.B) {
fastNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
0.0, 0.0, distribution, nil)
fastNetworkDelay := delay.Delay(fastSpeed, fastNetworkDelayGenerator)
averageNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
mediumSpeed-fastSpeed, slowSpeed-fastSpeed,
0.3, 0.3, distribution, nil)
averageNetworkDelay := delay.Delay(fastSpeed, averageNetworkDelayGenerator)
slowNetworkDelayGenerator := tn.InternetLatencyDelayGenerator(
mediumSpeed-fastSpeed, superSlowSpeed-fastSpeed,
0.3, 0.3, distribution, nil)
slowNetworkDelay := delay.Delay(fastSpeed, slowNetworkDelayGenerator)

b.Run("200Nodes-AllToAll-BigBatch-FastNetwork", func(b *testing.B) {
subtestDistributeAndFetch(b, 300, 200, fastNetworkDelay, allToAll, batchFetchAll)
})
b.Run("200Nodes-AllToAll-BigBatch-AverageVariableSpeedNetwork", func(b *testing.B) {
subtestDistributeAndFetch(b, 300, 200, averageNetworkDelay, allToAll, batchFetchAll)
})
b.Run("200Nodes-AllToAll-BigBatch-SlowVariableSpeedNetwork", func(b *testing.B) {
subtestDistributeAndFetch(b, 300, 200, slowNetworkDelay, allToAll, batchFetchAll)
})
}

func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, df distFunc, ff fetchFunc) {
start := time.Now()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond))
net := tn.VirtualNetwork(mockrouting.NewServer(), d)
sg := NewTestSessionGenerator(net)
defer sg.Close()

Expand Down
63 changes: 63 additions & 0 deletions testnet/internet_latency_delay_generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package bitswap

import (
"math/rand"
"time"

"github.com/ipfs/go-ipfs-delay"
)

var sharedRNG = rand.New(rand.NewSource(time.Now().UnixNano()))

// InternetLatencyDelayGenerator generates three clusters of delays,
// typical of the type of peers you would encounter on the interenet
// Given a base delay time T, the wait time generated will be either:
// 1. A normalized distribution around the base time
// 2. A normalized distribution around the base time plus a "medium" delay
// 3. A normalized distribution around the base time plus a "large" delay
// The size of the medium & large delays are determined when the generator
// is constructed, as well as the relative percentages with which delays fall
// into each of the three different clusters, and the standard deviation for
// the normalized distribution
// This can be used to generate a number of scenarios typical of latency
// distribution among peers on the internet
func InternetLatencyDelayGenerator(
mediumDelay time.Duration,
largeDelay time.Duration,
percentMedium float64,
percentLarge float64,
std time.Duration,
rng *rand.Rand) delay.Generator {
if rng == nil {
rng = sharedRNG
}

return &internetLatencyDelayGenerator{
mediumDelay: mediumDelay,
largeDelay: largeDelay,
percentLarge: percentLarge,
percentMedium: percentMedium,
std: std,
rng: rng,
}
}

type internetLatencyDelayGenerator struct {
mediumDelay time.Duration
largeDelay time.Duration
percentLarge float64
percentMedium float64
std time.Duration
rng *rand.Rand
}

func (d *internetLatencyDelayGenerator) NextWaitTime(t time.Duration) time.Duration {
clusterDistribution := d.rng.Float64()
baseDelay := time.Duration(d.rng.NormFloat64()*float64(d.std)) + t
if clusterDistribution < d.percentLarge {
return baseDelay + d.largeDelay
} else if clusterDistribution < d.percentMedium+d.percentLarge {
return baseDelay + d.mediumDelay
}
return baseDelay
}
69 changes: 69 additions & 0 deletions testnet/internet_latency_delay_generator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package bitswap

import (
"math"
"math/rand"
"testing"
"time"
)

const testSeed = 99

func TestInternetLatencyDelayNextWaitTimeDistribution(t *testing.T) {
initialValue := 1000 * time.Millisecond
deviation := 100 * time.Millisecond
mediumDelay := 1000 * time.Millisecond
largeDelay := 3000 * time.Millisecond
percentMedium := 0.2
percentLarge := 0.4
buckets := make(map[string]int)
internetLatencyDistributionDelay := InternetLatencyDelayGenerator(
mediumDelay,
largeDelay,
percentMedium,
percentLarge,
deviation,
rand.New(rand.NewSource(testSeed)))

buckets["fast"] = 0
buckets["medium"] = 0
buckets["slow"] = 0
buckets["outside_1_deviation"] = 0

// strategy here is rather than mock randomness, just use enough samples to
// get approximately the distribution you'd expect
for i := 0; i < 10000; i++ {
next := internetLatencyDistributionDelay.NextWaitTime(initialValue)
if math.Abs((next - initialValue).Seconds()) <= deviation.Seconds() {
buckets["fast"]++
} else if math.Abs((next - initialValue - mediumDelay).Seconds()) <= deviation.Seconds() {
buckets["medium"]++
} else if math.Abs((next - initialValue - largeDelay).Seconds()) <= deviation.Seconds() {
buckets["slow"]++
} else {
buckets["outside_1_deviation"]++
}
}
totalInOneDeviation := float64(10000 - buckets["outside_1_deviation"])
oneDeviationPercentage := totalInOneDeviation / 10000
fastPercentageResult := float64(buckets["fast"]) / totalInOneDeviation
mediumPercentageResult := float64(buckets["medium"]) / totalInOneDeviation
slowPercentageResult := float64(buckets["slow"]) / totalInOneDeviation

// see 68-95-99 rule for normal distributions
if math.Abs(oneDeviationPercentage-0.6827) >= 0.1 {
t.Fatal("Failed to distribute values normally based on standard deviation")
}

if math.Abs(fastPercentageResult+percentMedium+percentLarge-1) >= 0.1 {
t.Fatal("Incorrect percentage of values distributed around fast delay time")
}

if math.Abs(mediumPercentageResult-percentMedium) >= 0.1 {
t.Fatal("Incorrect percentage of values distributed around medium delay time")
}

if math.Abs(slowPercentageResult-percentLarge) >= 0.1 {
t.Fatal("Incorrect percentage of values distributed around slow delay time")
}
}
46 changes: 39 additions & 7 deletions testnet/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bitswap
import (
"context"
"errors"
"sort"
"sync"
"sync/atomic"
"time"
Expand All @@ -24,6 +25,7 @@ var log = logging.Logger("bstestnet")

func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {
return &network{
latencies: make(map[peer.ID]map[peer.ID]time.Duration),
clients: make(map[peer.ID]*receiverQueue),
delay: d,
routingserver: rs,
Expand All @@ -33,6 +35,7 @@ func VirtualNetwork(rs mockrouting.Server, d delay.D) Network {

type network struct {
mu sync.Mutex
latencies map[peer.ID]map[peer.ID]time.Duration
clients map[peer.ID]*receiverQueue
routingserver mockrouting.Server
delay delay.D
Expand Down Expand Up @@ -87,6 +90,18 @@ func (n *network) SendMessage(
n.mu.Lock()
defer n.mu.Unlock()

latencies, ok := n.latencies[from]
if !ok {
latencies = make(map[peer.ID]time.Duration)
n.latencies[from] = latencies
}

latency, ok := latencies[to]
if !ok {
latency = n.delay.NextWaitTime()
latencies[to] = latency
}

receiver, ok := n.clients[to]
if !ok {
return errors.New("cannot locate peer on network")
Expand All @@ -98,7 +113,7 @@ func (n *network) SendMessage(
msg := &message{
from: from,
msg: mes,
shouldSend: time.Now().Add(n.delay.Get()),
shouldSend: time.Now().Add(latency),
}
receiver.enqueue(msg)

Expand Down Expand Up @@ -229,21 +244,38 @@ func (rq *receiverQueue) enqueue(m *message) {
}
}

func (rq *receiverQueue) Swap(i, j int) {
rq.queue[i], rq.queue[j] = rq.queue[j], rq.queue[i]
}

func (rq *receiverQueue) Len() int {
return len(rq.queue)
}

func (rq *receiverQueue) Less(i, j int) bool {
return rq.queue[i].shouldSend.UnixNano() < rq.queue[j].shouldSend.UnixNano()
}

func (rq *receiverQueue) process() {
for {
rq.lk.Lock()
sort.Sort(rq)
if len(rq.queue) == 0 {
rq.active = false
rq.lk.Unlock()
return
}
m := rq.queue[0]
rq.queue = rq.queue[1:]
rq.lk.Unlock()

time.Sleep(time.Until(m.shouldSend))
atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1)
rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
if time.Until(m.shouldSend).Seconds() < 0.1 {
rq.queue = rq.queue[1:]
rq.lk.Unlock()
time.Sleep(time.Until(m.shouldSend))
atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1)
rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg)
} else {
rq.lk.Unlock()
time.Sleep(100 * time.Millisecond)
}
}
}

Expand Down

0 comments on commit 39fa3c7

Please sign in to comment.