Skip to content
Permalink
Browse files

Fix #803: Add "follower_mode" to the config

Peers configured with follower_mode = true fail to add/pin/unpin.

Additionally they do not contact other peers when doing Status, Sync or
Recover and report on themselves.

They still contact other peers when doing "peers ls", as this is an OpenRPC
endpoint.

This is merely improving user interaction with a cluster peer and avoids
getting into confusing places:

* pin/unpin seems to work even no one trusts them
* status will query all peers in the peerset only to get auth errors and
ignore them, becoming way slower than it could be

This is not a security feature.
  • Loading branch information...
hsanjuan committed Jul 30, 2019
1 parent eace903 commit 084e763468542cec3dc56dd69844bcc4eb25b65e
Showing with 130 additions and 8 deletions.
  1. +34 −8 cluster.go
  2. +10 −0 cluster_config.go
  3. +82 −0 ipfscluster_test.go
  4. +4 −0 rpc_api.go
@@ -43,6 +43,10 @@ const (
reBootstrapInterval = 30 * time.Second
)

var (
errFollowerMode = errors.New("this peer is configured as in follower mode. Write operations are disabled")
)

// Cluster is the main IPFS cluster component. It provides
// the go-API for it and orchestrates the components that make up the system.
type Cluster struct {
@@ -1183,6 +1187,7 @@ func (c *Cluster) PinGet(ctx context.Context, h cid.Cid) (*api.Pin, error) {
func (c *Cluster) Pin(ctx context.Context, h cid.Cid, opts api.PinOptions) (*api.Pin, error) {
_, span := trace.StartSpan(ctx, "cluster/Pin")
defer span.End()

ctx = trace.NewContext(c.ctx, span)
pin := api.PinWithOpts(h, opts)
result, _, err := c.pin(ctx, pin, []peer.ID{})
@@ -1283,6 +1288,10 @@ func (c *Cluster) pin(
ctx, span := trace.StartSpan(ctx, "cluster/pin")
defer span.End()

if c.config.FollowerMode {
return nil, false, errFollowerMode
}

if pin.Cid == cid.Undef {
return pin, false, errors.New("bad pin object")
}
@@ -1336,6 +1345,10 @@ func (c *Cluster) Unpin(ctx context.Context, h cid.Cid) (*api.Pin, error) {
defer span.End()
ctx = trace.NewContext(c.ctx, span)

if c.config.FollowerMode {
return nil, errFollowerMode
}

logger.Info("IPFS cluster unpinning:", h)
pin, err := c.PinGet(ctx, h)
if err != nil {
@@ -1423,6 +1436,7 @@ func (c *Cluster) UnpinPath(ctx context.Context, path string) (*api.Pin, error)
// sharded across the entire cluster.
func (c *Cluster) AddFile(reader *multipart.Reader, params *api.AddParams) (cid.Cid, error) {
// TODO: add context param and tracing

var dags adder.ClusterDAGService
if params.Shard {
dags = sharding.New(c.rpcClient, params.PinOptions, nil)
@@ -1495,10 +1509,16 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
PeerMap: make(map[string]*api.PinInfo),
}

members, err := c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return nil, err
var members []peer.ID
var err error
if c.config.FollowerMode {
members = []peer.ID{c.host.ID()}
} else {
members, err = c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return nil, err
}
}
lenMembers := len(members)

@@ -1551,10 +1571,16 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) (
infos := make([]*api.GlobalPinInfo, 0)
fullMap := make(map[cid.Cid]*api.GlobalPinInfo)

members, err := c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return nil, err
var members []peer.ID
var err error
if c.config.FollowerMode {
members = []peer.ID{c.host.ID()}
} else {
members, err = c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return nil, err
}
}
lenMembers := len(members)

@@ -36,6 +36,7 @@ const (
DefaultConnMgrHighWater = 400
DefaultConnMgrLowWater = 100
DefaultConnMgrGracePeriod = 2 * time.Minute
DefaultFollowerMode = false
)

// ConnMgrConfig configures the libp2p host connection manager.
@@ -130,6 +131,11 @@ type Config struct {
// when not wanting to rely on the monitoring system which needs a revamp.
DisableRepinning bool

// FollowerMode disables broadcast requests from this peer
// (sync, recover, status) and disallows pinset management
// operations (Pin/Unpin).
FollowerMode bool

// Peerstore file specifies the file on which we persist the
// libp2p host peerstore addresses. This file is regularly saved.
PeerstoreFile string
@@ -157,6 +163,7 @@ type configJSON struct {
MonitorPingInterval string `json:"monitor_ping_interval"`
PeerWatchInterval string `json:"peer_watch_interval"`
DisableRepinning bool `json:"disable_repinning"`
FollowerMode bool `json:"follower_mode,omitempty"`
PeerstoreFile string `json:"peerstore_file,omitempty"`
}

@@ -336,6 +343,7 @@ func (cfg *Config) setDefaults() {
cfg.PeerWatchInterval = DefaultPeerWatchInterval
cfg.DisableRepinning = DefaultDisableRepinning
cfg.PeerstoreFile = "" // empty so it gets ommited.
cfg.FollowerMode = DefaultFollowerMode
cfg.RPCPolicy = DefaultRPCPolicy
}

@@ -405,6 +413,7 @@ func (cfg *Config) applyConfigJSON(jcfg *configJSON) error {

cfg.LeaveOnShutdown = jcfg.LeaveOnShutdown
cfg.DisableRepinning = jcfg.DisableRepinning
cfg.FollowerMode = jcfg.FollowerMode

return cfg.Validate()
}
@@ -449,6 +458,7 @@ func (cfg *Config) toConfigJSON() (jcfg *configJSON, err error) {
jcfg.PeerWatchInterval = cfg.PeerWatchInterval.String()
jcfg.DisableRepinning = cfg.DisableRepinning
jcfg.PeerstoreFile = cfg.PeerstoreFile
jcfg.FollowerMode = cfg.FollowerMode

return
}
@@ -6,6 +6,7 @@ import (
"flag"
"fmt"
"math/rand"
"mime/multipart"
"os"
"path/filepath"
"sort"
@@ -1897,3 +1898,84 @@ func TestClustersDisabledRepinning(t *testing.T) {
t.Errorf("expected %d replicas for pin, got %d", nClusters-2, numPinned)
}
}

func TestClustersFollowerMode(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)

_, err := clusters[0].Pin(ctx, test.Cid1, api.PinOptions{})
if err != nil {
t.Fatal(err)
}
_, err = clusters[0].Pin(ctx, test.ErrorCid, api.PinOptions{})
if err != nil {
t.Fatal(err)
}

// Let the pins arrive
pinDelay()

// Set Cluster1 to follower mode
clusters[1].config.FollowerMode = true

t.Run("follower cannot pin", func(t *testing.T) {
_, err := clusters[1].PinPath(ctx, "/ipfs/"+test.Cid2.String(), api.PinOptions{})
if err != errFollowerMode {
t.Error("expected follower mode error")
}
_, err = clusters[1].Pin(ctx, test.Cid2, api.PinOptions{})
if err != errFollowerMode {
t.Error("expected follower mode error")
}
})

t.Run("follower cannot unpin", func(t *testing.T) {
_, err := clusters[1].UnpinPath(ctx, "/ipfs/"+test.Cid1.String())
if err != errFollowerMode {
t.Error("expected follower mode error")
}
_, err = clusters[1].Unpin(ctx, test.Cid1)
if err != errFollowerMode {
t.Error("expected follower mode error")
}
})

t.Run("follower cannot add", func(t *testing.T) {
sth := test.NewShardingTestHelper()
defer sth.Clean(t)
params := api.DefaultAddParams()
params.Shard = false
params.Name = "testlocal"
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
_, err = clusters[1].AddFile(r, params)
if err != errFollowerMode {
t.Error("expected follower mode error")
}
})

t.Run("follower syncs itself", func(t *testing.T) {
gpis, err := clusters[1].SyncAll(ctx)
if err != nil {
t.Error("sync should work")
}
if len(gpis) != 1 {
t.Fatal("globalPinInfo should have 1 pins (in error)")
}
if len(gpis[0].PeerMap) != 1 {
t.Fatal("globalPinInfo[0] should only have one peer")
}
})

t.Run("follower status itself only", func(t *testing.T) {
gpi, err := clusters[1].Status(ctx, test.Cid1)
if err != nil {
t.Error("status should work")
}
if len(gpi.PeerMap) != 1 {
t.Fatal("globalPinInfo[0] should only have one peer")
}
})
}
@@ -368,6 +368,10 @@ func (rpcapi *ClusterRPCAPI) RecoverLocal(ctx context.Context, in cid.Cid, out *
// BlockAllocate returns allocations for blocks. This is used in the adders.
// It's different from pin allocations when ReplicationFactor < 0.
func (rpcapi *ClusterRPCAPI) BlockAllocate(ctx context.Context, in *api.Pin, out *[]peer.ID) error {
if rpcapi.c.config.FollowerMode {
return errFollowerMode
}

err := rpcapi.c.setupPin(ctx, in)
if err != nil {
return err

0 comments on commit 084e763

Please sign in to comment.
You can’t perform that action at this time.