Skip to content

Commit

Permalink
Fix #803: Add "follower_mode" to the config
Browse files Browse the repository at this point in the history
Peers configured with follower_mode = true fail to add/pin/unpin.

Additionally they do not contact other peers when doing Status, Sync or
Recover and report on themselves.

They still contact other peers when doing "peers ls", as this is an OpenRPC
endpoint.

This is merely improving user interaction with a cluster peer and avoids
getting into confusing places:

* pin/unpin seems to work even no one trusts them
* status will query all peers in the peerset only to get auth errors and
ignore them, becoming way slower than it could be

This is not a security feature.
  • Loading branch information
hsanjuan committed Jul 30, 2019
1 parent eace903 commit 084e763
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 8 deletions.
42 changes: 34 additions & 8 deletions cluster.go
Expand Up @@ -43,6 +43,10 @@ const (
reBootstrapInterval = 30 * time.Second
)

var (
errFollowerMode = errors.New("this peer is configured as in follower mode. Write operations are disabled")
)

// Cluster is the main IPFS cluster component. It provides
// the go-API for it and orchestrates the components that make up the system.
type Cluster struct {
Expand Down Expand Up @@ -1183,6 +1187,7 @@ func (c *Cluster) PinGet(ctx context.Context, h cid.Cid) (*api.Pin, error) {
func (c *Cluster) Pin(ctx context.Context, h cid.Cid, opts api.PinOptions) (*api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/Pin")
defer span.End()

ctx = trace.NewContext(c.ctx, span)
pin := api.PinWithOpts(h, opts)
result, _, err := c.pin(ctx, pin, []peer.ID{})
Expand Down Expand Up @@ -1283,6 +1288,10 @@ func (c *Cluster) pin(
ctx, span := trace.StartSpan(ctx, "cluster/pin")
defer span.End()

if c.config.FollowerMode {
return nil, false, errFollowerMode
}

if pin.Cid == cid.Undef {
return pin, false, errors.New("bad pin object")
}
Expand Down Expand Up @@ -1336,6 +1345,10 @@ func (c *Cluster) Unpin(ctx context.Context, h cid.Cid) (*api.Pin, error) {
defer span.End()
ctx = trace.NewContext(c.ctx, span)

if c.config.FollowerMode {
return nil, errFollowerMode
}

logger.Info("IPFS cluster unpinning:", h)
pin, err := c.PinGet(ctx, h)
if err != nil {
Expand Down Expand Up @@ -1423,6 +1436,7 @@ func (c *Cluster) UnpinPath(ctx context.Context, path string) (*api.Pin, error)
// sharded across the entire cluster.
func (c *Cluster) AddFile(reader *multipart.Reader, params *api.AddParams) (cid.Cid, error) {
// TODO: add context param and tracing

var dags adder.ClusterDAGService
if params.Shard {
dags = sharding.New(c.rpcClient, params.PinOptions, nil)
Expand Down Expand Up @@ -1495,10 +1509,16 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
PeerMap: make(map[string]*api.PinInfo),
}

members, err := c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return nil, err
var members []peer.ID
var err error
if c.config.FollowerMode {
members = []peer.ID{c.host.ID()}
} else {
members, err = c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return nil, err
}
}
lenMembers := len(members)

Expand Down Expand Up @@ -1551,10 +1571,16 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) (
infos := make([]*api.GlobalPinInfo, 0)
fullMap := make(map[cid.Cid]*api.GlobalPinInfo)

members, err := c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return nil, err
var members []peer.ID
var err error
if c.config.FollowerMode {
members = []peer.ID{c.host.ID()}
} else {
members, err = c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return nil, err
}
}
lenMembers := len(members)

Expand Down
10 changes: 10 additions & 0 deletions cluster_config.go
Expand Up @@ -36,6 +36,7 @@ const (
DefaultConnMgrHighWater = 400
DefaultConnMgrLowWater = 100
DefaultConnMgrGracePeriod = 2 * time.Minute
DefaultFollowerMode = false
)

// ConnMgrConfig configures the libp2p host connection manager.
Expand Down Expand Up @@ -130,6 +131,11 @@ type Config struct {
// when not wanting to rely on the monitoring system which needs a revamp.
DisableRepinning bool

// FollowerMode disables broadcast requests from this peer
// (sync, recover, status) and disallows pinset management
// operations (Pin/Unpin).
FollowerMode bool

// Peerstore file specifies the file on which we persist the
// libp2p host peerstore addresses. This file is regularly saved.
PeerstoreFile string
Expand Down Expand Up @@ -157,6 +163,7 @@ type configJSON struct {
MonitorPingInterval string `json:"monitor_ping_interval"`
PeerWatchInterval string `json:"peer_watch_interval"`
DisableRepinning bool `json:"disable_repinning"`
FollowerMode bool `json:"follower_mode,omitempty"`
PeerstoreFile string `json:"peerstore_file,omitempty"`
}

Expand Down Expand Up @@ -336,6 +343,7 @@ func (cfg *Config) setDefaults() {
cfg.PeerWatchInterval = DefaultPeerWatchInterval
cfg.DisableRepinning = DefaultDisableRepinning
cfg.PeerstoreFile = "" // empty so it gets ommited.
cfg.FollowerMode = DefaultFollowerMode
cfg.RPCPolicy = DefaultRPCPolicy
}

Expand Down Expand Up @@ -405,6 +413,7 @@ func (cfg *Config) applyConfigJSON(jcfg *configJSON) error {

cfg.LeaveOnShutdown = jcfg.LeaveOnShutdown
cfg.DisableRepinning = jcfg.DisableRepinning
cfg.FollowerMode = jcfg.FollowerMode

return cfg.Validate()
}
Expand Down Expand Up @@ -449,6 +458,7 @@ func (cfg *Config) toConfigJSON() (jcfg *configJSON, err error) {
jcfg.PeerWatchInterval = cfg.PeerWatchInterval.String()
jcfg.DisableRepinning = cfg.DisableRepinning
jcfg.PeerstoreFile = cfg.PeerstoreFile
jcfg.FollowerMode = cfg.FollowerMode

return
}
Expand Down
82 changes: 82 additions & 0 deletions ipfscluster_test.go
Expand Up @@ -6,6 +6,7 @@ import (
"flag"
"fmt"
"math/rand"
"mime/multipart"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -1897,3 +1898,84 @@ func TestClustersDisabledRepinning(t *testing.T) {
t.Errorf("expected %d replicas for pin, got %d", nClusters-2, numPinned)
}
}

func TestClustersFollowerMode(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)

_, err := clusters[0].Pin(ctx, test.Cid1, api.PinOptions{})
if err != nil {
t.Fatal(err)
}
_, err = clusters[0].Pin(ctx, test.ErrorCid, api.PinOptions{})
if err != nil {
t.Fatal(err)
}

// Let the pins arrive
pinDelay()

// Set Cluster1 to follower mode
clusters[1].config.FollowerMode = true

t.Run("follower cannot pin", func(t *testing.T) {
_, err := clusters[1].PinPath(ctx, "/ipfs/"+test.Cid2.String(), api.PinOptions{})
if err != errFollowerMode {
t.Error("expected follower mode error")
}
_, err = clusters[1].Pin(ctx, test.Cid2, api.PinOptions{})
if err != errFollowerMode {
t.Error("expected follower mode error")
}
})

t.Run("follower cannot unpin", func(t *testing.T) {
_, err := clusters[1].UnpinPath(ctx, "/ipfs/"+test.Cid1.String())
if err != errFollowerMode {
t.Error("expected follower mode error")
}
_, err = clusters[1].Unpin(ctx, test.Cid1)
if err != errFollowerMode {
t.Error("expected follower mode error")
}
})

t.Run("follower cannot add", func(t *testing.T) {
sth := test.NewShardingTestHelper()
defer sth.Clean(t)
params := api.DefaultAddParams()
params.Shard = false
params.Name = "testlocal"
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
_, err = clusters[1].AddFile(r, params)
if err != errFollowerMode {
t.Error("expected follower mode error")
}
})

t.Run("follower syncs itself", func(t *testing.T) {
gpis, err := clusters[1].SyncAll(ctx)
if err != nil {
t.Error("sync should work")
}
if len(gpis) != 1 {
t.Fatal("globalPinInfo should have 1 pins (in error)")
}
if len(gpis[0].PeerMap) != 1 {
t.Fatal("globalPinInfo[0] should only have one peer")
}
})

t.Run("follower status itself only", func(t *testing.T) {
gpi, err := clusters[1].Status(ctx, test.Cid1)
if err != nil {
t.Error("status should work")
}
if len(gpi.PeerMap) != 1 {
t.Fatal("globalPinInfo[0] should only have one peer")
}
})
}
4 changes: 4 additions & 0 deletions rpc_api.go
Expand Up @@ -368,6 +368,10 @@ func (rpcapi *ClusterRPCAPI) RecoverLocal(ctx context.Context, in cid.Cid, out *
// BlockAllocate returns allocations for blocks. This is used in the adders.
// It's different from pin allocations when ReplicationFactor < 0.
func (rpcapi *ClusterRPCAPI) BlockAllocate(ctx context.Context, in *api.Pin, out *[]peer.ID) error {
if rpcapi.c.config.FollowerMode {
return errFollowerMode
}

err := rpcapi.c.setupPin(ctx, in)
if err != nil {
return err
Expand Down

0 comments on commit 084e763

Please sign in to comment.