diff --git a/broadcaster.go b/broadcaster.go index 1b34f2f..f33e521 100644 --- a/broadcaster.go +++ b/broadcaster.go @@ -27,17 +27,19 @@ type ( id peer.ID } findCids struct { - cids []cid.Cid + cids []cid.Cid + timestamp time.Time } channeledSender struct { - ctx context.Context - cancel context.CancelFunc - id peer.ID - outgoing chan []cid.Cid - c *Cassette - unsentCids map[cid.Cid]struct{} - maxBatchSize int - maxBatchWait *time.Ticker + ctx context.Context + cancel context.CancelFunc + id peer.ID + mailbox chan findCids + c *Cassette + unsentTimestamp time.Time + unsentCids map[cid.Cid]struct{} + maxBatchSize int + maxBatchWait *time.Ticker } ) @@ -54,6 +56,7 @@ func (b *broadcaster) start(ctx context.Context) { refresh := func() { logger.Info("Refreshing broadcast list...") peers := b.c.h.Peerstore().Peers() + var count int for _, id := range peers { select { case <-ctx.Done(): @@ -64,10 +67,11 @@ func (b *broadcaster) start(ctx context.Context) { b.mailbox <- addRecipient{ id: id, } + count++ } } } - logger.Infow("Broadcast list refreshed", "size", len(peers)) + logger.Infow("Broadcast list refreshed", "size", count) } refresh() for { @@ -103,7 +107,8 @@ func (b *broadcaster) start(ctx context.Context) { select { case <-ctx.Done(): return - case recipient.outgoing <- c.cids: + case recipient.mailbox <- c: + b.c.metrics.notifyBroadcastRequested(ctx, int64(len(c.cids))) } } case addRecipient: @@ -113,10 +118,12 @@ func (b *broadcaster) start(ctx context.Context) { cs := b.newChanneledSender(c.id) go cs.start() recipients[c.id] = cs + b.c.metrics.notifyBroadcastRecipientAdded(ctx) case removeRecipient: if cs, exists := recipients[c.id]; exists { cs.shutdown() delete(recipients, c.id) + b.c.metrics.notifyBroadcastRecipientRemoved(ctx) } } } @@ -129,11 +136,14 @@ func (cs *channeledSender) start() { select { case <-cs.ctx.Done(): return - case cids, ok := <-cs.outgoing: + case fc, ok := <-cs.mailbox: if !ok { return } - for _, c := range cids { + if cs.unsentTimestamp.IsZero() || cs.unsentTimestamp.After(fc.timestamp) { + cs.unsentTimestamp = fc.timestamp + } + for _, c := range fc.cids { if _, exists := cs.unsentCids[c]; !exists { cs.unsentCids[c] = struct{}{} } @@ -173,19 +183,23 @@ func (cs *channeledSender) supportsHaves() bool { func (cs *channeledSender) shutdown() { cs.cancel() cs.maxBatchWait.Stop() - close(cs.outgoing) + close(cs.mailbox) } func (cs *channeledSender) sendUnsent() { + var wantHave bool + cidCount := int64(len(cs.unsentCids)) var wlt bitswap_message_pb.Message_Wantlist_WantType if cs.supportsHaves() { wlt = bitswap_message_pb.Message_Wantlist_Have + wantHave = true } else if cs.c.fallbackOnWantBlock { wlt = bitswap_message_pb.Message_Wantlist_Block } else { logger.Warnw("Peer does not support Want-Haves and fallback on Want-Blocks is disabled. Skipping broadcast.", "peer", cs.id, "skipped", len(cs.unsentCids)) // Clear unsent CIDs. cs.unsentCids = make(map[cid.Cid]struct{}) + cs.c.metrics.notifyBroadcastSkipped(cs.ctx, cidCount, time.Since(cs.unsentTimestamp)) return } msg := message.New(false) @@ -195,13 +209,16 @@ func (cs *channeledSender) sendUnsent() { } if err := cs.c.bsn.SendMessage(cs.ctx, cs.id, msg); err != nil { logger.Errorw("Failed to send message", "to", cs.id, "err", err) + cs.c.metrics.notifyBroadcastFailed(cs.ctx, cidCount, err, time.Since(cs.unsentTimestamp)) + } else { + cs.c.metrics.notifyBroadcastSucceeded(cs.ctx, cidCount, wantHave, time.Since(cs.unsentTimestamp)) } } func (b *broadcaster) newChanneledSender(id peer.ID) *channeledSender { cs := channeledSender{ id: id, - outgoing: make(chan []cid.Cid, b.c.messageSenderBuffer), + mailbox: make(chan findCids, b.c.messageSenderBuffer), c: b.c, unsentCids: make(map[cid.Cid]struct{}), maxBatchSize: b.c.maxBroadcastBatchSize, @@ -212,5 +229,5 @@ func (b *broadcaster) newChanneledSender(id peer.ID) *channeledSender { } func (b *broadcaster) broadcastWant(c []cid.Cid) { - b.mailbox <- findCids{cids: c} + b.mailbox <- findCids{cids: c, timestamp: time.Now()} } diff --git a/cassette.go b/cassette.go index 10d51bb..fe43222 100644 --- a/cassette.go +++ b/cassette.go @@ -2,13 +2,15 @@ package cassette import ( "context" - "net" "net/http" + "sync/atomic" + "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-libipfs/bitswap/network" "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multihash" ) @@ -18,13 +20,15 @@ var ( type Cassette struct { *options - s *http.Server + server *http.Server // Context and cancellation used to terminate streaming responses on shutdown. ctx context.Context cancel context.CancelFunc bsn network.BitSwapNetwork r *receiver broadcaster *broadcaster + + metrics *metrics } func New(o ...Option) (*Cassette, error) { @@ -35,45 +39,63 @@ func New(o ...Option) (*Cassette, error) { c := Cassette{ options: opts, } - c.s = &http.Server{ + c.server = &http.Server{ Addr: opts.httpListenAddr, Handler: c.serveMux(), } c.ctx, c.cancel = context.WithCancel(context.Background()) - c.s.RegisterOnShutdown(c.cancel) + c.metrics, err = newMetrics(&c) + if err != nil { + return nil, err + } return &c, nil } func (c *Cassette) Start(ctx context.Context) error { + if err := c.metrics.Start(ctx); err != nil { + return err + } c.bsn = network.NewFromIpfsHost(c.h, nil) var err error - c.r, err = newReceiver(c.findByMultihash) + c.r, err = newReceiver(c) if err != nil { return err } c.bsn.Start(c.r) c.broadcaster = newBroadcaster(c) c.broadcaster.start(ctx) - ln, err := net.Listen("tcp", c.s.Addr) - if err != nil { - return err - } - go func() { _ = c.s.Serve(ln) }() - logger.Infow("Server started", "id", c.h.ID(), "libp2pAddrs", c.h.Addrs(), "httpAddr", ln.Addr(), "protocols", c.h.Mux().Protocols()) + c.server.RegisterOnShutdown(c.cancel) + go func() { _ = c.server.ListenAndServe() }() + logger.Infow("Lookup server started", "id", c.h.ID(), "libp2pAddrs", c.h.Addrs(), "httpAddr", c.server.Addr, "protocols", c.h.Mux().Protocols()) return nil } func (c *Cassette) Find(ctx context.Context, k cid.Cid) chan peer.AddrInfo { + start := time.Now() + c.metrics.notifyLookupRequested(ctx) + var timeToFirstProvider time.Duration rch := make(chan peer.AddrInfo, 1) go func() { + var resultCount atomic.Int64 ctx, cancel := context.WithTimeout(ctx, c.maxWaitTimeout) + providersSoFar := make(map[peer.ID]struct{}) unregister := c.r.registerFoundHook(ctx, k, func(id peer.ID) { + if _, seen := providersSoFar[id]; seen { + return + } + providersSoFar[id] = struct{}{} addrs := c.h.Peerstore().Addrs(id) + if !c.addrFilterDisabled { + addrs = multiaddr.FilterAddrs(addrs, IsPubliclyDialableAddr) + } if len(addrs) > 0 { select { case <-ctx.Done(): return case rch <- peer.AddrInfo{ID: id, Addrs: addrs}: + if resultCount.Add(1) == 1 { + timeToFirstProvider = time.Since(start) + } } } }) @@ -85,6 +107,7 @@ func (c *Cassette) Find(ctx context.Context, k cid.Cid) chan peer.AddrInfo { targets := c.toFindTargets(k) c.broadcaster.broadcastWant(targets) <-ctx.Done() + c.metrics.notifyLookupResponded(context.Background(), resultCount.Load(), timeToFirstProvider, time.Since(start)) // TODO add option to stop based on provider count limit }() return rch @@ -117,5 +140,6 @@ func (c *Cassette) toFindTargets(k cid.Cid) []cid.Cid { func (c *Cassette) Shutdown(ctx context.Context) error { c.bsn.Stop() - return c.s.Shutdown(ctx) + _ = c.metrics.Shutdown(ctx) + return c.server.Shutdown(ctx) } diff --git a/go.mod b/go.mod index 18f180c..2638299 100644 --- a/go.mod +++ b/go.mod @@ -7,9 +7,15 @@ require ( github.com/ipfs/go-libipfs v0.6.1 github.com/ipfs/go-log/v2 v2.5.1 github.com/libp2p/go-libp2p v0.26.2 + github.com/multiformats/go-multiaddr v0.8.0 github.com/multiformats/go-multicodec v0.7.0 github.com/multiformats/go-multihash v0.2.1 github.com/multiformats/go-varint v0.0.7 + github.com/prometheus/client_golang v1.14.0 + go.opentelemetry.io/otel v1.14.0 + go.opentelemetry.io/otel/exporters/prometheus v0.37.0 + go.opentelemetry.io/otel/metric v0.37.0 + go.opentelemetry.io/otel/sdk/metric v0.37.0 ) require ( @@ -24,6 +30,8 @@ require ( github.com/elastic/gosigar v0.14.2 // indirect github.com/flynn/noise v1.0.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect + github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -60,7 +68,6 @@ require ( github.com/mr-tron/base58 v1.2.0 // indirect github.com/multiformats/go-base32 v0.1.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect - github.com/multiformats/go-multiaddr v0.8.0 // indirect github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect github.com/multiformats/go-multibase v0.1.1 // indirect @@ -70,7 +77,6 @@ require ( github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect @@ -81,6 +87,8 @@ require ( github.com/quic-go/webtransport-go v0.5.2 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + go.opentelemetry.io/otel/sdk v1.14.0 // indirect + go.opentelemetry.io/otel/trace v1.14.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/dig v1.15.0 // indirect go.uber.org/fx v1.18.2 // indirect diff --git a/go.sum b/go.sum index b0f26e7..5e120c1 100644 --- a/go.sum +++ b/go.sum @@ -119,7 +119,11 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= @@ -478,7 +482,7 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= @@ -498,6 +502,18 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opentelemetry.io/otel v1.14.0 h1:/79Huy8wbf5DnIPhemGB+zEPVwnN6fuQybr/SRXa6hM= +go.opentelemetry.io/otel v1.14.0/go.mod h1:o4buv+dJzx8rohcUeRmWUZhqupFvzWis188WlggnNeU= +go.opentelemetry.io/otel/exporters/prometheus v0.37.0 h1:NQc0epfL0xItsmGgSXgfbH2C1fq2VLXkZoDFsfRNHpc= +go.opentelemetry.io/otel/exporters/prometheus v0.37.0/go.mod h1:hB8qWjsStK36t50/R0V2ULFb4u95X/Q6zupXLgvjTh8= +go.opentelemetry.io/otel/metric v0.37.0 h1:pHDQuLQOZwYD+Km0eb657A25NaRzy0a+eLyKfDXedEs= +go.opentelemetry.io/otel/metric v0.37.0/go.mod h1:DmdaHfGt54iV6UKxsV9slj2bBRJcKC1B1uvDLIioc1s= +go.opentelemetry.io/otel/sdk v1.14.0 h1:PDCppFRDq8A1jL9v6KMI6dYesaq+DFcDZvjsoGvxGzY= +go.opentelemetry.io/otel/sdk v1.14.0/go.mod h1:bwIC5TjrNG6QDCHNWvW4HLHtUQ4I+VQDsnjhvyZCALM= +go.opentelemetry.io/otel/sdk/metric v0.37.0 h1:haYBBtZZxiI3ROwSmkZnI+d0+AVzBWeviuYQDeBWosU= +go.opentelemetry.io/otel/sdk/metric v0.37.0/go.mod h1:mO2WV1AZKKwhwHTV3AKOoIEb9LbUaENZDuGUQd+j4A0= +go.opentelemetry.io/otel/trace v1.14.0 h1:wp2Mmvj41tDsyAJXiWDWpfNsOiIyd38fy85pyKcFq/M= +go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..d43f7f4 --- /dev/null +++ b/metrics.go @@ -0,0 +1,251 @@ +package cassette + +import ( + "context" + "errors" + "net/http" + "net/http/pprof" + "runtime" + "time" + + "github.com/libp2p/go-libp2p/core/network" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/sdk/metric" +) + +type metrics struct { + c *Cassette + server *http.Server + exporter *prometheus.Exporter + + lookupRequestCounter instrument.Int64Counter + lookupResponseTTFPHistogram instrument.Int64Histogram + lookupResponseResultCountHistogram instrument.Int64Histogram + lookupResponseLatencyHistogram instrument.Int64Histogram + + broadcastInFlightTimeHistogram instrument.Int64Histogram + broadcastBatchSizeHistogram instrument.Int64Histogram + broadcastSkipCounter instrument.Int64Counter + broadcastFailureCounter instrument.Int64Counter + broadcastRecipientsUpDownCounter instrument.Int64UpDownCounter + broadcastInFlightUpDownCounter instrument.Int64UpDownCounter + + receiverErrorCounter instrument.Int64Counter + receiverConnectionUpDownCounter instrument.Int64UpDownCounter +} + +func newMetrics(c *Cassette) (*metrics, error) { + m := metrics{ + c: c, + server: &http.Server{ + Addr: c.metricsHttpListenAddr, + // TODO add other metrics server options. + }, + } + return &m, nil +} + +func (m *metrics) Start(ctx context.Context) error { + var err error + if m.exporter, err = prometheus.New( + prometheus.WithoutUnits(), + prometheus.WithoutScopeInfo(), + prometheus.WithoutTargetInfo()); err != nil { + return err + } + provider := metric.NewMeterProvider(metric.WithReader(m.exporter)) + meter := provider.Meter("ipni/cassette") + + if m.lookupRequestCounter, err = meter.Int64Counter( + "ipni/cassette/lookup_request_count", + instrument.WithUnit("1"), + instrument.WithDescription("The number of lookup requests received."), + ); err != nil { + return err + } + if m.lookupResponseTTFPHistogram, err = meter.Int64Histogram( + "ipni/cassette/lookup_response_first_provider_time", + instrument.WithUnit("ms"), + instrument.WithDescription("The elapsed to find the first provider in milliseconds."), + ); err != nil { + return err + } + if m.lookupResponseResultCountHistogram, err = meter.Int64Histogram( + "ipni/cassette/lookup_response_result_count", + instrument.WithUnit("1"), + instrument.WithDescription("The number of providers found per lookup."), + ); err != nil { + return err + } + if m.lookupResponseLatencyHistogram, err = meter.Int64Histogram( + "ipni/cassette/lookup_response_latency", + instrument.WithUnit("ms"), + instrument.WithDescription("The lookup response latency."), + ); err != nil { + return err + } + if m.broadcastInFlightTimeHistogram, err = meter.Int64Histogram( + "ipni/cassette/broadcast_in_flight_time", + instrument.WithUnit("ms"), + instrument.WithDescription("The elapsed time between broadcast requested and broadcast sent."), + ); err != nil { + return err + } + if m.broadcastBatchSizeHistogram, err = meter.Int64Histogram( + "ipni/cassette/broadcast_batch_size", + instrument.WithUnit("1"), + instrument.WithDescription("The histogram of the number of CIDs in each broadcast message."), + ); err != nil { + return err + } + if m.broadcastSkipCounter, err = meter.Int64Counter( + "ipni/cassette/broadcast_skipped_count", + instrument.WithUnit("1"), + instrument.WithDescription("The number of CIDs skipped broadcast."), + ); err != nil { + return err + } + if m.broadcastFailureCounter, err = meter.Int64Counter( + "ipni/cassette/broadcast_failed_count", + instrument.WithUnit("1"), + instrument.WithDescription("The number of CIDs that failed broadcast."), + ); err != nil { + return err + } + if m.broadcastRecipientsUpDownCounter, err = meter.Int64UpDownCounter( + "ipni/cassette/broadcast_recipients_count", + instrument.WithUnit("1"), + instrument.WithDescription("The number of broadcast recipients."), + ); err != nil { + return err + } + if m.broadcastInFlightUpDownCounter, err = meter.Int64UpDownCounter( + "ipni/cassette/broadcast_in_flight_count", + instrument.WithUnit("1"), + instrument.WithDescription("The number of in-flight CIDs awaiting broadcast."), + ); err != nil { + return err + } + if m.receiverErrorCounter, err = meter.Int64Counter( + "ipni/cassette/receiver_error_count", + instrument.WithUnit("1"), + instrument.WithDescription("The number of errors observed by BitSwap receiver."), + ); err != nil { + return err + } + if m.receiverConnectionUpDownCounter, err = meter.Int64UpDownCounter( + "ipni/cassette/receiver_connection_count", + instrument.WithUnit("1"), + instrument.WithDescription("The number of connections observed by BitSwap receiver."), + ); err != nil { + return err + } + + m.server.Handler = m.serveMux() + go func() { _ = m.server.ListenAndServe() }() + m.server.RegisterOnShutdown(func() { + // TODO add timeout to exporter shutdown + if err := m.exporter.Shutdown(context.TODO()); err != nil { + logger.Errorw("Failed to shut down Prometheus exporter", "err", err) + } + }) + logger.Infow("Metric server started", "addr", m.server.Addr) + return nil +} + +func (m *metrics) serveMux() *http.ServeMux { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + if m.c.metricsEnablePprofDebug { + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + mux.HandleFunc("/debug/pprof/gc", + func(w http.ResponseWriter, req *http.Request) { + runtime.GC() + }, + ) + } + return mux +} + +func (m *metrics) notifyBroadcastSkipped(ctx context.Context, batchSize int64, inFlightTime time.Duration) { + m.broadcastInFlightTimeHistogram.Record(ctx, inFlightTime.Milliseconds(), attribute.String("status", "skipped")) + m.broadcastSkipCounter.Add(ctx, batchSize) + m.broadcastInFlightUpDownCounter.Add(ctx, -batchSize) +} + +func (m *metrics) notifyBroadcastFailed(ctx context.Context, batchSize int64, err error, inFlightTime time.Duration) { + errKindAttr := errKindAttribute(err) + m.broadcastInFlightTimeHistogram.Record(ctx, inFlightTime.Milliseconds(), attribute.String("status", "failed"), errKindAttr) + m.broadcastFailureCounter.Add(ctx, batchSize, errKindAttr) + m.broadcastInFlightUpDownCounter.Add(ctx, -batchSize) +} + +func (m *metrics) notifyBroadcastSucceeded(ctx context.Context, batchSize int64, wantHaves bool, inFlightTime time.Duration) { + m.broadcastInFlightTimeHistogram.Record(ctx, inFlightTime.Milliseconds(), attribute.String("status", "succeeded")) + m.broadcastBatchSizeHistogram.Record(ctx, batchSize) + m.broadcastInFlightUpDownCounter.Add(ctx, -batchSize) +} + +func (m *metrics) notifyBroadcastRequested(ctx context.Context, cidCount int64) { + m.broadcastInFlightUpDownCounter.Add(ctx, cidCount) +} + +func (m *metrics) notifyBroadcastRecipientAdded(ctx context.Context) { + m.broadcastRecipientsUpDownCounter.Add(ctx, 1) +} +func (m *metrics) notifyBroadcastRecipientRemoved(ctx context.Context) { + m.broadcastRecipientsUpDownCounter.Add(ctx, -1) +} + +func (m *metrics) notifyReceiverErrored(ctx context.Context, err error) { + m.receiverErrorCounter.Add(ctx, 1, errKindAttribute(err)) +} + +func (m *metrics) notifyReceiverConnected(ctx context.Context) { + m.receiverConnectionUpDownCounter.Add(ctx, 1) +} + +func (m *metrics) notifyReceiverDisconnected(ctx context.Context) { + m.receiverConnectionUpDownCounter.Add(ctx, -1) +} + +func (m *metrics) notifyLookupRequested(ctx context.Context) { + m.lookupRequestCounter.Add(ctx, 1) +} + +func (m *metrics) notifyLookupResponded(ctx context.Context, resultCount int64, timeToFirstResult time.Duration, latency time.Duration) { + if resultCount > 0 { + m.lookupResponseTTFPHistogram.Record(ctx, timeToFirstResult.Milliseconds()) + } + m.lookupResponseResultCountHistogram.Record(ctx, resultCount) + m.lookupResponseLatencyHistogram.Record(ctx, latency.Milliseconds()) +} + +func errKindAttribute(err error) attribute.KeyValue { + // TODO check logs for other popular error kinds we might care about. + var errKind string + switch { + case errors.Is(err, context.DeadlineExceeded): + errKind = "deadline-exceeded" + case errors.Is(err, context.Canceled): + errKind = "canceled" + case errors.Is(err, network.ErrReset): + errKind = "stream-reset" + case errors.Is(err, network.ErrResourceLimitExceeded): + errKind = "resource-limit" + default: + errKind = "other" + } + return attribute.String("error-kind", errKind) +} + +func (m *metrics) Shutdown(ctx context.Context) error { + return m.server.Shutdown(ctx) +} diff --git a/multiaddr.go b/multiaddr.go new file mode 100644 index 0000000..cff5387 --- /dev/null +++ b/multiaddr.go @@ -0,0 +1,26 @@ +package cassette + +import ( + "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" +) + +// IsPubliclyDialableAddr checks weather target can be dialled publicly. More specifically: +// - if it is of type IP, it is a public IP, and +// - if it is of type DNS, it is not localhost +// +// All other address types are treated as dialable. +func IsPubliclyDialableAddr(target multiaddr.Multiaddr) bool { + c, _ := multiaddr.SplitFirst(target) + if c == nil { + return false + } + switch c.Protocol().Code { + case multiaddr.P_IP4, multiaddr.P_IP6, multiaddr.P_IP6ZONE, multiaddr.P_IPCIDR: + return manet.IsPublicAddr(target) + case multiaddr.P_DNS, multiaddr.P_DNS4, multiaddr.P_DNS6, multiaddr.P_DNSADDR: + return c.Value() != "localhost" + default: + return true + } +} diff --git a/options.go b/options.go index 52324e8..5354a75 100644 --- a/options.go +++ b/options.go @@ -30,6 +30,8 @@ type ( options struct { h host.Host httpListenAddr string + metricsHttpListenAddr string + metricsEnablePprofDebug bool httpAllowOrigin string httpResponsePreferJson bool peers []peer.AddrInfo @@ -40,6 +42,7 @@ type ( messageSenderBuffer int recipientsRefreshInterval time.Duration fallbackOnWantBlock bool + addrFilterDisabled bool maxBroadcastBatchSize int maxBroadcastBatchWait time.Duration @@ -49,6 +52,8 @@ type ( func newOptions(o ...Option) (*options, error) { opts := options{ httpListenAddr: "0.0.0.0:40080", + metricsHttpListenAddr: "0.0.0.0:40081", + metricsEnablePprofDebug: true, ipniCascadeLabel: "legacy", httpAllowOrigin: "*", maxWaitTimeout: 5 * time.Second, diff --git a/receiver.go b/receiver.go index f520155..64acf4b 100644 --- a/receiver.go +++ b/receiver.go @@ -15,11 +15,11 @@ var _ network.Receiver = (*receiver)(nil) type ( receiver struct { - ctx context.Context - cancel context.CancelFunc - mailbox chan any - nextID atomic.Int64 - findByMultihash bool + c *Cassette + ctx context.Context + cancel context.CancelFunc + mailbox chan any + nextID atomic.Int64 } receivedMessageEvent struct { k string @@ -36,10 +36,10 @@ type ( } ) -func newReceiver(findByMultihash bool) (*receiver, error) { +func newReceiver(c *Cassette) (*receiver, error) { var r receiver r.ctx, r.cancel = context.WithCancel(context.Background()) - r.findByMultihash = findByMultihash + r.c = c r.mailbox = make(chan any) go func() { type registeredHook struct { @@ -107,7 +107,7 @@ func newReceiver(findByMultihash bool) (*receiver, error) { } func (r *receiver) keyFromCid(c cid.Cid) string { - if r.findByMultihash { + if r.c.findByMultihash { return string(c.Hash()) } return c.String() @@ -157,14 +157,17 @@ func (r *receiver) ReceiveMessage(ctx context.Context, sender peer.ID, in messag func (r *receiver) ReceiveError(err error) { // TODO hook this up to circuit breakers? logger.Errorw("Received Error", "err", err) + r.c.metrics.notifyReceiverErrored(r.ctx, err) } func (r *receiver) PeerConnected(id peer.ID) { logger.Debugw("peer connected", "id", id) + r.c.metrics.notifyReceiverConnected(r.ctx) } func (r *receiver) PeerDisconnected(id peer.ID) { logger.Debugw("peer disconnected", "id", id) + r.c.metrics.notifyReceiverDisconnected(r.ctx) } func (r *receiver) registerFoundHook(ctx context.Context, k cid.Cid, f func(id peer.ID)) func() {