Skip to content

Commit

Permalink
Detection Service Creation (#4867)
Browse files Browse the repository at this point in the history
* visibility added

* register in node

* fixed up imports

* include detection listeners for feed

* subscribe to blocks and todos

* tests passing

* todos

* pkg comment
  • Loading branch information
rauljordan committed Feb 14, 2020
1 parent 38fed73 commit 868c8f5
Show file tree
Hide file tree
Showing 15 changed files with 288 additions and 37 deletions.
2 changes: 2 additions & 0 deletions slasher/beaconclient/receivers_test.go
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/prysmaticlabs/prysm/shared/mock"
)

var _ = Notifier(&Service{})

func TestService_ReceiveBlocks(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
11 changes: 9 additions & 2 deletions slasher/beaconclient/service.go
Expand Up @@ -22,6 +22,13 @@ import (

var log = logrus.WithField("prefix", "beaconclient")

// Notifier defines a struct which exposes event feeds
// for beacon blocks and attestations received from a beacon node.
type Notifier interface {
BlockFeed() *event.Feed
AttestationFeed() *event.Feed
}

// Service struct for the beaconclient service of the slasher.
type Service struct {
ctx context.Context
Expand All @@ -46,8 +53,8 @@ type Config struct {
AttesterSlashingsFeed *event.Feed
}

// NewBeaconClient creates a new instance of a beacon client service.
func NewBeaconClient(ctx context.Context, cfg *Config) *Service {
// NewBeaconClientService instantiation.
func NewBeaconClientService(ctx context.Context, cfg *Config) *Service {
ctx, cancel := context.WithCancel(ctx)
return &Service{
cert: cfg.BeaconCert,
Expand Down
21 changes: 10 additions & 11 deletions slasher/beaconclient/submit.go
Expand Up @@ -4,7 +4,6 @@ import (
"context"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

Expand All @@ -15,19 +14,19 @@ import (
func (bs *Service) subscribeDetectedProposerSlashings(ctx context.Context, ch chan *ethpb.ProposerSlashing) {
ctx, span := trace.StartSpan(ctx, "beaconclient.submitProposerSlashing")
defer span.End()
stateSub := bs.proposerSlashingsFeed.Subscribe(ch)
defer stateSub.Unsubscribe()
sub := bs.proposerSlashingsFeed.Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
case slashing := <-ch:
if _, err := bs.client.SubmitProposerSlashing(ctx, slashing); err != nil {
log.Error(err)
}
case <-stateSub.Err():
logrus.Error("Subscriber closed, exiting goroutine")
case <-sub.Err():
log.Error("Subscriber closed, exiting goroutine")
return
case <-ctx.Done():
logrus.Error("Context canceled")
log.Error("Context canceled")
return
}
}
Expand All @@ -40,19 +39,19 @@ func (bs *Service) subscribeDetectedProposerSlashings(ctx context.Context, ch ch
func (bs *Service) subscribeDetectedAttesterSlashings(ctx context.Context, ch chan *ethpb.AttesterSlashing) {
ctx, span := trace.StartSpan(ctx, "beaconclient.submitAttesterSlashing")
defer span.End()
stateSub := bs.attesterSlashingsFeed.Subscribe(ch)
defer stateSub.Unsubscribe()
sub := bs.attesterSlashingsFeed.Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
case slashing := <-ch:
if _, err := bs.client.SubmitAttesterSlashing(ctx, slashing); err != nil {
log.Error(err)
}
case <-stateSub.Err():
logrus.Error("Subscriber closed, exiting goroutine")
case <-sub.Err():
log.Error("Subscriber closed, exiting goroutine")
return
case <-ctx.Done():
logrus.Error("Context canceled")
log.Error("Context canceled")
return
}
}
Expand Down
29 changes: 13 additions & 16 deletions slasher/detection/BUILD.bazel
Expand Up @@ -2,32 +2,29 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["attestations.go"],
srcs = [
"listeners.go",
"service.go",
],
importpath = "github.com/prysmaticlabs/prysm/slasher/detection",
visibility = ["//visibility:public"],
visibility = ["//slasher:__subpackages__"],
deps = [
"//proto/slashing:go_default_library",
"//shared/params:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"//shared/event:go_default_library",
"//slasher/beaconclient:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)

go_test(
name = "go_default_test",
testonly = True,
srcs = [
"attestation_test.go",
"attestations_bench_test.go",
],
srcs = ["listeners_test.go"],
embed = [":go_default_library"],
deps = [
"//proto/slashing:go_default_library",
"//shared/params:go_default_library",
"//slasher/db/testing:go_default_library",
"//slasher/flags:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_urfave_cli//:go_default_library",
"//shared/event:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)
32 changes: 32 additions & 0 deletions slasher/detection/attestations/BUILD.bazel
@@ -0,0 +1,32 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["attestations.go"],
importpath = "github.com/prysmaticlabs/prysm/slasher/detection/attestations",
visibility = ["//slasher:__subpackages__"],
deps = [
"//proto/slashing:go_default_library",
"//shared/params:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = [
"attestation_test.go",
"attestations_bench_test.go",
],
embed = [":go_default_library"],
deps = [
"//proto/slashing:go_default_library",
"//shared/params:go_default_library",
"//slasher/db/testing:go_default_library",
"//slasher/flags:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_urfave_cli//:go_default_library",
],
)
@@ -1,4 +1,4 @@
package detection
package attestations

import (
"context"
Expand Down
@@ -1,4 +1,4 @@
package detection
package attestations

import (
"context"
Expand Down
@@ -1,4 +1,4 @@
package detection
package attestations

import (
"context"
Expand Down
63 changes: 63 additions & 0 deletions slasher/detection/listeners.go
@@ -0,0 +1,63 @@
/*
Package detection defines a service that reacts to incoming blocks/attestations
by running slashing detection for double proposals, double votes, and surround votes
according to the eth2 specification. As soon as slashing objects are found, they are
sent over a feed for the beaconclient service to submit to a beacon node via gRPC.
*/
package detection

import (
"context"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"go.opencensus.io/trace"
)

// detectIncomingBlocks subscribes to an event feed for
// block objects from a notifier interface. Upon receiving
// a signed beacon block from the feed, we run proposer slashing
// detection on the block.
func (ds *Service) detectIncomingBlocks(ctx context.Context, ch chan *ethpb.SignedBeaconBlock) {
ctx, span := trace.StartSpan(ctx, "detection.detectIncomingBlocks")
defer span.End()
sub := ds.notifier.BlockFeed().Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
case <-ch:
log.Infof("Running detection on block...")
// TODO(#4836): Run detection function for proposer slashings.
case <-sub.Err():
log.Error("Subscriber closed, exiting goroutine")
return
case <-ctx.Done():
log.Error("Context canceled")
return
}
}
}

// detectIncomingAttestations subscribes to an event feed for
// attestation objects from a notifier interface. Upon receiving
// an attestation from the feed, we run surround vote and double vote
// detection on the attestation.
func (ds *Service) detectIncomingAttestations(ctx context.Context, ch chan *ethpb.Attestation) {
ctx, span := trace.StartSpan(ctx, "detection.detectIncomingAttestations")
defer span.End()
sub := ds.notifier.AttestationFeed().Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
case <-ch:
log.Infof("Running detection on attestation...")
// TODO(#4836): Run detection function for attester double voting.
// TODO(#4836): Run detection function for attester surround voting.
case <-sub.Err():
log.Error("Subscriber closed, exiting goroutine")
return
case <-ctx.Done():
log.Error("Context canceled")
return
}
}
}
68 changes: 68 additions & 0 deletions slasher/detection/listeners_test.go
@@ -0,0 +1,68 @@
package detection

import (
"context"
"testing"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
)

type mockNotifier struct{}

func (m *mockNotifier) BlockFeed() *event.Feed {
return new(event.Feed)
}

func (m *mockNotifier) AttestationFeed() *event.Feed {
return new(event.Feed)
}

func TestService_DetectIncomingBlocks(t *testing.T) {
hook := logTest.NewGlobal()
ds := Service{
notifier: &mockNotifier{},
}
blk := &ethpb.SignedBeaconBlock{
Block: &ethpb.BeaconBlock{Slot: 1},
Signature: make([]byte, 96),
}
exitRoutine := make(chan bool)
blocksChan := make(chan *ethpb.SignedBeaconBlock)
ctx, cancel := context.WithCancel(context.Background())
go func(tt *testing.T) {
ds.detectIncomingBlocks(ctx, blocksChan)
<-exitRoutine
}(t)
blocksChan <- blk
cancel()
exitRoutine <- true
testutil.AssertLogsContain(t, hook, "Running detection on block")
testutil.AssertLogsContain(t, hook, "Context canceled")
}

func TestService_DetectIncomingAttestations(t *testing.T) {
hook := logTest.NewGlobal()
ds := Service{
notifier: &mockNotifier{},
}
att := &ethpb.Attestation{
Data: &ethpb.AttestationData{
Slot: 1,
},
}
exitRoutine := make(chan bool)
attsChan := make(chan *ethpb.Attestation)
ctx, cancel := context.WithCancel(context.Background())
go func(tt *testing.T) {
ds.detectIncomingAttestations(ctx, attsChan)
<-exitRoutine
}(t)
attsChan <- att
cancel()
exitRoutine <- true
testutil.AssertLogsContain(t, hook, "Running detection on attestation")
testutil.AssertLogsContain(t, hook, "Context canceled")
}
63 changes: 63 additions & 0 deletions slasher/detection/service.go
@@ -0,0 +1,63 @@
package detection

import (
"context"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/slasher/beaconclient"
"github.com/sirupsen/logrus"
)

var log = logrus.WithField("prefix", "detection")

// Service struct for the detection service of the slasher.
type Service struct {
ctx context.Context
cancel context.CancelFunc
blocksChan chan *ethpb.SignedBeaconBlock
attsChan chan *ethpb.Attestation
notifier beaconclient.Notifier
attesterSlashingsFeed *event.Feed
proposerSlashingsFeed *event.Feed
}

// Config options for the detection service.
type Config struct {
Notifier beaconclient.Notifier
AttesterSlashingsFeed *event.Feed
ProposerSlashingsFeed *event.Feed
}

// NewDetectionService instantiation.
func NewDetectionService(ctx context.Context, cfg *Config) *Service {
ctx, cancel := context.WithCancel(ctx)
return &Service{
ctx: ctx,
cancel: cancel,
notifier: cfg.Notifier,
blocksChan: make(chan *ethpb.SignedBeaconBlock, 1),
attsChan: make(chan *ethpb.Attestation, 1),
attesterSlashingsFeed: cfg.AttesterSlashingsFeed,
proposerSlashingsFeed: cfg.ProposerSlashingsFeed,
}
}

// Stop the notifier service.
func (ds *Service) Stop() error {
ds.cancel()
log.Info("Stopping service")
return nil
}

// Status returns an error if there exists an error in
// the notifier service.
func (ds *Service) Status() error {
return nil
}

// Start the detection service runtime.
func (ds *Service) Start() {
go ds.detectIncomingBlocks(ds.ctx, ds.blocksChan)
go ds.detectIncomingAttestations(ds.ctx, ds.attsChan)
}
1 change: 1 addition & 0 deletions slasher/node/BUILD.bazel
Expand Up @@ -14,6 +14,7 @@ go_library(
"//slasher/beaconclient:go_default_library",
"//slasher/db:go_default_library",
"//slasher/db/kv:go_default_library",
"//slasher/detection:go_default_library",
"//slasher/flags:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_urfave_cli//:go_default_library",
Expand Down

0 comments on commit 868c8f5

Please sign in to comment.