Skip to content

Commit

Permalink
Send Slashing Objects to Beacon Node via RPC (#4866)
Browse files Browse the repository at this point in the history
* submit slashing objects

* tests complete
  • Loading branch information
rauljordan committed Feb 14, 2020
1 parent 4a44632 commit 38fed73
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 27 deletions.
8 changes: 7 additions & 1 deletion slasher/beaconclient/BUILD.bazel
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"receivers.go",
"service.go",
"submit.go",
],
importpath = "github.com/prysmaticlabs/prysm/slasher/beaconclient",
visibility = ["//slasher:__subpackages__"],
Expand All @@ -25,13 +26,18 @@ go_library(

go_test(
name = "go_default_test",
srcs = ["receivers_test.go"],
srcs = [
"receivers_test.go",
"submit_test.go",
],
embed = [":go_default_library"],
deps = [
"//shared/event:go_default_library",
"//shared/mock:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)
44 changes: 28 additions & 16 deletions slasher/beaconclient/service.go
Expand Up @@ -24,32 +24,42 @@ var log = logrus.WithField("prefix", "beaconclient")

// Service struct for the beaconclient service of the slasher.
type Service struct {
ctx context.Context
cancel context.CancelFunc
cert string
conn *grpc.ClientConn
provider string
client ethpb.BeaconChainClient
blockFeed *event.Feed
attestationFeed *event.Feed
ctx context.Context
cancel context.CancelFunc
cert string
conn *grpc.ClientConn
provider string
client ethpb.BeaconChainClient
blockFeed *event.Feed
attestationFeed *event.Feed
proposerSlashingsChan chan *ethpb.ProposerSlashing
attesterSlashingsChan chan *ethpb.AttesterSlashing
attesterSlashingsFeed *event.Feed
proposerSlashingsFeed *event.Feed
}

// Config options for the beaconclient service.
type Config struct {
BeaconProvider string
BeaconCert string
BeaconProvider string
BeaconCert string
ProposerSlashingsFeed *event.Feed
AttesterSlashingsFeed *event.Feed
}

// NewBeaconClient creates a new instance of a beacon client service.
func NewBeaconClient(ctx context.Context, cfg *Config) *Service {
ctx, cancel := context.WithCancel(ctx)
return &Service{
cert: cfg.BeaconCert,
ctx: ctx,
cancel: cancel,
provider: cfg.BeaconProvider,
blockFeed: new(event.Feed),
attestationFeed: new(event.Feed),
cert: cfg.BeaconCert,
ctx: ctx,
cancel: cancel,
provider: cfg.BeaconProvider,
blockFeed: new(event.Feed),
attestationFeed: new(event.Feed),
proposerSlashingsChan: make(chan *ethpb.ProposerSlashing, 1),
attesterSlashingsChan: make(chan *ethpb.AttesterSlashing, 1),
attesterSlashingsFeed: cfg.AttesterSlashingsFeed,
proposerSlashingsFeed: cfg.ProposerSlashingsFeed,
}
}

Expand Down Expand Up @@ -124,4 +134,6 @@ func (bs *Service) Start() {

go bs.receiveBlocks(bs.ctx)
go bs.receiveAttestations(bs.ctx)
go bs.subscribeDetectedProposerSlashings(bs.ctx, bs.proposerSlashingsChan)
go bs.subscribeDetectedAttesterSlashings(bs.ctx, bs.attesterSlashingsChan)
}
59 changes: 59 additions & 0 deletions slasher/beaconclient/submit.go
@@ -0,0 +1,59 @@
package beaconclient

import (
"context"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

// subscribeDetectedProposerSlashings subscribes to an event feed for
// slashing objects from the slasher runtime. Upon receiving
// a proposer slashing from the feed, we submit the object to the
// connected beacon node via a client RPC.
func (bs *Service) subscribeDetectedProposerSlashings(ctx context.Context, ch chan *ethpb.ProposerSlashing) {
ctx, span := trace.StartSpan(ctx, "beaconclient.submitProposerSlashing")
defer span.End()
stateSub := bs.proposerSlashingsFeed.Subscribe(ch)
defer stateSub.Unsubscribe()
for {
select {
case slashing := <-ch:
if _, err := bs.client.SubmitProposerSlashing(ctx, slashing); err != nil {
log.Error(err)
}
case <-stateSub.Err():
logrus.Error("Subscriber closed, exiting goroutine")
return
case <-ctx.Done():
logrus.Error("Context canceled")
return
}
}
}

// subscribeDetectedAttesterSlashings subscribes to an event feed for
// slashing objects from the slasher runtime. Upon receiving an
// attester slashing from the feed, we submit the object to the
// connected beacon node via a client RPC.
func (bs *Service) subscribeDetectedAttesterSlashings(ctx context.Context, ch chan *ethpb.AttesterSlashing) {
ctx, span := trace.StartSpan(ctx, "beaconclient.submitAttesterSlashing")
defer span.End()
stateSub := bs.attesterSlashingsFeed.Subscribe(ch)
defer stateSub.Unsubscribe()
for {
select {
case slashing := <-ch:
if _, err := bs.client.SubmitAttesterSlashing(ctx, slashing); err != nil {
log.Error(err)
}
case <-stateSub.Err():
logrus.Error("Subscriber closed, exiting goroutine")
return
case <-ctx.Done():
logrus.Error("Context canceled")
return
}
}
}
90 changes: 90 additions & 0 deletions slasher/beaconclient/submit_test.go
@@ -0,0 +1,90 @@
package beaconclient

import (
"context"
"testing"

"github.com/golang/mock/gomock"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/mock"
"github.com/prysmaticlabs/prysm/shared/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
)

func TestService_SubscribeDetectedProposerSlashings(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := mock.NewMockBeaconChainClient(ctrl)

bs := Service{
client: client,
proposerSlashingsFeed: new(event.Feed),
}

slashing := &ethpb.ProposerSlashing{
ProposerIndex: 5,
Header_1: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
Slot: 5,
},
Signature: make([]byte, 96),
},
Header_2: &ethpb.SignedBeaconBlockHeader{
Header: &ethpb.BeaconBlockHeader{
Slot: 5,
},
Signature: make([]byte, 96),
},
}

exitRoutine := make(chan bool)
slashingsChan := make(chan *ethpb.ProposerSlashing)
ctx, cancel := context.WithCancel(context.Background())
client.EXPECT().SubmitProposerSlashing(gomock.Any(), slashing)
go func(tt *testing.T) {
bs.subscribeDetectedProposerSlashings(ctx, slashingsChan)
<-exitRoutine
}(t)
slashingsChan <- slashing
cancel()
exitRoutine <- true
testutil.AssertLogsContain(t, hook, "Context canceled")
}

func TestService_SubscribeDetectedAttesterSlashings(t *testing.T) {
hook := logTest.NewGlobal()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := mock.NewMockBeaconChainClient(ctrl)

bs := Service{
client: client,
attesterSlashingsFeed: new(event.Feed),
}

slashing := &ethpb.AttesterSlashing{
Attestation_1: &ethpb.IndexedAttestation{
AttestingIndices: []uint64{1, 2, 3},
Data: nil,
},
Attestation_2: &ethpb.IndexedAttestation{
AttestingIndices: []uint64{3, 4, 5},
Data: nil,
},
}

exitRoutine := make(chan bool)
slashingsChan := make(chan *ethpb.AttesterSlashing)
ctx, cancel := context.WithCancel(context.Background())
client.EXPECT().SubmitAttesterSlashing(gomock.Any(), slashing)
go func(tt *testing.T) {
bs.subscribeDetectedAttesterSlashings(ctx, slashingsChan)
<-exitRoutine
}(t)
slashingsChan <- slashing
cancel()
exitRoutine <- true
testutil.AssertLogsContain(t, hook, "Context canceled")
}
1 change: 1 addition & 0 deletions slasher/node/BUILD.bazel
Expand Up @@ -9,6 +9,7 @@ go_library(
"//shared:go_default_library",
"//shared/cmd:go_default_library",
"//shared/debug:go_default_library",
"//shared/event:go_default_library",
"//shared/tracing:go_default_library",
"//slasher/beaconclient:go_default_library",
"//slasher/db:go_default_library",
Expand Down
27 changes: 17 additions & 10 deletions slasher/node/node.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/cmd"
"github.com/prysmaticlabs/prysm/shared/debug"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/tracing"
"github.com/prysmaticlabs/prysm/slasher/beaconclient"
"github.com/prysmaticlabs/prysm/slasher/db"
Expand All @@ -28,11 +29,13 @@ const slasherDBName = "slasherdata"
// for eth2. It handles the lifecycle of the entire system and registers
// services to a service registry.
type SlasherNode struct {
ctx *cli.Context
lock sync.RWMutex
services *shared.ServiceRegistry
stop chan struct{} // Channel to wait for termination notifications.
db db.Database
ctx *cli.Context
lock sync.RWMutex
services *shared.ServiceRegistry
proposerSlashingsFeed *event.Feed
attesterSlashingsFeed *event.Feed
stop chan struct{} // Channel to wait for termination notifications.
db db.Database
}

// NewSlasherNode creates a new node instance, sets up configuration options,
Expand All @@ -50,9 +53,11 @@ func NewSlasherNode(ctx *cli.Context) (*SlasherNode, error) {
registry := shared.NewServiceRegistry()

slasher := &SlasherNode{
ctx: ctx,
services: registry,
stop: make(chan struct{}),
ctx: ctx,
proposerSlashingsFeed: new(event.Feed),
attesterSlashingsFeed: new(event.Feed),
services: registry,
stop: make(chan struct{}),
}

if err := slasher.startDB(ctx); err != nil {
Expand Down Expand Up @@ -149,8 +154,10 @@ func (s *SlasherNode) registerBeaconClientService(ctx *cli.Context) error {
beaconProvider = flags.BeaconRPCProviderFlag.Value
}
bs := beaconclient.NewBeaconClient(context.Background(), &beaconclient.Config{
BeaconCert: beaconCert,
BeaconProvider: beaconProvider,
BeaconCert: beaconCert,
BeaconProvider: beaconProvider,
AttesterSlashingsFeed: s.attesterSlashingsFeed,
ProposerSlashingsFeed: s.proposerSlashingsFeed,
})
return s.services.RegisterService(bs)
}

0 comments on commit 38fed73

Please sign in to comment.