Skip to content

Commit

Permalink
Pre-create and pre-connect hosts in tests
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Hector Sanjuan <code@hector.link>
  • Loading branch information
hsanjuan committed Mar 29, 2018
1 parent d8e050a commit 9fa4aa0
Showing 1 changed file with 39 additions and 25 deletions.
64 changes: 39 additions & 25 deletions ipfscluster_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ipfscluster

import (
"context"
"flag"
"fmt"
"math/rand"
Expand All @@ -22,9 +23,11 @@ import (
"github.com/ipfs/ipfs-cluster/state"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/test"
peerstore "github.com/libp2p/go-libp2p-peerstore"

cid "github.com/ipfs/go-cid"
crypto "github.com/libp2p/go-libp2p-crypto"
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)
Expand Down Expand Up @@ -124,16 +127,16 @@ func createComponents(t *testing.T, i int, clusterSecret []byte) (*Config, *raft
return clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, mock
}

func createCluster(t *testing.T, clusterCfg *Config, consensusCfg *raft.Config, api API, ipfs IPFSConnector, state state.State, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer) *Cluster {
cl, err := NewCluster(nil, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf)
func createCluster(t *testing.T, host host.Host, clusterCfg *Config, consensusCfg *raft.Config, api API, ipfs IPFSConnector, state state.State, tracker PinTracker, mon PeerMonitor, alloc PinAllocator, inf Informer) *Cluster {
cl, err := NewCluster(host, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf)
checkErr(t, err)
<-cl.Ready()
return cl
}

func createOnePeerCluster(t *testing.T, nth int, clusterSecret []byte) (*Cluster, *test.IpfsMock) {
clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf, mock := createComponents(t, nth, clusterSecret)
cl := createCluster(t, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf)
cl := createCluster(t, nil, clusterCfg, consensusCfg, api, ipfs, state, tracker, mon, alloc, inf)
return cl, mock
}

Expand All @@ -149,6 +152,8 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
allocs := make([]PinAllocator, nClusters, nClusters)
infs := make([]Informer, nClusters, nClusters)
ipfsMocks := make([]*test.IpfsMock, nClusters, nClusters)

hosts := make([]host.Host, nClusters, nClusters)
clusters := make([]*Cluster, nClusters, nClusters)

// Uncomment when testing with fixed ports
Expand Down Expand Up @@ -197,36 +202,43 @@ func createClusters(t *testing.T) ([]*Cluster, []*test.IpfsMock) {
// ----------------------------------------------

// Alternative way of starting using bootstrap
// Create hosts
var err error
for i := 0; i < nClusters; i++ {
hosts[i], err = NewClusterHost(context.Background(), cfgs[i])
if err != nil {
t.Fatal(err)
}
}

// open connections among all hosts
for _, h := range hosts {
for _, h2 := range hosts {
if h.ID() != h2.ID() {

h.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
_, err := h.Network().DialPeer(context.Background(), h2.ID())
if err != nil {
t.Fatal(err)
}
}
}
}

// Start first node
clusters[0] = createCluster(t, cfgs[0], concfgs[0], apis[0], ipfss[0], states[0], trackers[0], mons[0], allocs[0], infs[0])
clusters[0] = createCluster(t, hosts[0], cfgs[0], concfgs[0], apis[0], ipfss[0], states[0], trackers[0], mons[0], allocs[0], infs[0])
// Find out where it binded
bootstrapAddr, _ := ma.NewMultiaddr(fmt.Sprintf("%s/ipfs/%s", clusters[0].host.Addrs()[0], clusters[0].id.Pretty()))
// Use first node to bootstrap
for i := 1; i < nClusters; i++ {
cfgs[i].Bootstrap = []ma.Multiaddr{bootstrapAddr}
}
time.Sleep(200 * time.Millisecond)
waitForLeaderLoop(t, clusters[0:1])

// Start the rest
// We don't do this in parallel because it causes libp2p dial backoffs
for i := 1; i < nClusters; i++ {
clusters[i] = createCluster(t, cfgs[i], concfgs[i], apis[i], ipfss[i], states[i], trackers[i], mons[i], allocs[i], infs[i])
time.Sleep(200 * time.Millisecond)
}

// open connections among all peers. This ensures smoother operations.
// Best effort. Some errors do happen.
for _, c := range clusters {
peers, err := c.consensus.Peers()
if err != nil {
shutdownClusters(t, clusters, ipfsMocks)
t.Fatal(err)
}
for _, p := range peers {
if p != c.id {
c.host.Network().DialPeer(c.ctx, p)
}
}
clusters[i] = createCluster(t, hosts[i], cfgs[i], concfgs[i], apis[i], ipfss[i], states[i], trackers[i], mons[i], allocs[i], infs[i])
}

// ---------------------------------------------
Expand Down Expand Up @@ -288,9 +300,13 @@ func ttlDelay() {
// Makes sure new metrics have come in for the new leader.
func waitForLeader(t *testing.T, clusters []*Cluster) {
ttlDelay()
waitForLeaderLoop(t, clusters)
ttlDelay()
}

func waitForLeaderLoop(t *testing.T, clusters []*Cluster) {
timer := time.NewTimer(time.Minute)
ticker := time.NewTicker(time.Second / 4)
ticker := time.NewTicker(100 * time.Millisecond)

loop:
for {
Expand All @@ -310,8 +326,6 @@ loop:
break loop
}
}

ttlDelay()
}

func TestClustersVersion(t *testing.T) {
Expand Down

0 comments on commit 9fa4aa0

Please sign in to comment.