Skip to content

Commit

Permalink
ipfshttp: add pin/unpin specific timeouts
Browse files Browse the repository at this point in the history
and get the tests passing and add Pin/UnpinQueued
tracker statuses back in.

License: MIT
Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
  • Loading branch information
lanzafame committed May 2, 2018
1 parent 356f44d commit 348bd9f
Show file tree
Hide file tree
Showing 11 changed files with 430 additions and 58 deletions.
8 changes: 7 additions & 1 deletion api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ const (
TrackerStatusUnpinning
// The IPFS daemon is not pinning the item
TrackerStatusUnpinned
// The IPFS deamon is not pinning the item but it is being tracked
// The IPFS daemon is not pinning the item but it is being tracked
TrackerStatusRemote
// The item has been queued for pinning on the IPFS daemon
TrackerStatusPinQueued
// The item has been queued for unpinning on the IPFS daemon
TrackerStatusUnpinQueued
)

// TrackerStatus represents the status of a tracked Cid in the PinTracker
Expand All @@ -63,6 +67,8 @@ var trackerStatusString = map[TrackerStatus]string{
TrackerStatusUnpinning: "unpinning",
TrackerStatusUnpinned: "unpinned",
TrackerStatusRemote: "remote",
TrackerStatusPinQueued: "pin_queued",
TrackerStatusUnpinQueued: "unpin_queued",
}

// String converts a TrackerStatus into a readable string.
Expand Down
8 changes: 6 additions & 2 deletions ipfscluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func delay() {
}

func pinDelay() {
time.Sleep(400 * time.Millisecond)
time.Sleep(700 * time.Millisecond)
}

func ttlDelay() {
Expand Down Expand Up @@ -418,6 +418,8 @@ func TestClustersPin(t *testing.T) {
}
delay()
delay()
delay()
delay()
fpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.StatusAll()
for _, v := range status {
Expand Down Expand Up @@ -451,7 +453,7 @@ func TestClustersPin(t *testing.T) {
}
delay()
delay()

delay()
funpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.StatusAll()
if l := len(status); l != 0 {
Expand Down Expand Up @@ -605,6 +607,7 @@ func TestClustersSyncLocal(t *testing.T) {
clusters[0].Pin(api.PinCid(h2))
pinDelay()
pinDelay()
pinDelay()

f := func(t *testing.T, c *Cluster) {
info, err := c.SyncLocal(h)
Expand Down Expand Up @@ -808,6 +811,7 @@ func TestClustersRecover(t *testing.T) {
t.Error("GlobalPinInfo should be for testrCid2")
}

pinDelay()
for _, c := range clusters {
inf, ok := ginfo.PeerMap[c.host.ID()]
if !ok {
Expand Down
67 changes: 40 additions & 27 deletions ipfsconn/ipfshttp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ const (
DefaultProxyWriteTimeout = 10 * time.Minute
DefaultProxyIdleTimeout = 60 * time.Second
DefaultPinMethod = "pin"
DefaultClientTimeout = 10 * time.Second
DefaultClientPostTimeout = 60 * time.Second
DefaultIPFSRequestTimeout = 5 * time.Minute
DefaultPinTimeout = 24 * time.Hour
DefaultUnpinTimeout = 3 * time.Hour
)

// Config is used to initialize a Connector and allows to customize
Expand Down Expand Up @@ -61,11 +62,14 @@ type Config struct {
// parallel but should be used with GC disabled.
PinMethod string

// IPFS Daemon HTTP Client timeout
ClientTimeout time.Duration

// IPFS Daemon HTTP Client POST timeout
ClientPostTimeout time.Duration
IPFSRequestTimeout time.Duration

// Pin Operation timeout
PinTimeout time.Duration

// Unpin Operation timeout
UnpinTimeout time.Duration
}

type jsonConfig struct {
Expand All @@ -77,8 +81,9 @@ type jsonConfig struct {
ProxyWriteTimeout string `json:"proxy_write_timeout"`
ProxyIdleTimeout string `json:"proxy_idle_timeout"`
PinMethod string `json:"pin_method"`
ClientTimeout string `json:"client_timeout"`
ClientPostTimeout string `json:"client_post_timeout"`
IPFSRequestTimeout string `json:"ipfs_request_timeout"`
PinTimeout string `json:"pin_timeout"`
UnpinTimeout string `json:"unpin_timeout"`
}

// ConfigKey provides a human-friendly identifier for this type of Config.
Expand All @@ -98,56 +103,62 @@ func (cfg *Config) Default() error {
cfg.ProxyWriteTimeout = DefaultProxyWriteTimeout
cfg.ProxyIdleTimeout = DefaultProxyIdleTimeout
cfg.PinMethod = DefaultPinMethod
cfg.ClientTimeout = DefaultClientTimeout
cfg.ClientPostTimeout = DefaultClientPostTimeout
cfg.IPFSRequestTimeout = DefaultIPFSRequestTimeout
cfg.PinTimeout = DefaultPinTimeout
cfg.UnpinTimeout = DefaultUnpinTimeout

return nil
}

// Validate checks that the fields of this Config have sensible values,
// at least in appearance.
func (cfg *Config) Validate() error {
var err error
if cfg.ProxyAddr == nil {
return errors.New("ipfshttp.proxy_listen_multiaddress not set")
err = errors.New("ipfshttp.proxy_listen_multiaddress not set")
}
if cfg.NodeAddr == nil {
return errors.New("ipfshttp.node_multiaddress not set")
err = errors.New("ipfshttp.node_multiaddress not set")
}

if cfg.ConnectSwarmsDelay < 0 {
return errors.New("ipfshttp.connect_swarms_delay is invalid")
err = errors.New("ipfshttp.connect_swarms_delay is invalid")
}

if cfg.ProxyReadTimeout < 0 {
return errors.New("ipfshttp.proxy_read_timeout is invalid")
err = errors.New("ipfshttp.proxy_read_timeout is invalid")
}

if cfg.ProxyReadHeaderTimeout < 0 {
return errors.New("ipfshttp.proxy_read_header_timeout is invalid")
err = errors.New("ipfshttp.proxy_read_header_timeout is invalid")
}

if cfg.ProxyWriteTimeout < 0 {
return errors.New("ipfshttp.proxy_write_timeout is invalid")
err = errors.New("ipfshttp.proxy_write_timeout is invalid")
}

if cfg.ProxyIdleTimeout < 0 {
return errors.New("ipfshttp.proxy_idle_timeout invalid")
err = errors.New("ipfshttp.proxy_idle_timeout invalid")
}

switch cfg.PinMethod {
case "refs", "pin":
default:
return errors.New("ipfshttp.pin_method invalid value")
err = errors.New("ipfshttp.pin_method invalid value")
}

if cfg.ClientTimeout < 0 {
return errors.New("ipfshttp.client_timeout invalid")
if cfg.IPFSRequestTimeout < 0 {
err = errors.New("ipfshttp.ipfs_request_timeout invalid")
}

if cfg.ClientPostTimeout < 0 {
return errors.New("ipfshttp.client_post_timeout invalid")
if cfg.PinTimeout < 0 {
err = errors.New("ipfshttp.pin_timeout invalid")
}
return nil

if cfg.UnpinTimeout < 0 {
err = errors.New("ipfshttp.unpin_timeout invalid")
}
return err

}

Expand Down Expand Up @@ -181,8 +192,9 @@ func (cfg *Config) LoadJSON(raw []byte) error {
&config.DurationOpt{jcfg.ProxyWriteTimeout, &cfg.ProxyWriteTimeout, "proxy_write_timeout"},
&config.DurationOpt{jcfg.ProxyIdleTimeout, &cfg.ProxyIdleTimeout, "proxy_idle_timeout"},
&config.DurationOpt{jcfg.ConnectSwarmsDelay, &cfg.ConnectSwarmsDelay, "connect_swarms_delay"},
&config.DurationOpt{jcfg.ClientTimeout, &cfg.ClientTimeout, "client_timeout"},
&config.DurationOpt{jcfg.ClientPostTimeout, &cfg.ClientPostTimeout, "client_post_timeout"},
&config.DurationOpt{jcfg.IPFSRequestTimeout, &cfg.IPFSRequestTimeout, "ipfs_request_timeout"},
&config.DurationOpt{jcfg.PinTimeout, &cfg.PinTimeout, "pin_timeout"},
&config.DurationOpt{jcfg.UnpinTimeout, &cfg.UnpinTimeout, "unpin_timeout"},
)
if err != nil {
return err
Expand Down Expand Up @@ -213,8 +225,9 @@ func (cfg *Config) ToJSON() (raw []byte, err error) {
jcfg.ProxyIdleTimeout = cfg.ProxyIdleTimeout.String()
jcfg.ConnectSwarmsDelay = cfg.ConnectSwarmsDelay.String()
jcfg.PinMethod = cfg.PinMethod
jcfg.ClientTimeout = cfg.ClientTimeout.String()
jcfg.ClientPostTimeout = cfg.ClientPostTimeout.String()
jcfg.IPFSRequestTimeout = cfg.IPFSRequestTimeout.String()
jcfg.PinTimeout = cfg.PinTimeout.String()
jcfg.UnpinTimeout = cfg.UnpinTimeout.String()

raw, err = config.DefaultJSONMarshal(jcfg)
return
Expand Down
5 changes: 3 additions & 2 deletions ipfsconn/ipfshttp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ var cfgJSON = []byte(`
"proxy_write_timeout": "10m0s",
"proxy_idle_timeout": "1m0s",
"pin_method": "pin",
"client_timeout": "10s",
"client_post_timeout": "1m0s"
"ipfs_request_timeout": "5m0s",
"pin_timeout": "24h",
"unpin_timeout": "3h"
}
`)

Expand Down
31 changes: 17 additions & 14 deletions ipfsconn/ipfshttp/ipfshttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func NewConnector(cfg *Config) (*Connector, error) {
s.SetKeepAlivesEnabled(true) // A reminder that this can be changed

c := &http.Client{
Timeout: cfg.ClientTimeout,
Timeout: cfg.IPFSRequestTimeout,
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -608,6 +608,8 @@ func (ipfs *Connector) ID() (api.IPFSID, error) {
// Pin performs a pin request against the configured IPFS
// daemon.
func (ipfs *Connector) Pin(ctx context.Context, hash *cid.Cid, recursive bool) error {
ctx, cancel := context.WithTimeout(ctx, ipfs.config.PinTimeout)
defer cancel()
pinStatus, err := ipfs.PinLsCid(ctx, hash)
if err != nil {
return err
Expand All @@ -616,15 +618,15 @@ func (ipfs *Connector) Pin(ctx context.Context, hash *cid.Cid, recursive bool) e
switch ipfs.config.PinMethod {
case "refs":
path := fmt.Sprintf("refs?arg=%s&recursive=%t", hash, recursive)
err := ipfs.postDiscardBody(path)
err := ipfs.postDiscardBodyCtx(ctx, path)
if err != nil {
return err
}
logger.Debugf("Refs for %s sucessfully fetched", hash)
}

path := fmt.Sprintf("pin/add?arg=%s&recursive=%t", hash, recursive)
_, err = ipfs.post(path)
_, err = ipfs.postCtx(ctx, path)
if err == nil {
logger.Info("IPFS Pin request succeeded: ", hash)
}
Expand All @@ -637,13 +639,15 @@ func (ipfs *Connector) Pin(ctx context.Context, hash *cid.Cid, recursive bool) e
// Unpin performs an unpin request against the configured IPFS
// daemon.
func (ipfs *Connector) Unpin(ctx context.Context, hash *cid.Cid) error {
ctx, cancel := context.WithTimeout(ctx, ipfs.config.UnpinTimeout)
defer cancel()
pinStatus, err := ipfs.PinLsCid(ctx, hash)
if err != nil {
return err
}
if pinStatus.IsPinned() {
path := fmt.Sprintf("pin/rm?arg=%s", hash)
_, err := ipfs.post(path)
_, err := ipfs.postCtx(ctx, path)
if err == nil {
logger.Info("IPFS Unpin request succeeded:", hash)
}
Expand Down Expand Up @@ -710,15 +714,10 @@ func (ipfs *Connector) PinLsCid(ctx context.Context, hash *cid.Cid) (api.IPFSPin
return api.IPFSPinStatusFromString(pinObj.Type), nil
}

func (ipfs *Connector) doPost(client *http.Client, apiURL, path string) (*http.Response, error) {
func (ipfs *Connector) doPostCtx(ctx context.Context, client *http.Client, apiURL, path string) (*http.Response, error) {
logger.Debugf("posting %s", path)
urlstr := fmt.Sprintf("%s/%s", apiURL, path)

ctx, cancel := context.WithCancel(ipfs.ctx)
time.AfterFunc(ipfs.config.ClientPostTimeout, func() {
cancel()
})

req, err := http.NewRequest("POST", urlstr, nil)
if err != nil {
logger.Error("error creating POST request:", err)
Expand Down Expand Up @@ -753,7 +752,11 @@ func checkResponse(path string, code int, body []byte) error {
// the ipfs daemon, reads the full body of the response and
// returns it after checking for errors.
func (ipfs *Connector) post(path string) ([]byte, error) {
res, err := ipfs.doPost(ipfs.client, ipfs.apiURL(), path)
return ipfs.postCtx(ipfs.ctx, path)
}

func (ipfs *Connector) postCtx(ctx context.Context, path string) ([]byte, error) {
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path)
if err != nil {
return nil, err
}
Expand All @@ -766,10 +769,10 @@ func (ipfs *Connector) post(path string) ([]byte, error) {
return body, checkResponse(path, res.StatusCode, body)
}

// postDiscardBody makes a POST requests but discards the body
// postDiscardBodyCtx makes a POST requests but discards the body
// of the response directly after reading it.
func (ipfs *Connector) postDiscardBody(path string) error {
res, err := ipfs.doPost(ipfs.client, ipfs.apiURL(), path)
func (ipfs *Connector) postDiscardBodyCtx(ctx context.Context, path string) error {
res, err := ipfs.doPostCtx(ctx, ipfs.client, ipfs.apiURL(), path)
if err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions pintracker/maptracker/maptracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (mpt *MapPinTracker) Track(c api.Pin) error {
}
}

mpt.set(c.Cid, api.TrackerStatusPinning)
mpt.set(c.Cid, api.TrackerStatusPinQueued)
select {
case mpt.pinCh <- c:
mpt.optracker.trackNewOperationCtx(mpt.ctx, c.Cid, operationPin)
Expand Down Expand Up @@ -326,7 +326,12 @@ func (mpt *MapPinTracker) Untrack(c *cid.Cid) error {
}
}
}
mpt.set(c, api.TrackerStatusUnpinning)

if pinStatus := mpt.get(c); pinStatus.Status == api.TrackerStatusUnpinned {
return nil
}

mpt.set(c, api.TrackerStatusUnpinQueued)
select {
case mpt.unpinCh <- api.PinCid(c):
mpt.optracker.trackNewOperationCtx(mpt.ctx, c, operationUnpin)
Expand Down
Loading

0 comments on commit 348bd9f

Please sign in to comment.