Skip to content

Commit

Permalink
add opencensus tracing and metrics
Browse files Browse the repository at this point in the history
This commit adds support for OpenCensus tracing
and metrics collection. This required support for
context.Context propogation throughout the cluster
codebase, and in particular, the ipfscluster component
interfaces.

The tracing propogates across RPC and HTTP boundaries.
The current default tracing backend is Jaeger.

The metrics currently exports the metrics exposed by
the opencensus http plugin as well as the pprof metrics
to a prometheus endpoint for scraping.
The current default metrics backend is Prometheus.

Metrics are currently exposed by default due to low
overhead, can be turned off if desired, whereas tracing
is off by default as it has a much higher performance
overhead, though the extent of the performance hit can be
adjusted with smaller sampling rates.

License: MIT
Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
  • Loading branch information
lanzafame committed Feb 4, 2019
1 parent a244af9 commit 3b3f786
Show file tree
Hide file tree
Showing 71 changed files with 2,757 additions and 1,225 deletions.
9 changes: 6 additions & 3 deletions add_test.go
Expand Up @@ -3,6 +3,7 @@ package ipfscluster
// This files has tests for Add* using multiple cluster peers.

import (
"context"
"mime/multipart"
"testing"

Expand All @@ -11,6 +12,7 @@ import (
)

func TestAdd(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
sth := test.NewShardingTestHelper()
Expand All @@ -34,7 +36,7 @@ func TestAdd(t *testing.T) {
pinDelay()

f := func(t *testing.T, c *Cluster) {
pin := c.StatusLocal(ci)
pin := c.StatusLocal(ctx, ci)
if pin.Error != "" {
t.Error(pin.Error)
}
Expand All @@ -48,12 +50,13 @@ func TestAdd(t *testing.T) {
}

func TestAddPeerDown(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
sth := test.NewShardingTestHelper()
defer sth.Clean(t)

err := clusters[0].Shutdown()
err := clusters[0].Shutdown(ctx)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -82,7 +85,7 @@ func TestAddPeerDown(t *testing.T) {
return
}

pin := c.StatusLocal(ci)
pin := c.StatusLocal(ctx, ci)
if pin.Error != "" {
t.Error(pin.Error)
}
Expand Down
4 changes: 2 additions & 2 deletions adder/sharding/dag_service_test.go
Expand Up @@ -44,15 +44,15 @@ func (rpcs *testRPC) BlockAllocate(ctx context.Context, in api.PinSerial, out *[
return nil
}

func (rpcs *testRPC) PinGet(c cid.Cid) (api.Pin, error) {
func (rpcs *testRPC) PinGet(ctx context.Context, c cid.Cid) (api.Pin, error) {
pI, ok := rpcs.pins.Load(c.String())
if !ok {
return api.Pin{}, errors.New("not found")
}
return pI.(api.PinSerial).ToPin(), nil
}

func (rpcs *testRPC) BlockGet(c cid.Cid) ([]byte, error) {
func (rpcs *testRPC) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) {
bI, ok := rpcs.blocks.Load(c.String())
if !ok {
return nil, errors.New("not found")
Expand Down
16 changes: 9 additions & 7 deletions adder/sharding/verify.go
@@ -1,6 +1,7 @@
package sharding

import (
"context"
"fmt"
"testing"

Expand All @@ -12,20 +13,21 @@ import (
// MockPinStore is used in VerifyShards
type MockPinStore interface {
// Gets a pin
PinGet(cid.Cid) (api.Pin, error)
PinGet(context.Context, cid.Cid) (api.Pin, error)
}

// MockBlockStore is used in VerifyShards
type MockBlockStore interface {
// Gets a block
BlockGet(cid.Cid) ([]byte, error)
BlockGet(context.Context, cid.Cid) ([]byte, error)
}

// VerifyShards checks that a sharded CID has been correctly formed and stored.
// This is a helper function for testing. It returns a map with all the blocks
// from all shards.
func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlockStore, expectedShards int) (map[string]struct{}, error) {
metaPin, err := pins.PinGet(rootCid)
ctx := context.Background()
metaPin, err := pins.PinGet(ctx, rootCid)
if err != nil {
return nil, fmt.Errorf("meta pin was not pinned: %s", err)
}
Expand All @@ -34,7 +36,7 @@ func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlo
return nil, fmt.Errorf("bad MetaPin type")
}

clusterPin, err := pins.PinGet(metaPin.Reference)
clusterPin, err := pins.PinGet(ctx, metaPin.Reference)
if err != nil {
return nil, fmt.Errorf("cluster pin was not pinned: %s", err)
}
Expand All @@ -46,7 +48,7 @@ func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlo
return nil, fmt.Errorf("clusterDAG should reference the MetaPin")
}

clusterDAGBlock, err := ipfs.BlockGet(clusterPin.Cid)
clusterDAGBlock, err := ipfs.BlockGet(ctx, clusterPin.Cid)
if err != nil {
return nil, fmt.Errorf("cluster pin was not stored: %s", err)
}
Expand All @@ -70,7 +72,7 @@ func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlo
return nil, err
}

shardPin, err := pins.PinGet(sh.Cid)
shardPin, err := pins.PinGet(ctx, sh.Cid)
if err != nil {
return nil, fmt.Errorf("shard was not pinned: %s %s", sh.Cid, err)
}
Expand All @@ -80,7 +82,7 @@ func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlo
}
ref = shardPin.Cid

shardBlock, err := ipfs.BlockGet(shardPin.Cid)
shardBlock, err := ipfs.BlockGet(ctx, shardPin.Cid)
if err != nil {
return nil, fmt.Errorf("shard block was not stored: %s", err)
}
Expand Down
16 changes: 13 additions & 3 deletions allocate.go
@@ -1,11 +1,13 @@
package ipfscluster

import (
"context"
"errors"
"fmt"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
"go.opencensus.io/trace"

"github.com/ipfs/ipfs-cluster/api"
)
Expand Down Expand Up @@ -44,7 +46,10 @@ import (
// into account if the given CID was previously in a "pin everywhere" mode,
// and will consider such Pins as currently unallocated ones, providing
// new allocations as available.
func (c *Cluster) allocate(hash cid.Cid, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) {
func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "cluster/allocate")
defer span.End()

if (rplMin + rplMax) == 0 {
return nil, fmt.Errorf("bad replication factors: %d/%d", rplMin, rplMax)
}
Expand All @@ -54,9 +59,9 @@ func (c *Cluster) allocate(hash cid.Cid, rplMin, rplMax int, blacklist []peer.ID
}

// Figure out who is holding the CID
currentPin, _ := c.PinGet(hash)
currentPin, _ := c.PinGet(ctx, hash)
currentAllocs := currentPin.Allocations
metrics := c.monitor.LatestMetrics(c.informer.Name())
metrics := c.monitor.LatestMetrics(ctx, c.informer.Name())

currentMetrics := make(map[peer.ID]api.Metric)
candidatesMetrics := make(map[peer.ID]api.Metric)
Expand All @@ -80,6 +85,7 @@ func (c *Cluster) allocate(hash cid.Cid, rplMin, rplMax int, blacklist []peer.ID
}

newAllocs, err := c.obtainAllocations(
ctx,
hash,
rplMin,
rplMax,
Expand Down Expand Up @@ -114,12 +120,15 @@ func allocationError(hash cid.Cid, needed, wanted int, candidatesValid []peer.ID
}

func (c *Cluster) obtainAllocations(
ctx context.Context,
hash cid.Cid,
rplMin, rplMax int,
currentValidMetrics map[peer.ID]api.Metric,
candidatesMetrics map[peer.ID]api.Metric,
priorityMetrics map[peer.ID]api.Metric,
) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "cluster/obtainAllocations")
defer span.End()

// The list of peers in current
validAllocations := make([]peer.ID, 0, len(currentValidMetrics))
Expand Down Expand Up @@ -167,6 +176,7 @@ func (c *Cluster) obtainAllocations(

// the allocator returns a list of peers ordered by priority
finalAllocs, err := c.allocator.Allocate(
ctx,
hash,
currentValidMetrics,
candidatesMetrics,
Expand Down
11 changes: 8 additions & 3 deletions allocator/ascendalloc/ascendalloc.go
Expand Up @@ -5,6 +5,8 @@
package ascendalloc

import (
"context"

"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"

Expand All @@ -28,14 +30,17 @@ func NewAllocator() AscendAllocator {
func (alloc AscendAllocator) SetClient(c *rpc.Client) {}

// Shutdown does nothing in this allocator
func (alloc AscendAllocator) Shutdown() error { return nil }
func (alloc AscendAllocator) Shutdown(_ context.Context) error { return nil }

// Allocate returns where to allocate a pin request based on metrics which
// carry a numeric value such as "used disk". We do not pay attention to
// the metrics of the currently allocated peers and we just sort the
// candidates based on their metric values (smallest to largest).
func (alloc AscendAllocator) Allocate(c cid.Cid, current,
candidates, priority map[peer.ID]api.Metric) ([]peer.ID, error) {
func (alloc AscendAllocator) Allocate(
ctx context.Context,
c cid.Cid,
current, candidates, priority map[peer.ID]api.Metric,
) ([]peer.ID, error) {
// sort our metrics
first := util.SortNumeric(priority, false)
last := util.SortNumeric(candidates, false)
Expand Down
4 changes: 3 additions & 1 deletion allocator/ascendalloc/ascendalloc_test.go
@@ -1,6 +1,7 @@
package ascendalloc

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -96,10 +97,11 @@ var testCases = []testcase{
}

func Test(t *testing.T) {
ctx := context.Background()
alloc := &AscendAllocator{}
for i, tc := range testCases {
t.Logf("Test case %d", i)
res, err := alloc.Allocate(testCid, tc.current, tc.candidates, nil)
res, err := alloc.Allocate(ctx, testCid, tc.current, tc.candidates, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 4 additions & 2 deletions allocator/descendalloc/descendalloc.go
Expand Up @@ -5,6 +5,8 @@
package descendalloc

import (
"context"

"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"

Expand All @@ -28,13 +30,13 @@ func NewAllocator() DescendAllocator {
func (alloc DescendAllocator) SetClient(c *rpc.Client) {}

// Shutdown does nothing in this allocator
func (alloc DescendAllocator) Shutdown() error { return nil }
func (alloc DescendAllocator) Shutdown(_ context.Context) error { return nil }

// Allocate returns where to allocate a pin request based on metrics which
// carry a numeric value such as "used disk". We do not pay attention to
// the metrics of the currently allocated peers and we just sort the
// candidates based on their metric values (largest to smallest).
func (alloc DescendAllocator) Allocate(c cid.Cid, current, candidates, priority map[peer.ID]api.Metric) ([]peer.ID, error) {
func (alloc DescendAllocator) Allocate(ctx context.Context, c cid.Cid, current, candidates, priority map[peer.ID]api.Metric) ([]peer.ID, error) {
// sort our metrics
first := util.SortNumeric(priority, true)
last := util.SortNumeric(candidates, true)
Expand Down
4 changes: 3 additions & 1 deletion allocator/descendalloc/descendalloc_test.go
@@ -1,6 +1,7 @@
package descendalloc

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -96,10 +97,11 @@ var testCases = []testcase{
}

func Test(t *testing.T) {
ctx := context.Background()
alloc := &DescendAllocator{}
for i, tc := range testCases {
t.Logf("Test case %d", i)
res, err := alloc.Allocate(testCid, tc.current, tc.candidates, nil)
res, err := alloc.Allocate(ctx, testCid, tc.current, tc.candidates, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 3 additions & 0 deletions api/ipfsproxy/config.go
Expand Up @@ -74,6 +74,9 @@ type Config struct {
// Establishes how long we should remember extracted headers before we
// refresh them with a new request. 0 means always.
ExtractHeadersTTL time.Duration

// Tracing flag used to skip tracing specific paths when not enabled.
Tracing bool
}

type jsonConfig struct {
Expand Down
23 changes: 21 additions & 2 deletions api/ipfsproxy/ipfsproxy.go
Expand Up @@ -13,6 +13,10 @@ import (
"sync"
"time"

"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
"go.opencensus.io/trace"

"github.com/ipfs/ipfs-cluster/adder/adderutils"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/rpcutil"
Expand Down Expand Up @@ -127,13 +131,28 @@ func New(cfg *Config) (*Server, error) {
return nil, err
}

var handler http.Handler
router := mux.NewRouter()
handler = router

if cfg.Tracing {
handler = &ochttp.Handler{
IsPublicEndpoint: true,
Propagation: &tracecontext.HTTPFormat{},
Handler: router,
StartOptions: trace.StartOptions{SpanKind: trace.SpanKindServer},
FormatSpanName: func(req *http.Request) string {
return "proxy:" + req.Host + ":" + req.URL.Path + ":" + req.Method
},
}
}

s := &http.Server{
ReadTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
ReadHeaderTimeout: cfg.ReadHeaderTimeout,
IdleTimeout: cfg.IdleTimeout,
Handler: router,
Handler: handler,
}

// See: https://github.com/ipfs/go-ipfs/issues/5168
Expand Down Expand Up @@ -216,7 +235,7 @@ func (proxy *Server) SetClient(c *rpc.Client) {

// Shutdown stops any listeners and stops the component from taking
// any requests.
func (proxy *Server) Shutdown() error {
func (proxy *Server) Shutdown(ctx context.Context) error {
proxy.shutdownLock.Lock()
defer proxy.shutdownLock.Unlock()

Expand Down

0 comments on commit 3b3f786

Please sign in to comment.