Skip to content

Commit

Permalink
Merge #1377
Browse files Browse the repository at this point in the history
1377: [consensus, synchronization] Fix forks preventing reception of blocks at same height r=huitseeker a=huitseeker

I highly suggest looking at individual commits (code AND messages) to review.

When receiving a block (`HandleBlock`), we test its status in existing tables to see if it was already received. 

If a `Received` status is in the height table, we reject the block, without checking if the block we are receiving is different than the one we've already received at the same height.

Moreover, this logic applies even if all blocks involved are requested and received by blockID only.

This:
- makes sure we report a request status upon block reception (`getRequestStatus`) favoring requests by ID,
- adds basic property testing, which exercises the bug from on a forest of blocks which includes forks,
- does not change anything to the pattern or volume of requested heights.

Fixes [#5884](https://github.com/dapperlabs/flow-go/issues/5884)

Co-authored-by: François Garillot <francois.garillot@dapperlabs.com>
  • Loading branch information
bors[bot] and huitseeker committed Sep 29, 2021
2 parents a9f4b27 + 0633b23 commit 7925ced
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 3 deletions.
8 changes: 5 additions & 3 deletions module/synchronization/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,14 @@ func (c *Core) getRequestStatus(height uint64, blockID flow.Identifier) *Status
heightStatus := c.heights[height]
idStatus := c.blockIDs[blockID]

if heightStatus.WasQueued() {
return heightStatus
}
if idStatus.WasQueued() {
return idStatus
}
// Only return the height status if there is no matching status for the ID
if heightStatus.WasQueued() {
return heightStatus
}

return nil
}

Expand Down
120 changes: 120 additions & 0 deletions module/synchronization/core_rapid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package synchronization

import (
"io/ioutil"
"testing"

"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"pgregory.net/rapid"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/utils/unittest"
)

// TODO: remove this restriction and write a proper parametrizable generator (à la SliceOfN) once we have
// a FlatMap combinator in rapid
const NUM_BLOCKS int = 100

// This returns a forest of blocks, some of which are in a parent relationship
// It should include forks
func populatedBlockStore(t *rapid.T) []flow.Header {
store := []flow.Header{unittest.BlockHeaderFixture()}
for i := 1; i < NUM_BLOCKS; i++ {
b := rapid.OneOf(rapid.Just(unittest.BlockHeaderFixture()), rapid.SampledFrom(store)).Draw(t, "parent").(flow.Header)
store = append(store, unittest.BlockHeaderWithParentFixture(&b))
}
return store
}

type rapidSync struct {
store []flow.Header
core *Core
idRequests map[flow.Identifier]int // pushdown automaton to track ID requests
heightRequests map[uint64]int // pushdown automaton to track height requests
}

// Init is an action for initializing a rapidSync instance.
func (r *rapidSync) Init(t *rapid.T) {
var err error

r.core, err = New(zerolog.New(ioutil.Discard), DefaultConfig())
require.NoError(t, err)

r.store = populatedBlockStore(t)
r.idRequests = make(map[flow.Identifier]int)
r.heightRequests = make(map[uint64]int)
}

// RequestByID is an action that requests a block by its ID.
func (r *rapidSync) RequestByID(t *rapid.T) {
b := rapid.SampledFrom(r.store).Draw(t, "id_request").(flow.Header)
r.core.RequestBlock(b.ID())
// Re-queueing by ID should always succeed
r.idRequests[b.ID()] = 1
// Re-qeueuing by ID "forgets" a past height request
r.heightRequests[b.Height] = 0
}

// RequestByHeight is an action that requests a specific height
func (r *rapidSync) RequestByHeight(t *rapid.T) {
b := rapid.SampledFrom(r.store).Draw(t, "id_request").(flow.Header)
r.core.RequestHeight(b.Height)
// Re-queueing by height should always succeed
r.heightRequests[b.Height] = 1
}

// HandleByID is an action that provides a block header to the sync engine
func (r *rapidSync) HandleByID(t *rapid.T) {
b := rapid.SampledFrom(r.store).Draw(t, "id_handling").(flow.Header)
success := r.core.HandleBlock(&b)
assert.True(t, success || r.idRequests[b.ID()] == 0)

// we decrease the pending requests iff we have already requested this block
// and we have not received it since
if r.idRequests[b.ID()] == 1 {
r.idRequests[b.ID()] = 0
}
// we eagerly remove height requests
r.heightRequests[b.Height] = 0
}

// Check runs after every action and verifies that all required invariants hold.
func (r *rapidSync) Check(t *rapid.T) {
for k, v := range r.idRequests {
if v == 1 {
s, ok := r.core.blockIDs[k]
require.True(t, ok)
assert.True(t, s.WasQueued(), "ID %v was expected to be Queued and is %v", k, s.StatusString())
} else if v == 0 {
s, ok := r.core.blockIDs[k]
// if a block is known with 0 pendings, it's because it was received
if ok {
assert.True(t, s.WasReceived(), "ID %v was expected to be Received and is %v", k, s.StatusString())
}
} else {
t.Fatalf("incorrect management of idRequests in the tests")
}
}
heights, blockIDs := r.core.getRequestableItems()
// the queueing logic queues intervals, while our r.heightRequests only queues specific requests
var activeHeights []uint64
for k, v := range r.heightRequests {
if v == 1 {
activeHeights = append(activeHeights, k)
}
}
assert.Subset(t, heights, activeHeights, "sync engine's height request tracking lost heights")

for _, bID := range blockIDs {
v, ok := r.idRequests[bID]
require.True(t, ok)
assert.Equal(t, 1, v, "blockID %v is supposed to be pending but is not", bID)
}
}

func TestRapidSync(t *testing.T) {
rapid.Check(t, rapid.Run(&rapidSync{}))
}
12 changes: 12 additions & 0 deletions module/synchronization/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ func (s *Status) WasReceived() bool {
return !s.Received.IsZero()
}

func (s *Status) StatusString() string {
if s.WasReceived() {
return "Received"
} else if s.WasRequested() {
return "Requested"
} else if s.WasQueued() {
return "Queued"
} else {
return "Unknown"
}
}

func NewQueuedStatus() *Status {
return &Status{
Queued: time.Now(),
Expand Down

0 comments on commit 7925ced

Please sign in to comment.