Skip to content

Commit

Permalink
Merge c7a288c into 2bc7aec
Browse files Browse the repository at this point in the history
  • Loading branch information
ZenGround0 committed Nov 22, 2017
2 parents 2bc7aec + c7a288c commit 1af9aa0
Show file tree
Hide file tree
Showing 18 changed files with 517 additions and 72 deletions.
38 changes: 5 additions & 33 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package ipfscluster
import (
"context"
"errors"
"os"
"path/filepath"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -520,9 +518,11 @@ func (c *Cluster) Shutdown() error {
}

// Do not save anything if we were not ready
if c.readyB {
c.backupState()
}
// if c.readyB {
// // peers are saved usually on addPeer/rmPeer
// // c.peerManager.savePeers()
// c.config.BackupState(c.state)
//}

// We left the cluster or were removed. Destroy the Raft state.
if c.removed && c.readyB {
Expand Down Expand Up @@ -1350,34 +1350,6 @@ func (c *Cluster) allocate(hash *cid.Cid, repl int, blacklist []peer.ID) ([]peer
}
}

func (c *Cluster) backupState() {
if c.config.BaseDir == "" {
logger.Warning("ClusterConfig BaseDir unset. Skipping backup")
return
}

folder := filepath.Join(c.config.BaseDir, "backups")
err := os.MkdirAll(folder, 0700)
if err != nil {
logger.Error(err)
logger.Error("skipping backup")
return
}
fname := time.Now().UTC().Format("20060102_15:04:05")
f, err := os.Create(filepath.Join(folder, fname))
if err != nil {
logger.Error(err)
return
}
defer f.Close()

err = c.state.Snapshot(f)
if err != nil {
logger.Error(err)
return
}
}

// diffPeers returns the peerIDs added and removed from peers2 in relation to
// peers1
func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) {
Expand Down
30 changes: 30 additions & 0 deletions cluster_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,3 +392,33 @@ func clusterSecretToKey(secret []byte) (string, error) {

return key.String(), nil
}

// BackupState backs up a state according to this configuration's options
//func (cfg *Config) BackupState(state state.State) error {
// if cfg.BaseDir == "" {
// msg := "ClusterConfig BaseDir unset. Skipping backup"
// logger.Warning(msg)
// return errors.New(msg)
// }

// folder := filepath.Join(cfg.BaseDir, "backups")
// err := os.MkdirAll(folder, 0700)
// if err != nil {
// logger.Error(err)
// logger.Error("skipping backup")
// return errors.New("skipping backup")
// }
// fname := time.Now().UTC().Format("20060102_15:04:05")
// f, err := os.Create(filepath.Join(folder, fname))
// if err != nil {
// logger.Error(err)
// return err
// }
// defer f.Close()
// err = state.Snapshot(f)
// if err != nil {
// logger.Error(err)
// return err
// }
// return nil
// }
63 changes: 63 additions & 0 deletions consensus/raft/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,66 @@ func TestConsensusLeader(t *testing.T) {
t.Errorf("expected %s but the leader appears as %s", pID, l)
}
}

func TestRaftLatestSnapshot(t *testing.T) {
cc := testingConsensus(t, p2pPort)
defer cleanRaft(p2pPort)
defer cc.Shutdown()

// Make pin 1
c1, _ := cid.Decode(test.TestCid1)
err := cc.LogPin(api.Pin{Cid: c1, ReplicationFactor: -1})
if err != nil {
t.Error("the first pin did not make it to the log:", err)
}

time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the first snapshot was not taken successfully")
}

// Make pin 2
c2, _ := cid.Decode(test.TestCid2)
err = cc.LogPin(api.Pin{Cid: c2, ReplicationFactor: -1})
if err != nil {
t.Error("the second pin did not make it to the log:", err)
}

time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the second snapshot was not taken successfully")
}

// Make pin 3
c3, _ := cid.Decode(test.TestCid3)
err = cc.LogPin(api.Pin{Cid: c3, ReplicationFactor: -1})
if err != nil {
t.Error("the third pin did not make it to the log:", err)
}

time.Sleep(250 * time.Millisecond)
err = cc.raft.Snapshot()
if err != nil {
t.Error("the third snapshot was not taken successfully")
}

// Call raft.LastState and ensure we get the correct state
snapState := mapstate.NewMapState()
r, snapExists, err := LastStateRaw(cc.config)
if !snapExists {
t.Fatal("No snapshot found by LastStateRaw")
}
if err != nil {
t.Fatal("Error while taking snapshot", err)
}
err = snapState.Restore(r)
if err != nil {
t.Fatal("Snapshot bytes returned could not restore to state")
}
pins := snapState.List()
if len(pins) != 3 {
t.Fatal("Latest snapshot not read")
}
}
119 changes: 114 additions & 5 deletions consensus/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package raft
import (
"context"
"errors"
"io"
"os"
"path/filepath"
"time"
Expand All @@ -12,6 +13,8 @@ import (
host "github.com/libp2p/go-libp2p-host"
peer "github.com/libp2p/go-libp2p-peer"
p2praft "github.com/libp2p/go-libp2p-raft"

"github.com/ipfs/ipfs-cluster/state"
)

// errBadRaftState is returned when the consensus component cannot start
Expand Down Expand Up @@ -332,9 +335,13 @@ func (rw *raftWrapper) Snapshot() error {

// Shutdown shutdown Raft and closes the BoltDB.
func (rw *raftWrapper) Shutdown() error {
future := rw.raft.Shutdown()
err := future.Error()
errMsgs := ""
err := rw.Snapshot()
if err != nil {
errMsgs += "could not snapshot raft: " + err.Error() + ".\n"
}
future := rw.raft.Shutdown()
err = future.Error()
if err != nil {
errMsgs += "could not shutdown raft: " + err.Error() + ".\n"
}
Expand Down Expand Up @@ -427,9 +434,106 @@ func (rw *raftWrapper) Peers() ([]string, error) {
return ids, nil
}

// only call when Raft is shutdown
func (rw *raftWrapper) Clean() error {
dbh := newDataBackupHelper(rw.dataFolder)
// latestSnapshot looks for the most recent raft snapshot stored at the
// provided basedir. It returns a boolean indicating if any snapshot is
// readable, the snapshot's metadata, and a reader to the snapshot's bytes
func latestSnapshot(raftDataFolder string) (bool, *hraft.SnapshotMeta, io.ReadCloser, error) {
store, err := hraft.NewFileSnapshotStore(raftDataFolder, RaftMaxSnapshots, nil)
if err != nil {
return false, nil, nil, err
}
snapMetas, err := store.List()
if err != nil {
return false, nil, nil, err
}
if len(snapMetas) == 0 { // no error if snapshot isn't found
return false, nil, nil, nil
}
meta, r, err := store.Open(snapMetas[0].ID)
if err != nil {
return false, nil, nil, err
}
return true, meta, r, nil
}

// LastStateRaw returns the bytes of the last snapshot stored, its metadata,
// and a flag indicating whether any snapshot was found.
func LastStateRaw(cfg *Config) (io.Reader, bool, error) {
// Read most recent snapshot
dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder)
if err != nil {
return nil, false, err
}
found, _, r, err := latestSnapshot(dataFolder)
if err != nil {
return nil, false, err
}
if !found {
return nil, false, nil
}
return r, true, nil
}

// SnapshotSave saves the provided state to a snapshot in the
// raft data path. Old raft data is backed up and replaced
// by the new snapshot
func SnapshotSave(cfg *Config, newState state.State, pid peer.ID) error {
newStateBytes, err := p2praft.EncodeSnapshot(newState)
if err != nil {
return err
}
dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder)
if err != nil {
return err
}
found, meta, _, err := latestSnapshot(dataFolder)
if err != nil {
return err
}

// make a new raft snapshot
var raftSnapVersion hraft.SnapshotVersion
raftSnapVersion = 1 // As of hraft v1.0.0 this is always 1
configIndex := uint64(1)
var raftIndex uint64
var raftTerm uint64
var srvCfg hraft.Configuration
if found {
raftIndex = meta.Index
raftTerm = meta.Term
srvCfg = meta.Configuration
cleanupRaft(dataFolder)
} else {
raftIndex = uint64(1)
raftTerm = uint64(1)
srvCfg = makeServerConf([]peer.ID{pid})
}

snapshotStore, err := hraft.NewFileSnapshotStoreWithLogger(dataFolder, RaftMaxSnapshots, nil)
if err != nil {
return err
}
_, dummyTransport := hraft.NewInmemTransport("")

sink, err := snapshotStore.Create(raftSnapVersion, raftIndex, raftTerm, srvCfg, configIndex, dummyTransport)
if err != nil {
return err
}

_, err = sink.Write(newStateBytes)
if err != nil {
sink.Cancel()
return err
}
err = sink.Close()
if err != nil {
return err
}
return nil
}

func cleanupRaft(dataFolder string) error {
dbh := newDataBackupHelper(dataFolder)
err := dbh.makeBackup()
if err != nil {
logger.Warning(err)
Expand All @@ -439,6 +543,11 @@ func (rw *raftWrapper) Clean() error {
return nil
}

// only call when Raft is shutdown
func (rw *raftWrapper) Clean() error {
return cleanupRaft(rw.dataFolder)
}

func find(s []string, elem string) bool {
for _, selem := range s {
if selem == elem {
Expand Down
30 changes: 29 additions & 1 deletion ipfs-cluster-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

logging "github.com/ipfs/go-log"
ma "github.com/multiformats/go-multiaddr"
"github.com/urfave/cli"
cli "github.com/urfave/cli"

ipfscluster "github.com/ipfs/ipfs-cluster"
"github.com/ipfs/ipfs-cluster/allocator/ascendalloc"
Expand Down Expand Up @@ -239,6 +239,29 @@ configuration.
Usage: "run the IPFS Cluster peer (default)",
Action: daemon,
},
{
Name: "state",
Usage: "Manage ipfs-cluster-state",
Subcommands: []cli.Command{
{
Name: "upgrade",
Usage: "upgrade the IPFS Cluster state to the current version",
Description: `
This command upgrades the internal state of the ipfs-cluster node
specified in the latest raft snapshot. The state format is migrated from the
version of the snapshot to the version supported by the current cluster version.
To succesfully run an upgrade of an entire cluster, shut down each peer without
removal, upgrade state using this command, and restart every peer.
`,
Action: func(c *cli.Context) error {
err := upgrade()
checkErr("upgrading state", err)
return nil
},
},
},
},
}

app.Before = func(c *cli.Context) error {
Expand Down Expand Up @@ -301,6 +324,10 @@ func daemon(c *cli.Context) error {
checkErr("creating IPFS Connector component", err)

state := mapstate.NewMapState()

err = validateVersion(clusterCfg, consensusCfg)
checkErr("validating version", err)

tracker := maptracker.NewMapPinTracker(clusterCfg.ID)
mon, err := basic.NewMonitor(monCfg)
checkErr("creating Monitor component", err)
Expand Down Expand Up @@ -342,6 +369,7 @@ var facilities = []string{
"cluster",
"restapi",
"ipfshttp",
"mapstate",
"monitor",
"consensus",
"pintracker",
Expand Down
Loading

0 comments on commit 1af9aa0

Please sign in to comment.