Skip to content

Commit

Permalink
Work around fsm reflection errors and fix peer diff
Browse files Browse the repository at this point in the history
  • Loading branch information
ZenGround0 committed Nov 7, 2017
1 parent d131bdd commit 3c09093
Showing 1 changed file with 28 additions and 14 deletions.
42 changes: 28 additions & 14 deletions ipfs-cluster-service/main.go
Expand Up @@ -5,7 +5,6 @@ import (
"bytes"
"errors"
"fmt"
"io/ioutil"
"os"
"os/signal"
"os/user"
Expand Down Expand Up @@ -280,13 +279,10 @@ removal, migrate using this command, and restart every peer.
newState := mapstate.NewMapState()
err = newState.Restore(r)
checkErr("migrating state to alternate version", err)
// Encode state to a reader-closer for fsm.Restore()
buf := new(bytes.Buffer)
enc := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Encoder(buf)
err = enc.Encode(newState)
// Encode state to save as a snapshot
newStateBytes, err := encodeState(*newState)
checkErr("encoding new state", err)
rc := ioutil.NopCloser(buf)


//Remove raft state
err = cleanupRaft(raftDataPath)
checkErr("cleaningup Raft", err)
Expand All @@ -306,13 +302,10 @@ removal, migrate using this command, and restart every peer.
srvCfg := raft.MakeServerConf(dummyHost.Peerstore().Peers())
sink, err := snapshotStore.Create(raftSnapVersion, raftIndex, raftTerm, srvCfg, configIndex, dummyTransport)
checkErr("Creating a temporary snapshot store", err)
cc := libp2praft.NewConsensus(newState)
err = cc.FSM().Restore(rc)
checkErr("Loading state to FSM", err)
snap, err := cc.FSM().Snapshot()
checkErr("Creating snapshot", err)
err = snap.Persist(sink)
checkErr("Persisting snapshot", err)
_, err = sink.Write(newStateBytes)
checkErr("Writing snapshot to sink", err)
err = sink.Close()
checkErr("Closing sink", err)

return nil
},
Expand Down Expand Up @@ -422,6 +415,13 @@ func cleanupRaft(raftDataDir string) error {
return err
}
err = os.RemoveAll(snapShotDir)

// TODO we need to create the snapshot dir so that it can be writeable by the
// next ipfs-cluster-service process. Ideally it's not globally writeable for
// security (escalation to putting any state in ipfs cluster raft logs) but
// this will need some thought. What does hraft do so that future processes
// can edit the snapshot dir after shutting down a raft peer?

return err
}

Expand Down Expand Up @@ -541,6 +541,12 @@ func makeDummyHost(cfg *ipfscluster.Config) (host.Host, error) {
}
ps.AddAddr(pid, decapAddr, peerstore.PermanentAddrTTL)
}
pid, decapAddr, err := ipfscluster.MultiaddrSplit(cfg.ID)
if err != nil {
return nil, err
}
ps.AddAddr(pid, decapAddr, peerstore.PermanentAddrTTL)

network, err := swarm.NewNetwork(
context.Background(),
[]ma.Multiaddr{cfg.ListenAddr},
Expand All @@ -556,3 +562,11 @@ func makeDummyHost(cfg *ipfscluster.Config) (host.Host, error) {
return bhost, nil
}

func encodeState(state mapstate.MapState) ([]byte, error) {
buf := new(bytes.Buffer)
enc := msgpack.Multicodec(msgpack.DefaultMsgpackHandle()).Encoder(buf)
if err := enc.Encode(state); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

0 comments on commit 3c09093

Please sign in to comment.