Skip to content

Commit

Permalink
Merge branch 'master' into feat/datastore
Browse files Browse the repository at this point in the history
At this point, tracing in dsstate is missing

License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
  • Loading branch information
hsanjuan committed Feb 7, 2019
2 parents 7deb628 + fcea4f4 commit bfdf575
Show file tree
Hide file tree
Showing 74 changed files with 2,870 additions and 1,236 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -7,7 +7,7 @@
[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme)
[![GoDoc](https://godoc.org/github.com/ipfs/ipfs-cluster?status.svg)](https://godoc.org/github.com/ipfs/ipfs-cluster)
[![Go Report Card](https://goreportcard.com/badge/github.com/ipfs/ipfs-cluster)](https://goreportcard.com/report/github.com/ipfs/ipfs-cluster)
[![Build Status](https://travis-ci.org/ipfs/ipfs-cluster.svg?branch=master)](https://travis-ci.org/ipfs/ipfs-cluster)
[![Build Status](https://travis-ci.com/ipfs/ipfs-cluster.svg?branch=master)](https://travis-ci.com/ipfs/ipfs-cluster)
[![Coverage Status](https://coveralls.io/repos/github/ipfs/ipfs-cluster/badge.svg?branch=master)](https://coveralls.io/github/ipfs/ipfs-cluster?branch=master)

> Pinset orchestration for IPFS.
Expand Down
9 changes: 6 additions & 3 deletions add_test.go
Expand Up @@ -3,6 +3,7 @@ package ipfscluster
// This files has tests for Add* using multiple cluster peers.

import (
"context"
"mime/multipart"
"testing"

Expand All @@ -11,6 +12,7 @@ import (
)

func TestAdd(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
sth := test.NewShardingTestHelper()
Expand All @@ -34,7 +36,7 @@ func TestAdd(t *testing.T) {
pinDelay()

f := func(t *testing.T, c *Cluster) {
pin := c.StatusLocal(ci)
pin := c.StatusLocal(ctx, ci)
if pin.Error != "" {
t.Error(pin.Error)
}
Expand All @@ -48,12 +50,13 @@ func TestAdd(t *testing.T) {
}

func TestAddPeerDown(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
sth := test.NewShardingTestHelper()
defer sth.Clean(t)

err := clusters[0].Shutdown()
err := clusters[0].Shutdown(ctx)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -82,7 +85,7 @@ func TestAddPeerDown(t *testing.T) {
return
}

pin := c.StatusLocal(ci)
pin := c.StatusLocal(ctx, ci)
if pin.Error != "" {
t.Error(pin.Error)
}
Expand Down
4 changes: 2 additions & 2 deletions adder/sharding/dag_service_test.go
Expand Up @@ -44,15 +44,15 @@ func (rpcs *testRPC) BlockAllocate(ctx context.Context, in api.PinSerial, out *[
return nil
}

func (rpcs *testRPC) PinGet(c cid.Cid) (api.Pin, error) {
func (rpcs *testRPC) PinGet(ctx context.Context, c cid.Cid) (api.Pin, error) {
pI, ok := rpcs.pins.Load(c.String())
if !ok {
return api.Pin{}, errors.New("not found")
}
return pI.(api.PinSerial).ToPin(), nil
}

func (rpcs *testRPC) BlockGet(c cid.Cid) ([]byte, error) {
func (rpcs *testRPC) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) {
bI, ok := rpcs.blocks.Load(c.String())
if !ok {
return nil, errors.New("not found")
Expand Down
16 changes: 9 additions & 7 deletions adder/sharding/verify.go
@@ -1,6 +1,7 @@
package sharding

import (
"context"
"fmt"
"testing"

Expand All @@ -12,20 +13,21 @@ import (
// MockPinStore is used in VerifyShards
type MockPinStore interface {
// Gets a pin
PinGet(cid.Cid) (api.Pin, error)
PinGet(context.Context, cid.Cid) (api.Pin, error)
}

// MockBlockStore is used in VerifyShards
type MockBlockStore interface {
// Gets a block
BlockGet(cid.Cid) ([]byte, error)
BlockGet(context.Context, cid.Cid) ([]byte, error)
}

// VerifyShards checks that a sharded CID has been correctly formed and stored.
// This is a helper function for testing. It returns a map with all the blocks
// from all shards.
func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlockStore, expectedShards int) (map[string]struct{}, error) {
metaPin, err := pins.PinGet(rootCid)
ctx := context.Background()
metaPin, err := pins.PinGet(ctx, rootCid)
if err != nil {
return nil, fmt.Errorf("meta pin was not pinned: %s", err)
}
Expand All @@ -34,7 +36,7 @@ func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlo
return nil, fmt.Errorf("bad MetaPin type")
}

clusterPin, err := pins.PinGet(metaPin.Reference)
clusterPin, err := pins.PinGet(ctx, metaPin.Reference)
if err != nil {
return nil, fmt.Errorf("cluster pin was not pinned: %s", err)
}
Expand All @@ -46,7 +48,7 @@ func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlo
return nil, fmt.Errorf("clusterDAG should reference the MetaPin")
}

clusterDAGBlock, err := ipfs.BlockGet(clusterPin.Cid)
clusterDAGBlock, err := ipfs.BlockGet(ctx, clusterPin.Cid)
if err != nil {
return nil, fmt.Errorf("cluster pin was not stored: %s", err)
}
Expand All @@ -70,7 +72,7 @@ func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlo
return nil, err
}

shardPin, err := pins.PinGet(sh.Cid)
shardPin, err := pins.PinGet(ctx, sh.Cid)
if err != nil {
return nil, fmt.Errorf("shard was not pinned: %s %s", sh.Cid, err)
}
Expand All @@ -80,7 +82,7 @@ func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlo
}
ref = shardPin.Cid

shardBlock, err := ipfs.BlockGet(shardPin.Cid)
shardBlock, err := ipfs.BlockGet(ctx, shardPin.Cid)
if err != nil {
return nil, fmt.Errorf("shard block was not stored: %s", err)
}
Expand Down
16 changes: 13 additions & 3 deletions allocate.go
@@ -1,11 +1,13 @@
package ipfscluster

import (
"context"
"errors"
"fmt"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"
"go.opencensus.io/trace"

"github.com/ipfs/ipfs-cluster/api"
)
Expand Down Expand Up @@ -44,7 +46,10 @@ import (
// into account if the given CID was previously in a "pin everywhere" mode,
// and will consider such Pins as currently unallocated ones, providing
// new allocations as available.
func (c *Cluster) allocate(hash cid.Cid, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) {
func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, rplMin, rplMax int, blacklist []peer.ID, prioritylist []peer.ID) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "cluster/allocate")
defer span.End()

if (rplMin + rplMax) == 0 {
return nil, fmt.Errorf("bad replication factors: %d/%d", rplMin, rplMax)
}
Expand All @@ -54,9 +59,9 @@ func (c *Cluster) allocate(hash cid.Cid, rplMin, rplMax int, blacklist []peer.ID
}

// Figure out who is holding the CID
currentPin, _ := c.PinGet(hash)
currentPin, _ := c.PinGet(ctx, hash)
currentAllocs := currentPin.Allocations
metrics := c.monitor.LatestMetrics(c.informer.Name())
metrics := c.monitor.LatestMetrics(ctx, c.informer.Name())

currentMetrics := make(map[peer.ID]api.Metric)
candidatesMetrics := make(map[peer.ID]api.Metric)
Expand All @@ -80,6 +85,7 @@ func (c *Cluster) allocate(hash cid.Cid, rplMin, rplMax int, blacklist []peer.ID
}

newAllocs, err := c.obtainAllocations(
ctx,
hash,
rplMin,
rplMax,
Expand Down Expand Up @@ -114,12 +120,15 @@ func allocationError(hash cid.Cid, needed, wanted int, candidatesValid []peer.ID
}

func (c *Cluster) obtainAllocations(
ctx context.Context,
hash cid.Cid,
rplMin, rplMax int,
currentValidMetrics map[peer.ID]api.Metric,
candidatesMetrics map[peer.ID]api.Metric,
priorityMetrics map[peer.ID]api.Metric,
) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "cluster/obtainAllocations")
defer span.End()

// The list of peers in current
validAllocations := make([]peer.ID, 0, len(currentValidMetrics))
Expand Down Expand Up @@ -167,6 +176,7 @@ func (c *Cluster) obtainAllocations(

// the allocator returns a list of peers ordered by priority
finalAllocs, err := c.allocator.Allocate(
ctx,
hash,
currentValidMetrics,
candidatesMetrics,
Expand Down
11 changes: 8 additions & 3 deletions allocator/ascendalloc/ascendalloc.go
Expand Up @@ -5,6 +5,8 @@
package ascendalloc

import (
"context"

"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"

Expand All @@ -28,14 +30,17 @@ func NewAllocator() AscendAllocator {
func (alloc AscendAllocator) SetClient(c *rpc.Client) {}

// Shutdown does nothing in this allocator
func (alloc AscendAllocator) Shutdown() error { return nil }
func (alloc AscendAllocator) Shutdown(_ context.Context) error { return nil }

// Allocate returns where to allocate a pin request based on metrics which
// carry a numeric value such as "used disk". We do not pay attention to
// the metrics of the currently allocated peers and we just sort the
// candidates based on their metric values (smallest to largest).
func (alloc AscendAllocator) Allocate(c cid.Cid, current,
candidates, priority map[peer.ID]api.Metric) ([]peer.ID, error) {
func (alloc AscendAllocator) Allocate(
ctx context.Context,
c cid.Cid,
current, candidates, priority map[peer.ID]api.Metric,
) ([]peer.ID, error) {
// sort our metrics
first := util.SortNumeric(priority, false)
last := util.SortNumeric(candidates, false)
Expand Down
4 changes: 3 additions & 1 deletion allocator/ascendalloc/ascendalloc_test.go
@@ -1,6 +1,7 @@
package ascendalloc

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -96,10 +97,11 @@ var testCases = []testcase{
}

func Test(t *testing.T) {
ctx := context.Background()
alloc := &AscendAllocator{}
for i, tc := range testCases {
t.Logf("Test case %d", i)
res, err := alloc.Allocate(testCid, tc.current, tc.candidates, nil)
res, err := alloc.Allocate(ctx, testCid, tc.current, tc.candidates, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 4 additions & 2 deletions allocator/descendalloc/descendalloc.go
Expand Up @@ -5,6 +5,8 @@
package descendalloc

import (
"context"

"github.com/ipfs/ipfs-cluster/allocator/util"
"github.com/ipfs/ipfs-cluster/api"

Expand All @@ -28,13 +30,13 @@ func NewAllocator() DescendAllocator {
func (alloc DescendAllocator) SetClient(c *rpc.Client) {}

// Shutdown does nothing in this allocator
func (alloc DescendAllocator) Shutdown() error { return nil }
func (alloc DescendAllocator) Shutdown(_ context.Context) error { return nil }

// Allocate returns where to allocate a pin request based on metrics which
// carry a numeric value such as "used disk". We do not pay attention to
// the metrics of the currently allocated peers and we just sort the
// candidates based on their metric values (largest to smallest).
func (alloc DescendAllocator) Allocate(c cid.Cid, current, candidates, priority map[peer.ID]api.Metric) ([]peer.ID, error) {
func (alloc DescendAllocator) Allocate(ctx context.Context, c cid.Cid, current, candidates, priority map[peer.ID]api.Metric) ([]peer.ID, error) {
// sort our metrics
first := util.SortNumeric(priority, true)
last := util.SortNumeric(candidates, true)
Expand Down
4 changes: 3 additions & 1 deletion allocator/descendalloc/descendalloc_test.go
@@ -1,6 +1,7 @@
package descendalloc

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -96,10 +97,11 @@ var testCases = []testcase{
}

func Test(t *testing.T) {
ctx := context.Background()
alloc := &DescendAllocator{}
for i, tc := range testCases {
t.Logf("Test case %d", i)
res, err := alloc.Allocate(testCid, tc.current, tc.candidates, nil)
res, err := alloc.Allocate(ctx, testCid, tc.current, tc.candidates, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 3 additions & 0 deletions api/ipfsproxy/config.go
Expand Up @@ -74,6 +74,9 @@ type Config struct {
// Establishes how long we should remember extracted headers before we
// refresh them with a new request. 0 means always.
ExtractHeadersTTL time.Duration

// Tracing flag used to skip tracing specific paths when not enabled.
Tracing bool
}

type jsonConfig struct {
Expand Down
23 changes: 21 additions & 2 deletions api/ipfsproxy/ipfsproxy.go
Expand Up @@ -13,6 +13,10 @@ import (
"sync"
"time"

"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
"go.opencensus.io/trace"

"github.com/ipfs/ipfs-cluster/adder/adderutils"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/rpcutil"
Expand Down Expand Up @@ -127,13 +131,28 @@ func New(cfg *Config) (*Server, error) {
return nil, err
}

var handler http.Handler
router := mux.NewRouter()
handler = router

if cfg.Tracing {
handler = &ochttp.Handler{
IsPublicEndpoint: true,
Propagation: &tracecontext.HTTPFormat{},
Handler: router,
StartOptions: trace.StartOptions{SpanKind: trace.SpanKindServer},
FormatSpanName: func(req *http.Request) string {
return "proxy:" + req.Host + ":" + req.URL.Path + ":" + req.Method
},
}
}

s := &http.Server{
ReadTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
ReadHeaderTimeout: cfg.ReadHeaderTimeout,
IdleTimeout: cfg.IdleTimeout,
Handler: router,
Handler: handler,
}

// See: https://github.com/ipfs/go-ipfs/issues/5168
Expand Down Expand Up @@ -216,7 +235,7 @@ func (proxy *Server) SetClient(c *rpc.Client) {

// Shutdown stops any listeners and stops the component from taking
// any requests.
func (proxy *Server) Shutdown() error {
func (proxy *Server) Shutdown(ctx context.Context) error {
proxy.shutdownLock.Lock()
defer proxy.shutdownLock.Unlock()

Expand Down

0 comments on commit bfdf575

Please sign in to comment.