| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,245 @@ | ||
| package cluster_test | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "os" | ||
| "testing" | ||
| "time" | ||
|
|
||
| icid "github.com/ipfs/go-cid" | ||
| icore "github.com/ipfs/go-ipfs/core" | ||
| "github.com/ipfs/go-ipfs/repo/fsrepo" | ||
| "github.com/textileio/go-textile/core" | ||
| "github.com/textileio/go-textile/ipfs" | ||
| "github.com/textileio/go-textile/keypair" | ||
| "github.com/textileio/go-textile/pb" | ||
| "github.com/textileio/go-textile/repo/config" | ||
| ) | ||
|
|
||
| var vars = struct { | ||
| repoPath1 string | ||
| repoPath2 string | ||
|
|
||
| node1 *core.Textile | ||
| node2 *core.Textile | ||
|
|
||
| cid icid.Cid | ||
| }{ | ||
| repoPath1: "testdata/.cluster1", | ||
| repoPath2: "testdata/.cluster2", | ||
|
|
||
| cid: icid.Undef, | ||
| } | ||
|
|
||
| func TestInitCluster(t *testing.T) { | ||
| _ = os.RemoveAll(vars.repoPath1) | ||
| _ = os.RemoveAll(vars.repoPath2) | ||
|
|
||
| accnt1 := keypair.Random() | ||
| accnt2 := keypair.Random() | ||
|
|
||
| swarmPort1 := core.GetRandomPort() | ||
| swarmPort2 := core.GetRandomPort() | ||
|
|
||
| err := core.InitRepo(core.InitConfig{ | ||
| Account: accnt1, | ||
| RepoPath: vars.repoPath1, | ||
| ApiAddr: fmt.Sprintf("127.0.0.1:%s", core.GetRandomPort()), | ||
| SwarmPorts: swarmPort1, | ||
| Cluster: true, | ||
| ClusterBindMultiaddr: "/ip4/0.0.0.0/tcp/9096", | ||
| Debug: true, | ||
| }) | ||
| if err != nil { | ||
| t.Fatalf("init node1 failed: %s", err) | ||
| } | ||
| err = core.InitRepo(core.InitConfig{ | ||
| Account: accnt2, | ||
| RepoPath: vars.repoPath2, | ||
| ApiAddr: fmt.Sprintf("127.0.0.1:%s", core.GetRandomPort()), | ||
| SwarmPorts: swarmPort2, | ||
| Cluster: true, | ||
| ClusterBindMultiaddr: "/ip4/0.0.0.0/tcp/9097", | ||
| Debug: true, | ||
| }) | ||
| if err != nil { | ||
| t.Fatalf("init node2 failed: %s", err) | ||
| } | ||
|
|
||
| // update bootstraps | ||
| addr1, err := getPeerAddress(vars.repoPath1, swarmPort1) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| addr2, err := getPeerAddress(vars.repoPath2, swarmPort2) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| err = updateClusterBootstraps(vars.repoPath1, []string{addr2}) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| err = updateClusterBootstraps(vars.repoPath2, []string{addr1}) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| } | ||
|
|
||
| func TestStartCluster(t *testing.T) { | ||
| var err error | ||
| vars.node1, err = core.NewTextile(core.RunConfig{ | ||
| RepoPath: vars.repoPath1, | ||
| Debug: true, | ||
| }) | ||
| if err != nil { | ||
| t.Fatalf("create node1 failed: %s", err) | ||
| } | ||
| vars.node2, err = core.NewTextile(core.RunConfig{ | ||
| RepoPath: vars.repoPath2, | ||
| Debug: true, | ||
| }) | ||
| if err != nil { | ||
| t.Fatalf("create node2 failed: %s", err) | ||
| } | ||
|
|
||
| // set cluster logs to debug | ||
| level := &pb.LogLevel{ | ||
| Systems: map[string]pb.LogLevel_Level{ | ||
| "cluster": pb.LogLevel_DEBUG, | ||
| }, | ||
| } | ||
| err = vars.node1.SetLogLevel(level) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| err = vars.node2.SetLogLevel(level) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| // start nodes | ||
| err = vars.node1.Start() | ||
| if err != nil { | ||
| t.Fatalf("start node1 failed: %s", err) | ||
| } | ||
| <-vars.node1.OnlineCh() | ||
| <-vars.node1.Cluster().Ready() | ||
|
|
||
| // let node1 warm up | ||
| timer := time.NewTimer(time.Second * 5) | ||
| <-timer.C | ||
|
|
||
| err = vars.node2.Start() | ||
| if err != nil { | ||
| t.Fatalf("start node2 failed: %s", err) | ||
| } | ||
| <-vars.node2.OnlineCh() | ||
| <-vars.node2.Cluster().Ready() | ||
|
|
||
| // let node2 warm up | ||
| timer = time.NewTimer(time.Second * 5) | ||
| <-timer.C | ||
|
|
||
| // pin some data to node1 | ||
| cid, err := pinTestData(vars.node1.Ipfs()) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| vars.cid = *cid | ||
| } | ||
|
|
||
| func TestTextileClusterPeers(t *testing.T) { | ||
| ctx, cancel := context.WithTimeout(vars.node1.Ipfs().Context(), time.Minute) | ||
| defer cancel() | ||
|
|
||
| var ok bool | ||
| for _, p := range vars.node1.Cluster().Peers(ctx) { | ||
| if p.ID.Pretty() == vars.node2.Ipfs().Identity.Pretty() { | ||
| ok = true | ||
| break | ||
| } | ||
| } | ||
| if !ok { | ||
| t.Fatal("node2 not found in node1's peers") | ||
| } | ||
| ok = false | ||
| for _, p := range vars.node2.Cluster().Peers(ctx) { | ||
| if p.ID.Pretty() == vars.node1.Ipfs().Identity.Pretty() { | ||
| ok = true | ||
| break | ||
| } | ||
| } | ||
| if !ok { | ||
| t.Fatal("node1 not found in node2's peers") | ||
| } | ||
| } | ||
|
|
||
| func TestTextileClusterSync(t *testing.T) { | ||
| ctx, cancel := context.WithTimeout(vars.node1.Ipfs().Context(), time.Minute) | ||
| defer cancel() | ||
|
|
||
| _, err := vars.node1.Cluster().SyncAll(ctx) | ||
| if err != nil { | ||
| t.Fatalf("sync all failed: %s", err) | ||
| } | ||
|
|
||
| err = vars.node1.Cluster().StateSync(ctx) | ||
| if err != nil { | ||
| t.Fatalf("state sync failed: %s", err) | ||
| } | ||
|
|
||
| info, err := vars.node1.Cluster().Status(ctx, vars.cid) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| fmt.Println(info.String()) | ||
| } | ||
|
|
||
| func TestTextileCluster_Stop(t *testing.T) { | ||
| err := vars.node1.Stop() | ||
| if err != nil { | ||
| t.Fatalf("stop node1 failed: %s", err) | ||
| } | ||
| err = vars.node2.Stop() | ||
| if err != nil { | ||
| t.Fatalf("stop node2 failed: %s", err) | ||
| } | ||
| } | ||
|
|
||
| func TestTextileCluster_Teardown(t *testing.T) { | ||
| vars.node1 = nil | ||
| vars.node2 = nil | ||
| } | ||
|
|
||
| func getPeerAddress(repoPath, swarmPort string) (string, error) { | ||
| r, err := fsrepo.Open(repoPath) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| defer r.Close() | ||
| id, err := r.GetConfigKey("Identity.PeerID") | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| return fmt.Sprintf("/ip4/127.0.0.1/tcp/%s/ipfs/%s", swarmPort, id), nil | ||
| } | ||
|
|
||
| func updateClusterBootstraps(repoPath string, bootstraps []string) error { | ||
| conf, err := config.Read(repoPath) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| conf.Cluster.Bootstraps = bootstraps | ||
| return config.Write(repoPath, conf) | ||
| } | ||
|
|
||
| func pinTestData(node *icore.IpfsNode) (*icid.Cid, error) { | ||
| f, err := os.Open("../mill/testdata/image.jpeg") | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer f.Close() | ||
|
|
||
| return ipfs.AddData(node, f, true, false) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| package core | ||
|
|
||
| import ( | ||
| "context" | ||
| "time" | ||
|
|
||
| util "github.com/ipfs/go-ipfs-util" | ||
| ipfscluster "github.com/ipfs/ipfs-cluster" | ||
| capi "github.com/ipfs/ipfs-cluster/api" | ||
| "github.com/ipfs/ipfs-cluster/consensus/raft" | ||
| "github.com/ipfs/ipfs-cluster/monitor/pubsubmon" | ||
| "github.com/ipfs/ipfs-cluster/observations" | ||
| peer "github.com/libp2p/go-libp2p-peer" | ||
| "github.com/textileio/go-textile/cluster" | ||
| ) | ||
|
|
||
| func (t *Textile) clusterExists() bool { | ||
| return util.FileExists(cluster.ConfigPath(t.repoPath)) | ||
| } | ||
|
|
||
| // startCluster creates all the necessary things to produce the cluster object | ||
| func (t *Textile) startCluster() error { | ||
| cfgMgr, cfgs, err := cluster.MakeAndLoadConfigs(t.repoPath) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer cfgMgr.Shutdown() | ||
|
|
||
| cfgs.ClusterCfg.LeaveOnShutdown = false | ||
|
|
||
| tracker, err := cluster.SetupPinTracker( | ||
| "map", | ||
| t.node.PeerHost, | ||
| cfgs.MaptrackerCfg, | ||
| cfgs.StatelessTrackerCfg, | ||
| cfgs.ClusterCfg.Peername, | ||
| ) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| informer, alloc, err := cluster.SetupAllocation( | ||
| "disk-freespace", | ||
| cfgs.DiskInfCfg, | ||
| cfgs.NumpinInfCfg, | ||
| ) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| ipfscluster.ReadyTimeout = raft.DefaultWaitForLeaderTimeout + 5*time.Second | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can ignore this (like set to 5 seconds or leave default). Raft is not involved anyways. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| cons, err := cluster.SetupConsensus( | ||
| t.node.PeerHost, | ||
| t.node.DHT, | ||
| t.node.PubSub, | ||
| cfgs.CrdtCfg, | ||
| t.node.Repo.Datastore(), | ||
| ) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| tracer, err := observations.SetupTracing(cfgs.TracingCfg) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| var peersF func(context.Context) ([]peer.ID, error) | ||
| mon, err := pubsubmon.New(t.node.Context(), cfgs.PubsubmonCfg, t.node.PubSub, peersF) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| connector, err := cluster.NewConnector(t.node, func(ctx context.Context) []*capi.ID { | ||
| return t.cluster.Peers(ctx) | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| t.cluster, err = ipfscluster.NewCluster( | ||
| t.node.Context(), | ||
| t.node.PeerHost, | ||
| t.node.DHT, | ||
| cfgs.ClusterCfg, | ||
| t.node.Repo.Datastore(), | ||
| cons, | ||
| nil, | ||
| connector, | ||
| tracker, | ||
| mon, | ||
| alloc, | ||
| informer, | ||
| tracer, | ||
| ) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| bootstraps, err := cluster.ParseBootstraps(t.config.Cluster.Bootstraps) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // noop if no bootstraps | ||
| // if bootstrapping fails, consensus will never be ready | ||
| // and timeout. So this can happen in background and we | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not true for CRDT consensus. it will work regardless of bootstrap. Old comment in cluster, we will fix it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| // avoid worrying about error handling here (since Cluster | ||
| // will realize). | ||
| go cluster.Bootstrap(t.node.Context(), t.cluster, cons, bootstraps) | ||
|
|
||
| return nil | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| package core | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| "github.com/ipfs/go-ipfs/core" | ||
| "github.com/ipfs/go-ipfs/core/node/libp2p" | ||
| "github.com/ipfs/go-ipfs/plugin/loader" | ||
| "github.com/ipfs/go-ipfs/repo/fsrepo" | ||
| ) | ||
|
|
||
| // createIPFS creates an IPFS node | ||
| func (t *Textile) createIPFS(plugins *loader.PluginLoader, online bool) error { | ||
| rep, err := fsrepo.Open(t.repoPath) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| routing := libp2p.DHTClientOption | ||
| if t.Server() { | ||
| routing = libp2p.DHTOption | ||
| } | ||
|
|
||
| cctx, _ := context.WithCancel(context.Background()) | ||
| nd, err := core.NewNode(cctx, &core.BuildCfg{ | ||
| Repo: rep, | ||
| Permanent: true, // temporary way to signify that node is permanent | ||
| Online: online, | ||
| ExtraOpts: map[string]bool{ | ||
| "pubsub": true, | ||
| "ipnsps": true, | ||
| "mplex": true, | ||
| }, | ||
| Routing: routing, | ||
| }) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| nd.IsDaemon = true | ||
|
|
||
| if t.node != nil { | ||
| err = t.node.Close() | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| t.node = nd | ||
|
|
||
| return nil | ||
| } |
Large diffs are not rendered by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
technically cluster won't bind to anything as it re-uses your already existing ipfs peer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, ok. That makes sense. I can remove this flag then.