Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/backups upgrade path -- WIP #220

Merged
merged 1 commit into from Nov 29, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 5 additions & 33 deletions cluster.go
Expand Up @@ -3,8 +3,6 @@ package ipfscluster
import (
"context"
"errors"
"os"
"path/filepath"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -520,9 +518,11 @@ func (c *Cluster) Shutdown() error {
}

// Do not save anything if we were not ready
if c.readyB {
c.backupState()
}
// if c.readyB {
// // peers are saved usually on addPeer/rmPeer
// // c.peerManager.savePeers()
// c.config.BackupState(c.state)
//}

// We left the cluster or were removed. Destroy the Raft state.
if c.removed && c.readyB {
Expand Down Expand Up @@ -1350,34 +1350,6 @@ func (c *Cluster) allocate(hash *cid.Cid, repl int, blacklist []peer.ID) ([]peer
}
}

func (c *Cluster) backupState() {
if c.config.BaseDir == "" {
logger.Warning("ClusterConfig BaseDir unset. Skipping backup")
return
}

folder := filepath.Join(c.config.BaseDir, "backups")
err := os.MkdirAll(folder, 0700)
if err != nil {
logger.Error(err)
logger.Error("skipping backup")
return
}
fname := time.Now().UTC().Format("20060102_15:04:05")
f, err := os.Create(filepath.Join(folder, fname))
if err != nil {
logger.Error(err)
return
}
defer f.Close()

err = c.state.Snapshot(f)
if err != nil {
logger.Error(err)
return
}
}

// diffPeers returns the peerIDs added and removed from peers2 in relation to
// peers1
func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) {
Expand Down
30 changes: 30 additions & 0 deletions cluster_config.go
Expand Up @@ -392,3 +392,33 @@ func clusterSecretToKey(secret []byte) (string, error) {

return key.String(), nil
}

// BackupState backs up a state according to this configuration's options
//func (cfg *Config) BackupState(state state.State) error {
// if cfg.BaseDir == "" {
// msg := "ClusterConfig BaseDir unset. Skipping backup"
// logger.Warning(msg)
// return errors.New(msg)
// }

// folder := filepath.Join(cfg.BaseDir, "backups")
// err := os.MkdirAll(folder, 0700)
// if err != nil {
// logger.Error(err)
// logger.Error("skipping backup")
// return errors.New("skipping backup")
// }
// fname := time.Now().UTC().Format("20060102_15:04:05")
// f, err := os.Create(filepath.Join(folder, fname))
// if err != nil {
// logger.Error(err)
// return err
// }
// defer f.Close()
// err = state.Snapshot(f)
// if err != nil {
// logger.Error(err)
// return err
// }
// return nil
// }
6 changes: 5 additions & 1 deletion cluster_test.go
Expand Up @@ -3,6 +3,7 @@ package ipfscluster
import (
"errors"
"os"
"path/filepath"
"testing"
"time"

Expand Down Expand Up @@ -115,7 +116,10 @@ func testingCluster(t *testing.T) (*Cluster, *mockAPI, *mockConnector, *mapstate
}

func cleanRaft() {
os.RemoveAll("raftFolderFromTests")
raftDirs, _ := filepath.Glob("raftFolderFromTests*")
for _, dir := range raftDirs {
os.RemoveAll(dir)
}
}

func testClusterShutdown(t *testing.T) {
Expand Down
63 changes: 63 additions & 0 deletions consensus/raft/consensus_test.go
Expand Up @@ -221,3 +221,66 @@ func TestConsensusLeader(t *testing.T) {
t.Errorf("expected %s but the leader appears as %s", pID, l)
}
}

func TestRaftLatestSnapshot(t *testing.T) {
cc := testingConsensus(t, p2pPort)
defer cleanRaft(p2pPort)
defer cc.Shutdown()

// Make pin 1
c1, _ := cid.Decode(test.TestCid1)
err := cc.LogPin(api.Pin{Cid: c1, ReplicationFactor: -1})
if err != nil {
t.Error("the first pin did not make it to the log:", err)
}

time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the first snapshot was not taken successfully")
}

// Make pin 2
c2, _ := cid.Decode(test.TestCid2)
err = cc.LogPin(api.Pin{Cid: c2, ReplicationFactor: -1})
if err != nil {
t.Error("the second pin did not make it to the log:", err)
}

time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the second snapshot was not taken successfully")
}

// Make pin 3
c3, _ := cid.Decode(test.TestCid3)
err = cc.LogPin(api.Pin{Cid: c3, ReplicationFactor: -1})
if err != nil {
t.Error("the third pin did not make it to the log:", err)
}

time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the third snapshot was not taken successfully")
}

// Call raft.LastState and ensure we get the correct state
snapState := mapstate.NewMapState()
r, snapExists, err := LastStateRaw(cc.config)
if !snapExists {
t.Fatal("No snapshot found by LastStateRaw")
}
if err != nil {
t.Fatal("Error while taking snapshot", err)
}
err = snapState.Restore(r)
if err != nil {
t.Fatal("Snapshot bytes returned could not restore to state")
}
pins := snapState.List()
if len(pins) != 3 {
t.Fatal("Latest snapshot not read")
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice test :)

119 changes: 114 additions & 5 deletions consensus/raft/raft.go
Expand Up @@ -3,6 +3,7 @@ package raft
import (
"context"
"errors"
"io"
"os"
"path/filepath"
"time"
Expand All @@ -12,6 +13,8 @@ import (
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
p2praft "github.com/libp2p/go-libp2p-raft"

"github.com/ipfs/ipfs-cluster/state"
)

// errBadRaftState is returned when the consensus component cannot start
Expand Down Expand Up @@ -332,9 +335,13 @@ func (rw *raftWrapper) Snapshot() error {

// Shutdown shutdown Raft and closes the BoltDB.
func (rw *raftWrapper) Shutdown() error {
future := rw.raft.Shutdown()
err := future.Error()
errMsgs := ""
err := rw.Snapshot()
if err != nil {
errMsgs += "could not snapshot raft: " + err.Error() + ".\n"
}
future := rw.raft.Shutdown()
err = future.Error()
if err != nil {
errMsgs += "could not shutdown raft: " + err.Error() + ".\n"
}
Expand Down Expand Up @@ -427,9 +434,106 @@ func (rw *raftWrapper) Peers() ([]string, error) {
return ids, nil
}

// only call when Raft is shutdown
func (rw *raftWrapper) Clean() error {
dbh := newDataBackupHelper(rw.dataFolder)
// latestSnapshot looks for the most recent raft snapshot stored at the
// provided basedir. It returns a boolean indicating if any snapshot is
// readable, the snapshot's metadata, and a reader to the snapshot's bytes
func latestSnapshot(raftDataFolder string) (*hraft.SnapshotMeta, io.ReadCloser, error) {
store, err := hraft.NewFileSnapshotStore(raftDataFolder, RaftMaxSnapshots, nil)
if err != nil {
return nil, nil, err
}
snapMetas, err := store.List()
if err != nil {
return nil, nil, err
}
if len(snapMetas) == 0 { // no error if snapshot isn't found
return nil, nil, nil
}
meta, r, err := store.Open(snapMetas[0].ID)
if err != nil {
return nil, nil, err
}
return meta, r, nil
}

// LastStateRaw returns the bytes of the last snapshot stored, its metadata,
// and a flag indicating whether any snapshot was found.
func LastStateRaw(cfg *Config) (io.Reader, bool, error) {
// Read most recent snapshot
dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder)
if err != nil {
return nil, false, err
}
meta, r, err := latestSnapshot(dataFolder)
if err != nil {
return nil, false, err
}
if meta == nil { // no snapshots could be read
return nil, false, nil
}
return r, true, nil
}

// SnapshotSave saves the provided state to a snapshot in the
// raft data path. Old raft data is backed up and replaced
// by the new snapshot
func SnapshotSave(cfg *Config, newState state.State, pid peer.ID) error {
newStateBytes, err := p2praft.EncodeSnapshot(newState)
if err != nil {
return err
}
dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder)
if err != nil {
return err
}
meta, _, err := latestSnapshot(dataFolder)
if err != nil {
return err
}

// make a new raft snapshot
var raftSnapVersion hraft.SnapshotVersion
raftSnapVersion = 1 // As of hraft v1.0.0 this is always 1
configIndex := uint64(1)
var raftIndex uint64
var raftTerm uint64
var srvCfg hraft.Configuration
if meta != nil {
raftIndex = meta.Index
raftTerm = meta.Term
srvCfg = meta.Configuration
cleanupRaft(dataFolder)
} else {
raftIndex = uint64(1)
raftTerm = uint64(1)
srvCfg = makeServerConf([]peer.ID{pid})
}

snapshotStore, err := hraft.NewFileSnapshotStoreWithLogger(dataFolder, RaftMaxSnapshots, nil)
if err != nil {
return err
}
_, dummyTransport := hraft.NewInmemTransport("")

sink, err := snapshotStore.Create(raftSnapVersion, raftIndex, raftTerm, srvCfg, configIndex, dummyTransport)
if err != nil {
return err
}

_, err = sink.Write(newStateBytes)
if err != nil {
sink.Cancel()
return err
}
err = sink.Close()
if err != nil {
return err
}
return nil
}

func cleanupRaft(dataFolder string) error {
dbh := newDataBackupHelper(dataFolder)
err := dbh.makeBackup()
if err != nil {
logger.Warning(err)
Expand All @@ -439,6 +543,11 @@ func (rw *raftWrapper) Clean() error {
return nil
}

// only call when Raft is shutdown
func (rw *raftWrapper) Clean() error {
return cleanupRaft(rw.dataFolder)
}

func find(s []string, elem string) bool {
for _, selem := range s {
if selem == elem {
Expand Down
2 changes: 1 addition & 1 deletion coverage.sh
Expand Up @@ -7,7 +7,7 @@ for dir in $dirs;
do
if ls "$dir"/*.go &> /dev/null;
then
go test -v -coverprofile=profile.out -covermode=count -tags silent "$dir"
go test -v -coverprofile=profile.out -covermode=count -tags silent -timeout 15m "$dir"
if [ $? -ne 0 ];
then
exit 1
Expand Down
2 changes: 1 addition & 1 deletion docs/ipfs-cluster-guide.md
Expand Up @@ -338,7 +338,7 @@ The safest way to upgrade ipfs-cluster is to stop all cluster peers, update and

As long as the *shared state* format has not changed, there is nothing preventing from stopping cluster peers separately, updating and launching them.

When the shared state format has changed, a state migration will be required. ipfs-cluster will refuse to start and inform the user in this case, although this feature is yet to be implemented next time the state format changes.
When the shared state format has changed, a state migration will be required. ipfs-cluster will refuse to start and inform the user in this case. Currently state migrations are supported in one direction, from old state formats to the format used by the updated ipfs-cluster-service. This is accomplished by stopping the ipfs-cluster-service daemon and running `ipfs-cluster-service state upgrade`. Note that due to changes in state serialization introduced while implementing state migrations ipfs-cluster shared state saved before December 2017 can not be migrated with this method.

The upgrading procedures is something which is actively worked on and will improve over time.

Expand Down