diff --git a/cluster.go b/cluster.go index 9ba904acf..26a8cd325 100644 --- a/cluster.go +++ b/cluster.go @@ -3,8 +3,6 @@ package ipfscluster import ( "context" "errors" - "os" - "path/filepath" "strings" "sync" "time" @@ -520,9 +518,11 @@ func (c *Cluster) Shutdown() error { } // Do not save anything if we were not ready - if c.readyB { - c.backupState() - } + // if c.readyB { + // // peers are saved usually on addPeer/rmPeer + // // c.peerManager.savePeers() + // c.config.BackupState(c.state) + //} // We left the cluster or were removed. Destroy the Raft state. if c.removed && c.readyB { @@ -1350,34 +1350,6 @@ func (c *Cluster) allocate(hash *cid.Cid, repl int, blacklist []peer.ID) ([]peer } } -func (c *Cluster) backupState() { - if c.config.BaseDir == "" { - logger.Warning("ClusterConfig BaseDir unset. Skipping backup") - return - } - - folder := filepath.Join(c.config.BaseDir, "backups") - err := os.MkdirAll(folder, 0700) - if err != nil { - logger.Error(err) - logger.Error("skipping backup") - return - } - fname := time.Now().UTC().Format("20060102_15:04:05") - f, err := os.Create(filepath.Join(folder, fname)) - if err != nil { - logger.Error(err) - return - } - defer f.Close() - - err = c.state.Snapshot(f) - if err != nil { - logger.Error(err) - return - } -} - // diffPeers returns the peerIDs added and removed from peers2 in relation to // peers1 func diffPeers(peers1, peers2 []peer.ID) (added, removed []peer.ID) { diff --git a/cluster_config.go b/cluster_config.go index 12a9a2743..ee12c2a6a 100644 --- a/cluster_config.go +++ b/cluster_config.go @@ -392,3 +392,33 @@ func clusterSecretToKey(secret []byte) (string, error) { return key.String(), nil } + +// BackupState backs up a state according to this configuration's options +//func (cfg *Config) BackupState(state state.State) error { +// if cfg.BaseDir == "" { +// msg := "ClusterConfig BaseDir unset. Skipping backup" +// logger.Warning(msg) +// return errors.New(msg) +// } + +// folder := filepath.Join(cfg.BaseDir, "backups") +// err := os.MkdirAll(folder, 0700) +// if err != nil { +// logger.Error(err) +// logger.Error("skipping backup") +// return errors.New("skipping backup") +// } +// fname := time.Now().UTC().Format("20060102_15:04:05") +// f, err := os.Create(filepath.Join(folder, fname)) +// if err != nil { +// logger.Error(err) +// return err +// } +// defer f.Close() +// err = state.Snapshot(f) +// if err != nil { +// logger.Error(err) +// return err +// } +// return nil +// } diff --git a/consensus/raft/consensus_test.go b/consensus/raft/consensus_test.go index 65fb268c4..2f8872549 100644 --- a/consensus/raft/consensus_test.go +++ b/consensus/raft/consensus_test.go @@ -221,3 +221,66 @@ func TestConsensusLeader(t *testing.T) { t.Errorf("expected %s but the leader appears as %s", pID, l) } } + +func TestRaftLatestSnapshot(t *testing.T) { + cc := testingConsensus(t, p2pPort) + defer cleanRaft(p2pPort) + defer cc.Shutdown() + + // Make pin 1 + c1, _ := cid.Decode(test.TestCid1) + err := cc.LogPin(api.Pin{Cid: c1, ReplicationFactor: -1}) + if err != nil { + t.Error("the first pin did not make it to the log:", err) + } + + time.Sleep(250 * time.Millisecond) + err = cc.raft.Snapshot() + if err != nil { + t.Error("the first snapshot was not taken successfully") + } + + // Make pin 2 + c2, _ := cid.Decode(test.TestCid2) + err = cc.LogPin(api.Pin{Cid: c2, ReplicationFactor: -1}) + if err != nil { + t.Error("the second pin did not make it to the log:", err) + } + + time.Sleep(250 * time.Millisecond) + err = cc.raft.Snapshot() + if err != nil { + t.Error("the second snapshot was not taken successfully") + } + + // Make pin 3 + c3, _ := cid.Decode(test.TestCid3) + err = cc.LogPin(api.Pin{Cid: c3, ReplicationFactor: -1}) + if err != nil { + t.Error("the third pin did not make it to the log:", err) + } + + time.Sleep(250 * time.Millisecond) + err = cc.raft.Snapshot() + if err != nil { + t.Error("the third snapshot was not taken successfully") + } + + // Call raft.LastState and ensure we get the correct state + snapState := mapstate.NewMapState() + r, snapExists, err := LastStateRaw(cc.config) + if !snapExists { + t.Fatal("No snapshot found by LastStateRaw") + } + if err != nil { + t.Fatal("Error while taking snapshot", err) + } + err = snapState.Restore(r) + if err != nil { + t.Fatal("Snapshot bytes returned could not restore to state") + } + pins := snapState.List() + if len(pins) != 3 { + t.Fatal("Latest snapshot not read") + } +} diff --git a/consensus/raft/raft.go b/consensus/raft/raft.go index e33a9617a..1cb888a3a 100644 --- a/consensus/raft/raft.go +++ b/consensus/raft/raft.go @@ -3,6 +3,7 @@ package raft import ( "context" "errors" + "io" "os" "path/filepath" "time" @@ -12,6 +13,8 @@ import ( host "github.com/libp2p/go-libp2p-host" peer "github.com/libp2p/go-libp2p-peer" p2praft "github.com/libp2p/go-libp2p-raft" + + "github.com/ipfs/ipfs-cluster/state" ) // errBadRaftState is returned when the consensus component cannot start @@ -332,9 +335,13 @@ func (rw *raftWrapper) Snapshot() error { // Shutdown shutdown Raft and closes the BoltDB. func (rw *raftWrapper) Shutdown() error { - future := rw.raft.Shutdown() - err := future.Error() errMsgs := "" + err := rw.Snapshot() + if err != nil { + errMsgs += "could not snapshot raft: " + err.Error() + ".\n" + } + future := rw.raft.Shutdown() + err = future.Error() if err != nil { errMsgs += "could not shutdown raft: " + err.Error() + ".\n" } @@ -427,9 +434,106 @@ func (rw *raftWrapper) Peers() ([]string, error) { return ids, nil } -// only call when Raft is shutdown -func (rw *raftWrapper) Clean() error { - dbh := newDataBackupHelper(rw.dataFolder) +// latestSnapshot looks for the most recent raft snapshot stored at the +// provided basedir. It returns a boolean indicating if any snapshot is +// readable, the snapshot's metadata, and a reader to the snapshot's bytes +func latestSnapshot(raftDataFolder string) (bool, *hraft.SnapshotMeta, io.ReadCloser, error) { + store, err := hraft.NewFileSnapshotStore(raftDataFolder, RaftMaxSnapshots, nil) + if err != nil { + return false, nil, nil, err + } + snapMetas, err := store.List() + if err != nil { + return false, nil, nil, err + } + if len(snapMetas) == 0 { // no error if snapshot isn't found + return false, nil, nil, nil + } + meta, r, err := store.Open(snapMetas[0].ID) + if err != nil { + return false, nil, nil, err + } + return true, meta, r, nil +} + +// LastStateRaw returns the bytes of the last snapshot stored, its metadata, +// and a flag indicating whether any snapshot was found. +func LastStateRaw(cfg *Config) (io.Reader, bool, error) { + // Read most recent snapshot + dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder) + if err != nil { + return nil, false, err + } + found, _, r, err := latestSnapshot(dataFolder) + if err != nil { + return nil, false, err + } + if !found { + return nil, false, nil + } + return r, true, nil +} + +// SnapshotSave saves the provided state to a snapshot in the +// raft data path. Old raft data is backed up and replaced +// by the new snapshot +func SnapshotSave(cfg *Config, newState state.State, pid peer.ID) error { + newStateBytes, err := p2praft.EncodeSnapshot(newState) + if err != nil { + return err + } + dataFolder, err := makeDataFolder(cfg.BaseDir, cfg.DataFolder) + if err != nil { + return err + } + found, meta, _, err := latestSnapshot(dataFolder) + if err != nil { + return err + } + + // make a new raft snapshot + var raftSnapVersion hraft.SnapshotVersion + raftSnapVersion = 1 // As of hraft v1.0.0 this is always 1 + configIndex := uint64(1) + var raftIndex uint64 + var raftTerm uint64 + var srvCfg hraft.Configuration + if found { + raftIndex = meta.Index + raftTerm = meta.Term + srvCfg = meta.Configuration + cleanupRaft(dataFolder) + } else { + raftIndex = uint64(1) + raftTerm = uint64(1) + srvCfg = makeServerConf([]peer.ID{pid}) + } + + snapshotStore, err := hraft.NewFileSnapshotStoreWithLogger(dataFolder, RaftMaxSnapshots, nil) + if err != nil { + return err + } + _, dummyTransport := hraft.NewInmemTransport("") + + sink, err := snapshotStore.Create(raftSnapVersion, raftIndex, raftTerm, srvCfg, configIndex, dummyTransport) + if err != nil { + return err + } + + _, err = sink.Write(newStateBytes) + if err != nil { + sink.Cancel() + return err + } + err = sink.Close() + if err != nil { + return err + } + return nil +} + +func cleanupRaft(dataFolder string) error { + dbh := newDataBackupHelper(dataFolder) err := dbh.makeBackup() if err != nil { logger.Warning(err) @@ -439,6 +543,11 @@ func (rw *raftWrapper) Clean() error { return nil } +// only call when Raft is shutdown +func (rw *raftWrapper) Clean() error { + return cleanupRaft(rw.dataFolder) +} + func find(s []string, elem string) bool { for _, selem := range s { if selem == elem { diff --git a/ipfs-cluster-service/main.go b/ipfs-cluster-service/main.go index 1782345e1..94c80b31b 100644 --- a/ipfs-cluster-service/main.go +++ b/ipfs-cluster-service/main.go @@ -14,7 +14,7 @@ import ( logging "github.com/ipfs/go-log" ma "github.com/multiformats/go-multiaddr" - "github.com/urfave/cli" + cli "github.com/urfave/cli" ipfscluster "github.com/ipfs/ipfs-cluster" "github.com/ipfs/ipfs-cluster/allocator/ascendalloc" @@ -239,6 +239,29 @@ configuration. Usage: "run the IPFS Cluster peer (default)", Action: daemon, }, + { + Name: "state", + Usage: "Manage ipfs-cluster-state", + Subcommands: []cli.Command{ + { + Name: "upgrade", + Usage: "upgrade the IPFS Cluster state to the current version", + Description: ` + +This command upgrades the internal state of the ipfs-cluster node +specified in the latest raft snapshot. The state format is migrated from the +version of the snapshot to the version supported by the current cluster version. +To succesfully run an upgrade of an entire cluster, shut down each peer without +removal, upgrade state using this command, and restart every peer. +`, + Action: func(c *cli.Context) error { + err := upgrade() + checkErr("upgrading state", err) + return nil + }, + }, + }, + }, } app.Before = func(c *cli.Context) error { @@ -301,6 +324,10 @@ func daemon(c *cli.Context) error { checkErr("creating IPFS Connector component", err) state := mapstate.NewMapState() + + err = validateVersion(clusterCfg, consensusCfg) + checkErr("validating version", err) + tracker := maptracker.NewMapPinTracker(clusterCfg.ID) mon, err := basic.NewMonitor(monCfg) checkErr("creating Monitor component", err) @@ -342,6 +369,7 @@ var facilities = []string{ "cluster", "restapi", "ipfshttp", + "mapstate", "monitor", "consensus", "pintracker", diff --git a/ipfs-cluster-service/migrate.go b/ipfs-cluster-service/migrate.go new file mode 100644 index 000000000..fab3f1ed4 --- /dev/null +++ b/ipfs-cluster-service/migrate.go @@ -0,0 +1,78 @@ +package main + +import ( + "errors" + "io/ioutil" + + ipfscluster "github.com/ipfs/ipfs-cluster" + "github.com/ipfs/ipfs-cluster/state/mapstate" + "github.com/ipfs/ipfs-cluster/consensus/raft" +) + + +func upgrade() error { + //Load configs + cfg, clusterCfg, _, _, consensusCfg, _, _, _ := makeConfigs() + err := cfg.LoadJSONFromFile(configPath) + if err != nil { + return err + } + + newState := mapstate.NewMapState() + + // Get the last state + r, snapExists, err := raft.LastStateRaw(consensusCfg) + if err != nil { + return err + } + if !snapExists { + logger.Error("No raft state currently exists to upgrade from") + return errors.New("No snapshot could be found") + } + + // Restore the state from snapshot + err = newState.Restore(r) + if err != nil { + return err + } + + // Reset with SnapshotSave + err = raft.SnapshotSave(consensusCfg, newState, clusterCfg.ID) + if err != nil { + return err + } + return nil +} + + +func validateVersion(cfg *ipfscluster.Config, cCfg *raft.Config) error { + state := mapstate.NewMapState() + r, snapExists, err := raft.LastStateRaw(cCfg) + if !snapExists && err != nil { + logger.Error("Error before reading latest snapshot.") + return err + } else if snapExists && err != nil { + logger.Error("Error after reading last snapshot. Snapshot potentially corrupt.") + return err + } else if snapExists && err == nil { + raw, err := ioutil.ReadAll(r) + if err != nil { + return err + } + err = state.Unmarshal(raw) + if err != nil { + logger.Error("Error unmarshalling snapshot. Snapshot potentially corrupt.") + return err + } + if state.GetVersion() != state.Version { + logger.Error("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + logger.Error("Out of date ipfs-cluster state is saved.") + logger.Error("To migrate to the new version, run ipfs-cluster-service state upgrade.") + logger.Error("To launch a node without this state, rename the consensus data directory.") + logger.Error("Hint, the default is .ipfs-cluster/ipfs-cluster-data.") + logger.Error("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + return errors.New("Outdated state version stored") + } + } // !snapExists && err == nil // no existing state, no check needed + return nil +} diff --git a/logging.go b/logging.go index d895172a5..74a28d64f 100644 --- a/logging.go +++ b/logging.go @@ -9,6 +9,7 @@ var facilities = []string{ "restapi", "ipfshttp", "monitor", + "mapstate", "consensus", "raft", "pintracker", diff --git a/package.json b/package.json index 9581af372..6195f8761 100644 --- a/package.json +++ b/package.json @@ -21,9 +21,9 @@ }, { "author": "hsanjuan", - "hash": "QmcjpDnmS6jGrYrTnuT4RDHUHduF97w2V8JuYs6eynbFg2", + "hash": "QmSzULQiTbMnt36oxzdaUUomNuTHwXa1DvnyM8o7ogJ7Hb", "name": "go-libp2p-raft", - "version": "1.1.0" + "version": "1.2.0" }, { "author": "whyrusleeping", diff --git a/sharness/lib/test-lib.sh b/sharness/lib/test-lib.sh index 4a22438fd..30c8ad9ac 100755 --- a/sharness/lib/test-lib.sh +++ b/sharness/lib/test-lib.sh @@ -6,8 +6,8 @@ SHARNESS_LIB="lib/sharness/sharness.sh" # Daemons output will be redirected to... -IPFS_OUTPUT="/dev/null" # change for debugging -# IPFS_OUTPUT="/dev/stderr" # change for debugging +#IPFS_OUTPUT="/dev/null" # change for debugging +IPFS_OUTPUT="/dev/stderr" # change for debugging . "$SHARNESS_LIB" || { echo >&2 "Cannot source: $SHARNESS_LIB" @@ -88,6 +88,28 @@ cluster_id() { jq --raw-output ".cluster.id" test-config/service.json } +test_confirm_v1State() { + V1_SNAP_PATH="../test_data/v1State" + V1_CRC_PATH="../test_data/v1Crc" + if [ -f $V1_SNAP_PATH ] && [ -f $V1_CRC_PATH ]; then + export V1_CRC=$(cat ../test_data/v1Crc) + cp $V1_SNAP_PATH v1State + test_set_prereq V1STATE + fi +} + +cluster_kill(){ + kill -1 "$CLUSTER_D_PID" + sleep 20 +} + +cluster_start(){ + ipfs-cluster-service --config "test-config" >"$IPFS_OUTPUT" 2>&1 & + export CLUSTER_D_PID=$! + sleep 40 +} + + # Cleanup functions test_clean_ipfs(){ docker kill ipfs diff --git a/sharness/t0050-service-state-upgrade-from-current.sh b/sharness/t0050-service-state-upgrade-from-current.sh new file mode 100755 index 000000000..d37dad8a1 --- /dev/null +++ b/sharness/t0050-service-state-upgrade-from-current.sh @@ -0,0 +1,22 @@ +#! /bin/sh + +test_description="Test service state 'upgrade' from current version" + +. lib/test-lib.sh + +test_ipfs_init +cleanup test_clean_ipfs +test_cluster_init +cleanup test_clean_cluster + +test_expect_success IPFS,CLUSTER "cluster-service state preserved by migration" ' + cid=`docker exec ipfs sh -c "echo testing | ipfs add -q"` && + ipfs-cluster-ctl pin add "$cid" && sleep 30 && + cluster_kill && + ipfs-cluster-service --config "test-config" state upgrade >"$IPFS_OUTPUT" && + cluster_start && + ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" && + ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED" +' + +test_done diff --git a/sharness/t0051-service-state-upgrade-from-old.sh b/sharness/t0051-service-state-upgrade-from-old.sh new file mode 100755 index 000000000..2d7c6641e --- /dev/null +++ b/sharness/t0051-service-state-upgrade-from-old.sh @@ -0,0 +1,33 @@ +#! /bin/sh + +test_description="Test service state upgrade v1 -> v2 and v2 -> v2" + +. lib/test-lib.sh + +test_ipfs_init +cleanup test_clean_ipfs +test_cluster_init +test_confirm_v1State +cleanup test_clean_cluster + +# Make a pin and shutdown to force a snapshot. Modify snapshot files to specify +# a snapshot of v1 state pinning "test" (it's easier than taking a new one each +# time with the correct metadata). Upgrade state, check that the correct +# pin cid is in the state +test_expect_success IPFS,CLUSTER,V1STATE,JQ "cluster-service loads v1 state correctly" ' + cid=`docker exec ipfs sh -c "echo test | ipfs add -q"` && + cid2=`docker exec ipfs sh -c "echo testing | ipfs add -q"` && + ipfs-cluster-ctl pin add "$cid2" && sleep 30 && + cluster_kill && + SNAP_DIR="test-config/ipfs-cluster-data/snapshots/" && + SNAP_DIR+="$( ls test-config/ipfs-cluster-data/snapshots/ | head -n 1)" && + cp v1State $SNAP_DIR/state.bin && + cat $SNAP_DIR/meta.json | jq --arg CRC "$V1_CRC" '"'"'.CRC = $CRC'"'"' > tmp.json&& + cp tmp.json $SNAP_DIR/meta.json && + ipfs-cluster-service --config "test-config" state upgrade >"$IPFS_OUTPUT" && + cluster_start && + ipfs-cluster-ctl pin ls "$cid" | grep -q "$cid" && + ipfs-cluster-ctl status "$cid" | grep -q -i "PINNED" +' + +test_done diff --git a/sharness/test_data/v1Crc b/sharness/test_data/v1Crc new file mode 100644 index 000000000..26695bef6 --- /dev/null +++ b/sharness/test_data/v1Crc @@ -0,0 +1 @@ +y8SrOIoXJo4= diff --git a/sharness/test_data/v1State b/sharness/test_data/v1State new file mode 100755 index 000000000..7a81195e8 Binary files /dev/null and b/sharness/test_data/v1State differ diff --git a/state/interface.go b/state/interface.go index dbd894fda..ebeced25d 100644 --- a/state/interface.go +++ b/state/interface.go @@ -25,6 +25,12 @@ type State interface { Get(*cid.Cid) api.Pin // Snapshot writes a snapshot of the state to a writer Snapshot(w io.Writer) error - // Restore restores a snapshot from a reader + // Restore restores an outdated state to the current version Restore(r io.Reader) error + // Return the version of this state + GetVersion() int + // Marshal serializes the state to a byte slice + Marshal() ([]byte, error) + // Unmarshal deserializes the state from marshaled bytes + Unmarshal([]byte) error } diff --git a/state/mapstate/map_state.go b/state/mapstate/map_state.go index 199dcb643..8e1d5d11d 100644 --- a/state/mapstate/map_state.go +++ b/state/mapstate/map_state.go @@ -3,20 +3,25 @@ package mapstate import ( + "bytes" "encoding/json" "io" "io/ioutil" "sync" - "github.com/ipfs/ipfs-cluster/api" + msgpack "github.com/multiformats/go-multicodec/msgpack" cid "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log" + "github.com/ipfs/ipfs-cluster/api" ) // Version is the map state Version. States with old versions should // perform an upgrade before. const Version = 2 +var logger = logging.Logger("mapstate") + // MapState is a very simple database to store the state of the system // using a Go map. It is thread safe. It implements the State interface. type MapState struct { @@ -92,22 +97,69 @@ func (st *MapState) Snapshot(w io.Writer) error { return enc.Encode(st) } -// Restore takes a reader and restores a snapshot. It should migrate -// the format if it is not compatible with the current version. +// Restore restores a snapshot from the state's internal bytes. It should +// migrate the format if it is not compatible with the current version. func (st *MapState) Restore(r io.Reader) error { - snap, err := ioutil.ReadAll(r) + bs, err := ioutil.ReadAll(r) if err != nil { return err } - var vonly struct{ Version int } - err = json.Unmarshal(snap, &vonly) + err = st.Unmarshal(bs) + if st.Version == Version { // Unmarshal restored for us + return nil + } + bytesNoVersion := bs[1:] // Restore is aware of encoding format + err = st.migrateFrom(st.Version, bytesNoVersion) if err != nil { return err } - if vonly.Version == Version { - // we are good - err := json.Unmarshal(snap, st) + st.Version = Version + return nil +} + +// GetVersion returns the current version of this state object. +// It is not necessarily up to date +func (st *MapState) GetVersion() int { + return st.Version +} + +// Marshal encodes the state using msgpack +func (st *MapState) Marshal() ([]byte, error) { + logger.Debugf("Marshal-- Marshalling state of version %d", st.Version) + buf := new(bytes.Buffer) + enc := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Encoder(buf) + if err := enc.Encode(st); err != nil { + return nil, err + } + // First byte indicates the version (probably should make this a varint + // if we stick to this encoding) + vCodec := make([]byte, 1) + vCodec[0] = byte(st.Version) + ret := append(vCodec, buf.Bytes()...) + logger.Debugf("Marshal-- The final marshaled bytes: %x", ret) + return ret, nil +} + +// Unmarshal decodes the state using msgpack. It first decodes just +// the version number. If this is not the current version the bytes +// are stored within the state's internal reader, which can be migrated +// to the current version in a later call to restore. Note: Out of date +// version is not an error +func (st *MapState) Unmarshal(bs []byte) error { + // Check version byte + logger.Debugf("The incoming bytes to unmarshal: %x", bs) + v := int(bs[0]) + logger.Debugf("The interpreted version: %d", v) + if v != Version { // snapshot is out of date + st.Version = v + return nil + } + + // snapshot is up to date + buf := bytes.NewBuffer(bs[1:]) + dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(buf) + if err := dec.Decode(st); err != nil { return err } - return st.migrateFrom(vonly.Version, snap) + return nil } diff --git a/state/mapstate/map_state_test.go b/state/mapstate/map_state_test.go index cff6820e2..3fca31c21 100644 --- a/state/mapstate/map_state_test.go +++ b/state/mapstate/map_state_test.go @@ -2,9 +2,10 @@ package mapstate import ( "bytes" - "fmt" "testing" + msgpack "github.com/multiformats/go-multicodec/msgpack" + cid "github.com/ipfs/go-cid" peer "github.com/libp2p/go-libp2p-peer" @@ -69,19 +70,21 @@ func TestList(t *testing.T) { } } -func TestSnapshotRestore(t *testing.T) { +func TestMarshalUnmarshal(t *testing.T) { ms := NewMapState() ms.Add(c) - var buf bytes.Buffer - err := ms.Snapshot(&buf) + b, err := ms.Marshal() if err != nil { t.Fatal(err) } ms2 := NewMapState() - err = ms2.Restore(&buf) + err = ms2.Unmarshal(b) if err != nil { t.Fatal(err) } + if ms.Version != ms2.Version { + t.Fatal(err) + } get := ms2.Get(c.Cid) if get.Allocations[0] != testPeerID1 { t.Error("expected different peer id") @@ -89,19 +92,36 @@ func TestSnapshotRestore(t *testing.T) { } func TestMigrateFromV1(t *testing.T) { - v1 := []byte(fmt.Sprintf(`{ - "Version": 1, - "PinMap": { - "%s": {} - } -} -`, c.Cid)) - buf := bytes.NewBuffer(v1) - ms := NewMapState() - err := ms.Restore(buf) + // Construct the bytes of a v1 state + var v1State mapStateV1 + v1State.PinMap = map[string]struct{}{ + c.Cid.String(): {}} + v1State.Version = 1 + buf := new(bytes.Buffer) + enc := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Encoder(buf) + err := enc.Encode(v1State) if err != nil { t.Fatal(err) } + vCodec := make([]byte, 1) + vCodec[0] = byte(v1State.Version) + v1Bytes := append(vCodec, buf.Bytes()...) + + // Unmarshal first to check this is v1 + ms := NewMapState() + err = ms.Unmarshal(v1Bytes) + if err != nil { + t.Error(err) + } + if ms.Version != 1 { + t.Error("unmarshal picked up the wrong version") + } + // Migrate state to current version + r := bytes.NewBuffer(v1Bytes) + err = ms.Restore(r) + if err != nil { + t.Error(err) + } get := ms.Get(c.Cid) if get.ReplicationFactor != -1 || !get.Cid.Equals(c.Cid) { t.Error("expected something different") diff --git a/state/mapstate/migrate.go b/state/mapstate/migrate.go index 44bdb1c19..4b2b09315 100644 --- a/state/mapstate/migrate.go +++ b/state/mapstate/migrate.go @@ -1,9 +1,11 @@ package mapstate import ( - "encoding/json" + "bytes" "errors" + msgpack "github.com/multiformats/go-multicodec/msgpack" + "github.com/ipfs/ipfs-cluster/api" ) @@ -16,10 +18,12 @@ func (st *MapState) migrateFrom(version int, snap []byte) error { switch version { case 1: var mstv1 mapStateV1 - err := json.Unmarshal(snap, &mstv1) - if err != nil { + buf := bytes.NewBuffer(snap) + dec := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Decoder(buf) + if err := dec.Decode(&mstv1); err != nil { return err } + for k := range mstv1.PinMap { st.PinMap[k] = api.PinSerial{ Cid: k, diff --git a/util.go b/util.go index 65b83354c..4c29fe229 100644 --- a/util.go +++ b/util.go @@ -52,6 +52,10 @@ func copyEmptyStructToIfaces(in []struct{}) []interface{} { return ifaces } +func MultiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) { + return multiaddrSplit(addr) +} + func multiaddrSplit(addr ma.Multiaddr) (peer.ID, ma.Multiaddr, error) { pid, err := addr.ValueForProtocol(ma.P_IPFS) if err != nil {