diff --git a/integration/tests/access/cohort3/grpc_streaming_blocks_test.go b/integration/tests/access/cohort3/grpc_streaming_blocks_test.go index 96c7655406a..a41896e11db 100644 --- a/integration/tests/access/cohort3/grpc_streaming_blocks_test.go +++ b/integration/tests/access/cohort3/grpc_streaming_blocks_test.go @@ -146,11 +146,6 @@ func (s *GrpcBlocksStreamSuite) TestHappyPath() { currentFinalized := s.BlockState.HighestFinalizedHeight() blockA := s.BlockState.WaitForHighestFinalizedProgress(s.T(), currentFinalized) - // Let the network run for this many blocks - blockCount := uint64(5) - // wait for the requested number of sealed blocks - s.BlockState.WaitForSealedHeight(s.T(), blockA.Header.Height+blockCount) - var startValue interface{} txCount := 10 @@ -187,8 +182,7 @@ func (s *GrpcBlocksStreamSuite) TestHappyPath() { foundANTxCount++ case block := <-observerBlocks: s.T().Logf("ON block received: height: %d", block.Header.Height) - r.Add(s.T(), block.Header.Height, "observer", block) - foundONTxCount++ + s.addObserverBlock(block, r, rpc.name, &foundONTxCount) } if foundANTxCount >= txCount && foundONTxCount >= txCount { @@ -201,6 +195,33 @@ func (s *GrpcBlocksStreamSuite) TestHappyPath() { } } +// addObserverBlock adds a block received from the observer node to the response tracker +// and increments the transaction count for that node. +// +// Parameters: +// - block: The block received from the node. +// - responseTracker: The response tracker to which the block should be added. +// - rpcCallName: The name of the rpc subscription call which is testing. +// - txCount: A pointer to an integer that tracks the number of transactions received from the node. +func (s *GrpcBlocksStreamSuite) addObserverBlock( + block *flow.Block, + responseTracker *ResponseTracker[*flow.Block], + rpcCallName string, + txCount *int, +) { + // the response tracker expects to receive data for the same heights from each node. + // when subscribing to the latest block, the specific start height depends on the node's + // local sealed height, so it may vary. + // check only the responses for ON that are also tracked by AN and compare them + isANResponseExist := len(responseTracker.r[block.Header.Height]) > 0 + if rpcCallName == "SubscribeBlocksFromLatest" && !isANResponseExist { + return + } + + responseTracker.Add(s.T(), block.Header.Height, "observer", block) + *txCount++ +} + func blockResponseHandler(msg *accessproto.SubscribeBlocksResponse) (*flow.Block, error) { return convert.MessageToBlock(msg.GetBlock()) }