Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: pintracker revamp #383

Merged
merged 14 commits into from
May 7, 2018
7 changes: 3 additions & 4 deletions allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ import (

// This file gathers allocation logic used when pinning or re-pinning
// to find which peers should be allocated to a Cid. Allocation is constrained
// by ReplicationFactorMin and ReplicationFactorMax parametres obtained
// by ReplicationFactorMin and ReplicationFactorMax parameters obtained
// from the Pin object.

//The allocation
// process has several steps:
// The allocation process has several steps:
//
// * Find which peers are pinning a CID
// * Obtain the last values for the configured informer metrics from the
Expand Down Expand Up @@ -164,7 +163,7 @@ func (c *Cluster) obtainAllocations(

// Reminder: rplMin <= rplMax AND >0

if wanted < 0 { // alocations above maximum threshold: drop some
if wanted < 0 { // allocations above maximum threshold: drop some
// This could be done more intelligently by dropping them
// according to the allocator order (i.e. free-ing peers
// with most used space first).
Expand Down
2 changes: 1 addition & 1 deletion allocator/descendalloc/descendalloc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package descendalloc implements an ipfscluster.util.Allocator returns
// Package descendalloc implements an ipfscluster.PinAllocator returns
// allocations based on sorting the metrics in descending order. Thus, peers
// with largest metrics are first in the list. This allocator can be used with a
// number of informers, as long as they provide a numeric metric value.
Expand Down
8 changes: 7 additions & 1 deletion api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ const (
TrackerStatusUnpinning
// The IPFS daemon is not pinning the item
TrackerStatusUnpinned
// The IPFS deamon is not pinning the item but it is being tracked
// The IPFS daemon is not pinning the item but it is being tracked
TrackerStatusRemote
// The item has been queued for pinning on the IPFS daemon
TrackerStatusPinQueued
// The item has been queued for unpinning on the IPFS daemon
TrackerStatusUnpinQueued
)

// TrackerStatus represents the status of a tracked Cid in the PinTracker
Expand All @@ -63,6 +67,8 @@ var trackerStatusString = map[TrackerStatus]string{
TrackerStatusUnpinning: "unpinning",
TrackerStatusUnpinned: "unpinned",
TrackerStatusRemote: "remote",
TrackerStatusPinQueued: "pin_queued",
TrackerStatusUnpinQueued: "unpin_queued",
}

// String converts a TrackerStatus into a readable string.
Expand Down
9 changes: 5 additions & 4 deletions cluster_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ipfscluster

import (
"context"
"errors"
"os"
"path/filepath"
Expand Down Expand Up @@ -51,28 +52,28 @@ func (ipfs *mockConnector) ID() (api.IPFSID, error) {
}, nil
}

func (ipfs *mockConnector) Pin(c *cid.Cid, b bool) error {
func (ipfs *mockConnector) Pin(ctx context.Context, c *cid.Cid, b bool) error {
if ipfs.returnError {
return errors.New("")
}
return nil
}

func (ipfs *mockConnector) Unpin(c *cid.Cid) error {
func (ipfs *mockConnector) Unpin(ctx context.Context, c *cid.Cid) error {
if ipfs.returnError {
return errors.New("")
}
return nil
}

func (ipfs *mockConnector) PinLsCid(c *cid.Cid) (api.IPFSPinStatus, error) {
func (ipfs *mockConnector) PinLsCid(ctx context.Context, c *cid.Cid) (api.IPFSPinStatus, error) {
if ipfs.returnError {
return api.IPFSPinStatusError, errors.New("")
}
return api.IPFSPinStatusRecursive, nil
}

func (ipfs *mockConnector) PinLs(filter string) (map[string]api.IPFSPinStatus, error) {
func (ipfs *mockConnector) PinLs(ctx context.Context, filter string) (map[string]api.IPFSPinStatus, error) {
if ipfs.returnError {
return nil, errors.New("")
}
Expand Down
6 changes: 3 additions & 3 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ var testingIpfsCfg = []byte(`{
"proxy_read_header_timeout": "10m0s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s",
"pin_method": "pin"
"pin_method": "pin",
"pin_timeout": "30s",
"unpin_timeout": "15s"
}`)

var testingTrackerCfg = []byte(`
{
"pinning_timeout": "30s",
"unpinning_timeout": "15s",
"max_pin_queue_size": 4092,
"concurrent_pins": 1
}
Expand Down
16 changes: 9 additions & 7 deletions ipfscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package ipfscluster

import (
"context"

"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/state"

Expand Down Expand Up @@ -70,10 +72,10 @@ type API interface {
type IPFSConnector interface {
Component
ID() (api.IPFSID, error)
Pin(*cid.Cid, bool) error
Unpin(*cid.Cid) error
PinLsCid(*cid.Cid) (api.IPFSPinStatus, error)
PinLs(typeFilter string) (map[string]api.IPFSPinStatus, error)
Pin(context.Context, *cid.Cid, bool) error
Unpin(context.Context, *cid.Cid) error
PinLsCid(context.Context, *cid.Cid) (api.IPFSPinStatus, error)
PinLs(ctx context.Context, typeFilter string) (map[string]api.IPFSPinStatus, error)
// ConnectSwarms make sure this peer's IPFS daemon is connected to
// other peers IPFS daemons.
ConnectSwarms() error
Expand Down Expand Up @@ -119,10 +121,10 @@ type PinTracker interface {
// Sync makes sure that the Cid status reflect the real IPFS status.
// It returns the local status of the Cid.
Sync(*cid.Cid) (api.PinInfo, error)
// Recover retriggers a Pin/Unpin operation in a Cids with error status.
Recover(*cid.Cid) (api.PinInfo, error)
// RecoverAll calls Recover() for all pins tracked.
RecoverAll() ([]api.PinInfo, error)
// Recover retriggers a Pin/Unpin operation in a Cids with error status.
Recover(*cid.Cid) (api.PinInfo, error)
}

// Informer provides Metric information from a peer. The metrics produced by
Expand All @@ -142,7 +144,7 @@ type Informer interface {
type PinAllocator interface {
Component
// Allocate returns the list of peers that should be assigned to
// Pin content in oder of preference (from the most preferred to the
// Pin content in order of preference (from the most preferred to the
// least). The "current" map contains valid metrics for peers
// which are currently pinning the content. The candidates map
// contains the metrics for all peers which are eligible for pinning
Expand Down
4 changes: 2 additions & 2 deletions ipfscluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func delay() {
}

func pinDelay() {
time.Sleep(400 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
}

func ttlDelay() {
Expand Down Expand Up @@ -451,7 +451,6 @@ func TestClustersPin(t *testing.T) {
}
delay()
delay()

funpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.StatusAll()
if l := len(status); l != 0 {
Expand Down Expand Up @@ -909,6 +908,7 @@ func TestClustersReplication(t *testing.T) {
}
if numLocal != nClusters-1 {
t.Errorf("Expected %d local pins but got %d", nClusters-1, numLocal)
t.Error(pinfos)
}

if numRemote != 1 {
Expand Down
55 changes: 46 additions & 9 deletions ipfsconn/ipfshttp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ const (
DefaultProxyWriteTimeout = 10 * time.Minute
DefaultProxyIdleTimeout = 60 * time.Second
DefaultPinMethod = "pin"
DefaultIPFSRequestTimeout = 5 * time.Minute
DefaultPinTimeout = 24 * time.Hour
DefaultUnpinTimeout = 3 * time.Hour
)

// Config is used to initialize a Connector and allows to customize
Expand Down Expand Up @@ -58,6 +61,15 @@ type Config struct {
// "refs -r" call followed by "pin/add". "refs" allows fetching in
// parallel but should be used with GC disabled.
PinMethod string

// IPFS Daemon HTTP Client POST timeout
IPFSRequestTimeout time.Duration

// Pin Operation timeout
PinTimeout time.Duration

// Unpin Operation timeout
UnpinTimeout time.Duration
}

type jsonConfig struct {
Expand All @@ -69,6 +81,9 @@ type jsonConfig struct {
ProxyWriteTimeout string `json:"proxy_write_timeout"`
ProxyIdleTimeout string `json:"proxy_idle_timeout"`
PinMethod string `json:"pin_method"`
IPFSRequestTimeout string `json:"ipfs_request_timeout"`
PinTimeout string `json:"pin_timeout"`
UnpinTimeout string `json:"unpin_timeout"`
}

// ConfigKey provides a human-friendly identifier for this type of Config.
Expand All @@ -88,46 +103,62 @@ func (cfg *Config) Default() error {
cfg.ProxyWriteTimeout = DefaultProxyWriteTimeout
cfg.ProxyIdleTimeout = DefaultProxyIdleTimeout
cfg.PinMethod = DefaultPinMethod
cfg.IPFSRequestTimeout = DefaultIPFSRequestTimeout
cfg.PinTimeout = DefaultPinTimeout
cfg.UnpinTimeout = DefaultUnpinTimeout

return nil
}

// Validate checks that the fields of this Config have sensible values,
// at least in appearance.
func (cfg *Config) Validate() error {
var err error
if cfg.ProxyAddr == nil {
return errors.New("ipfshttp.proxy_listen_multiaddress not set")
err = errors.New("ipfshttp.proxy_listen_multiaddress not set")
}
if cfg.NodeAddr == nil {
return errors.New("ipfshttp.node_multiaddress not set")
err = errors.New("ipfshttp.node_multiaddress not set")
}

if cfg.ConnectSwarmsDelay < 0 {
return errors.New("ipfshttp.connect_swarms_delay is invalid")
err = errors.New("ipfshttp.connect_swarms_delay is invalid")
}

if cfg.ProxyReadTimeout < 0 {
return errors.New("ipfshttp.proxy_read_timeout is invalid")
err = errors.New("ipfshttp.proxy_read_timeout is invalid")
}

if cfg.ProxyReadHeaderTimeout < 0 {
return errors.New("ipfshttp.proxy_read_header_timeout is invalid")
err = errors.New("ipfshttp.proxy_read_header_timeout is invalid")
}

if cfg.ProxyWriteTimeout < 0 {
return errors.New("ipfshttp.proxy_write_timeout is invalid")
err = errors.New("ipfshttp.proxy_write_timeout is invalid")
}

if cfg.ProxyIdleTimeout < 0 {
return errors.New("ipfshttp.proxy_idle_timeout invalid")
err = errors.New("ipfshttp.proxy_idle_timeout invalid")
}

switch cfg.PinMethod {
case "refs", "pin":
default:
return errors.New("ipfshttp.pin_method invalid value")
err = errors.New("ipfshttp.pin_method invalid value")
}
return nil

if cfg.IPFSRequestTimeout < 0 {
err = errors.New("ipfshttp.ipfs_request_timeout invalid")
}

if cfg.PinTimeout < 0 {
err = errors.New("ipfshttp.pin_timeout invalid")
}

if cfg.UnpinTimeout < 0 {
err = errors.New("ipfshttp.unpin_timeout invalid")
}
return err

}

Expand Down Expand Up @@ -161,6 +192,9 @@ func (cfg *Config) LoadJSON(raw []byte) error {
&config.DurationOpt{jcfg.ProxyWriteTimeout, &cfg.ProxyWriteTimeout, "proxy_write_timeout"},
&config.DurationOpt{jcfg.ProxyIdleTimeout, &cfg.ProxyIdleTimeout, "proxy_idle_timeout"},
&config.DurationOpt{jcfg.ConnectSwarmsDelay, &cfg.ConnectSwarmsDelay, "connect_swarms_delay"},
&config.DurationOpt{jcfg.IPFSRequestTimeout, &cfg.IPFSRequestTimeout, "ipfs_request_timeout"},
&config.DurationOpt{jcfg.PinTimeout, &cfg.PinTimeout, "pin_timeout"},
&config.DurationOpt{jcfg.UnpinTimeout, &cfg.UnpinTimeout, "unpin_timeout"},
)
if err != nil {
return err
Expand Down Expand Up @@ -191,6 +225,9 @@ func (cfg *Config) ToJSON() (raw []byte, err error) {
jcfg.ProxyIdleTimeout = cfg.ProxyIdleTimeout.String()
jcfg.ConnectSwarmsDelay = cfg.ConnectSwarmsDelay.String()
jcfg.PinMethod = cfg.PinMethod
jcfg.IPFSRequestTimeout = cfg.IPFSRequestTimeout.String()
jcfg.PinTimeout = cfg.PinTimeout.String()
jcfg.UnpinTimeout = cfg.UnpinTimeout.String()

raw, err = config.DefaultJSONMarshal(jcfg)
return
Expand Down
5 changes: 4 additions & 1 deletion ipfsconn/ipfshttp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ var cfgJSON = []byte(`
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s",
"pin_method": "pin"
"pin_method": "pin",
"ipfs_request_timeout": "5m0s",
"pin_timeout": "24h",
"unpin_timeout": "3h"
}
`)

Expand Down
Loading