Skip to content

Commit

Permalink
Stream Blocks Functionality for RPC (#4771)
Browse files Browse the repository at this point in the history
* stream blocks functionality included
* necessary tests for stream blocks and notifier
* gazelle and tests passing
* gazelle and tests passing
* Merge branch 'master' into stream-block
* Update beacon-chain/core/feed/block/events.go
* Merge refs/heads/master into stream-block
* Merge refs/heads/master into stream-block
* Merge refs/heads/master into stream-block
* Merge refs/heads/master into stream-block
* naming
* build
* Merge refs/heads/master into stream-block
* Merge refs/heads/master into stream-block
* Merge refs/heads/master into stream-block
* fix up tests
* Merge branch 'stream-block' of github.com:prysmaticlabs/prysm into stream-block
* Merge refs/heads/master into stream-block
* shay comment
* Merge refs/heads/master into stream-block
* Merge branch 'stream-block' of github.com:prysmaticlabs/prysm into stream-block
* Merge refs/heads/master into stream-block
  • Loading branch information
rauljordan committed Feb 6, 2020
1 parent 9cf3000 commit a9d144a
Show file tree
Hide file tree
Showing 24 changed files with 231 additions and 7 deletions.
1 change: 1 addition & 0 deletions beacon-chain/blockchain/testing/BUILD.bazel
Expand Up @@ -8,6 +8,7 @@ go_library(
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/core/epoch/precompute:go_default_library",
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
Expand Down
23 changes: 23 additions & 0 deletions beacon-chain/blockchain/testing/mock.go
Expand Up @@ -9,6 +9,7 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/epoch/precompute"
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
Expand All @@ -34,6 +35,7 @@ type ChainService struct {
Fork *pb.Fork
DB db.Database
stateNotifier statefeed.Notifier
blockNotifier blockfeed.Notifier
opNotifier opfeed.Notifier
}

Expand All @@ -45,6 +47,27 @@ func (ms *ChainService) StateNotifier() statefeed.Notifier {
return ms.stateNotifier
}

// BlockNotifier mocks the same method in the chain service.
func (ms *ChainService) BlockNotifier() blockfeed.Notifier {
if ms.blockNotifier == nil {
ms.blockNotifier = &MockBlockNotifier{}
}
return ms.blockNotifier
}

// MockBlockNotifier mocks the block notifier.
type MockBlockNotifier struct {
feed *event.Feed
}

// BlockFeed returns a block feed.
func (msn *MockBlockNotifier) BlockFeed() *event.Feed {
if msn.feed == nil {
msn.feed = new(event.Feed)
}
return msn.feed
}

// MockStateNotifier mocks the state notifier.
type MockStateNotifier struct {
feed *event.Feed
Expand Down
15 changes: 15 additions & 0 deletions beacon-chain/core/feed/block/BUILD.bazel
@@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = [
"events.go",
"notifier.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//shared/event:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
],
)
15 changes: 15 additions & 0 deletions beacon-chain/core/feed/block/events.go
@@ -0,0 +1,15 @@
package block

import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
)

const (
// ReceivedBlock is sent after a block has been received by the beacon node via p2p or RPC.
ReceivedBlock = iota + 1
)

// ReceivedBlockData is the data sent with ReceivedBlock events.
type ReceivedBlockData struct {
SignedBlock *ethpb.SignedBeaconBlock
}
8 changes: 8 additions & 0 deletions beacon-chain/core/feed/block/notifier.go
@@ -0,0 +1,8 @@
package block

import "github.com/prysmaticlabs/prysm/shared/event"

// Notifier interface defines the methods of the service that provides block updates to consumers.
type Notifier interface {
BlockFeed() *event.Feed
}
11 changes: 10 additions & 1 deletion beacon-chain/node/node.go
Expand Up @@ -64,6 +64,7 @@ type BeaconNode struct {
exitPool *voluntaryexits.Pool
depositCache *depositcache.DepositCache
stateFeed *event.Feed
blockFeed *event.Feed
opFeed *event.Feed
forkChoiceStore forkchoice.ForkChoicer
}
Expand Down Expand Up @@ -104,6 +105,7 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
services: registry,
stop: make(chan struct{}),
stateFeed: new(event.Feed),
blockFeed: new(event.Feed),
opFeed: new(event.Feed),
attestationPool: attestations.NewPool(),
exitPool: voluntaryexits.NewPool(),
Expand Down Expand Up @@ -169,6 +171,11 @@ func (b *BeaconNode) StateFeed() *event.Feed {
return b.stateFeed
}

// BlockFeed implements blockfeed.Notifier.
func (b *BeaconNode) BlockFeed() *event.Feed {
return b.blockFeed
}

// OperationFeed implements opfeed.Notifier.
func (b *BeaconNode) OperationFeed() *event.Feed {
return b.opFeed
Expand Down Expand Up @@ -407,6 +414,7 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error {
Chain: chainService,
InitialSync: initSync,
StateNotifier: b,
BlockNotifier: b,
AttPool: b.attestationPool,
ExitPool: b.exitPool,
})
Expand All @@ -415,7 +423,6 @@ func (b *BeaconNode) registerSyncService(ctx *cli.Context) error {
}

func (b *BeaconNode) registerInitialSyncService(ctx *cli.Context) error {

var chainService *blockchain.Service
if err := b.services.FetchService(&chainService); err != nil {
return err
Expand All @@ -426,6 +433,7 @@ func (b *BeaconNode) registerInitialSyncService(ctx *cli.Context) error {
Chain: chainService,
P2P: b.fetchP2P(ctx),
StateNotifier: b,
BlockNotifier: b,
})

return b.services.RegisterService(is)
Expand Down Expand Up @@ -495,6 +503,7 @@ func (b *BeaconNode) registerRPCService(ctx *cli.Context) error {
SyncService: syncService,
DepositFetcher: depositFetcher,
PendingDepositFetcher: b.depositCache,
BlockNotifier: b,
StateNotifier: b,
OperationNotifier: b,
SlasherCert: slasherCert,
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/rpc/BUILD.bazel
Expand Up @@ -9,6 +9,7 @@ go_library(
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/cache/depositcache:go_default_library",
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/db:go_default_library",
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/rpc/beacon/BUILD.bazel
Expand Up @@ -17,6 +17,7 @@ go_library(
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/core/epoch/precompute:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/state:go_default_library",
Expand Down Expand Up @@ -57,6 +58,7 @@ go_test(
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/core/epoch/precompute:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db:go_default_library",
Expand Down
33 changes: 32 additions & 1 deletion beacon-chain/rpc/beacon/blocks.go
Expand Up @@ -8,6 +8,7 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
Expand Down Expand Up @@ -174,7 +175,37 @@ func (bs *Server) GetChainHead(ctx context.Context, _ *ptypes.Empty) (*ethpb.Cha

// StreamBlocks to clients every single time a block is received by the beacon node.
func (bs *Server) StreamBlocks(_ *ptypes.Empty, stream ethpb.BeaconChain_StreamBlocksServer) error {
return status.Error(codes.Unimplemented, "Unimplemented")
blocksChannel := make(chan *feed.Event, 1)
blockSub := bs.BlockNotifier.BlockFeed().Subscribe(blocksChannel)
defer blockSub.Unsubscribe()
for {
select {
case event := <-blocksChannel:
if event.Type == blockfeed.ReceivedBlock {
data, ok := event.Data.(*blockfeed.ReceivedBlockData)
if !ok {
return status.Errorf(
codes.FailedPrecondition,
"Could not subscribe to block feed, received bad data: %v",
data,
)
}
if data.SignedBlock == nil {
// One nil block shouldn't stop the stream.
continue
}
if err := stream.Send(data.SignedBlock.Block); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
}
case <-blockSub.Err():
return status.Error(codes.Aborted, "Subscriber closed, exiting goroutine")
case <-bs.Ctx.Done():
return status.Error(codes.Canceled, "Context canceled")
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Context canceled")
}
}
}

// StreamChainHead to clients every single time the head block and state of the chain change.
Expand Down
70 changes: 70 additions & 0 deletions beacon-chain/rpc/beacon/blocks_test.go
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prysmaticlabs/go-ssz"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
dbTest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
Expand Down Expand Up @@ -589,3 +590,72 @@ func TestServer_StreamChainHead_OnHeadUpdated(t *testing.T) {
}
<-exitRoutine
}

func TestServer_StreamBlocks_ContextCanceled(t *testing.T) {
db := dbTest.SetupDB(t)
defer dbTest.TeardownDB(t, db)
ctx := context.Background()

chainService := &mock.ChainService{}
ctx, cancel := context.WithCancel(ctx)
server := &Server{
Ctx: ctx,
BlockNotifier: chainService.BlockNotifier(),
BeaconDB: db,
}

exitRoutine := make(chan bool)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := mockRPC.NewMockBeaconChain_StreamBlocksServer(ctrl)
mockStream.EXPECT().Context().Return(ctx)
go func(tt *testing.T) {
if err := server.StreamBlocks(&ptypes.Empty{}, mockStream); !strings.Contains(err.Error(), "Context canceled") {
tt.Errorf("Could not call RPC method: %v", err)
}
<-exitRoutine
}(t)
cancel()
exitRoutine <- true
}

func TestServer_StreamBlocks_OnHeadUpdated(t *testing.T) {
db := dbTest.SetupDB(t)
defer dbTest.TeardownDB(t, db)

b := &ethpb.SignedBeaconBlock{
Block: &ethpb.BeaconBlock{
Slot: 1,
},
}

chainService := &mock.ChainService{}
ctx := context.Background()
server := &Server{
Ctx: ctx,
BlockNotifier: chainService.BlockNotifier(),
}
exitRoutine := make(chan bool)
ctrl := gomock.NewController(t)
defer ctrl.Finish()
mockStream := mockRPC.NewMockBeaconChain_StreamBlocksServer(ctrl)
mockStream.EXPECT().Send(b.Block).Do(func(arg0 interface{}) {
exitRoutine <- true
})
mockStream.EXPECT().Context().Return(ctx).AnyTimes()

go func(tt *testing.T) {
if err := server.StreamBlocks(&ptypes.Empty{}, mockStream); err != nil {
tt.Errorf("Could not call RPC method: %v", err)
}
}(t)

// Send in a loop to ensure it is delivered (busy wait for the service to subscribe to the state feed).
for sent := 0; sent == 0; {
sent = server.BlockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{SignedBlock: b},
})
}
<-exitRoutine
}
2 changes: 2 additions & 0 deletions beacon-chain/rpc/beacon/server.go
Expand Up @@ -6,6 +6,7 @@ import (

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations"
Expand All @@ -25,6 +26,7 @@ type Server struct {
FinalizationFetcher blockchain.FinalizationFetcher
ParticipationFetcher blockchain.ParticipationFetcher
StateNotifier statefeed.Notifier
BlockNotifier blockfeed.Notifier
Pool attestations.Pool
IncomingAttestation chan *ethpb.Attestation
CanonicalStateChan chan *pbp2p.BeaconState
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/rpc/service.go
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache"
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
opfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
Expand Down Expand Up @@ -81,6 +82,7 @@ type Service struct {
depositFetcher depositcache.DepositFetcher
pendingDepositFetcher depositcache.PendingDepositsFetcher
stateNotifier statefeed.Notifier
blockNotifier blockfeed.Notifier
operationNotifier opfeed.Notifier
slasherConn *grpc.ClientConn
slasherProvider string
Expand Down Expand Up @@ -116,6 +118,7 @@ type Config struct {
SlasherProvider string
SlasherCert string
StateNotifier statefeed.Notifier
BlockNotifier blockfeed.Notifier
OperationNotifier opfeed.Notifier
}

Expand Down Expand Up @@ -151,6 +154,7 @@ func NewService(ctx context.Context, cfg *Config) *Service {
canonicalStateChan: make(chan *pbp2p.BeaconState, params.BeaconConfig().DefaultBufferSize),
incomingAttestation: make(chan *ethpb.Attestation, params.BeaconConfig().DefaultBufferSize),
stateNotifier: cfg.StateNotifier,
blockNotifier: cfg.BlockNotifier,
operationNotifier: cfg.OperationNotifier,
slasherProvider: cfg.SlasherProvider,
slasherCert: cfg.SlasherCert,
Expand Down Expand Up @@ -217,6 +221,7 @@ func (s *Service) Start() {
Eth1InfoFetcher: s.powChainService,
SyncChecker: s.syncService,
StateNotifier: s.stateNotifier,
BlockNotifier: s.blockNotifier,
OperationNotifier: s.operationNotifier,
P2P: s.p2p,
BlockReceiver: s.blockReceiver,
Expand All @@ -242,6 +247,7 @@ func (s *Service) Start() {
ChainStartFetcher: s.chainStartFetcher,
CanonicalStateChan: s.canonicalStateChan,
StateNotifier: s.stateNotifier,
BlockNotifier: s.blockNotifier,
SlotTicker: ticker,
}
aggregatorServer := &aggregator.Server{
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/rpc/validator/BUILD.bazel
Expand Up @@ -18,6 +18,7 @@ go_library(
"//beacon-chain/cache/depositcache:go_default_library",
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/rpc/validator/proposer.go
Expand Up @@ -10,6 +10,8 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/feed"
blockfeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/block"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state/interop"
Expand Down Expand Up @@ -105,6 +107,10 @@ func (vs *Server) ProposeBlock(ctx context.Context, blk *ethpb.SignedBeaconBlock
}
log.WithField("blockRoot", fmt.Sprintf("%#x", bytesutil.Trunc(root[:]))).Debugf(
"Block proposal received via RPC")
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock,
Data: blockfeed.ReceivedBlockData{SignedBlock: blk},
})
if err := vs.BlockReceiver.ReceiveBlock(ctx, blk); err != nil {
return nil, status.Errorf(codes.Internal, "Could not process beacon block: %v", err)
}
Expand Down

0 comments on commit a9d144a

Please sign in to comment.