Skip to content

Commit

Permalink
Merge pull request #544 from ipfs/fix/update-go-cid
Browse files Browse the repository at this point in the history
update go-cid and go-libp2p
  • Loading branch information
hsanjuan committed Sep 24, 2018
2 parents e751fbe + 31474f6 commit 275a8b2
Show file tree
Hide file tree
Showing 35 changed files with 197 additions and 192 deletions.
22 changes: 11 additions & 11 deletions adder/adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type ClusterDAGService interface {
ipld.DAGService
// Finalize receives the IPFS content root CID as
// returned by the ipfs adder.
Finalize(ctx context.Context, ipfsRoot *cid.Cid) (*cid.Cid, error)
Finalize(ctx context.Context, ipfsRoot cid.Cid) (cid.Cid, error)
}

// Adder is used to add content to IPFS Cluster using an implementation of
Expand Down Expand Up @@ -81,7 +81,7 @@ func (a *Adder) setContext(ctx context.Context) {

// FromMultipart adds content from a multipart.Reader. The adder will
// no longer be usable after calling this method.
func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (*cid.Cid, error) {
func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (cid.Cid, error) {
logger.Debugf("adding from multipart with params: %+v", a.params)

f := &files.MultipartFile{
Expand All @@ -94,12 +94,12 @@ func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (*cid.Ci

// FromFiles adds content from a files.File. The adder will no longer
// be usable after calling this method.
func (a *Adder) FromFiles(ctx context.Context, f files.File) (*cid.Cid, error) {
func (a *Adder) FromFiles(ctx context.Context, f files.File) (cid.Cid, error) {
logger.Debugf("adding from files")
a.setContext(ctx)

if a.ctx.Err() != nil { // don't allow running twice
return nil, a.ctx.Err()
return cid.Undef, a.ctx.Err()
}

defer a.cancel()
Expand All @@ -108,7 +108,7 @@ func (a *Adder) FromFiles(ctx context.Context, f files.File) (*cid.Cid, error) {
ipfsAdder, err := ipfsadd.NewAdder(a.ctx, a.dgs)
if err != nil {
logger.Error(err)
return nil, err
return cid.Undef, err
}

ipfsAdder.Hidden = a.params.Hidden
Expand All @@ -122,12 +122,12 @@ func (a *Adder) FromFiles(ctx context.Context, f files.File) (*cid.Cid, error) {
// Set up prefix
prefix, err := merkledag.PrefixForCidVersion(a.params.CidVersion)
if err != nil {
return nil, fmt.Errorf("bad CID Version: %s", err)
return cid.Undef, fmt.Errorf("bad CID Version: %s", err)
}

hashFunCode, ok := multihash.Names[strings.ToLower(a.params.HashFun)]
if !ok {
return nil, fmt.Errorf("unrecognized hash function: %s", a.params.HashFun)
return cid.Undef, fmt.Errorf("unrecognized hash function: %s", a.params.HashFun)
}
prefix.MhType = hashFunCode
prefix.MhLength = -1
Expand All @@ -136,28 +136,28 @@ func (a *Adder) FromFiles(ctx context.Context, f files.File) (*cid.Cid, error) {
for {
select {
case <-a.ctx.Done():
return nil, a.ctx.Err()
return cid.Undef, a.ctx.Err()
default:
err := addFile(f, ipfsAdder)
if err == io.EOF {
goto FINALIZE
}
if err != nil {
logger.Error("error adding to cluster: ", err)
return nil, err
return cid.Undef, err
}
}
}

FINALIZE:
adderRoot, err := ipfsAdder.Finalize()
if err != nil {
return nil, err
return cid.Undef, err
}
clusterRoot, err := a.dgs.Finalize(a.ctx, adderRoot.Cid())
if err != nil {
logger.Error("error finalizing adder:", err)
return nil, err
return cid.Undef, err
}
logger.Infof("%s successfully added to cluster", clusterRoot)
return clusterRoot, nil
Expand Down
2 changes: 1 addition & 1 deletion adder/adder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (dag *mockCDAGServ) AddMany(ctx context.Context, nodes []ipld.Node) error {
return nil
}

func (dag *mockCDAGServ) Finalize(ctx context.Context, root *cid.Cid) (*cid.Cid, error) {
func (dag *mockCDAGServ) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) {
return root, nil
}

Expand Down
2 changes: 1 addition & 1 deletion adder/adderutils/adderutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func AddMultipartHTTPHandler(
params *api.AddParams,
reader *multipart.Reader,
w http.ResponseWriter,
) (*cid.Cid, error) {
) (cid.Cid, error) {
var dags adder.ClusterDAGService
output := make(chan *api.AddedOutput, 200)
flusher, flush := w.(http.Flusher)
Expand Down
2 changes: 1 addition & 1 deletion adder/ipfsadd/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Adder struct {
Chunker string
root ipld.Node
mroot *mfs.Root
tempRoot *cid.Cid
tempRoot cid.Cid
Prefix *cid.Prefix
CidBuilder cid.Builder
liveNodes uint64
Expand Down
2 changes: 1 addition & 1 deletion adder/local/dag_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
}

// Finalize pins the last Cid added to this DAGService.
func (dgs *DAGService) Finalize(ctx context.Context, root *cid.Cid) (*cid.Cid, error) {
func (dgs *DAGService) Finalize(ctx context.Context, root cid.Cid) (cid.Cid, error) {
// Cluster pin the result
rootPin := api.PinWithOpts(root, dgs.pinOpts)
rootPin.Allocations = dgs.dests
Expand Down
12 changes: 6 additions & 6 deletions adder/sharding/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func CborDataToNode(raw []byte, format string) (ipld.Node, error) {
return shardNode, nil
}

func makeDAGSimple(ctx context.Context, dagObj map[string]*cid.Cid) (ipld.Node, error) {
func makeDAGSimple(ctx context.Context, dagObj map[string]cid.Cid) (ipld.Node, error) {
node, err := cbor.WrapObject(
dagObj,
hashFn, mh.DefaultLengths[hashFn],
Expand All @@ -83,7 +83,7 @@ func makeDAGSimple(ctx context.Context, dagObj map[string]*cid.Cid) (ipld.Node,
// carry links to the data nodes being tracked. The head of the output slice
// is always the root of the shardDAG, i.e. the ipld node that should be
// recursively pinned to track the shard
func makeDAG(ctx context.Context, dagObj map[string]*cid.Cid) ([]ipld.Node, error) {
func makeDAG(ctx context.Context, dagObj map[string]cid.Cid) ([]ipld.Node, error) {
// FIXME: We have a 4MB limit on the block size enforced by bitswap:
// https://github.com/libp2p/go-libp2p-net/blob/master/interface.go#L20

Expand All @@ -93,11 +93,11 @@ func makeDAG(ctx context.Context, dagObj map[string]*cid.Cid) ([]ipld.Node, erro
return []ipld.Node{n}, err
}
// Indirect node required
leafNodes := make([]ipld.Node, 0) // shardNodes with links to data
indirectObj := make(map[string]*cid.Cid) // shardNode with links to shardNodes
leafNodes := make([]ipld.Node, 0) // shardNodes with links to data
indirectObj := make(map[string]cid.Cid) // shardNode with links to shardNodes
numFullLeaves := len(dagObj) / MaxLinks
for i := 0; i <= numFullLeaves; i++ {
leafObj := make(map[string]*cid.Cid)
leafObj := make(map[string]cid.Cid)
for j := 0; j < MaxLinks; j++ {
c, ok := dagObj[fmt.Sprintf("%d", i*MaxLinks+j)]
if !ok { // finished with this leaf before filling all the way
Expand Down Expand Up @@ -182,7 +182,7 @@ func indirectCount(linkNum int) uint64 {
if q == 0 { // no indirect node needed
return 0
}
dummyIndirect := make(map[string]*cid.Cid)
dummyIndirect := make(map[string]cid.Cid)
for key := 0; key <= q; key++ {
dummyIndirect[fmt.Sprintf("%d", key)] = nil
}
Expand Down
14 changes: 7 additions & 7 deletions adder/sharding/dag_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ type DAGService struct {
// Current shard being built
currentShard *shard
// Last flushed shard CID
previousShard *cid.Cid
previousShard cid.Cid

// shard tracking
shards map[string]*cid.Cid
shards map[string]cid.Cid

startTime time.Time
totalSize uint64
Expand All @@ -58,7 +58,7 @@ func New(rpc *rpc.Client, opts api.PinOptions, out chan<- *api.AddedOutput) *DAG
pinOpts: opts,
output: out,
addedSet: cid.NewSet(),
shards: make(map[string]*cid.Cid),
shards: make(map[string]cid.Cid),
startTime: time.Now(),
}
}
Expand Down Expand Up @@ -86,7 +86,7 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {

// Finalize finishes sharding, creates the cluster DAG and pins it along
// with the meta pin for the root node of the content.
func (dgs *DAGService) Finalize(ctx context.Context, dataRoot *cid.Cid) (*cid.Cid, error) {
func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid, error) {
lastCid, err := dgs.flushCurrentShard(ctx)
if err != nil {
return lastCid, err
Expand Down Expand Up @@ -214,7 +214,7 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n *api.NodeWithMeta) err
return dgs.ingestBlock(ctx, n) // <-- retry ingest
}

func (dgs *DAGService) logStats(metaPin, clusterDAGPin *cid.Cid) {
func (dgs *DAGService) logStats(metaPin, clusterDAGPin cid.Cid) {
duration := time.Since(dgs.startTime)
seconds := uint64(duration) / uint64(time.Second)
var rate string
Expand Down Expand Up @@ -252,10 +252,10 @@ func (dgs *DAGService) sendOutput(ao *api.AddedOutput) {
}

// flushes the dgs.currentShard and returns the LastLink()
func (dgs *DAGService) flushCurrentShard(ctx context.Context) (*cid.Cid, error) {
func (dgs *DAGService) flushCurrentShard(ctx context.Context) (cid.Cid, error) {
shard := dgs.currentShard
if shard == nil {
return nil, errors.New("cannot flush a nil shard")
return cid.Undef, errors.New("cannot flush a nil shard")
}

lens := len(dgs.shards)
Expand Down
4 changes: 2 additions & 2 deletions adder/sharding/dag_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ func (rpcs *testRPC) BlockAllocate(ctx context.Context, in api.PinSerial, out *[
return nil
}

func (rpcs *testRPC) PinGet(c *cid.Cid) (api.Pin, error) {
func (rpcs *testRPC) PinGet(c cid.Cid) (api.Pin, error) {
pI, ok := rpcs.pins.Load(c.String())
if !ok {
return api.Pin{}, errors.New("not found")
}
return pI.(api.PinSerial).ToPin(), nil
}

func (rpcs *testRPC) BlockGet(c *cid.Cid) ([]byte, error) {
func (rpcs *testRPC) BlockGet(c cid.Cid) ([]byte, error) {
bI, ok := rpcs.blocks.Load(c.String())
if !ok {
return nil, errors.New("not found")
Expand Down
14 changes: 7 additions & 7 deletions adder/sharding/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type shard struct {
pinOptions api.PinOptions
// dagNode represents a node with links and will be converted
// to Cbor.
dagNode map[string]*cid.Cid
dagNode map[string]cid.Cid
currentSize uint64
sizeLimit uint64
}
Expand All @@ -49,15 +49,15 @@ func newShard(ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard
rpc: rpc,
allocations: allocs,
pinOptions: opts,
dagNode: make(map[string]*cid.Cid),
dagNode: make(map[string]cid.Cid),
currentSize: 0,
sizeLimit: opts.ShardSize,
}, nil
}

// AddLink tries to add a new block to this shard if it's not full.
// Returns true if the block was added
func (sh *shard) AddLink(ctx context.Context, c *cid.Cid, s uint64) {
func (sh *shard) AddLink(ctx context.Context, c cid.Cid, s uint64) {
linkN := len(sh.dagNode)
linkName := fmt.Sprintf("%d", linkN)
logger.Debugf("shard: add link: %s", linkName)
Expand All @@ -74,16 +74,16 @@ func (sh *shard) Allocations() []peer.ID {
// Flush completes the allocation of this shard by building a CBOR node
// and adding it to IPFS, then pinning it in cluster. It returns the Cid of the
// shard.
func (sh *shard) Flush(ctx context.Context, shardN int, prev *cid.Cid) (*cid.Cid, error) {
func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid, error) {
logger.Debugf("shard %d: flush", shardN)
nodes, err := makeDAG(ctx, sh.dagNode)
if err != nil {
return nil, err
return cid.Undef, err
}

err = putDAG(ctx, sh.rpc, nodes, sh.allocations)
if err != nil {
return nil, err
return cid.Undef, err
}

rootCid := nodes[0].Cid()
Expand Down Expand Up @@ -123,7 +123,7 @@ func (sh *shard) Limit() uint64 {
// the last link of the last shard is the data root for the
// full sharded DAG (the CID that would have resulted from
// adding the content to a single IPFS daemon).
func (sh *shard) LastLink() *cid.Cid {
func (sh *shard) LastLink() cid.Cid {
l := len(sh.dagNode)
lastLink := fmt.Sprintf("%d", l-1)
return sh.dagNode[lastLink]
Expand Down
10 changes: 5 additions & 5 deletions adder/sharding/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@ import (
// MockPinStore is used in VerifyShards
type MockPinStore interface {
// Gets a pin
PinGet(*cid.Cid) (api.Pin, error)
PinGet(cid.Cid) (api.Pin, error)
}

// MockBlockStore is used in VerifyShards
type MockBlockStore interface {
// Gets a block
BlockGet(*cid.Cid) ([]byte, error)
BlockGet(cid.Cid) ([]byte, error)
}

// VerifyShards checks that a sharded CID has been correctly formed and stored.
// This is a helper function for testing. It returns a map with all the blocks
// from all shards.
func VerifyShards(t *testing.T, rootCid *cid.Cid, pins MockPinStore, ipfs MockBlockStore, expectedShards int) (map[string]struct{}, error) {
func VerifyShards(t *testing.T, rootCid cid.Cid, pins MockPinStore, ipfs MockBlockStore, expectedShards int) (map[string]struct{}, error) {
metaPin, err := pins.PinGet(rootCid)
if err != nil {
return nil, fmt.Errorf("meta pin was not pinned: %s", err)
Expand Down Expand Up @@ -62,7 +62,7 @@ func VerifyShards(t *testing.T, rootCid *cid.Cid, pins MockPinStore, ipfs MockBl
}

shardBlocks := make(map[string]struct{})
var ref *cid.Cid
var ref cid.Cid
// traverse shards in order
for i := 0; i < len(shards); i++ {
sh, _, err := clusterDAGNode.ResolveLink([]string{fmt.Sprintf("%d", i)})
Expand All @@ -75,7 +75,7 @@ func VerifyShards(t *testing.T, rootCid *cid.Cid, pins MockPinStore, ipfs MockBl
return nil, fmt.Errorf("shard was not pinned: %s %s", sh.Cid, err)
}

if ref != nil && !shardPin.Reference.Equals(ref) {
if ref != cid.Undef && !shardPin.Reference.Equals(ref) {
t.Errorf("Ref (%s) should point to previous shard (%s)", ref, shardPin.Reference)
}
ref = shardPin.Cid
Expand Down
10 changes: 5 additions & 5 deletions adder/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func BlockAllocate(ctx context.Context, rpc *rpc.Client, pinOpts api.PinOptions)
"",
"Cluster",
"BlockAllocate",
api.PinWithOpts(nil, pinOpts).ToSerial(),
api.PinWithOpts(cid.Undef, pinOpts).ToSerial(),
&allocsStr,
)
return api.StringsToPeers(allocsStr), err
Expand Down Expand Up @@ -87,24 +87,24 @@ type BaseDAGService struct {
}

// Get always returns errNotFound
func (dag BaseDAGService) Get(ctx context.Context, key *cid.Cid) (ipld.Node, error) {
func (dag BaseDAGService) Get(ctx context.Context, key cid.Cid) (ipld.Node, error) {
return nil, ErrDAGNotFound
}

// GetMany returns an output channel that always emits an error
func (dag BaseDAGService) GetMany(ctx context.Context, keys []*cid.Cid) <-chan *ipld.NodeOption {
func (dag BaseDAGService) GetMany(ctx context.Context, keys []cid.Cid) <-chan *ipld.NodeOption {
out := make(chan *ipld.NodeOption, 1)
out <- &ipld.NodeOption{Err: fmt.Errorf("failed to fetch all nodes")}
close(out)
return out
}

// Remove is a nop
func (dag BaseDAGService) Remove(ctx context.Context, key *cid.Cid) error {
func (dag BaseDAGService) Remove(ctx context.Context, key cid.Cid) error {
return nil
}

// RemoveMany is a nop
func (dag BaseDAGService) RemoveMany(ctx context.Context, keys []*cid.Cid) error {
func (dag BaseDAGService) RemoveMany(ctx context.Context, keys []cid.Cid) error {
return nil
}
Loading

0 comments on commit 275a8b2

Please sign in to comment.