/
replay.go
executable file
·137 lines (122 loc) · 4.36 KB
/
replay.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package consensus
import (
"fmt"
"hash/crc32"
"io"
"reflect"
"time"
"github.com/lianxiangcloud/linkchain/types"
)
var crc32c = crc32.MakeTable(crc32.Castagnoli)
// Functionality to replay messages on recovery from a crash during consensus.
// Unmarshal and apply a single message to the consensus state as if it were
// received in receiveRoutine. Lines that start with "#" are ignored.
// NOTE: receiveRoutine should not be running.
func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan interface{}) error {
// Skip meta messages which exist for demarcating boundaries.
if _, ok := msg.Msg.(EndHeightMessage); ok {
return nil
}
// for logging
switch m := msg.Msg.(type) {
case types.EventDataRoundState:
cs.Logger.Info("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
// these are playback checks
ticker := time.After(time.Second * 2)
if newStepCh != nil {
select {
case mi := <-newStepCh:
m2 := mi.(types.EventDataRoundState)
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m)
}
case <-ticker:
return fmt.Errorf("Failed to read off newStepCh")
}
}
case msgInfo:
peerID := m.PeerID
if peerID == "" {
peerID = "local"
}
switch msg := m.Msg.(type) {
case *ProposalMessage:
p := msg.Proposal
cs.Logger.Info("Replay: Proposal", "height", p.Height, "round", p.Round, "header",
p.BlockPartsHeader, "pol", p.POLRound, "peer", peerID)
case *BlockPartMessage:
cs.Logger.Info("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerID)
case *VoteMessage:
v := msg.Vote
cs.Logger.Info("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
"blockID", v.BlockID, "peer", peerID)
}
cs.handleMsg(m)
case timeoutInfo:
cs.Logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
cs.handleTimeout(m, cs.RoundState)
default:
return fmt.Errorf("Replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg))
}
return nil
}
// Replay only those messages since the last block. `timeoutRoutine` should
// run concurrently to read off tickChan.
func (cs *ConsensusState) catchupReplay(csHeight uint64) error {
// Set replayMode to true so we don't log signing errors.
cs.replayMode = true
defer func() { cs.replayMode = false }()
// Ensure that #ENDHEIGHT for this height doesn't exist.
// NOTE: This is just a sanity check. As far as we know things work fine
// without it, and Handshake could reuse ConsensusState if it weren't for
// this check (since we can crash after writing #ENDHEIGHT).
//
// Ignore data corruption errors since this is a sanity check.
gr, found, err := cs.wal.SearchForEndHeight(csHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
if err != nil {
return err
}
if gr != nil {
if err := gr.Close(); err != nil {
return err
}
}
if found {
return fmt.Errorf("WAL should not contain #ENDHEIGHT %d", csHeight)
}
// Search for last height marker.
//
// Ignore data corruption errors in previous heights because we only care about last height
gr, found, err = cs.wal.SearchForEndHeight(csHeight-1, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
if err == io.EOF {
cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1)
} else if err != nil {
return err
}
if !found {
return fmt.Errorf("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d", csHeight, csHeight-1)
}
defer gr.Close() // nolint: errcheck
cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight)
var msg *TimedWALMessage
dec := WALDecoder{gr}
for {
msg, err = dec.Decode()
if err == io.EOF {
break
} else if IsDataCorruptionError(err) {
cs.Logger.Debug("data has been corrupted in last height of consensus WAL", "err", err, "height", csHeight)
panic(fmt.Sprintf("data has been corrupted (%v) in last height %d of consensus WAL", err, csHeight))
} else if err != nil {
return err
}
// NOTE: since the priv key is set when the msgs are received
// it will attempt to eg double sign but we can just ignore it
// since the votes will be replayed and we'll get to the next step
if err := cs.readReplayMessage(msg, nil); err != nil {
return err
}
}
cs.Logger.Info("Replay: Done")
return nil
}