Skip to content

Commit

Permalink
Gather and report metrics on lookup success and broadcasting
Browse files Browse the repository at this point in the history
Implement prometheus metrics to report information on
* lookup success rate
* lookup result count
* lookup latency
* lookup time to first provider
* broadcaster end-to-end latency
* broadcaster in-flight messages
* broadcaster message batch size
* broadcaster success/skipped/failure rate
* receiver connected peers count
* receiver error rate

The changes here also implement:
* optional filtering of non-public addrs
* uniqueness check on found providers within a lookup response

Fixes #6
  • Loading branch information
masih committed Mar 21, 2023
1 parent 87a22ed commit a49ba2f
Show file tree
Hide file tree
Showing 8 changed files with 389 additions and 39 deletions.
49 changes: 33 additions & 16 deletions broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@ type (
id peer.ID
}
findCids struct {
cids []cid.Cid
cids []cid.Cid
timestamp time.Time
}
channeledSender struct {
ctx context.Context
cancel context.CancelFunc
id peer.ID
outgoing chan []cid.Cid
c *Cassette
unsentCids map[cid.Cid]struct{}
maxBatchSize int
maxBatchWait *time.Ticker
ctx context.Context
cancel context.CancelFunc
id peer.ID
mailbox chan findCids
c *Cassette
unsentTimestamp time.Time
unsentCids map[cid.Cid]struct{}
maxBatchSize int
maxBatchWait *time.Ticker
}
)

Expand All @@ -54,6 +56,7 @@ func (b *broadcaster) start(ctx context.Context) {
refresh := func() {
logger.Info("Refreshing broadcast list...")
peers := b.c.h.Peerstore().Peers()
var count int
for _, id := range peers {
select {
case <-ctx.Done():
Expand All @@ -64,10 +67,11 @@ func (b *broadcaster) start(ctx context.Context) {
b.mailbox <- addRecipient{
id: id,
}
count++
}
}
}
logger.Infow("Broadcast list refreshed", "size", len(peers))
logger.Infow("Broadcast list refreshed", "size", count)
}
refresh()
for {
Expand Down Expand Up @@ -103,7 +107,8 @@ func (b *broadcaster) start(ctx context.Context) {
select {
case <-ctx.Done():
return
case recipient.outgoing <- c.cids:
case recipient.mailbox <- c:
b.c.metrics.notifyBroadcastRequested(ctx, int64(len(c.cids)))
}
}
case addRecipient:
Expand All @@ -113,10 +118,12 @@ func (b *broadcaster) start(ctx context.Context) {
cs := b.newChanneledSender(c.id)
go cs.start()
recipients[c.id] = cs
b.c.metrics.notifyBroadcastRecipientAdded(ctx)
case removeRecipient:
if cs, exists := recipients[c.id]; exists {
cs.shutdown()
delete(recipients, c.id)
b.c.metrics.notifyBroadcastRecipientRemoved(ctx)
}
}
}
Expand All @@ -129,11 +136,14 @@ func (cs *channeledSender) start() {
select {
case <-cs.ctx.Done():
return
case cids, ok := <-cs.outgoing:
case fc, ok := <-cs.mailbox:
if !ok {
return
}
for _, c := range cids {
if cs.unsentTimestamp.IsZero() || cs.unsentTimestamp.After(fc.timestamp) {
cs.unsentTimestamp = fc.timestamp
}
for _, c := range fc.cids {
if _, exists := cs.unsentCids[c]; !exists {
cs.unsentCids[c] = struct{}{}
}
Expand Down Expand Up @@ -173,19 +183,23 @@ func (cs *channeledSender) supportsHaves() bool {
func (cs *channeledSender) shutdown() {
cs.cancel()
cs.maxBatchWait.Stop()
close(cs.outgoing)
close(cs.mailbox)
}

func (cs *channeledSender) sendUnsent() {
var wantHave bool
cidCount := int64(len(cs.unsentCids))
var wlt bitswap_message_pb.Message_Wantlist_WantType
if cs.supportsHaves() {
wlt = bitswap_message_pb.Message_Wantlist_Have
wantHave = true
} else if cs.c.fallbackOnWantBlock {
wlt = bitswap_message_pb.Message_Wantlist_Block
} else {
logger.Warnw("Peer does not support Want-Haves and fallback on Want-Blocks is disabled. Skipping broadcast.", "peer", cs.id, "skipped", len(cs.unsentCids))
// Clear unsent CIDs.
cs.unsentCids = make(map[cid.Cid]struct{})
cs.c.metrics.notifyBroadcastSkipped(cs.ctx, cidCount, time.Since(cs.unsentTimestamp))
return
}
msg := message.New(false)
Expand All @@ -195,13 +209,16 @@ func (cs *channeledSender) sendUnsent() {
}
if err := cs.c.bsn.SendMessage(cs.ctx, cs.id, msg); err != nil {
logger.Errorw("Failed to send message", "to", cs.id, "err", err)
cs.c.metrics.notifyBroadcastFailed(cs.ctx, cidCount, err, time.Since(cs.unsentTimestamp))
} else {
cs.c.metrics.notifyBroadcastSucceeded(cs.ctx, cidCount, wantHave, time.Since(cs.unsentTimestamp))
}
}

func (b *broadcaster) newChanneledSender(id peer.ID) *channeledSender {
cs := channeledSender{
id: id,
outgoing: make(chan []cid.Cid, b.c.messageSenderBuffer),
mailbox: make(chan findCids, b.c.messageSenderBuffer),
c: b.c,
unsentCids: make(map[cid.Cid]struct{}),
maxBatchSize: b.c.maxBroadcastBatchSize,
Expand All @@ -212,5 +229,5 @@ func (b *broadcaster) newChanneledSender(id peer.ID) *channeledSender {
}

func (b *broadcaster) broadcastWant(c []cid.Cid) {
b.mailbox <- findCids{cids: c}
b.mailbox <- findCids{cids: c, timestamp: time.Now()}
}
48 changes: 36 additions & 12 deletions cassette.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package cassette

import (
"context"
"net"
"net/http"
"sync/atomic"
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-libipfs/bitswap/network"
"github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multihash"
)

Expand All @@ -18,13 +20,15 @@ var (

type Cassette struct {
*options
s *http.Server
server *http.Server
// Context and cancellation used to terminate streaming responses on shutdown.
ctx context.Context
cancel context.CancelFunc
bsn network.BitSwapNetwork
r *receiver
broadcaster *broadcaster

metrics *metrics
}

func New(o ...Option) (*Cassette, error) {
Expand All @@ -35,45 +39,63 @@ func New(o ...Option) (*Cassette, error) {
c := Cassette{
options: opts,
}
c.s = &http.Server{
c.server = &http.Server{
Addr: opts.httpListenAddr,
Handler: c.serveMux(),
}
c.ctx, c.cancel = context.WithCancel(context.Background())
c.s.RegisterOnShutdown(c.cancel)
c.metrics, err = newMetrics(&c)
if err != nil {
return nil, err
}
return &c, nil
}

func (c *Cassette) Start(ctx context.Context) error {
if err := c.metrics.Start(ctx); err != nil {
return err
}
c.bsn = network.NewFromIpfsHost(c.h, nil)
var err error
c.r, err = newReceiver(c.findByMultihash)
c.r, err = newReceiver(c)
if err != nil {
return err
}
c.bsn.Start(c.r)
c.broadcaster = newBroadcaster(c)
c.broadcaster.start(ctx)
ln, err := net.Listen("tcp", c.s.Addr)
if err != nil {
return err
}
go func() { _ = c.s.Serve(ln) }()
logger.Infow("Server started", "id", c.h.ID(), "libp2pAddrs", c.h.Addrs(), "httpAddr", ln.Addr(), "protocols", c.h.Mux().Protocols())
c.server.RegisterOnShutdown(c.cancel)
go func() { _ = c.server.ListenAndServe() }()
logger.Infow("Lookup server started", "id", c.h.ID(), "libp2pAddrs", c.h.Addrs(), "httpAddr", c.server.Addr, "protocols", c.h.Mux().Protocols())
return nil
}

func (c *Cassette) Find(ctx context.Context, k cid.Cid) chan peer.AddrInfo {
start := time.Now()
c.metrics.notifyLookupRequested(ctx)
var timeToFirstProvider time.Duration
rch := make(chan peer.AddrInfo, 1)
go func() {
var resultCount atomic.Int64
ctx, cancel := context.WithTimeout(ctx, c.maxWaitTimeout)
providersSoFar := make(map[peer.ID]struct{})
unregister := c.r.registerFoundHook(ctx, k, func(id peer.ID) {
if _, seen := providersSoFar[id]; seen {
return
}
providersSoFar[id] = struct{}{}
addrs := c.h.Peerstore().Addrs(id)
if !c.addrFilterDisabled {
addrs = multiaddr.FilterAddrs(addrs, IsPubliclyDialableAddr)
}
if len(addrs) > 0 {
select {
case <-ctx.Done():
return
case rch <- peer.AddrInfo{ID: id, Addrs: addrs}:
if resultCount.Add(1) == 1 {
timeToFirstProvider = time.Since(start)
}
}
}
})
Expand All @@ -85,6 +107,7 @@ func (c *Cassette) Find(ctx context.Context, k cid.Cid) chan peer.AddrInfo {
targets := c.toFindTargets(k)
c.broadcaster.broadcastWant(targets)
<-ctx.Done()
c.metrics.notifyLookupResponded(context.Background(), resultCount.Load(), timeToFirstProvider, time.Since(start))
// TODO add option to stop based on provider count limit
}()
return rch
Expand Down Expand Up @@ -117,5 +140,6 @@ func (c *Cassette) toFindTargets(k cid.Cid) []cid.Cid {

func (c *Cassette) Shutdown(ctx context.Context) error {
c.bsn.Stop()
return c.s.Shutdown(ctx)
_ = c.metrics.Shutdown(ctx)
return c.server.Shutdown(ctx)
}
12 changes: 10 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ require (
github.com/ipfs/go-libipfs v0.6.1
github.com/ipfs/go-log/v2 v2.5.1
github.com/libp2p/go-libp2p v0.26.2
github.com/multiformats/go-multiaddr v0.8.0
github.com/multiformats/go-multicodec v0.7.0
github.com/multiformats/go-multihash v0.2.1
github.com/multiformats/go-varint v0.0.7
github.com/prometheus/client_golang v1.14.0
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/exporters/prometheus v0.37.0
go.opentelemetry.io/otel/metric v0.37.0
go.opentelemetry.io/otel/sdk/metric v0.37.0
)

require (
Expand All @@ -24,6 +30,8 @@ require (
github.com/elastic/gosigar v0.14.2 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
Expand Down Expand Up @@ -60,7 +68,6 @@ require (
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr v0.8.0 // indirect
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multibase v0.1.1 // indirect
Expand All @@ -70,7 +77,6 @@ require (
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
Expand All @@ -81,6 +87,8 @@ require (
github.com/quic-go/webtransport-go v0.5.2 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.opentelemetry.io/otel/sdk v1.14.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/dig v1.15.0 // indirect
go.uber.org/fx v1.18.2 // indirect
Expand Down
18 changes: 17 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
Expand Down Expand Up @@ -478,7 +482,7 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
Expand All @@ -498,6 +502,18 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opentelemetry.io/otel v1.14.0 h1:/79Huy8wbf5DnIPhemGB+zEPVwnN6fuQybr/SRXa6hM=
go.opentelemetry.io/otel v1.14.0/go.mod h1:o4buv+dJzx8rohcUeRmWUZhqupFvzWis188WlggnNeU=
go.opentelemetry.io/otel/exporters/prometheus v0.37.0 h1:NQc0epfL0xItsmGgSXgfbH2C1fq2VLXkZoDFsfRNHpc=
go.opentelemetry.io/otel/exporters/prometheus v0.37.0/go.mod h1:hB8qWjsStK36t50/R0V2ULFb4u95X/Q6zupXLgvjTh8=
go.opentelemetry.io/otel/metric v0.37.0 h1:pHDQuLQOZwYD+Km0eb657A25NaRzy0a+eLyKfDXedEs=
go.opentelemetry.io/otel/metric v0.37.0/go.mod h1:DmdaHfGt54iV6UKxsV9slj2bBRJcKC1B1uvDLIioc1s=
go.opentelemetry.io/otel/sdk v1.14.0 h1:PDCppFRDq8A1jL9v6KMI6dYesaq+DFcDZvjsoGvxGzY=
go.opentelemetry.io/otel/sdk v1.14.0/go.mod h1:bwIC5TjrNG6QDCHNWvW4HLHtUQ4I+VQDsnjhvyZCALM=
go.opentelemetry.io/otel/sdk/metric v0.37.0 h1:haYBBtZZxiI3ROwSmkZnI+d0+AVzBWeviuYQDeBWosU=
go.opentelemetry.io/otel/sdk/metric v0.37.0/go.mod h1:mO2WV1AZKKwhwHTV3AKOoIEb9LbUaENZDuGUQd+j4A0=
go.opentelemetry.io/otel/trace v1.14.0 h1:wp2Mmvj41tDsyAJXiWDWpfNsOiIyd38fy85pyKcFq/M=
go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
Expand Down
Loading

0 comments on commit a49ba2f

Please sign in to comment.