Skip to content

Commit

Permalink
Merge pull request #81 from renproject/feat/handle-resyncs
Browse files Browse the repository at this point in the history
Fix handling of Resync messages
  • Loading branch information
loongy committed Apr 15, 2020
2 parents 35655e5 + aac6139 commit f12dec5
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 40 deletions.
43 changes: 26 additions & 17 deletions replica/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,23 +169,40 @@ func (replica *Replica) HandleMessage(m Message) {
return
}

// Ignore messages from heights that the process has already progressed
// through. Messages at these earlier heights have no affect on consensus,
// and so there is no point wasting time processing them.
// Ignore non-Resync messages from heights that the process has already
// progressed through. Messages at these earlier heights have no affect on
// consensus, and so there is no point wasting time processing them.
if m.Message.Height() < replica.p.CurrentHeight() {
if _, ok := m.Message.(*process.Resync); !ok {
replica.options.Logger.Debugf("ignore message: expected height>=%v, got height=%v", replica.p.CurrentHeight(), m.Message.Height())
return
}
}

// Check that the Message sender is from our Shard (this can be a moderately
// expensive operation, so we cache the result until a new `block.Base` is
// detected)
replica.cache.fillBaseBlock(replica.blockStorage.LatestBaseBlock(replica.shard))
if !replica.cache.signatoryInBaseBlock(m.Message.Signatory()) {
return
}
if err := replica.verifySignedMessage(m); err != nil {
replica.options.Logger.Warnf("bad message: unverified: %v", err)
return
}


// Resync messages can be handled immediately, as long as they are not from
// a future height and their timestamps do not differ greatly from the
// current time.
if m.Message.Type() == process.ResyncMessageType {
if m.Message.Height() > replica.p.CurrentHeight() {
// We cannot respond to resync messages from future heights with
// anything that is useful, so we ignore it.
replica.options.Logger.Debugf("ignore message: resync height=%v compared to current height=%v", m.Message.Height(), replica.p.CurrentHeight())
return
}
// Filter resync messages by timestamp. If they're too old, or too far
// Filter Resync messages by timestamp. If they're too old, or too far
// in the future, then ignore them. The total window of time is 20
// seconds, approximately the latency expected for globally distributed
// message passing.
Expand All @@ -199,21 +216,13 @@ func (replica *Replica) HandleMessage(m Message) {
replica.options.Logger.Debugf("ignore message: resync timestamp=%v compared to now=%v", timestamp, now)
return
}
}

// Check that the Message sender is from our Shard (this can be a moderately
// expensive operation, so we cache the result until a new `block.Base` is
// detected)
replica.cache.fillBaseBlock(replica.blockStorage.LatestBaseBlock(replica.shard))
if !replica.cache.signatoryInBaseBlock(m.Message.Signatory()) {
return
}
if err := replica.verifySignedMessage(m); err != nil {
replica.options.Logger.Warnf("bad message: unverified: %v", err)
replica.p.HandleMessage(m.Message)
return
}

// Make sure that the Process state gets saved.

// Make sure that the Process state gets saved. We do this here
// because Resync cannot cause state changes, so there is no
// reason to save after handling a Resync message.
defer replica.p.Save()

// Messages from the current height can be handled immediately.
Expand Down
50 changes: 27 additions & 23 deletions testutil/replica/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ func (observer *MockObserver) DidReceiveSufficientNilPrevotes(process.Messages,
type MockBroadcaster struct {
min, max int

mu *sync.RWMutex
cons map[id.Signatory]chan []byte
active map[id.Signatory]bool
cons map[id.Signatory]chan []byte
active map[id.Signatory]bool
activeMu *sync.RWMutex

signatories map[id.Signatory]int
}
Expand All @@ -171,21 +171,23 @@ func NewMockBroadcaster(keys []*ecdsa.PrivateKey, min, max int) *MockBroadcaster
min: min,
max: max,

mu: new(sync.RWMutex),
cons: cons,
active: map[id.Signatory]bool{},
activeMu: new(sync.RWMutex),
signatories: signatories,
}
}

func (m *MockBroadcaster) Broadcast(message replica.Message) {
m.mu.RLock()
defer m.mu.RUnlock()
func() {
// If the sender is offline, it cannot send messages to other nodes.
m.activeMu.RLock()
defer m.activeMu.RUnlock()

// If the sender is offline, it cannot send messages to other nodes.
if !m.active[message.Message.Signatory()] {
return
}
if !m.active[message.Message.Signatory()] {
return
}
}()

messageBytes, err := surge.ToBinary(message)
if err != nil {
Expand All @@ -197,13 +199,15 @@ func (m *MockBroadcaster) Broadcast(message replica.Message) {
}

func (m *MockBroadcaster) Cast(to id.Signatory, message replica.Message) {
m.mu.RLock()
defer m.mu.RUnlock()
func() {
// If the sender is offline, it cannot send messages to other nodes.
m.activeMu.RLock()
defer m.activeMu.RUnlock()

// If the sender is offline, it cannot send messages to other nodes.
if !m.active[message.Message.Signatory()] {
return
}
if !m.active[message.Message.Signatory()] {
return
}
}()

messageBytes, err := surge.ToBinary(message)
if err != nil {
Expand All @@ -218,28 +222,28 @@ func (m *MockBroadcaster) sendMessage(receiver id.Signatory, message []byte) {

// If the receiver is offline, it cannot receive any messages from other
// nodes.
m.activeMu.RLock()
defer m.activeMu.RUnlock()

if m.active[receiver] {
go func() { messages <- message }()
}
}

func (m *MockBroadcaster) Messages(sig id.Signatory) chan []byte {
m.mu.RLock()
defer m.mu.RUnlock()

return m.cons[sig]
}

func (m *MockBroadcaster) EnablePeer(sig id.Signatory) {
m.mu.Lock()
defer m.mu.Unlock()
m.activeMu.Lock()
defer m.activeMu.Unlock()

m.active[sig] = true
}

func (m *MockBroadcaster) DisablePeer(sig id.Signatory) {
m.mu.Lock()
defer m.mu.Unlock()
m.activeMu.Lock()
defer m.activeMu.Unlock()

m.active[sig] = false
}

0 comments on commit f12dec5

Please sign in to comment.