Skip to content

Commit

Permalink
ipfs-cluster-service state upgrade cli command
Browse files Browse the repository at this point in the history
    ipfs-cluster-service now has a migration subcommand that upgrades
    persistant state snapshots with an out-of-date format version to the
    newest version of raft state. If all cluster members shutdown with
    consistent state, upgrade ipfs-cluster, and run the state upgrade command,
    the new version of cluster will be compatible with persistent storage.
    ipfs-cluster now validates its persistent state upon loading it and exits
    with a clear error in the case the state format version is not up to date.

    Raft snapshotting is enforced on all shutdowns and the json backup is no
    longer run.  This commit makes use of recent changes to libp2p-raft
    allowing raft states to implement their own marshaling strategies. Now
    mapstate handles the logic for its (de)serialization.  In the interest of
    supporting various potential upgrade formats the state serialization
    begins with a varint (right now one byte) describing the version.

    Some go tests are modified and a go test is added to cover new ipfs-cluster
    raft snapshot reading functions.  Sharness tests are added to cover the
    state upgrade command.
  • Loading branch information
ZenGround0 committed Nov 29, 2017
1 parent 2bc7aec commit 47b744f
Show file tree
Hide file tree
Showing 29 changed files with 553 additions and 90 deletions.
38 changes: 5 additions & 33 deletions cluster.go
Expand Up @@ -3,8 +3,6 @@ package ipfscluster
import (
"context"
"errors"
"os"
"path/filepath"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -520,9 +518,11 @@ func (c *Cluster) Shutdown() error {
}

// Do not save anything if we were not ready
if c.readyB {
c.backupState()
}
// if c.readyB {
// // peers are saved usually on addPeer/rmPeer
// // c.peerManager.savePeers()
// c.config.BackupState(c.state)
//}

// We left the cluster or were removed. Destroy the Raft state.
if c.removed && c.readyB {
Expand Down Expand Up @@ -1350,34 +1350,6 @@ func (c *Cluster) allocate(hash *cid.Cid, repl int, blacklist []peer.ID) ([]peer
}
}

func (c *Cluster) backupState() {
if c.config.BaseDir == "" {
logger.Warning("ClusterConfig BaseDir unset. Skipping backup")
return
}

folder := filepath.Join(c.config.BaseDir, "backups")
err := os.MkdirAll(folder, 0700)
if err != nil {
logger.Error(err)
logger.Error("skipping backup")
return
}
fname := time.Now().UTC().Format("20060102_15:04:05")
f, err := os.Create(filepath.Join(folder, fname))
if err != nil {
logger.Error(err)
return
}
defer f.Close()

err = c.state.Snapshot(f)
if err != nil {
logger.Error(err)
return
}
}

// diffPeers returns the peerIDs added and removed from peers2 in relation to
// peers1
func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) {
Expand Down
30 changes: 30 additions & 0 deletions cluster_config.go
Expand Up @@ -392,3 +392,33 @@ func clusterSecretToKey(secret []byte) (string, error) {

return key.String(), nil
}

// BackupState backs up a state according to this configuration's options
//func (cfg *Config) BackupState(state state.State) error {
// if cfg.BaseDir == "" {
// msg := "ClusterConfig BaseDir unset. Skipping backup"
// logger.Warning(msg)
// return errors.New(msg)
// }

// folder := filepath.Join(cfg.BaseDir, "backups")
// err := os.MkdirAll(folder, 0700)
// if err != nil {
// logger.Error(err)
// logger.Error("skipping backup")
// return errors.New("skipping backup")
// }
// fname := time.Now().UTC().Format("20060102_15:04:05")
// f, err := os.Create(filepath.Join(folder, fname))
// if err != nil {
// logger.Error(err)
// return err
// }
// defer f.Close()
// err = state.Snapshot(f)
// if err != nil {
// logger.Error(err)
// return err
// }
// return nil
// }
6 changes: 5 additions & 1 deletion cluster_test.go
Expand Up @@ -3,6 +3,7 @@ package ipfscluster
import (
"errors"
"os"
"path/filepath"
"testing"
"time"

Expand Down Expand Up @@ -115,7 +116,10 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate
}

func cleanRaft() {
os.RemoveAll("raftFolderFromTests")
raftDirs, _ := filepath.Glob("raftFolderFromTests*")
for _, dir := range raftDirs {
os.RemoveAll(dir)
}
}

func testClusterShutdown(t *testing.T) {
Expand Down
63 changes: 63 additions & 0 deletions consensus/raft/consensus_test.go
Expand Up @@ -221,3 +221,66 @@ func TestConsensusLeader(t *testing.T) {
t.Errorf("expected %s but the leader appears as %s", pID, l)
}
}

func TestRaftLatestSnapshot(t *testing.T) {
cc := testingConsensus(t, p2pPort)
defer cleanRaft(p2pPort)
defer cc.Shutdown()

// Make pin 1
c1, _ := cid.Decode(test.TestCid1)
err := cc.LogPin(api.Pin{Cid: c1, ReplicationFactor: -1})
if err != nil {
t.Error("the first pin did not make it to the log:", err)
}

time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the first snapshot was not taken successfully")
}

// Make pin 2
c2, _ := cid.Decode(test.TestCid2)
err = cc.LogPin(api.Pin{Cid: c2, ReplicationFactor: -1})
if err != nil {
t.Error("the second pin did not make it to the log:", err)
}

time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the second snapshot was not taken successfully")
}

// Make pin 3
c3, _ := cid.Decode(test.TestCid3)
err = cc.LogPin(api.Pin{Cid: c3, ReplicationFactor: -1})
if err != nil {
t.Error("the third pin did not make it to the log:", err)
}

time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the third snapshot was not taken successfully")
}

// Call raft.LastState and ensure we get the correct state
snapState := mapstate.NewMapState()
r, snapExists, err := LastStateRaw(cc.config)
if !snapExists {
t.Fatal("No snapshot found by LastStateRaw")
}
if err != nil {
t.Fatal("Error while taking snapshot", err)
}
err = snapState.Restore(r)
if err != nil {
t.Fatal("Snapshot bytes returned could not restore to state")
}
pins := snapState.List()
if len(pins) != 3 {
t.Fatal("Latest snapshot not read")
}
}
119 changes: 114 additions & 5 deletions consensus/raft/raft.go
Expand Up @@ -3,6 +3,7 @@ package raft
import (
"context"
"errors"
"io"
"os"
"path/filepath"
"time"
Expand All @@ -12,6 +13,8 @@ import (
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
p2praft "github.com/libp2p/go-libp2p-raft"

"github.com/ipfs/ipfs-cluster/state"
)

// errBadRaftState is returned when the consensus component cannot start
Expand Down Expand Up @@ -332,9 +335,13 @@ func (rw *raftWrapper) Snapshot() error {

// Shutdown shutdown Raft and closes the BoltDB.
func (rw *raftWrapper) Shutdown() error {
future := rw.raft.Shutdown()
err := future.Error()
errMsgs := ""
err := rw.Snapshot()
if err != nil {
errMsgs += "could not snapshot raft: " + err.Error() + ".\n"
}
future := rw.raft.Shutdown()
err = future.Error()
if err != nil {
errMsgs += "could not shutdown raft: " + err.Error() + ".\n"
}
Expand Down Expand Up @@ -427,9 +434,106 @@ func (rw *raftWrapper) Peers() ([]string, error) {
return ids, nil
}

// only call when Raft is shutdown
func (rw *raftWrapper) Clean() error {
dbh := newDataBackupHelper(rw.dataFolder)
// latestSnapshot looks for the most recent raft snapshot stored at the
// provided basedir. It returns a boolean indicating if any snapshot is
// readable, the snapshot's metadata, and a reader to the snapshot's bytes
func latestSnapshot(raftDataFolder string) (*hraft.SnapshotMeta, io.ReadCloser, error) {
store, err := hraft.NewFileSnapshotStore(raftDataFolder, RaftMaxSnapshots, nil)
if err != nil {
return nil, nil, err
}
snapMetas, err := store.List()
if err != nil {
return nil, nil, err
}
if len(snapMetas) == 0 { // no error if snapshot isn't found
return nil, nil, nil
}
meta, r, err := store.Open(snapMetas[0].ID)
if err != nil {
return nil, nil, err
}
return meta, r, nil
}

// LastStateRaw returns the bytes of the last snapshot stored, its metadata,
// and a flag indicating whether any snapshot was found.
func LastStateRaw(cfg *Config) (io.Reader, bool, error) {
// Read most recent snapshot
dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder)
if err != nil {
return nil, false, err
}
meta, r, err := latestSnapshot(dataFolder)
if err != nil {
return nil, false, err
}
if meta == nil { // no snapshots could be read
return nil, false, nil
}
return r, true, nil
}

// SnapshotSave saves the provided state to a snapshot in the
// raft data path. Old raft data is backed up and replaced
// by the new snapshot
func SnapshotSave(cfg *Config, newState state.State, pid peer.ID) error {
newStateBytes, err := p2praft.EncodeSnapshot(newState)
if err != nil {
return err
}
dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder)
if err != nil {
return err
}
meta, _, err := latestSnapshot(dataFolder)
if err != nil {
return err
}

// make a new raft snapshot
var raftSnapVersion hraft.SnapshotVersion
raftSnapVersion = 1 // As of hraft v1.0.0 this is always 1
configIndex := uint64(1)
var raftIndex uint64
var raftTerm uint64
var srvCfg hraft.Configuration
if meta != nil {
raftIndex = meta.Index
raftTerm = meta.Term
srvCfg = meta.Configuration
cleanupRaft(dataFolder)
} else {
raftIndex = uint64(1)
raftTerm = uint64(1)
srvCfg = makeServerConf([]peer.ID{pid})
}

snapshotStore, err := hraft.NewFileSnapshotStoreWithLogger(dataFolder, RaftMaxSnapshots, nil)
if err != nil {
return err
}
_, dummyTransport := hraft.NewInmemTransport("")

sink, err := snapshotStore.Create(raftSnapVersion, raftIndex, raftTerm, srvCfg, configIndex, dummyTransport)
if err != nil {
return err
}

_, err = sink.Write(newStateBytes)
if err != nil {
sink.Cancel()
return err
}
err = sink.Close()
if err != nil {
return err
}
return nil
}

func cleanupRaft(dataFolder string) error {
dbh := newDataBackupHelper(dataFolder)
err := dbh.makeBackup()
if err != nil {
logger.Warning(err)
Expand All @@ -439,6 +543,11 @@ func (rw *raftWrapper) Clean() error {
return nil
}

// only call when Raft is shutdown
func (rw *raftWrapper) Clean() error {
return cleanupRaft(rw.dataFolder)
}

func find(s []string, elem string) bool {
for _, selem := range s {
if selem == elem {
Expand Down
2 changes: 1 addition & 1 deletion coverage.sh
Expand Up @@ -7,7 +7,7 @@ for dir in $dirs;
do
if ls "$dir"/*.go &> /dev/null;
then
go test -v -coverprofile=profile.out -covermode=count -tags silent "$dir"
go test -v -coverprofile=profile.out -covermode=count -tags silent -timeout 15m "$dir"
if [ $? -ne 0 ];
then
exit 1
Expand Down
2 changes: 1 addition & 1 deletion docs/ipfs-cluster-guide.md
Expand Up @@ -338,7 +338,7 @@ The safest way to upgrade ipfs-cluster is to stop all cluster peers, update and

As long as the *shared state* format has not changed, there is nothing preventing from stopping cluster peers separately, updating and launching them.

When the shared state format has changed, a state migration will be required. ipfs-cluster will refuse to start and inform the user in this case, although this feature is yet to be implemented next time the state format changes.
When the shared state format has changed, a state migration will be required. ipfs-cluster will refuse to start and inform the user in this case. Currently state migrations are supported in one direction, from old state formats to the format used by the updated ipfs-cluster-service. This is accomplished by stopping the ipfs-cluster-service daemon and running `ipfs-cluster-service state upgrade`. Note that due to changes in state serialization introduced while implementing state migrations ipfs-cluster shared state saved before December 2017 can not be migrated with this method.

The upgrading procedures is something which is actively worked on and will improve over time.

Expand Down

0 comments on commit 47b744f

Please sign in to comment.