Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove *Serial types. Use pointers for all types. #688

Merged
merged 8 commits into from
Mar 1, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func TestAddPeerDown(t *testing.T) {
defer shutdownClusters(t, clusters, mock)
sth := test.NewShardingTestHelper()
defer sth.Clean(t)

err := clusters[0].Shutdown(ctx)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -84,7 +83,6 @@ func TestAddPeerDown(t *testing.T) {
if c.id == clusters[0].id {
return
}

pin := c.StatusLocal(ctx, ci)
if pin.Error != "" {
t.Error(pin.Error)
Expand Down
2 changes: 1 addition & 1 deletion adder/adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (cid.Cid
// FromFiles adds content from a files.Directory. The adder will no longer
// be usable after calling this method.
func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, error) {
logger.Debugf("adding from files")
logger.Debug("adding from files")
a.setContext(ctx)

if a.ctx.Err() != nil { // don't allow running twice
Expand Down
4 changes: 2 additions & 2 deletions adder/local/dag_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
return err
}
nodeSerial := &api.NodeWithMeta{
Cid: node.Cid().String(),
Cid: node.Cid(),
Data: node.RawData(),
CumSize: size,
}
Expand All @@ -78,7 +78,7 @@ func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, err
"",
"Cluster",
"Pin",
rootPin.ToSerial(),
rootPin,
&struct{}{},
)
}
Expand Down
16 changes: 10 additions & 6 deletions adder/local/dag_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"testing"

peer "github.com/libp2p/go-libp2p-peer"

adder "github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
Expand All @@ -19,21 +21,23 @@ type testRPC struct {
pins sync.Map
}

func (rpcs *testRPC) IPFSBlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error {
rpcs.blocks.Store(in.Cid, in)
func (rpcs *testRPC) IPFSBlockPut(ctx context.Context, in *api.NodeWithMeta, out *struct{}) error {
rpcs.blocks.Store(in.Cid.String(), in)
return nil
}

func (rpcs *testRPC) Pin(ctx context.Context, in api.PinSerial, out *struct{}) error {
rpcs.pins.Store(in.Cid, in)
func (rpcs *testRPC) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
rpcs.pins.Store(in.Cid.String(), in)
return nil
}

func (rpcs *testRPC) BlockAllocate(ctx context.Context, in api.PinSerial, out *[]string) error {
func (rpcs *testRPC) BlockAllocate(ctx context.Context, in *api.Pin, out *[]peer.ID) error {
if in.ReplicationFactorMin > 1 {
return errors.New("we can only replicate to 1 peer")
}
*out = []string{""}
// it does not matter since we use host == nil for RPC, so it uses the
// local one in all cases.
*out = []peer.ID{test.PeerID1}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions adder/sharding/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ func makeDAG(ctx context.Context, dagObj map[string]cid.Cid) ([]ipld.Node, error
func putDAG(ctx context.Context, rpcC *rpc.Client, nodes []ipld.Node, dests []peer.ID) error {
for _, n := range nodes {
//logger.Debugf("The dag cbor Node Links: %+v", n.Links())
b := api.NodeWithMeta{
Cid: n.Cid().String(), // Tests depend on this.
b := &api.NodeWithMeta{
Cid: n.Cid(), // Tests depend on this.
Data: n.RawData(),
Format: "cbor",
}
Expand Down
15 changes: 5 additions & 10 deletions adder/sharding/dag_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
return err
}
nodeSerial := &api.NodeWithMeta{
Cid: node.Cid().String(),
Cid: node.Cid(),
Data: node.RawData(),
CumSize: size,
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid,
clusterDAGPin.MaxDepth = 0 // pin direct
clusterDAGPin.Name = fmt.Sprintf("%s-clusterDAG", dgs.pinOpts.Name)
clusterDAGPin.Type = api.ClusterDAGType
clusterDAGPin.Reference = dataRoot
clusterDAGPin.Reference = &dataRoot
err = adder.Pin(ctx, dgs.rpcClient, clusterDAGPin)
if err != nil {
return dataRoot, err
Expand All @@ -131,7 +131,7 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid,
// Pin the META pin
metaPin := api.PinWithOpts(dataRoot, dgs.pinOpts)
metaPin.Type = api.MetaType
metaPin.Reference = clusterDAG
metaPin.Reference = &clusterDAG
metaPin.MaxDepth = 0 // irrelevant. Meta-pins are not pinned
err = adder.Pin(ctx, dgs.rpcClient, metaPin)
if err != nil {
Expand Down Expand Up @@ -180,14 +180,9 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n *api.NodeWithMeta) err

logger.Debugf("ingesting block %s in shard %d (%s)", n.Cid, len(dgs.shards), dgs.pinOpts.Name)

c, err := cid.Decode(n.Cid)
if err != nil {
return err
}

// add the block to it if it fits and return
if shard.Size()+n.Size() < shard.Limit() {
shard.AddLink(ctx, c, n.Size())
shard.AddLink(ctx, n.Cid, n.Size())
return adder.PutBlock(ctx, dgs.rpcClient, n, shard.Allocations())
}

Expand All @@ -207,7 +202,7 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n *api.NodeWithMeta) err
return errors.New("block doesn't fit in empty shard: shard size too small?")
}

_, err = dgs.flushCurrentShard(ctx)
_, err := dgs.flushCurrentShard(ctx)
if err != nil {
return err
}
Expand Down
23 changes: 13 additions & 10 deletions adder/sharding/dag_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
adder "github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
peer "github.com/libp2p/go-libp2p-peer"

cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
Expand All @@ -26,30 +27,32 @@ type testRPC struct {
pins sync.Map
}

func (rpcs *testRPC) IPFSBlockPut(ctx context.Context, in api.NodeWithMeta, out *struct{}) error {
rpcs.blocks.Store(in.Cid, in.Data)
func (rpcs *testRPC) IPFSBlockPut(ctx context.Context, in *api.NodeWithMeta, out *struct{}) error {
rpcs.blocks.Store(in.Cid.String(), in.Data)
return nil
}

func (rpcs *testRPC) Pin(ctx context.Context, in api.PinSerial, out *struct{}) error {
rpcs.pins.Store(in.Cid, in)
func (rpcs *testRPC) Pin(ctx context.Context, in *api.Pin, out *struct{}) error {
rpcs.pins.Store(in.Cid.String(), in)
return nil
}

func (rpcs *testRPC) BlockAllocate(ctx context.Context, in api.PinSerial, out *[]string) error {
func (rpcs *testRPC) BlockAllocate(ctx context.Context, in *api.Pin, out *[]peer.ID) error {
if in.ReplicationFactorMin > 1 {
return errors.New("we can only replicate to 1 peer")
}
*out = []string{""}
// it does not matter since we use host == nil for RPC, so it uses the
// local one in all cases
*out = []peer.ID{test.PeerID1}
return nil
}

func (rpcs *testRPC) PinGet(ctx context.Context, c cid.Cid) (api.Pin, error) {
func (rpcs *testRPC) PinGet(ctx context.Context, c cid.Cid) (*api.Pin, error) {
pI, ok := rpcs.pins.Load(c.String())
if !ok {
return api.Pin{}, errors.New("not found")
return nil, errors.New("not found")
}
return pI.(api.PinSerial).ToPin(), nil
return pI.(*api.Pin), nil
}

func (rpcs *testRPC) BlockGet(ctx context.Context, c cid.Cid) ([]byte, error) {
Expand Down Expand Up @@ -110,7 +113,7 @@ func TestFromMultipart(t *testing.T) {

// Print all pins
// rpcObj.pins.Range(func(k, v interface{}) bool {
// p := v.(api.PinSerial)
// p := v.(*api.Pin)
// j, _ := config.DefaultJSONMarshal(p)
// fmt.Printf("%s", j)
// return true
Expand Down
2 changes: 1 addition & 1 deletion adder/sharding/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid,
// this sets allocations as priority allocation
pin.Allocations = sh.allocations
pin.Type = api.ShardType
pin.Reference = prev
pin.Reference = &prev
pin.MaxDepth = 1
pin.ShardSize = sh.Size() // use current size, not the limit
if len(nodes) > len(sh.dagNode)+1 { // using an indirect graph
Expand Down
9 changes: 7 additions & 2 deletions adder/sharding/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sharding

import (
"context"
"errors"
"fmt"
"testing"

Expand All @@ -13,7 +14,7 @@ import (
// MockPinStore is used in VerifyShards
type MockPinStore interface {
// Gets a pin
PinGet(context.Context, cid.Cid) (api.Pin, error)
PinGet(context.Context, cid.Cid) (*api.Pin, error)
}

// MockBlockStore is used in VerifyShards
Expand All @@ -36,7 +37,11 @@ func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlo
return nil, fmt.Errorf("bad MetaPin type")
}

clusterPin, err := pins.PinGet(ctx, metaPin.Reference)
if metaPin.Reference == nil {
return nil, errors.New("metaPin.Reference is unset")
}

clusterPin, err := pins.PinGet(ctx, *metaPin.Reference)
if err != nil {
return nil, fmt.Errorf("cluster pin was not pinned: %s", err)
}
Expand Down
21 changes: 8 additions & 13 deletions adder/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,12 @@ import (

// PutBlock sends a NodeWithMeta to the given destinations.
func PutBlock(ctx context.Context, rpc *rpc.Client, n *api.NodeWithMeta, dests []peer.ID) error {
c, err := cid.Decode(n.Cid)
if err != nil {
return err
}

format, ok := cid.CodecToStr[c.Type()]
format, ok := cid.CodecToStr[n.Cid.Type()]
if !ok {
format = ""
logger.Warning("unsupported cid type, treating as v0")
}
if c.Prefix().Version == 0 {
if n.Cid.Prefix().Version == 0 {
format = "v0"
}
n.Format = format
Expand All @@ -40,7 +35,7 @@ func PutBlock(ctx context.Context, rpc *rpc.Client, n *api.NodeWithMeta, dests [
dests,
"Cluster",
"IPFSBlockPut",
*n,
n,
rpcutil.RPCDiscardReplies(len(dests)),
)
return rpcutil.CheckErrs(errs)
Expand All @@ -49,20 +44,20 @@ func PutBlock(ctx context.Context, rpc *rpc.Client, n *api.NodeWithMeta, dests [
// BlockAllocate helps allocating blocks to peers.
func BlockAllocate(ctx context.Context, rpc *rpc.Client, pinOpts api.PinOptions) ([]peer.ID, error) {
// Find where to allocate this file
var allocsStr []string
var allocsStr []peer.ID
err := rpc.CallContext(
ctx,
"",
"Cluster",
"BlockAllocate",
api.PinWithOpts(cid.Undef, pinOpts).ToSerial(),
api.PinWithOpts(cid.Undef, pinOpts),
&allocsStr,
)
return api.StringsToPeers(allocsStr), err
return allocsStr, err
}

// Pin helps sending local RPC pin requests.
func Pin(ctx context.Context, rpc *rpc.Client, pin api.Pin) error {
func Pin(ctx context.Context, rpc *rpc.Client, pin *api.Pin) error {
if pin.ReplicationFactorMin < 0 {
pin.Allocations = []peer.ID{}
}
Expand All @@ -72,7 +67,7 @@ func Pin(ctx context.Context, rpc *rpc.Client, pin api.Pin) error {
"", // use ourself to pin
"Cluster",
"Pin",
pin.ToSerial(),
pin,
&struct{}{},
)
}
Expand Down
20 changes: 12 additions & 8 deletions allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p-peer"

"go.opencensus.io/trace"

"github.com/ipfs/ipfs-cluster/api"
Expand Down Expand Up @@ -59,13 +60,16 @@ func (c *Cluster) allocate(ctx context.Context, hash cid.Cid, rplMin, rplMax int
}

// Figure out who is holding the CID
currentPin, _ := c.PinGet(ctx, hash)
currentAllocs := currentPin.Allocations
var currentAllocs []peer.ID
currentPin, err := c.PinGet(ctx, hash)
if err == nil {
currentAllocs = currentPin.Allocations
}
metrics := c.monitor.LatestMetrics(ctx, c.informer.Name())

currentMetrics := make(map[peer.ID]api.Metric)
candidatesMetrics := make(map[peer.ID]api.Metric)
priorityMetrics := make(map[peer.ID]api.Metric)
currentMetrics := make(map[peer.ID]*api.Metric)
candidatesMetrics := make(map[peer.ID]*api.Metric)
priorityMetrics := make(map[peer.ID]*api.Metric)

// Divide metrics between current and candidates.
// All metrics in metrics are valid (at least the
Expand Down Expand Up @@ -123,9 +127,9 @@ func (c *Cluster) obtainAllocations(
ctx context.Context,
hash cid.Cid,
rplMin, rplMax int,
currentValidMetrics map[peer.ID]api.Metric,
candidatesMetrics map[peer.ID]api.Metric,
priorityMetrics map[peer.ID]api.Metric,
currentValidMetrics map[peer.ID]*api.Metric,
candidatesMetrics map[peer.ID]*api.Metric,
priorityMetrics map[peer.ID]*api.Metric,
) ([]peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "cluster/obtainAllocations")
defer span.End()
Expand Down
2 changes: 1 addition & 1 deletion allocator/ascendalloc/ascendalloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (alloc AscendAllocator) Shutdown(_ context.Context) error { return nil }
func (alloc AscendAllocator) Allocate(
ctx context.Context,
c cid.Cid,
current, candidates, priority map[peer.ID]api.Metric,
current, candidates, priority map[peer.ID]*api.Metric,
) ([]peer.ID, error) {
// sort our metrics
first := util.SortNumeric(priority, false)
Expand Down
Loading