Skip to content

Commit

Permalink
Merge 1fdbaea into 435012a
Browse files Browse the repository at this point in the history
  • Loading branch information
ZenGround0 committed Nov 10, 2017
2 parents 435012a + 1fdbaea commit 3dda525
Show file tree
Hide file tree
Showing 11 changed files with 314 additions and 35 deletions.
32 changes: 1 addition & 31 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -462,7 +460,7 @@ func (c *Cluster) Shutdown() error {
if c.readyB {
// peers are saved usually on addPeer/rmPeer
// c.peerManager.savePeers()
c.backupState()
BackupState(c.config, c.state)
}

// We left the cluster or were removed. Destroy the Raft state.
Expand Down Expand Up @@ -1215,31 +1213,3 @@ func (c *Cluster) allocate(hash *cid.Cid, repl int, blacklist []peer.ID) ([]peer
return append(validAllocations, candidateAllocs[0:needed]...), nil
}
}

func (c *Cluster) backupState() {
if c.config.BaseDir == "" {
logger.Warning("ClusterConfig BaseDir unset. Skipping backup")
return
}

folder := filepath.Join(c.config.BaseDir, "backups")
err := os.MkdirAll(folder, 0700)
if err != nil {
logger.Error(err)
logger.Error("skipping backup")
return
}
fname := time.Now().UTC().Format("20060102_15:04:05")
f, err := os.Create(filepath.Join(folder, fname))
if err != nil {
logger.Error(err)
return
}
defer f.Close()

err = c.state.Snapshot(f)
if err != nil {
logger.Error(err)
return
}
}
1 change: 1 addition & 0 deletions consensus/raft/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,4 @@ func parsePIDFromMultiaddr(addr ma.Multiaddr) string {
}
return pidstr
}

88 changes: 87 additions & 1 deletion consensus/raft/raft.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package raft

import (
"bytes"
"context"
"encoding/json"
"errors"
"io"
"io/ioutil"
"os"
"path/filepath"
"time"
Expand All @@ -12,6 +16,9 @@ import (
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
p2praft "github.com/libp2p/go-libp2p-raft"
p2pconsensus "github.com/libp2p/go-libp2p-consensus"

"github.com/ipfs/ipfs-cluster/state"
)

// errBadRaftState is returned when the consensus component cannot start
Expand Down Expand Up @@ -427,9 +434,88 @@ func (rw *raftWrapper) Peers() ([]string, error) {
return ids, nil
}

// LastSnapshot does a best effort search for existing raft state files
// and returns a reader to the bytes of the latest raft snapshot
func ExistingStateReader(cfg *Config) (io.Reader, error){
// Read most recent snapshot
snapShotDir := filepath.Join(cfg.BaseDir, cfg.DataFolder)
store, err := hraft.NewFileSnapshotStore(snapShotDir, RaftMaxSnapshots, nil)
if err != nil {
return nil, err
}
snapMetas, err := store.List()
if err != nil {
return nil, err
}
if len(snapMetas) == 0 {
return nil, errors.New("No snapshot files found")
}
_, recentSnapReader, err := store.Open(snapMetas[0].ID)
if err != nil {
return nil, err
}
rawBytes, err := ioutil.ReadAll(recentSnapReader)
if err != nil {
return nil, err
}

// Package state from snapshot as json in a reader
var state p2pconsensus.State
err = p2praft.DecodeState(rawBytes, &state)
if err != nil {
return nil, err
}
stateBytes, err := json.Marshal(state)
if err != nil {
return nil, err
}
r := bytes.NewReader(stateBytes)
return r, nil
}

// Reset saves a raft snapshot containing newState to be loaded on restart.
// Only call when Raft is shutdown.
func SnapshotReset(newState state.State, cfg *Config, raftDataPath string, peers []peer.ID) error{
err := cleanupRaft(raftDataPath)
if err != nil {
return err
}
snapshotStore, err := hraft.NewFileSnapshotStoreWithLogger(raftDataPath, RaftMaxSnapshots, nil)
if err != nil {
return err
}

serverAddr := hraft.ServerAddress(peer.IDB58Encode(peers[len(peers) -1]))
_, dummyTransport := hraft.NewInmemTransport(serverAddr)
var raftSnapVersion hraft.SnapshotVersion
raftSnapVersion = 1 // As of v1.0.0 this is always 1
raftIndex := uint64(1) // We reset history to the beginning
raftTerm := uint64(1) // We reset history to the beginning
configIndex := uint64(1) // We reset history to the beginning
srvCfg := makeServerConf(peers)
sink, err := snapshotStore.Create(raftSnapVersion, raftIndex, raftTerm, srvCfg, configIndex, dummyTransport)
if err != nil {
return err
}
newStateBytes, err := p2praft.EncodeState(newState)
_, err = sink.Write(newStateBytes)
if err != nil {
return err
}
err = sink.Close()
if err != nil {
return err
}
return nil
}

func cleanupRaft(raftDataDir string) error {
return os.RemoveAll(raftDataDir)
}

// only call when Raft is shutdown
func (rw *raftWrapper) Clean() error {
return os.RemoveAll(rw.dataFolder)
return cleanupRaft(rw.dataFolder)
}

func find(s []string, elem string) bool {
Expand Down
30 changes: 30 additions & 0 deletions ipfs-cluster-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ var (
configPath string
)


func init() {
// Set the right commit. The only way I could make this work
ipfscluster.Commit = commit
Expand Down Expand Up @@ -231,6 +232,30 @@ configuration.
Usage: "run the IPFS Cluster peer (default)",
Action: daemon,
},
{
Name: "state",
Usage: "Manage ipfs-cluster-state",
Subcommands: []cli.Command {
{
Name: "upgrade",
Usage: "upgrade the IPFS Cluster state to the current version",
Description: `
This command replaces the internal state of the ipfs-cluster node with the state
specified in a backup file. The state format is migrated from the version
of the backup file to the version supported by the current cluster version.
To succesfully run an upgrade of an entire cluster, shut down each peer without
removal, upgrade state using this command, and restart every peer.
`,
ArgsUsage: "backup-file-path [raft-data-dir]",
Action: func(c *cli.Context) error {
err := upgrade(c)
checkErr("upgrading state", err)
return nil
},
},
},
},
}

app.Before = func(c *cli.Context) error {
Expand Down Expand Up @@ -290,6 +315,11 @@ func daemon(c *cli.Context) error {
checkErr("creating IPFS Connector component", err)

state := mapstate.NewMapState()

if needsUpdate(clusterCfg, consensusCfg) {
return errors.New("unsupported state version")
}

tracker := maptracker.NewMapPinTracker(clusterCfg.ID)
mon, err := basic.NewMonitor(monCfg)
checkErr("creating Monitor component", err)
Expand Down
91 changes: 91 additions & 0 deletions ipfs-cluster-service/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package main

import (
"bufio"

"fmt"
"os"

"github.com/urfave/cli"

peer "github.com/libp2p/go-libp2p-peer"

ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/state/mapstate"
"github.com/ipfs/ipfs-cluster/consensus/raft"
)


func upgrade(c *cli.Context) error {
if c.NArg() < 1 || c.NArg() > 2 {
return fmt.Errorf("Usage: <BACKUP-FILE-PATH> [RAFT-DATA-DIR]")
}
//Load configs
cfg, clusterCfg, _, _, consensusCfg, _, _, _ := makeConfigs()
err := cfg.LoadJSONFromFile(configPath)
if err != nil {
return err
}
backupFilePath := c.Args().First()
var raftDataPath string
if c.NArg() == 1 {
raftDataPath = consensusCfg.DataFolder
} else {
raftDataPath = c.Args().Get(1)
}

//Migrate backup to new state
backup, err := os.Open(backupFilePath)
if err != nil {
return err
}

defer backup.Close()
r := bufio.NewReader(backup)
newState := mapstate.NewMapState()
err = newState.Restore(r)
if err != nil {
return err
}
//Record peers of cluster
var peers []peer.ID
for _, m := range clusterCfg.Peers {
pid, _, err := ipfscluster.MultiaddrSplit(m)
if err != nil {
return err
}
peers = append(peers, pid)
}
peers = append(peers, clusterCfg.ID)
//Reset raft state to a snapshot of the new migrated state
err = raft.SnapshotReset(newState, consensusCfg, raftDataPath, peers)
if err != nil {
return err
}
return nil
}


func needsUpdate(cfg *ipfscluster.Config, cCfg *raft.Config) bool {
state := mapstate.NewMapState()
r, err := raft.ExistingStateReader(cCfg) // Note direct dependence on raft here
if err == nil { //err != nil no snapshots so skip check
storedV, err := state.GetVersion(r)
if storedV != state.Version || err != nil {
logger.Error("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logger.Error("Raft state is in a non-supported version")
err = state.Restore(r)
if err == nil {
err = ipfscluster.BackupState(cfg, state)
if err == nil {
logger.Error("An updated backup of this state has been saved")
logger.Error("to baseDir/backups. To setup state for use")
logger.Error("run ipfs-cluster-service migration on the latest backup")
}
logger.Error("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
return true
}
}
}
return false
}
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
},
{
"author": "hsanjuan",
"hash": "QmcjpDnmS6jGrYrTnuT4RDHUHduF97w2V8JuYs6eynbFg2",
"hash": "QmSrgyB4gDbaQnTCSaE9taRxhJkCUc9p5pYFqtXGyTaTbR",
"name": "go-libp2p-raft",
"version": "1.1.0"
"version": "1.1.1"
},
{
"author": "whyrusleeping",
Expand Down
7 changes: 7 additions & 0 deletions sharness/lib/test-lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ cluster_id() {
jq --raw-output ".cluster.id" test-config/service.json
}

# Note: should only be called in when CLUSTER prereq is already true because
# it depends on test-config existing to add the temporary v1State file.
test_create_v1State() {
echo '{ "Version": 1, "PinMap": { "QmeomffUNfmQy76CQGy9NdmqEnnHU9soCexBnGU3ezPHVH": {} }}' > test-config/v1State
test_set_prereq V1STATE
}

# Cleanup functions
test_clean_ipfs(){
docker kill ipfs
Expand Down
39 changes: 39 additions & 0 deletions sharness/t0050-service-migration.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#! /bin/sh

test_description="Test service migration v1 -> v2 and v2 -> v2"

. lib/test-lib.sh

test_ipfs_init
cleanup test_clean_ipfs
test_cluster_init
test_create_v1State
cleanup test_clean_cluster

test_expect_success IPFS,CLUSTER "cluster-service state preserved by migration" '
cid=`docker exec ipfs sh -c "echo test | ipfs add -q"` &&
ipfs-cluster-ctl pin add "$cid" &> test4 && sleep 2 &&
kill -1 $CLUSTER_D_PID && sleep 30 &&
export BACKUP_FILE=$( ls test-config/backups/ | head -n 1) &&
ipfs-cluster-service --config "test-config" state upgrade "test-config/backups/""$BACKUP_FILE" "test-config/ipfs-cluster-data" &&
echo "successful state upgrade" &&
ipfs-cluster-service --config "test-config" >"$IPFS_OUTPUT" 2>&1 &
export CLUSTER_D_PID=$! &&
sleep 30 &&
ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" &&
ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED"
'

test_expect_success IPFS,CLUSTER,V1STATE "cluster-service loads v1 state correctly" '
cid=`docker exec ipfs sh -c "echo test | ipfs add -q"` &&
kill -1 $CLUSTER_D_PID && sleep 30 &&
ipfs-cluster-service --config "test-config" state upgrade "test-config/v1State" "test-config/ipfs-cluster-data" &&
ipfs-cluster-service --config "test-config" >"$IPFS_OUTPUT" 2>&1 &
export CLUSTER_D_PID=$! &&
sleep 30 &&
ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" &&
ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED"
kill -1 $CLUSTER_D_PID
'

test_done
2 changes: 2 additions & 0 deletions state/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,6 @@ type State interface {
Snapshot(w io.Writer) error
// Restore restores a snapshot from a reader
Restore(r io.Reader) error
// Check whether version of state in reader has correct version
GetVersion(r io.Reader) (int, error)
}
Loading

0 comments on commit 3dda525

Please sign in to comment.