diff --git a/add_test.go b/add_test.go new file mode 100644 index 000000000..39002e824 --- /dev/null +++ b/add_test.go @@ -0,0 +1,96 @@ +package ipfscluster + +// This files has tests for Add* using multiple cluster peers. + +import ( + "mime/multipart" + "testing" + + "github.com/ipfs/ipfs-cluster/api" + "github.com/ipfs/ipfs-cluster/test" +) + +func TestAdd(t *testing.T) { + clusters, mock := createClusters(t) + defer shutdownClusters(t, clusters, mock) + sth := test.NewShardingTestHelper() + defer sth.Clean(t) + + t.Run("local", func(t *testing.T) { + params := api.DefaultAddParams() + params.Shard = false + params.Name = "testlocal" + mfr, closer := sth.GetTreeMultiReader(t) + defer closer.Close() + r := multipart.NewReader(mfr, mfr.Boundary()) + ci, err := clusters[0].AddFile(r, params) + if err != nil { + t.Fatal(err) + } + if ci.String() != test.ShardingDirBalancedRootCID { + t.Fatal("unexpected root CID for local add") + } + + pinDelay() + + f := func(t *testing.T, c *Cluster) { + pin := c.StatusLocal(ci) + if pin.Error != "" { + t.Error(pin.Error) + } + if pin.Status != api.TrackerStatusPinned { + t.Error("item should be pinned") + } + } + + runF(t, clusters, f) + }) +} + +func TestAddPeerDown(t *testing.T) { + clusters, mock := createClusters(t) + defer shutdownClusters(t, clusters, mock) + sth := test.NewShardingTestHelper() + defer sth.Clean(t) + + err := clusters[0].Shutdown() + if err != nil { + t.Fatal(err) + } + + waitForLeaderAndMetrics(t, clusters) + + t.Run("local", func(t *testing.T) { + params := api.DefaultAddParams() + params.Shard = false + params.Name = "testlocal" + mfr, closer := sth.GetTreeMultiReader(t) + defer closer.Close() + r := multipart.NewReader(mfr, mfr.Boundary()) + ci, err := clusters[1].AddFile(r, params) + if err != nil { + t.Fatal(err) + } + if ci.String() != test.ShardingDirBalancedRootCID { + t.Fatal("unexpected root CID for local add") + } + + pinDelay() + + f := func(t *testing.T, c *Cluster) { + if c.id == clusters[0].id { + return + } + + pin := c.StatusLocal(ci) + if pin.Error != "" { + t.Error(pin.Error) + } + if pin.Status != api.TrackerStatusPinned { + t.Error("item should be pinned") + } + } + + runF(t, clusters, f) + }) +} diff --git a/cluster.go b/cluster.go index 1d5b03db8..53f563ac5 100644 --- a/cluster.go +++ b/cluster.go @@ -31,6 +31,8 @@ import ( // consensus layer. var ReadyTimeout = 30 * time.Second +var pingMetricName = "ping" + // Cluster is the main IPFS cluster component. It provides // the go-API for it and orchestrates the components that make up the system. type Cluster struct { @@ -252,7 +254,7 @@ func (c *Cluster) pushPingMetrics() { ticker := time.NewTicker(c.config.MonitorPingInterval) for { metric := api.Metric{ - Name: "ping", + Name: pingMetricName, Peer: c.id, Valid: true, } @@ -279,7 +281,7 @@ func (c *Cluster) alertsHandler() { if err == nil && leader == c.id { logger.Warningf("Peer %s received alert for %s in %s", c.id, alrt.MetricName, alrt.Peer.Pretty()) switch alrt.MetricName { - case "ping": + case pingMetricName: c.repinFromPeer(alrt.Peer) } } diff --git a/rpc_api.go b/rpc_api.go index feb84f438..f6068ec3c 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -202,10 +202,14 @@ func (rpcapi *RPCAPI) BlockAllocate(ctx context.Context, in api.PinSerial, out * // Return the current peer list. if pin.ReplicationFactorMin < 0 { - peers, err := rpcapi.c.consensus.Peers() - if err != nil { - return err + // Returned metrics are Valid and belong to current + // Cluster peers. + metrics := rpcapi.c.monitor.LatestMetrics(pingMetricName) + peers := make([]peer.ID, len(metrics), len(metrics)) + for i, m := range metrics { + peers[i] = m.Peer } + *out = api.PeersToStrings(peers) return nil }