Skip to content

Commit

Permalink
Migration cli command
Browse files Browse the repository at this point in the history
ipfs-cluster-service now has a migration subcommand that will
handle loading a json encoded state backup file into ipfs-cluster's
raft state.  If all cluster members shutdown, perform this operation
on the same state file and restart, then they will see this (possibly
different) state upon restart.  Migrations automatically update any version
of state in the backup file to the most recent version of the state, and
therefore should be run cluster-wide during a software update.

Cli command and snapshotting:
ipfs-cluster-service now has a migration subcommand that will
handle migrating a state to a new version, and upgrading all
raft state to refer to the new version of state.

migration cli command and snapshotting first draft
Remaining: evaluate and set up a proper migration framework so
changing the state is enforced to be backwards compatible and requires
little new code (2 migration functions between most recent state and update)

adding version to migration file

Fixing errors

more errors

more errors

Compile errors fixed

Fix argparse and uninit fsm errors

Work around fsm reflection errors and fix peer diff
  • Loading branch information
ZenGround0 committed Nov 7, 2017
1 parent aced97c commit eed5e3c
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 0 deletions.
4 changes: 4 additions & 0 deletions consensus/raft/raft.go
Expand Up @@ -182,6 +182,10 @@ func makeDataFolder(baseDir, folder string) (string, error) {
return folder, nil
}

func MakeServerConf(peers []peer.ID) hraft.Configuration {
return makeServerConf(peers)
}

// create Raft servers configuration
func makeServerConf(peers []peer.ID) hraft.Configuration {
sm := make(map[string]struct{})
Expand Down
136 changes: 136 additions & 0 deletions ipfs-cluster-service/main.go
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bufio"
"bytes"
"errors"
"fmt"
"os"
Expand All @@ -10,11 +11,20 @@ import (
"path/filepath"
"syscall"

"context"

// _ "net/http/pprof"

logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
"github.com/urfave/cli"
libp2praft "github.com/libp2p/go-libp2p-raft"
host "github.com/libp2p/go-libp2p-host"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
peerstore "github.com/libp2p/go-libp2p-peerstore"
swarm "github.com/libp2p/go-libp2p-swarm"
hraft "github.com/hashicorp/raft"
msgpack "github.com/multiformats/go-multicodec/msgpack"

ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
Expand Down Expand Up @@ -112,6 +122,7 @@ var (
configPath string
)


func init() {
// Set the right commit. The only way I could make this work
ipfscluster.Commit = commit
Expand Down Expand Up @@ -231,6 +242,74 @@ configuration.
Usage: "run the IPFS Cluster peer (default)",
Action: daemon,
},
{
Name: "migration",
Usage: "migrate the IPFS Cluster state between versions",
Description: `
This command migrates the internal state of the ipfs-cluster node to the state
specified in a backup file. The state format is migrated from the version
of the backup file to the version supported by the current cluster version.
To succesfully run a migration of an entire cluster, shut down each peer without
removal, migrate using this command, and restart every peer.
`,
ArgsUsage: "<BACKUP-FILE-PATH> [RAFT-DATA-DIR]",
Action: func(c *cli.Context) error {
if c.NArg() < 1 || c.NArg() > 2 {
return fmt.Errorf("Usage: <BACKUP-FILE-PATH> [RAFT-DATA-DIR]")

}
//Load configs
cfg, clusterCfg, _, _, consensusCfg, _, _, _ := makeConfigs()
err := cfg.LoadJSONFromFile(configPath)
checkErr("loading configuration", err)
backupFilePath := c.Args().First()
var raftDataPath string
if c.NArg() == 1 {
raftDataPath = consensusCfg.DataFolder
} else {
raftDataPath = c.Args().Get(1)
}

//Migrate backup to new state
backup, err := os.Open(backupFilePath)
checkErr("opening backup file", err)
defer backup.Close()
r := bufio.NewReader(backup)
newState := mapstate.NewMapState()
err = newState.Restore(r)
checkErr("migrating state to alternate version", err)
// Encode state to save as a snapshot
newStateBytes, err := encodeState(*newState)
checkErr("encoding new state", err)

//Remove raft state
err = cleanupRaft(raftDataPath)
checkErr("cleaningup Raft", err)

//Write snapshot of the migrated state
snapshotStore, err := hraft.NewFileSnapshotStoreWithLogger(raftDataPath, 5, nil)
checkErr("creating snapshot store", err)
dummyHost, err := makeDummyHost(clusterCfg)
checkErr("creating dummy host for snapshot store", err)
dummyTransport, err := libp2praft.NewLibp2pTransport(dummyHost, consensusCfg.NetworkTimeout)
checkErr("creating dummy transport for snapshot store", err)
var raftSnapVersion hraft.SnapshotVersion
raftSnapVersion = 1 // As of v1.0.0 this is always 1
raftIndex := uint64(1) // We reset history to the beginning
raftTerm := uint64(1) // We reset history to the beginning
configIndex := uint64(1) // We reset history to the beginning
srvCfg := raft.MakeServerConf(dummyHost.Peerstore().Peers())
sink, err := snapshotStore.Create(raftSnapVersion, raftIndex, raftTerm, srvCfg, configIndex, dummyTransport)
checkErr("Creating a temporary snapshot store", err)
_, err = sink.Write(newStateBytes)
checkErr("Writing snapshot to sink", err)
err = sink.Close()
checkErr("Closing sink", err)

return nil
},
},
}

app.Before = func(c *cli.Context) error {
Expand Down Expand Up @@ -328,6 +407,24 @@ func daemon(c *cli.Context) error {
return nil
}

func cleanupRaft(raftDataDir string) error {
raftDB := filepath.Join(raftDataDir, "raft.db")
snapShotDir := filepath.Join(raftDataDir, "snapshots")
err := os.Remove(raftDB)
if err != nil {
return err
}
err = os.RemoveAll(snapShotDir)

// TODO we need to create the snapshot dir so that it can be writeable by the
// next ipfs-cluster-service process. Ideally it's not globally writeable for
// security (escalation to putting any state in ipfs cluster raft logs) but
// this will need some thought. What does hraft do so that future processes
// can edit the snapshot dir after shutting down a raft peer?

return err
}

var facilities = []string{
"service",
"cluster",
Expand Down Expand Up @@ -434,3 +531,42 @@ func makeConfigs() (*config.Manager, *ipfscluster.Config, *rest.Config, *ipfshtt
cfg.RegisterComponent(config.Informer, numpinInfCfg)
return cfg, clusterCfg, apiCfg, ipfshttpCfg, consensusCfg, monCfg, diskInfCfg, numpinInfCfg
}

func makeDummyHost(cfg *ipfscluster.Config) (host.Host, error) {
ps := peerstore.NewPeerstore()
for _, m := range cfg.Peers {
pid, decapAddr, err := ipfscluster.MultiaddrSplit(m)
if err != nil {
return nil, err
}
ps.AddAddr(pid, decapAddr, peerstore.PermanentAddrTTL)
}
pid, decapAddr, err := ipfscluster.MultiaddrSplit(cfg.ID)
if err != nil {
return nil, err
}
ps.AddAddr(pid, decapAddr, peerstore.PermanentAddrTTL)

network, err := swarm.NewNetwork(
context.Background(),
[]ma.Multiaddr{cfg.ListenAddr},
cfg.ID,
ps,
nil,
)
if err != nil {
return nil, err
}

bhost := basichost.New(network)
return bhost, nil
}

func encodeState(state mapstate.MapState) ([]byte, error) {
buf := new(bytes.Buffer)
enc := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Encoder(buf)
if err := enc.Encode(state); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
5 changes: 5 additions & 0 deletions state/mapstate/migrate.go
Expand Up @@ -12,7 +12,12 @@ type mapStateV1 struct {
PinMap map[string]struct{}
}


const currentV int = 1


func (st *MapState) migrateFrom(version int, snap []byte) error {

switch version {
case 1:
var mstv1 mapStateV1
Expand Down
4 changes: 4 additions & 0 deletions util.go
Expand Up @@ -52,6 +52,10 @@ func copyEmptyStructToIfaces(in []struct{}) []interface{} {
return ifaces
}

func MultiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) {
return multiaddrSplit(addr)
}

func multiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) {
pid, err := addr.ValueForProtocol(ma.P_IPFS)
if err != nil {
Expand Down

0 comments on commit eed5e3c

Please sign in to comment.