Skip to content

Commit

Permalink
Batch saving of attestations from stream for slasher (#5041)
Browse files Browse the repository at this point in the history
* Batch saving of attestations from stream for slasher

* Progress on test

* Fixes

* Fix test

* Rename

* Modify logs and timing

* Change

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
  • Loading branch information
0xKiwi and rauljordan committed Mar 9, 2020
1 parent 4c1e2ba commit 8334aac
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 11 deletions.
51 changes: 41 additions & 10 deletions slasher/beaconclient/receivers.go
Expand Up @@ -3,8 +3,10 @@ package beaconclient
import (
"context"
"io"
"time"

ptypes "github.com/gogo/protobuf/types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
Expand Down Expand Up @@ -51,6 +53,8 @@ func (bs *Service) receiveAttestations(ctx context.Context) {
log.WithError(err).Error("Failed to retrieve attestations stream")
return
}

go bs.collectReceivedAttestations(ctx)
for {
res, err := stream.Recv()
// If the stream is closed, we stop the loop.
Expand All @@ -66,15 +70,42 @@ func (bs *Service) receiveAttestations(ctx context.Context) {
log.WithError(err).Error("Could not receive attestations from beacon node")
return
}
log.WithFields(logrus.Fields{
"slot": res.Data.Slot,
"indices": res.AttestingIndices,
}).Debug("Received attestation from beacon node")
if err := bs.slasherDB.SaveIndexedAttestation(ctx, res); err != nil {
log.WithError(err).Error("Could not save indexed attestation")
continue
}
// We send the received attestation over the attestation feed.
bs.attestationFeed.Send(res)
bs.receivedAttestationsBuffer <- res
}
}

func (bs *Service) collectReceivedAttestations(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "beaconclient.collectReceivedAttestations")
defer span.End()

var atts []*ethpb.IndexedAttestation
ticker := time.NewTicker(2*time.Second)
for {
select {
case <-ticker.C:
if len(atts) > 0 {
bs.collectedAttestationsBuffer <- atts
atts = []*ethpb.IndexedAttestation{}
}
case att := <-bs.receivedAttestationsBuffer:
atts = append(atts, att)
case collectedAtts := <-bs.collectedAttestationsBuffer:
if err := bs.slasherDB.SaveIndexedAttestations(ctx, collectedAtts); err != nil {
log.WithError(err).Error("Could not save indexed attestation")
continue
}
log.Infof("%d attestations for slot %d saved to slasher DB", len(collectedAtts), collectedAtts[0].Data.Slot)

// After saving, we send the received attestation over the attestation feed.
for _, att := range collectedAtts {
log.WithFields(logrus.Fields{
"slot": att.Data.Slot,
"indices": att.AttestingIndices,
}).Debug("Sending attestation to detection service")
bs.attestationFeed.Send(att)
}
case <-ctx.Done():
return
}
}
}
54 changes: 54 additions & 0 deletions slasher/beaconclient/receivers_test.go
Expand Up @@ -3,12 +3,14 @@ package beaconclient
import (
"context"
"testing"
"time"

ptypes "github.com/gogo/protobuf/types"
"github.com/golang/mock/gomock"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/mock"
testDB "github.com/prysmaticlabs/prysm/slasher/db/testing"
)

func TestService_ReceiveBlocks(t *testing.T) {
Expand Down Expand Up @@ -44,6 +46,8 @@ func TestService_ReceiveAttestations(t *testing.T) {
bs := Service{
beaconClient: client,
blockFeed: new(event.Feed),
receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1),
collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1),
}
stream := mock.NewMockBeaconChain_StreamIndexedAttestationsClient(ctrl)
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -65,3 +69,53 @@ func TestService_ReceiveAttestations(t *testing.T) {
})
bs.receiveAttestations(ctx)
}


func TestService_ReceiveAttestations_Batched(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
client := mock.NewMockBeaconChainClient(ctrl)

bs := Service{
beaconClient: client,
blockFeed: new(event.Feed),
slasherDB: testDB.SetupSlasherDB(t, false),
attestationFeed: new(event.Feed),
receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1),
collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1),
}
stream := mock.NewMockBeaconChain_StreamIndexedAttestationsClient(ctrl)
ctx, cancel := context.WithCancel(context.Background())
att := &ethpb.IndexedAttestation{
Data: &ethpb.AttestationData{
Slot: 5,
Target: &ethpb.Checkpoint{
Epoch: 5,
},
},
Signature: []byte{1, 2},
}
client.EXPECT().StreamIndexedAttestations(
gomock.Any(),
&ptypes.Empty{},
).Return(stream, nil)
stream.EXPECT().Context().Return(ctx).AnyTimes()
stream.EXPECT().Recv().Return(
att,
nil,
).Do(func() {
time.Sleep(2*time.Second)
cancel()
})

go bs.receiveAttestations(ctx)
bs.receivedAttestationsBuffer <- att
att.Data.Target.Epoch = 6
bs.receivedAttestationsBuffer <- att
att.Data.Target.Epoch = 8
bs.receivedAttestationsBuffer <- att
atts := <- bs.collectedAttestationsBuffer
if len(atts) != 3 {
t.Fatalf("Expected %d received attestations to be batched", len(atts))
}
}
4 changes: 4 additions & 0 deletions slasher/beaconclient/service.go
Expand Up @@ -54,6 +54,8 @@ type Service struct {
attesterSlashingsChan chan *ethpb.AttesterSlashing
attesterSlashingsFeed *event.Feed
proposerSlashingsFeed *event.Feed
receivedAttestationsBuffer chan *ethpb.IndexedAttestation
collectedAttestationsBuffer chan []*ethpb.IndexedAttestation
}

// Config options for the beaconclient service.
Expand Down Expand Up @@ -81,6 +83,8 @@ func NewBeaconClientService(ctx context.Context, cfg *Config) *Service {
attesterSlashingsChan: make(chan *ethpb.AttesterSlashing, 1),
attesterSlashingsFeed: cfg.AttesterSlashingsFeed,
proposerSlashingsFeed: cfg.ProposerSlashingsFeed,
receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1),
collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1),
}
}

Expand Down
1 change: 0 additions & 1 deletion slasher/detection/attestations/spanner_test.go
Expand Up @@ -739,7 +739,6 @@ func TestNewSpanDetector_UpdateSpans(t *testing.T) {
t.Errorf("Wanted and received:\n%v \n%v", tt.want, sm)
}
}

})
}
}

0 comments on commit 8334aac

Please sign in to comment.