Skip to content

Commit

Permalink
Merge pull request #6125 from The-K-R-O-K/UlyanaAndrukhiv/6083-flaky-…
Browse files Browse the repository at this point in the history
…blocks-streaming-integration-test

[CI] Fix flaky blocks streaming integration test
  • Loading branch information
Guitarheroua committed Jun 21, 2024
2 parents 125b54e + b39acf6 commit 1788144
Showing 1 changed file with 28 additions and 7 deletions.
35 changes: 28 additions & 7 deletions integration/tests/access/cohort3/grpc_streaming_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@ func (s *GrpcBlocksStreamSuite) TestHappyPath() {
currentFinalized := s.BlockState.HighestFinalizedHeight()
blockA := s.BlockState.WaitForHighestFinalizedProgress(s.T(), currentFinalized)

// Let the network run for this many blocks
blockCount := uint64(5)
// wait for the requested number of sealed blocks
s.BlockState.WaitForSealedHeight(s.T(), blockA.Header.Height+blockCount)

var startValue interface{}
txCount := 10

Expand Down Expand Up @@ -187,8 +182,7 @@ func (s *GrpcBlocksStreamSuite) TestHappyPath() {
foundANTxCount++
case block := <-observerBlocks:
s.T().Logf("ON block received: height: %d", block.Header.Height)
r.Add(s.T(), block.Header.Height, "observer", block)
foundONTxCount++
s.addObserverBlock(block, r, rpc.name, &foundONTxCount)
}

if foundANTxCount >= txCount && foundONTxCount >= txCount {
Expand All @@ -201,6 +195,33 @@ func (s *GrpcBlocksStreamSuite) TestHappyPath() {
}
}

// addObserverBlock adds a block received from the observer node to the response tracker
// and increments the transaction count for that node.
//
// Parameters:
// - block: The block received from the node.
// - responseTracker: The response tracker to which the block should be added.
// - rpcCallName: The name of the rpc subscription call which is testing.
// - txCount: A pointer to an integer that tracks the number of transactions received from the node.
func (s *GrpcBlocksStreamSuite) addObserverBlock(
block *flow.Block,
responseTracker *ResponseTracker[*flow.Block],
rpcCallName string,
txCount *int,
) {
// the response tracker expects to receive data for the same heights from each node.
// when subscribing to the latest block, the specific start height depends on the node's
// local sealed height, so it may vary.
// check only the responses for ON that are also tracked by AN and compare them
isANResponseExist := len(responseTracker.r[block.Header.Height]) > 0
if rpcCallName == "SubscribeBlocksFromLatest" && !isANResponseExist {
return
}

responseTracker.Add(s.T(), block.Header.Height, "observer", block)
*txCount++
}

func blockResponseHandler(msg *accessproto.SubscribeBlocksResponse) (*flow.Block, error) {
return convert.MessageToBlock(msg.GetBlock())
}
Expand Down

0 comments on commit 1788144

Please sign in to comment.