Skip to content

Commit

Permalink
Feat #326: Adds "refs -r" pinning method support + multiple pin workers
Browse files Browse the repository at this point in the history
This fixes #326. It adds a new `pin_method` configuration option to the
`ipfshttp` component allows to configure it to perform `refs -r <cid>` before
the `pin/add` call. By fetching content before pinning, we don't have
a global lock in place, and we can have several pin-requests to
ipfs in parallel.

It also adds a `concurrent_pins` option to the pin tracker, which
launches more pin workers so it can potentially trigger more pins at
the same time. This is a minimal intervention in the pintracker as #308
is still pending.

Documentation for the configuration file has been updated.

License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
  • Loading branch information
hsanjuan committed Mar 9, 2018
1 parent 4e1f590 commit fb4812e
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 21 deletions.
3 changes: 2 additions & 1 deletion config/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func DefaultJSONMarshal(v interface{}) ([]byte, error) {
return bs, nil
}

// SetIfNotDefault dest to the value of src if src is not the default value.
// SetIfNotDefault sets dest to the value of src if src is not the default
// value of the type.
// dest must be a pointer.
func SetIfNotDefault(src interface{}, dest interface{}) {
switch src.(type) {
Expand Down
6 changes: 4 additions & 2 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,16 @@ var testingIpfsCfg = []byte(`{
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "10m0s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
"proxy_idle_timeout": "1m0s",
"pin_method": "pin"
}`)

var testingTrackerCfg = []byte(`
{
"pinning_timeout": "30s",
"unpinning_timeout": "15s",
"max_pin_queue_size": 4092
"max_pin_queue_size": 4092,
"concurrent_pins": 1,
}
`)

Expand Down
14 changes: 13 additions & 1 deletion docs/ipfs-cluster-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,21 @@ Each section of the configuration file and the options in it depend on their ass
"proxy_read_timeout": "10m0s", // Here and below, timeouts for network operations
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
"proxy_idle_timeout": "1m0s",
"pin_method": "pin" // Supports "pin" and "refs". "refs" will fetch all deps before pinning.
// Use refs when GC is disabled on ipfs.
// Increase maptracker.concurrent_pins to take advantange of concurrency.
}
},
"pin_tracker": {
"maptracker": {
"pinning_timeout": "1h0m0s", // How long before we transition a pinning CID to error state
"unpinning_timeout": "5m0s", // How long before we transition an unpinning CID to error state
"max_pin_queue_size": 4096, // How many pins to hold in the pinning queue
"concurrent_pins": 1 // How many concurrent pin requests we can perform.
// Useful with ipfshttp.pin_method set to "refs"
}
}
"monitor": {
"monbasic": {
"check_interval": "15s" // How often to check for expired alerts. See cluster monitoring section
Expand Down
56 changes: 46 additions & 10 deletions ipfsconn/ipfshttp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
DefaultProxyReadHeaderTimeout = 5 * time.Second
DefaultProxyWriteTimeout = 10 * time.Minute
DefaultProxyIdleTimeout = 60 * time.Second
DefaultPinMethod = "pin"
)

// Config is used to initialize a Connector and allows to customize
Expand Down Expand Up @@ -52,6 +53,11 @@ type Config struct {
// Server-side amount of time a Keep-Alive connection will be
// kept idle before being reused
ProxyIdleTimeout time.Duration

// "pin" or "refs". "pin" uses a "pin/add" call. "refs" uses a
// "refs -r" call followed by "pin/add". "refs" allows fetching in
// parallel but should be used with GC disabled.
PinMethod string
}

type jsonConfig struct {
Expand All @@ -62,6 +68,7 @@ type jsonConfig struct {
ProxyReadHeaderTimeout string `json:"proxy_read_header_timeout"`
ProxyWriteTimeout string `json:"proxy_write_timeout"`
ProxyIdleTimeout string `json:"proxy_idle_timeout"`
PinMethod string `json:"pin_method"`
}

// ConfigKey provides a human-friendly identifier for this type of Config.
Expand All @@ -80,6 +87,7 @@ func (cfg *Config) Default() error {
cfg.ProxyReadHeaderTimeout = DefaultProxyReadHeaderTimeout
cfg.ProxyWriteTimeout = DefaultProxyWriteTimeout
cfg.ProxyIdleTimeout = DefaultProxyIdleTimeout
cfg.PinMethod = DefaultPinMethod

return nil
}
Expand All @@ -98,22 +106,29 @@ func (cfg *Config) Validate() error {
return errors.New("ipfshttp.connect_swarms_delay is invalid")
}

if cfg.ProxyReadTimeout <= 0 {
if cfg.ProxyReadTimeout < 0 {
return errors.New("ipfshttp.proxy_read_timeout is invalid")
}

if cfg.ProxyReadHeaderTimeout <= 0 {
if cfg.ProxyReadHeaderTimeout < 0 {
return errors.New("ipfshttp.proxy_read_header_timeout is invalid")
}

if cfg.ProxyWriteTimeout <= 0 {
if cfg.ProxyWriteTimeout < 0 {
return errors.New("ipfshttp.proxy_write_timeout is invalid")
}

if cfg.ProxyIdleTimeout <= 0 {
if cfg.ProxyIdleTimeout < 0 {
return errors.New("ipfshttp.proxy_idle_timeout invalid")
}

switch cfg.PinMethod {
case "refs", "pin":
default:
return errors.New("ipfshttp.pin_method invalid value")
}
return nil

}

// LoadJSON parses a JSON representation of this Config as generated by ToJSON.
Expand All @@ -125,6 +140,8 @@ func (cfg *Config) LoadJSON(raw []byte) error {
return err
}

cfg.Default()

proxyAddr, err := ma.NewMultiaddr(jcfg.ProxyListenMultiaddress)
if err != nil {
return fmt.Errorf("error parsing ipfs_proxy_listen_multiaddress: %s", err)
Expand All @@ -137,22 +154,40 @@ func (cfg *Config) LoadJSON(raw []byte) error {
cfg.ProxyAddr = proxyAddr
cfg.NodeAddr = nodeAddr

// errors ignored as Validate() below will catch them
t, _ := time.ParseDuration(jcfg.ProxyReadTimeout)
// only overwrite defaults when we can parse the time
// Note for these 0 is a valid value.
t, err := time.ParseDuration(jcfg.ProxyReadTimeout)
if err != nil {
return fmt.Errorf("error parsing proxy_read_timeout: %s", err)
}
cfg.ProxyReadTimeout = t

t, _ = time.ParseDuration(jcfg.ProxyReadHeaderTimeout)
t, err = time.ParseDuration(jcfg.ProxyReadHeaderTimeout)
if err != nil {
return fmt.Errorf("error parsing proxy_read_header_timeout: %s", err)
}
cfg.ProxyReadHeaderTimeout = t

t, _ = time.ParseDuration(jcfg.ProxyWriteTimeout)
t, err = time.ParseDuration(jcfg.ProxyWriteTimeout)
if err != nil {
return fmt.Errorf("error parsing proxy_write_timeout: %s", err)
}
cfg.ProxyWriteTimeout = t

t, _ = time.ParseDuration(jcfg.ProxyIdleTimeout)
t, err = time.ParseDuration(jcfg.ProxyIdleTimeout)
if err != nil {
return fmt.Errorf("error parsing proxy_idle_timeout: %s", err)
}
cfg.ProxyIdleTimeout = t

t, _ = time.ParseDuration(jcfg.ConnectSwarmsDelay)
t, err = time.ParseDuration(jcfg.ConnectSwarmsDelay)
if err != nil {
return fmt.Errorf("error parsing connect_swarms_delay: %s", err)
}
cfg.ConnectSwarmsDelay = t

config.SetIfNotDefault(jcfg.PinMethod, &cfg.PinMethod)

return cfg.Validate()
}

Expand All @@ -175,6 +210,7 @@ func (cfg *Config) ToJSON() (raw []byte, err error) {
jcfg.ProxyWriteTimeout = cfg.ProxyWriteTimeout.String()
jcfg.ProxyIdleTimeout = cfg.ProxyIdleTimeout.String()
jcfg.ConnectSwarmsDelay = cfg.ConnectSwarmsDelay.String()
jcfg.PinMethod = cfg.PinMethod

raw, err = config.DefaultJSONMarshal(jcfg)
return
Expand Down
11 changes: 6 additions & 5 deletions ipfsconn/ipfshttp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ var cfgJSON = []byte(`
"proxy_read_timeout": "10m0s",
"proxy_read_header_timeout": "5s",
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s"
"proxy_idle_timeout": "1m0s",
"pin_method": "pin"
}
`)

Expand Down Expand Up @@ -85,25 +86,25 @@ func TestDefault(t *testing.T) {
}

cfg.Default()
cfg.ProxyReadTimeout = 0
cfg.ProxyReadTimeout = -1
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}

cfg.Default()
cfg.ProxyReadHeaderTimeout = 0
cfg.ProxyReadHeaderTimeout = -2
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}

cfg.Default()
cfg.ProxyIdleTimeout = 0
cfg.ProxyIdleTimeout = -1
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}

cfg.Default()
cfg.ProxyWriteTimeout = 0
cfg.ProxyWriteTimeout = -3
if cfg.Validate() == nil {
t.Fatal("expected error validating")
}
Expand Down
41 changes: 40 additions & 1 deletion ipfsconn/ipfshttp/ipfshttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,16 @@ func (ipfs *Connector) Pin(hash *cid.Cid) error {
return err
}
if !pinStatus.IsPinned() {
switch ipfs.config.PinMethod {
case "refs":
path := fmt.Sprintf("refs?arg=%s&recursive=true", hash)
err := ipfs.postDiscardBody(path)
if err != nil {
return err
}
logger.Debugf("Refs for %s sucessfully fetched", hash)
}

path := fmt.Sprintf("pin/add?arg=%s", hash)
_, err = ipfs.post(path)
if err == nil {
Expand Down Expand Up @@ -721,7 +731,7 @@ func (ipfs *Connector) post(path string) ([]byte, error) {
msg = fmt.Sprintf("IPFS unsuccessful: %d: %s",
res.StatusCode, ipfsErr.Message)
} else {
msg = fmt.Sprintf("IPFS-get '%s' unsuccessful: %d: %s",
msg = fmt.Sprintf("IPFS-post '%s' unsuccessful: %d: %s",
path, res.StatusCode, body)
}

Expand All @@ -730,6 +740,35 @@ func (ipfs *Connector) post(path string) ([]byte, error) {
return body, nil
}

// like post() but discarding the response body after reading it
// useful for any responses that we have to wait for but are not interested
// in parsing
func (ipfs *Connector) postDiscardBody(path string) error {
logger.Debugf("posting and discarding body: %s", path)
url := fmt.Sprintf("%s/%s",
ipfs.apiURL(),
path)

res, err := http.Post(url, "", nil)
if err != nil {
logger.Error("error posting:", err)
return err
}
defer res.Body.Close()
_, err = io.Copy(ioutil.Discard, res.Body)
if err != nil {
logger.Errorf("error reading response body: %s", err)
return err
}

if res.StatusCode != http.StatusOK {
msg := fmt.Sprintf("IPFS-post '%s' unsuccessful: %d", path, res.StatusCode)
return errors.New(msg)
}

return nil
}

// apiURL is a short-hand for building the url of the IPFS
// daemon API.
func (ipfs *Connector) apiURL() string {
Expand Down
27 changes: 27 additions & 0 deletions ipfsconn/ipfshttp/ipfshttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,33 @@ func TestIPFSPin(t *testing.T) {
}
}

func TestIPFSPinRefs(t *testing.T) {
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
defer ipfs.Shutdown()

ipfs.config.PinMethod = "refs"

c, _ := cid.Decode(test.TestCid1)
err := ipfs.Pin(c)
if err != nil {
t.Error("expected success pinning cid")
}
pinSt, err := ipfs.PinLsCid(c)
if err != nil {
t.Fatal("expected success doing ls")
}
if !pinSt.IsPinned() {
t.Error("cid should have been pinned")
}

c2, _ := cid.Decode(test.ErrorCid)
err = ipfs.Pin(c2)
if err == nil {
t.Error("expected error pinning cid")
}
}

func TestIPFSUnpin(t *testing.T) {
ipfs, mock := testIPFSConnector(t)
defer mock.Close()
Expand Down
13 changes: 13 additions & 0 deletions pintracker/maptracker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
DefaultPinningTimeout = 60 * time.Minute
DefaultUnpinningTimeout = 5 * time.Minute
DefaultMaxPinQueueSize = 4096
DefaultConcurrentPins = 1
)

// Config allows to initialize a Monitor and customize some parameters.
Expand All @@ -28,12 +29,17 @@ type Config struct {
// MaxPinQueueSize specifies how many pin or unpin requests we can hold in the queue
// If higher, they will automatically marked with an error.
MaxPinQueueSize int
// ConcurrentPins specifies how many pin requests can be sent to the ipfs
// daemon in parallel. If the pinning method is "refs", it might increase
// speed. Unpin requests are always processed one by one.
ConcurrentPins int
}

type jsonConfig struct {
PinningTimeout string `json:"pinning_timeout"`
UnpinningTimeout string `json:"unpinning_timeout"`
MaxPinQueueSize int `json:"max_pin_queue_size"`
ConcurrentPins int `json:"concurrent_pins"`
}

// ConfigKey provides a human-friendly identifier for this type of Config.
Expand All @@ -46,6 +52,7 @@ func (cfg *Config) Default() error {
cfg.PinningTimeout = DefaultPinningTimeout
cfg.UnpinningTimeout = DefaultUnpinningTimeout
cfg.MaxPinQueueSize = DefaultMaxPinQueueSize
cfg.ConcurrentPins = DefaultConcurrentPins
return nil
}

Expand All @@ -61,6 +68,10 @@ func (cfg *Config) Validate() error {
if cfg.MaxPinQueueSize <= 0 {
return errors.New("maptracker.max_pin_queue_size too low")
}

if cfg.ConcurrentPins <= 0 {
return errors.New("maptracker.concurrent_pins is too low")
}
return nil
}

Expand Down Expand Up @@ -89,6 +100,7 @@ func (cfg *Config) LoadJSON(raw []byte) error {
config.SetIfNotDefault(pinningTimeo, &cfg.PinningTimeout)
config.SetIfNotDefault(unpinningTimeo, &cfg.UnpinningTimeout)
config.SetIfNotDefault(jcfg.MaxPinQueueSize, &cfg.MaxPinQueueSize)
config.SetIfNotDefault(jcfg.ConcurrentPins, &cfg.ConcurrentPins)

return cfg.Validate()
}
Expand All @@ -100,6 +112,7 @@ func (cfg *Config) ToJSON() ([]byte, error) {
jcfg.PinningTimeout = cfg.PinningTimeout.String()
jcfg.UnpinningTimeout = cfg.UnpinningTimeout.String()
jcfg.MaxPinQueueSize = cfg.MaxPinQueueSize
jcfg.ConcurrentPins = cfg.ConcurrentPins

return config.DefaultJSONMarshal(jcfg)
}
4 changes: 3 additions & 1 deletion pintracker/maptracker/maptracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ func NewMapPinTracker(cfg *Config, pid peer.ID) *MapPinTracker {
pinCh: make(chan api.Pin, cfg.MaxPinQueueSize),
unpinCh: make(chan api.Pin, cfg.MaxPinQueueSize),
}
go mpt.pinWorker()
for i := 0; i < mpt.config.ConcurrentPins; i++ {
go mpt.pinWorker()
}
go mpt.unpinWorker()
return mpt
}
Expand Down

0 comments on commit fb4812e

Please sign in to comment.