diff --git a/config/util.go b/config/util.go index dba200e4f..8655a6e1e 100644 --- a/config/util.go +++ b/config/util.go @@ -49,7 +49,8 @@ func DefaultJSONMarshal(v interface{}) ([]byte, error) { return bs, nil } -// SetIfNotDefault dest to the value of src if src is not the default value. +// SetIfNotDefault sets dest to the value of src if src is not the default +// value of the type. // dest must be a pointer. func SetIfNotDefault(src interface{}, dest interface{}) { switch src.(type) { diff --git a/config_test.go b/config_test.go index 4c60ec6ea..c51ae2142 100644 --- a/config_test.go +++ b/config_test.go @@ -57,14 +57,16 @@ var testingIpfsCfg = []byte(`{ "proxy_read_timeout": "10m0s", "proxy_read_header_timeout": "10m0s", "proxy_write_timeout": "10m0s", - "proxy_idle_timeout": "1m0s" + "proxy_idle_timeout": "1m0s", + "pin_method": "pin" }`) var testingTrackerCfg = []byte(` { "pinning_timeout": "30s", "unpinning_timeout": "15s", - "max_pin_queue_size": 4092 + "max_pin_queue_size": 4092, + "concurrent_pins": 1, } `) diff --git a/docs/ipfs-cluster-guide.md b/docs/ipfs-cluster-guide.md index 60d1865bb..59aae0231 100644 --- a/docs/ipfs-cluster-guide.md +++ b/docs/ipfs-cluster-guide.md @@ -121,9 +121,21 @@ Each section of the configuration file and the options in it depend on their ass "proxy_read_timeout": "10m0s", // Here and below, timeouts for network operations "proxy_read_header_timeout": "5s", "proxy_write_timeout": "10m0s", - "proxy_idle_timeout": "1m0s" + "proxy_idle_timeout": "1m0s", + "pin_method": "pin" // Supports "pin" and "refs". "refs" will fetch all deps before pinning. + // Use refs when GC is disabled on ipfs. + // Increase maptracker.concurrent_pins to take advantange of concurrency. } }, + "pin_tracker": { + "maptracker": { + "pinning_timeout": "1h0m0s", // How long before we transition a pinning CID to error state + "unpinning_timeout": "5m0s", // How long before we transition an unpinning CID to error state + "max_pin_queue_size": 4096, // How many pins to hold in the pinning queue + "concurrent_pins": 1 // How many concurrent pin requests we can perform. + // Useful with ipfshttp.pin_method set to "refs" + } + } "monitor": { "monbasic": { "check_interval": "15s" // How often to check for expired alerts. See cluster monitoring section diff --git a/ipfsconn/ipfshttp/config.go b/ipfsconn/ipfshttp/config.go index dcb748200..5e76502b9 100644 --- a/ipfsconn/ipfshttp/config.go +++ b/ipfsconn/ipfshttp/config.go @@ -22,6 +22,7 @@ const ( DefaultProxyReadHeaderTimeout = 5 * time.Second DefaultProxyWriteTimeout = 10 * time.Minute DefaultProxyIdleTimeout = 60 * time.Second + DefaultPinMethod = "pin" ) // Config is used to initialize a Connector and allows to customize @@ -52,6 +53,11 @@ type Config struct { // Server-side amount of time a Keep-Alive connection will be // kept idle before being reused ProxyIdleTimeout time.Duration + + // "pin" or "refs". "pin" uses a "pin/add" call. "refs" uses a + // "refs -r" call followed by "pin/add". "refs" allows fetching in + // parallel but should be used with GC disabled. + PinMethod string } type jsonConfig struct { @@ -62,6 +68,7 @@ type jsonConfig struct { ProxyReadHeaderTimeout string `json:"proxy_read_header_timeout"` ProxyWriteTimeout string `json:"proxy_write_timeout"` ProxyIdleTimeout string `json:"proxy_idle_timeout"` + PinMethod string `json:"pin_method"` } // ConfigKey provides a human-friendly identifier for this type of Config. @@ -80,6 +87,7 @@ func (cfg *Config) Default() error { cfg.ProxyReadHeaderTimeout = DefaultProxyReadHeaderTimeout cfg.ProxyWriteTimeout = DefaultProxyWriteTimeout cfg.ProxyIdleTimeout = DefaultProxyIdleTimeout + cfg.PinMethod = DefaultPinMethod return nil } @@ -98,22 +106,29 @@ func (cfg *Config) Validate() error { return errors.New("ipfshttp.connect_swarms_delay is invalid") } - if cfg.ProxyReadTimeout <= 0 { + if cfg.ProxyReadTimeout < 0 { return errors.New("ipfshttp.proxy_read_timeout is invalid") } - if cfg.ProxyReadHeaderTimeout <= 0 { + if cfg.ProxyReadHeaderTimeout < 0 { return errors.New("ipfshttp.proxy_read_header_timeout is invalid") } - if cfg.ProxyWriteTimeout <= 0 { + if cfg.ProxyWriteTimeout < 0 { return errors.New("ipfshttp.proxy_write_timeout is invalid") } - if cfg.ProxyIdleTimeout <= 0 { + if cfg.ProxyIdleTimeout < 0 { return errors.New("ipfshttp.proxy_idle_timeout invalid") } + + switch cfg.PinMethod { + case "refs", "pin": + default: + return errors.New("ipfshttp.pin_method invalid value") + } return nil + } // LoadJSON parses a JSON representation of this Config as generated by ToJSON. @@ -125,6 +140,8 @@ func (cfg *Config) LoadJSON(raw []byte) error { return err } + cfg.Default() + proxyAddr, err := ma.NewMultiaddr(jcfg.ProxyListenMultiaddress) if err != nil { return fmt.Errorf("error parsing ipfs_proxy_listen_multiaddress: %s", err) @@ -137,22 +154,40 @@ func (cfg *Config) LoadJSON(raw []byte) error { cfg.ProxyAddr = proxyAddr cfg.NodeAddr = nodeAddr - // errors ignored as Validate() below will catch them - t, _ := time.ParseDuration(jcfg.ProxyReadTimeout) + // only overwrite defaults when we can parse the time + // Note for these 0 is a valid value. + t, err := time.ParseDuration(jcfg.ProxyReadTimeout) + if err != nil { + return fmt.Errorf("error parsing proxy_read_timeout: %s", err) + } cfg.ProxyReadTimeout = t - t, _ = time.ParseDuration(jcfg.ProxyReadHeaderTimeout) + t, err = time.ParseDuration(jcfg.ProxyReadHeaderTimeout) + if err != nil { + return fmt.Errorf("error parsing proxy_read_header_timeout: %s", err) + } cfg.ProxyReadHeaderTimeout = t - t, _ = time.ParseDuration(jcfg.ProxyWriteTimeout) + t, err = time.ParseDuration(jcfg.ProxyWriteTimeout) + if err != nil { + return fmt.Errorf("error parsing proxy_write_timeout: %s", err) + } cfg.ProxyWriteTimeout = t - t, _ = time.ParseDuration(jcfg.ProxyIdleTimeout) + t, err = time.ParseDuration(jcfg.ProxyIdleTimeout) + if err != nil { + return fmt.Errorf("error parsing proxy_idle_timeout: %s", err) + } cfg.ProxyIdleTimeout = t - t, _ = time.ParseDuration(jcfg.ConnectSwarmsDelay) + t, err = time.ParseDuration(jcfg.ConnectSwarmsDelay) + if err != nil { + return fmt.Errorf("error parsing connect_swarms_delay: %s", err) + } cfg.ConnectSwarmsDelay = t + config.SetIfNotDefault(jcfg.PinMethod, &cfg.PinMethod) + return cfg.Validate() } @@ -175,6 +210,7 @@ func (cfg *Config) ToJSON() (raw []byte, err error) { jcfg.ProxyWriteTimeout = cfg.ProxyWriteTimeout.String() jcfg.ProxyIdleTimeout = cfg.ProxyIdleTimeout.String() jcfg.ConnectSwarmsDelay = cfg.ConnectSwarmsDelay.String() + jcfg.PinMethod = cfg.PinMethod raw, err = config.DefaultJSONMarshal(jcfg) return diff --git a/ipfsconn/ipfshttp/config_test.go b/ipfsconn/ipfshttp/config_test.go index ba3cf7e1f..0bcd16070 100644 --- a/ipfsconn/ipfshttp/config_test.go +++ b/ipfsconn/ipfshttp/config_test.go @@ -13,7 +13,8 @@ var cfgJSON = []byte(` "proxy_read_timeout": "10m0s", "proxy_read_header_timeout": "5s", "proxy_write_timeout": "10m0s", - "proxy_idle_timeout": "1m0s" + "proxy_idle_timeout": "1m0s", + "pin_method": "pin" } `) @@ -85,25 +86,25 @@ func TestDefault(t *testing.T) { } cfg.Default() - cfg.ProxyReadTimeout = 0 + cfg.ProxyReadTimeout = -1 if cfg.Validate() == nil { t.Fatal("expected error validating") } cfg.Default() - cfg.ProxyReadHeaderTimeout = 0 + cfg.ProxyReadHeaderTimeout = -2 if cfg.Validate() == nil { t.Fatal("expected error validating") } cfg.Default() - cfg.ProxyIdleTimeout = 0 + cfg.ProxyIdleTimeout = -1 if cfg.Validate() == nil { t.Fatal("expected error validating") } cfg.Default() - cfg.ProxyWriteTimeout = 0 + cfg.ProxyWriteTimeout = -3 if cfg.Validate() == nil { t.Fatal("expected error validating") } diff --git a/ipfsconn/ipfshttp/ipfshttp.go b/ipfsconn/ipfshttp/ipfshttp.go index b9083395b..d5e66db24 100644 --- a/ipfsconn/ipfshttp/ipfshttp.go +++ b/ipfsconn/ipfshttp/ipfshttp.go @@ -605,6 +605,16 @@ func (ipfs *Connector) Pin(hash *cid.Cid) error { return err } if !pinStatus.IsPinned() { + switch ipfs.config.PinMethod { + case "refs": + path := fmt.Sprintf("refs?arg=%s&recursive=true", hash) + err := ipfs.postDiscardBody(path) + if err != nil { + return err + } + logger.Debugf("Refs for %s sucessfully fetched", hash) + } + path := fmt.Sprintf("pin/add?arg=%s", hash) _, err = ipfs.post(path) if err == nil { @@ -721,7 +731,7 @@ func (ipfs *Connector) post(path string) ([]byte, error) { msg = fmt.Sprintf("IPFS unsuccessful: %d: %s", res.StatusCode, ipfsErr.Message) } else { - msg = fmt.Sprintf("IPFS-get '%s' unsuccessful: %d: %s", + msg = fmt.Sprintf("IPFS-post '%s' unsuccessful: %d: %s", path, res.StatusCode, body) } @@ -730,6 +740,35 @@ func (ipfs *Connector) post(path string) ([]byte, error) { return body, nil } +// like post() but discarding the response body after reading it +// useful for any responses that we have to wait for but are not interested +// in parsing +func (ipfs *Connector) postDiscardBody(path string) error { + logger.Debugf("posting and discarding body: %s", path) + url := fmt.Sprintf("%s/%s", + ipfs.apiURL(), + path) + + res, err := http.Post(url, "", nil) + if err != nil { + logger.Error("error posting:", err) + return err + } + defer res.Body.Close() + _, err = io.Copy(ioutil.Discard, res.Body) + if err != nil { + logger.Errorf("error reading response body: %s", err) + return err + } + + if res.StatusCode != http.StatusOK { + msg := fmt.Sprintf("IPFS-post '%s' unsuccessful: %d", path, res.StatusCode) + return errors.New(msg) + } + + return nil +} + // apiURL is a short-hand for building the url of the IPFS // daemon API. func (ipfs *Connector) apiURL() string { diff --git a/ipfsconn/ipfshttp/ipfshttp_test.go b/ipfsconn/ipfshttp/ipfshttp_test.go index 026dddb81..1a4b3f89b 100644 --- a/ipfsconn/ipfshttp/ipfshttp_test.go +++ b/ipfsconn/ipfshttp/ipfshttp_test.go @@ -99,6 +99,33 @@ func TestIPFSPin(t *testing.T) { } } +func TestIPFSPinRefs(t *testing.T) { + ipfs, mock := testIPFSConnector(t) + defer mock.Close() + defer ipfs.Shutdown() + + ipfs.config.PinMethod = "refs" + + c, _ := cid.Decode(test.TestCid1) + err := ipfs.Pin(c) + if err != nil { + t.Error("expected success pinning cid") + } + pinSt, err := ipfs.PinLsCid(c) + if err != nil { + t.Fatal("expected success doing ls") + } + if !pinSt.IsPinned() { + t.Error("cid should have been pinned") + } + + c2, _ := cid.Decode(test.ErrorCid) + err = ipfs.Pin(c2) + if err == nil { + t.Error("expected error pinning cid") + } +} + func TestIPFSUnpin(t *testing.T) { ipfs, mock := testIPFSConnector(t) defer mock.Close() diff --git a/pintracker/maptracker/config.go b/pintracker/maptracker/config.go index 213356be3..51f3c4525 100644 --- a/pintracker/maptracker/config.go +++ b/pintracker/maptracker/config.go @@ -15,6 +15,7 @@ const ( DefaultPinningTimeout = 60 * time.Minute DefaultUnpinningTimeout = 5 * time.Minute DefaultMaxPinQueueSize = 4096 + DefaultConcurrentPins = 1 ) // Config allows to initialize a Monitor and customize some parameters. @@ -28,12 +29,17 @@ type Config struct { // MaxPinQueueSize specifies how many pin or unpin requests we can hold in the queue // If higher, they will automatically marked with an error. MaxPinQueueSize int + // ConcurrentPins specifies how many pin requests can be sent to the ipfs + // daemon in parallel. If the pinning method is "refs", it might increase + // speed. Unpin requests are always processed one by one. + ConcurrentPins int } type jsonConfig struct { PinningTimeout string `json:"pinning_timeout"` UnpinningTimeout string `json:"unpinning_timeout"` MaxPinQueueSize int `json:"max_pin_queue_size"` + ConcurrentPins int `json:"concurrent_pins"` } // ConfigKey provides a human-friendly identifier for this type of Config. @@ -46,6 +52,7 @@ func (cfg *Config) Default() error { cfg.PinningTimeout = DefaultPinningTimeout cfg.UnpinningTimeout = DefaultUnpinningTimeout cfg.MaxPinQueueSize = DefaultMaxPinQueueSize + cfg.ConcurrentPins = DefaultConcurrentPins return nil } @@ -61,6 +68,10 @@ func (cfg *Config) Validate() error { if cfg.MaxPinQueueSize <= 0 { return errors.New("maptracker.max_pin_queue_size too low") } + + if cfg.ConcurrentPins <= 0 { + return errors.New("maptracker.concurrent_pins is too low") + } return nil } @@ -89,6 +100,7 @@ func (cfg *Config) LoadJSON(raw []byte) error { config.SetIfNotDefault(pinningTimeo, &cfg.PinningTimeout) config.SetIfNotDefault(unpinningTimeo, &cfg.UnpinningTimeout) config.SetIfNotDefault(jcfg.MaxPinQueueSize, &cfg.MaxPinQueueSize) + config.SetIfNotDefault(jcfg.ConcurrentPins, &cfg.ConcurrentPins) return cfg.Validate() } @@ -100,6 +112,7 @@ func (cfg *Config) ToJSON() ([]byte, error) { jcfg.PinningTimeout = cfg.PinningTimeout.String() jcfg.UnpinningTimeout = cfg.UnpinningTimeout.String() jcfg.MaxPinQueueSize = cfg.MaxPinQueueSize + jcfg.ConcurrentPins = cfg.ConcurrentPins return config.DefaultJSONMarshal(jcfg) } diff --git a/pintracker/maptracker/maptracker.go b/pintracker/maptracker/maptracker.go index 0f9ee543c..a19ff1c73 100644 --- a/pintracker/maptracker/maptracker.go +++ b/pintracker/maptracker/maptracker.go @@ -62,7 +62,9 @@ func NewMapPinTracker(cfg *Config, pid peer.ID) *MapPinTracker { pinCh: make(chan api.Pin, cfg.MaxPinQueueSize), unpinCh: make(chan api.Pin, cfg.MaxPinQueueSize), } - go mpt.pinWorker() + for i := 0; i < mpt.config.ConcurrentPins; i++ { + go mpt.pinWorker() + } go mpt.unpinWorker() return mpt } diff --git a/test/ipfs_mock.go b/test/ipfs_mock.go index 54f8c5808..30457d461 100644 --- a/test/ipfs_mock.go +++ b/test/ipfs_mock.go @@ -63,6 +63,11 @@ type mockAddResp struct { Bytes uint64 } +type mockRefsResp struct { + Ref string + Err string +} + type mockSwarmPeersResp struct { Peers []mockIpfsPeer } @@ -250,6 +255,17 @@ func (m *IpfsMock) handler(w http.ResponseWriter, r *http.Request) { } j, _ := json.Marshal(resp) w.Write(j) + case "refs": + query := r.URL.Query() + arg, ok := query["arg"] + if !ok { + goto ERROR + } + resp := mockRefsResp{ + Ref: arg[0], + } + j, _ := json.Marshal(resp) + w.Write(j) case "version": w.Write([]byte("{\"Version\":\"m.o.c.k\"}")) default: