Skip to content

Commit

Permalink
pin: Rewrite to store pins in IPFS objects
Browse files Browse the repository at this point in the history
WARNING: No migration performed! That needs to come in a separate
commit, perhaps amended into this one.

This is the minimal rewrite, only changing the storage from
JSON(+extra keys) in Datastore to IPFS objects. All of the pinning
state is still loaded in memory, and written from scratch on Flush. To
do more would require API changes, e.g. adding error returns.

Set/Multiset is not cleanly separated into a library, yet, as it's API
is expected to change radically.
  • Loading branch information
tv42 committed May 11, 2015
1 parent 11ada40 commit 1673b9c
Show file tree
Hide file tree
Showing 7 changed files with 497 additions and 78 deletions.
35 changes: 1 addition & 34 deletions pin/indirect.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package pin

import (
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
"github.com/ipfs/go-ipfs/util"
)
import "github.com/ipfs/go-ipfs/util"

type indirectPin struct {
refCounts map[util.Key]uint64
Expand All @@ -15,36 +12,6 @@ func newIndirectPin() *indirectPin {
}
}

func loadIndirPin(d ds.Datastore, k ds.Key) (*indirectPin, error) {
var rcStore map[string]uint64
err := loadSet(d, k, &rcStore)
if err != nil {
return nil, err
}

refcnt := make(map[util.Key]uint64)
var keys []util.Key
for encK, v := range rcStore {
if v > 0 {
k := util.B58KeyDecode(encK)
keys = append(keys, k)
refcnt[k] = v
}
}
// log.Debugf("indirPin keys: %#v", keys)

return &indirectPin{refCounts: refcnt}, nil
}

func storeIndirPin(d ds.Datastore, k ds.Key, p *indirectPin) error {

rcStore := map[string]uint64{}
for k, v := range p.refCounts {
rcStore[util.B58KeyEncode(k)] = v
}
return storeSet(d, k, rcStore)
}

func (i *indirectPin) Increment(k util.Key) {
i.refCounts[k]++
}
Expand Down
6 changes: 6 additions & 0 deletions pin/internal/pb/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package pb

//go:generate protoc --gogo_out=. header.proto

// kludge to get vendoring right in protobuf output
//go:generate sed -i s,github.com/,github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/,g header.pb.go
59 changes: 59 additions & 0 deletions pin/internal/pb/header.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions pin/internal/pb/header.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto2";

package ipfs.pin;

option go_package = "pb";

message Set {
// 1 for now, library will refuse to handle entries with an unrecognized version.
optional uint32 version = 1;
// how many of the links are subtrees
optional uint32 fanout = 2;
// hash seed for subtree selection, a random number
optional fixed32 seed = 3;
}
131 changes: 88 additions & 43 deletions pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
package pin

import (
"encoding/json"
"errors"
"fmt"
"sync"

Expand All @@ -16,9 +14,16 @@ import (
)

var log = util.Logger("pin")
var recursePinDatastoreKey = ds.NewKey("/local/pins/recursive/keys")
var directPinDatastoreKey = ds.NewKey("/local/pins/direct/keys")
var indirectPinDatastoreKey = ds.NewKey("/local/pins/indirect/keys")

var pinDatastoreKey = ds.NewKey("/local/pins")

var emptyKey = util.B58KeyDecode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n")

const (
linkDirect = "direct"
linkRecursive = "recursive"
linkIndirect = "indirect"
)

type PinMode int

Expand Down Expand Up @@ -55,8 +60,11 @@ type pinner struct {
recursePin set.BlockSet
directPin set.BlockSet
indirPin *indirectPin
dserv mdag.DAGService
dstore ds.ThreadSafeDatastore
// Track the keys used for storing the pinning state, so gc does
// not delete them.
internalPin map[util.Key]struct{}
dserv mdag.DAGService
dstore ds.ThreadSafeDatastore
}

// NewPinner creates a new pinner using the given datastore as a backend
Expand Down Expand Up @@ -188,13 +196,19 @@ func (p *pinner) pinLinks(ctx context.Context, node *mdag.Node) error {
return nil
}

func (p *pinner) isInternalPin(key util.Key) bool {
_, ok := p.internalPin[key]
return ok
}

// IsPinned returns whether or not the given key is pinned
func (p *pinner) IsPinned(key util.Key) bool {
p.lock.RLock()
defer p.lock.RUnlock()
return p.recursePin.HasKey(key) ||
p.directPin.HasKey(key) ||
p.indirPin.HasKey(key)
p.indirPin.HasKey(key) ||
p.isInternalPin(key)
}

func (p *pinner) RemovePinWithMode(key util.Key, mode PinMode) {
Expand All @@ -217,30 +231,51 @@ func (p *pinner) RemovePinWithMode(key util.Key, mode PinMode) {
func LoadPinner(d ds.ThreadSafeDatastore, dserv mdag.DAGService) (Pinner, error) {
p := new(pinner)

rootKeyI, err := d.Get(pinDatastoreKey)
if err != nil {
return nil, fmt.Errorf("cannot load pin state: %v", err)
}
rootKey := util.Key(rootKeyI.([]byte))

ctx := context.TODO()
root, err := dserv.Get(ctx, rootKey)
if err != nil {
return nil, fmt.Errorf("cannot find pinning root object: %v", err)
}

internalPin := map[util.Key]struct{}{
rootKey: struct{}{},
}
recordInternal := func(k util.Key) {
internalPin[k] = struct{}{}
}

{ // load recursive set
var recurseKeys []util.Key
if err := loadSet(d, recursePinDatastoreKey, &recurseKeys); err != nil {
return nil, err
recurseKeys, err := loadSet(ctx, dserv, root, linkRecursive, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load recursive pins: %v", err)
}
p.recursePin = set.SimpleSetFromKeys(recurseKeys)
}

{ // load direct set
var directKeys []util.Key
if err := loadSet(d, directPinDatastoreKey, &directKeys); err != nil {
return nil, err
directKeys, err := loadSet(ctx, dserv, root, linkDirect, recordInternal)
if err != nil {
return nil, fmt.Errorf("cannot load direct pins: %v", err)
}
p.directPin = set.SimpleSetFromKeys(directKeys)
}

{ // load indirect set
var err error
p.indirPin, err = loadIndirPin(d, indirectPinDatastoreKey)
refcnt, err := loadMultiset(ctx, dserv, root, linkIndirect, recordInternal)
if err != nil {
return nil, err
return nil, fmt.Errorf("cannot load indirect pins: %v", err)
}
p.indirPin = &indirectPin{refCounts: refcnt}
}

p.internalPin = internalPin

// assign services
p.dserv = dserv
p.dstore = d
Expand Down Expand Up @@ -268,44 +303,54 @@ func (p *pinner) Flush() error {
p.lock.Lock()
defer p.lock.Unlock()

err := storeSet(p.dstore, directPinDatastoreKey, p.directPin.GetKeys())
if err != nil {
return err
}
ctx := context.TODO()

err = storeSet(p.dstore, recursePinDatastoreKey, p.recursePin.GetKeys())
if err != nil {
return err
internalPin := make(map[util.Key]struct{})
recordInternal := func(k util.Key) {
internalPin[k] = struct{}{}
}

err = storeIndirPin(p.dstore, indirectPinDatastoreKey, p.indirPin)
if err != nil {
return err
root := &mdag.Node{}
{
n, err := storeSet(ctx, p.dserv, p.directPin.GetKeys(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkDirect, n); err != nil {
return err
}
}
return nil
}

// helpers to marshal / unmarshal a pin set
func storeSet(d ds.Datastore, k ds.Key, val interface{}) error {
buf, err := json.Marshal(val)
if err != nil {
return err
{
n, err := storeSet(ctx, p.dserv, p.recursePin.GetKeys(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkRecursive, n); err != nil {
return err
}
}

return d.Put(k, buf)
}
{
n, err := storeMultiset(ctx, p.dserv, p.indirPin.GetRefs(), recordInternal)
if err != nil {
return err
}
if err := root.AddNodeLink(linkIndirect, n); err != nil {
return err
}
}

func loadSet(d ds.Datastore, k ds.Key, val interface{}) error {
buf, err := d.Get(k)
k, err := p.dserv.Add(root)
if err != nil {
return err
}

bf, ok := buf.([]byte)
if !ok {
return errors.New("invalid pin set value in datastore")
internalPin[k] = struct{}{}
if err := p.dstore.Put(pinDatastoreKey, []byte(k)); err != nil {
return fmt.Errorf("cannot store pin state: %v", err)
}
return json.Unmarshal(bf, val)
p.internalPin = internalPin
return nil
}

// PinWithMode allows the user to have fine grained control over pin
Expand Down

0 comments on commit 1673b9c

Please sign in to comment.