Skip to content

Commit

Permalink
Merge 28d5b79 into aced97c
Browse files Browse the repository at this point in the history
  • Loading branch information
ZenGround0 committed Nov 7, 2017
2 parents aced97c + 28d5b79 commit f088c5a
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 0 deletions.
4 changes: 4 additions & 0 deletions consensus/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ func makeDataFolder(baseDir, folder string) (string, error) {
return folder, nil
}

func MakeServerConf(peers []peer.ID) hraft.Configuration {
return makeServerConf(peers)
}

// create Raft servers configuration
func makeServerConf(peers []peer.ID) hraft.Configuration {
sm := make(map[string]struct{})
Expand Down
125 changes: 125 additions & 0 deletions ipfs-cluster-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bufio"
"bytes"
"errors"
"fmt"
"os"
Expand All @@ -10,11 +11,20 @@ import (
"path/filepath"
"syscall"

"context"

// _ "net/http/pprof"

logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
"github.com/urfave/cli"
libp2praft "github.com/libp2p/go-libp2p-raft"
host "github.com/libp2p/go-libp2p-host"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
peerstore "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
hraft "github.com/hashicorp/raft"
msgpack "github.com/multiformats/go-multicodec/msgpack"

ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
Expand Down Expand Up @@ -112,6 +122,7 @@ var (
configPath string
)


func init() {
// Set the right commit. The only way I could make this work
ipfscluster.Commit = commit
Expand Down Expand Up @@ -231,6 +242,74 @@ configuration.
Usage: "run the IPFS Cluster peer (default)",
Action: daemon,
},
{
Name: "migration",
Usage: "migrate the IPFS Cluster state between versions",
Description: `
This command migrates the internal state of the ipfs-cluster node to the state
specified in a backup file. The state format is migrated from the version
of the backup file to the version supported by the current cluster version.
To succesfully run a migration of an entire cluster, shut down each peer without
removal, migrate using this command, and restart every peer.
`,
ArgsUsage: "<BACKUP-FILE-PATH> [RAFT-DATA-DIR]",
Action: func(c *cli.Context) error {
if c.NArg() < 1 || c.NArg() > 2 {
return fmt.Errorf("Usage: <BACKUP-FILE-PATH> [RAFT-DATA-DIR]")

}
//Load configs
cfg, clusterCfg, _, _, consensusCfg, _, _, _ := makeConfigs()
err := cfg.LoadJSONFromFile(configPath)
checkErr("loading configuration", err)
backupFilePath := c.Args().First()
var raftDataPath string
if c.NArg() == 1 {
raftDataPath = consensusCfg.DataFolder
} else {
raftDataPath = c.Args().Get(1)
}

//Migrate backup to new state
backup, err := os.Open(backupFilePath)
checkErr("opening backup file", err)
defer backup.Close()
r := bufio.NewReader(backup)
newState := mapstate.NewMapState()
err = newState.Restore(r)
checkErr("migrating state to alternate version", err)
// Encode state to save as a snapshot
newStateBytes, err := encodeState(*newState)
checkErr("encoding new state", err)

//Remove raft state
err = cleanupRaft(raftDataPath)
checkErr("cleaningup Raft", err)

//Write snapshot of the migrated state
snapshotStore, err := hraft.NewFileSnapshotStoreWithLogger(raftDataPath, 5, nil)
checkErr("creating snapshot store", err)
dummyHost, err := makeDummyHost(clusterCfg)
checkErr("creating dummy host for snapshot store", err)
dummyTransport, err := libp2praft.NewLibp2pTransport(dummyHost, consensusCfg.NetworkTimeout)
checkErr("creating dummy transport for snapshot store", err)
var raftSnapVersion hraft.SnapshotVersion
raftSnapVersion = 1 // As of v1.0.0 this is always 1
raftIndex := uint64(1) // We reset history to the beginning
raftTerm := uint64(1) // We reset history to the beginning
configIndex := uint64(1) // We reset history to the beginning
srvCfg := raft.MakeServerConf(dummyHost.Peerstore().Peers())
sink, err := snapshotStore.Create(raftSnapVersion, raftIndex, raftTerm, srvCfg, configIndex, dummyTransport)
checkErr("Creating a temporary snapshot store", err)
_, err = sink.Write(newStateBytes)
checkErr("Writing snapshot to sink", err)
err = sink.Close()
checkErr("Closing sink", err)

return nil
},
},
}

app.Before = func(c *cli.Context) error {
Expand Down Expand Up @@ -328,6 +407,17 @@ func daemon(c *cli.Context) error {
return nil
}

func cleanupRaft(raftDataDir string) error {
raftDB := filepath.Join(raftDataDir, "raft.db")
snapShotDir := filepath.Join(raftDataDir, "snapshots")
err := os.Remove(raftDB)
if err != nil {
return err
}
err = os.RemoveAll(snapShotDir)
return err
}

var facilities = []string{
"service",
"cluster",
Expand Down Expand Up @@ -434,3 +524,38 @@ func makeConfigs() (*config.Manager, *ipfscluster.Config, *rest.Config, *ipfshtt
cfg.RegisterComponent(config.Informer, numpinInfCfg)
return cfg, clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, monCfg, diskInfCfg, numpinInfCfg
}

func makeDummyHost(cfg *ipfscluster.Config) (host.Host, error) {
ps := peerstore.NewPeerstore()
for _, m := range cfg.Peers {
pid, decapAddr, err := ipfscluster.MultiaddrSplit(m)
if err != nil {
return nil, err
}
ps.AddAddr(pid, decapAddr, peerstore.PermanentAddrTTL)
}
ps.AddAddr(cfg.ID, cfg.ListenAddr, peerstore.PermanentAddrTTL)

network, err := swarm.NewNetwork(
context.Background(),
[]ma.Multiaddr{cfg.ListenAddr},
cfg.ID,
ps,
nil,
)
if err != nil {
return nil, err
}

bhost := basichost.New(network)
return bhost, nil
}

func encodeState(state mapstate.MapState) ([]byte, error) {
buf := new(bytes.Buffer)
enc := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Encoder(buf)
if err := enc.Encode(state); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
7 changes: 7 additions & 0 deletions sharness/lib/test-lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ cluster_id() {
jq --raw-output ".cluster.id" test-config/service.json
}

# Note: should only be called in when CLUSTER prereq is already true because
# it depends on test-config existing to add the temporary v1State file.
test_create_v1State() {
echo '{ Version: 1, PinMap { "QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH": {} }}' > test-config/v1State
test_set_prereq V1STATE
}

# Cleanup functions
test_clean_ipfs(){
docker kill ipfs
Expand Down
33 changes: 33 additions & 0 deletions sharness/t0050-service-migration.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#! /bin/sh

test_description="Test service migration v1 -> v2 and v2 -> v2"

. lib/test-lib.sh

test_ipfs_init
cleanup test_clean_ipfs
test_cluster_init
test_create_v1State
cleanup test_clean_cluster

test_expect_success IPFS,CLUSTER "cluster-service state preserved by migration" '
cid=`docker exec ipfs sh -c "echo test | ipfs add -q"`
ipfs-cluster-ctl pin add "$cid" &> test4 && sleep 2 &&
kill -1 CLUSTER_D_PID && sleep 2 &&
backup-file=$( ls test-config/backups/ | head -n 1)
ipfs-cluster-service migration "test-config/backups/"$(backup-file) "test-config/ipfs-cluster-data"
ipfs-cluster-service --config "test-config" >"$IPFS_OUTPUT" 2>&1 &
ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" &&
ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED"
'

test_expect_success IPFS,CLUSTER,V1STATE "cluster-service loads v1 state correctly" '
cid=`docker exec ipfs sh -c "echo test | ipfs add -q"`
kill -1 CLUSTER_D_PID &&
ipfs-cluster-service migration "test-config/v1State" "test-config/ipfs-cluster-data"
ipfs-cluster-service --config "test-config" >"$IPFS_OUTPUT" 2>&1 &
ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" &&
ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED"
'

test_done
4 changes: 4 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func copyEmptyStructToIfaces(in []struct{}) []interface{} {
return ifaces
}

func MultiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) {
return multiaddrSplit(addr)
}

func multiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) {
pid, err := addr.ValueForProtocol(ma.P_IPFS)
if err != nil {
Expand Down

0 comments on commit f088c5a

Please sign in to comment.