Skip to content

Commit

Permalink
Huge refactoring: replace compaction by snapshotting
Browse files Browse the repository at this point in the history
In recent discussions, we realized that the original motivation for the
compaction implementation (i.e. deleting old IRC messages based on
analyzing their dependencies) is no longer clear. Quite possibly,
compaction just was the first idea that came to mind and we went with
it.

In the last couple of weeks (see the commit history), it became clear
that the compaction approach is _really_ complex and complicated to get
right.

So, instead, with this commit, we serialize the IRCServer state into a
single protocol buffer encoded message which is stored as a RobustState
message as the first message of each snapshot. I.e., instead of
re-applying thousands of messages to end up with a certain IRCServer
state, we can now just deserialize the RobustState message.

This is a backwards-compatible change, i.e. old snapshots can be read by
new nodes just fine. However, once your nodes are upgraded and Raft took
a new snapshot, you cannot downgrade anymore — the old versions won’t
understand the new RobustState message. Further, you must update all
nodes within a short period of time, ideally updating Followers before
the Leader. That way, the chance of Raft transferring a snapshot from a
new node to an older node is minimized.

With compaction being gone, the behavior of the robustirc-canary tool
changes slightly: instead of comparing all messages in the compaction
window, it now compares all messages outside of the compaction
window. The compaction window is boring to look at, since all messages
are just deleted and replaced by the single snapshot message. The
messages afterwards are more interesting to see if changes in the code
caused a change in behavior.

The robustirc-dump tool and the status page have been updated to display
IRCServer state in text format (stripping privacy-sensitive information
like connection passwords).

We’ve decided to use protocol buffers for serialization instead of JSON
because protocol buffers are way faster and will be used with gRPC
anyway (see issue #132) and likely even before that to speed up node
startup (see issue #131).

fixes #108
  • Loading branch information
stapelberg committed Jun 14, 2016
1 parent b813898 commit 2d3a623
Show file tree
Hide file tree
Showing 29 changed files with 1,552 additions and 4,012 deletions.
66 changes: 32 additions & 34 deletions api.go
Expand Up @@ -36,8 +36,6 @@ var (
GetMessageRequests = make(map[string]GetMessageStats)
getMessagesRequestsMu sync.Mutex

configMu sync.Mutex

// applyMu guards calls to raft.Apply(). We need to lock them because
// otherwise we cannot guarantee that multiple goroutines will write
// strictly monotonically increasing timestamps.
Expand Down Expand Up @@ -171,10 +169,8 @@ func handlePostMessage(w http.ResponseWriter, r *http.Request, ps httprouter.Par
}

// Don’t throttle server-to-server connections (services)
if netConfig.PostMessageCooloff > 0 {
until := ircServer.ThrottleUntil(session, time.Duration(netConfig.PostMessageCooloff))
time.Sleep(until.Sub(time.Now()))
}
until := ircServer.ThrottleUntil(session)
time.Sleep(until.Sub(time.Now()))

type postMessageRequest struct {
Data string
Expand Down Expand Up @@ -624,33 +620,50 @@ func handleQuit(w http.ResponseWriter, r *http.Request) {

func handleGetConfig(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
w.Header().Set("X-RobustIRC-Config-Revision", strconv.Itoa(netConfig.Revision))
w.Header().Set("X-RobustIRC-Config-Revision", strconv.Itoa(ircServer.Config.Revision))

if err := toml.NewEncoder(w).Encode(&netConfig); err != nil {
ircServer.ConfigMu.RLock()
defer ircServer.ConfigMu.RUnlock()
if err := toml.NewEncoder(w).Encode(&ircServer.Config); err != nil {
log.Printf("Could not send TOML config: %v\n", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

func handlePostConfig(w http.ResponseWriter, r *http.Request) {
revision, err := strconv.ParseInt(r.Header.Get("X-RobustIRC-Config-Revision"), 0, 64)
func configRevision() int {
ircServer.ConfigMu.RLock()
defer ircServer.ConfigMu.RUnlock()
return ircServer.Config.Revision
}

func applyConfig(revision int, body string) error {
applyMu.Lock()
defer applyMu.Unlock()
if got, want := revision, configRevision(); got != want {
return fmt.Errorf("Revision mismatch (got %d, want %d). Try again.", got, want)
}

msg := ircServer.NewRobustMessage(types.RobustConfig, types.RobustId{}, body)
msg.Revision = int(revision) + 1
msgbytes, err := json.Marshal(msg)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
return fmt.Errorf("Could not store message, cannot encode it as JSON: %v", err)
}

configMu.Lock()
return node.Apply(msgbytes, 10*time.Second).Error()
}

if got, want := int(revision), netConfig.Revision; got != want {
http.Error(w, fmt.Sprintf("Revision mismatch (got %d, want %d). Try again.", got, want), http.StatusBadRequest)
func handlePostConfig(w http.ResponseWriter, r *http.Request) {
revision, err := strconv.ParseInt(r.Header.Get("X-RobustIRC-Config-Revision"), 0, 64)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

var unused config.Network
var body bytes.Buffer
if _, err := toml.DecodeReader(io.TeeReader(r.Body, &body), &unused); err != nil {
configMu.Unlock()
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand All @@ -660,28 +673,13 @@ func handlePostConfig(w http.ResponseWriter, r *http.Request) {
return
}

applyMu.Lock()
msg := ircServer.NewRobustMessage(types.RobustConfig, types.RobustId{}, body.String())
msg.Revision = int(revision) + 1
msgbytes, err := json.Marshal(msg)
if err != nil {
applyMu.Unlock()
configMu.Unlock()
http.Error(w, fmt.Sprintf("Could not store message, cannot encode it as JSON: %v", err),
http.StatusBadRequest)
return
}

f := node.Apply(msgbytes, 10*time.Second)
applyMu.Unlock()
err = f.Error()
configMu.Unlock()
if err != nil {
if err := applyConfig(int(revision), body.String()); err != nil {
if err == raft.ErrNotLeader {
maybeProxyToLeader(w, r, nopCloser{&body})
return
}
http.Error(w, fmt.Sprintf("Apply(): %v", err), http.StatusInternalServerError)

http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
13 changes: 7 additions & 6 deletions canary.go
Expand Up @@ -46,6 +46,8 @@ func canary(fsm raft.FSM, statePath string) {
// just compacted).
log.Printf("Compacting before dumping state\n")

fsm.(*FSM).skipDeletionForCanary = true

snapshot, err := fsm.Snapshot()
if err != nil {
log.Fatalf("fsm.Snapshot(): %v\n", err)
Expand All @@ -56,8 +58,6 @@ func canary(fsm raft.FSM, statePath string) {
log.Fatalf("snapshot is not a robustSnapshot")
}

rs.skipDeletionForCanary = true

sink, err := raft.NewDiscardSnapshotStore().Create(rs.lastIndex, 1, []byte{})
if err != nil {
log.Fatalf("DiscardSnapshotStore.Create(): %v\n", err)
Expand All @@ -67,6 +67,8 @@ func canary(fsm raft.FSM, statePath string) {
log.Fatalf("snapshot.Persist(): %v\n", err)
}

sink.Close()

// Dump the in-memory state into a file, to be read by robustirc-canary.
f, err := os.Create(statePath)
if err != nil {
Expand Down Expand Up @@ -102,8 +104,8 @@ func canary(fsm raft.FSM, statePath string) {
}

nmsg := types.NewRobustMessageFromBytes(nlog.Data)
if time.Unix(0, nmsg.Id.Id).After(rs.compactionEnd) {
break
if time.Unix(0, nmsg.Id.Id).Before(rs.compactionEnd) {
continue
}

// TODO: come up with pseudo-values for createsession/deletesession
Expand All @@ -115,13 +117,12 @@ func canary(fsm raft.FSM, statePath string) {
continue
}
vmsgs, _ := ircServer.Get(nmsg.Id)
_, compacted := rs.del[idx]
cm := canaryMessageState{
Id: idx,
Session: nmsg.Session.Id,
Input: util.PrivacyFilterIrcmsg(ircmsg).String(),
Output: make([]canaryMessageOutput, len(vmsgs)),
Compacted: compacted,
Compacted: false,
}
for idx, vmsg := range vmsgs {
ifc := make(map[string]bool)
Expand Down
1 change: 1 addition & 0 deletions cmd/robustirc-canary/canary.go
Expand Up @@ -97,6 +97,7 @@ func openOrDump(executable, statePath, heapPath string, compactionStart time.Tim
fmt.Sprintf("-dump_canary_state=%s", statePath),
fmt.Sprintf("-dump_heap_profile=%s", heapPath),
fmt.Sprintf("-canary_compaction_start=%d", compactionStart.UnixNano()))
cmd.Stderr = os.Stderr
log.Printf("Dumping canary state: %v\n", cmd.Args)
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("Could not dump canary state: %v", err)
Expand Down
21 changes: 21 additions & 0 deletions cmd/robustirc-dump/dump.go
Expand Up @@ -4,6 +4,7 @@ package main

import (
"bytes"
"encoding/base64"
"encoding/binary"
"encoding/json"
"flag"
Expand All @@ -16,19 +17,24 @@ import (
"strings"
"time"

"github.com/golang/protobuf/proto"
"github.com/hashicorp/raft"
"github.com/robustirc/robustirc/types"
"github.com/robustirc/robustirc/util"
"github.com/syndtr/goleveldb/leveldb"
leveldb_errors "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"

pb "github.com/robustirc/robustirc/proto"
)

var (
path = flag.String("path",
"",
"Path to the database directory to dump.")

// XXX(1.0): delete this flag/functionality, it’s useless since
// snapshots replaced compaction.
onlyCompacted = flag.Bool("only_compacted",
false,
"Display only messages which (should have) undergone at least one compaction cycle because of their age.")
Expand All @@ -51,6 +57,21 @@ func dumpLog(key uint64, rlog *raft.Log) {
rmsg := &unfilteredMsg
if rmsg.Type == types.RobustIRCFromClient {
rmsg = util.PrivacyFilterMsg(rmsg)
} else if rmsg.Type == types.RobustState {
state, err := base64.StdEncoding.DecodeString(rmsg.Data)
if err != nil {
log.Printf("Could not decode robuststate: %v", err)
return
}

var snapshot pb.Snapshot
if err := proto.Unmarshal(state, &snapshot); err != nil {
log.Printf("Could not unmarshal proto: %v", err)
return
}
snapshot = util.PrivacyFilterSnapshot(snapshot)
var marshaler proto.TextMarshaler
rmsg.Data = marshaler.Text(&snapshot)
}
msgtime := time.Unix(0, rmsg.Id.Id)
timepassed := lastModified.Sub(msgtime)
Expand Down

0 comments on commit 2d3a623

Please sign in to comment.