Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace receive slot with event stream #13563

Merged
merged 107 commits into from Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
29a552b
WIP
james-prysm Jan 30, 2024
86966f5
event stream wip
james-prysm Jan 31, 2024
de96792
returning nil
james-prysm Jan 31, 2024
033448e
temp removing some tests
james-prysm Feb 1, 2024
23ec534
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 1, 2024
91ea02a
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 1, 2024
b074356
wip health checks
james-prysm Feb 5, 2024
b5670b6
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 5, 2024
b76e446
fixing conficts
james-prysm Feb 5, 2024
f3a0534
updating fields based on linting
james-prysm Feb 5, 2024
38bce44
fixing more errors
james-prysm Feb 6, 2024
87c62c9
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 6, 2024
1fb9a3e
fixing mocks
james-prysm Feb 6, 2024
8b0d2ed
fixing more mocks
james-prysm Feb 6, 2024
17eb642
fixing more linting
james-prysm Feb 6, 2024
be92c89
removing white space for lint
james-prysm Feb 6, 2024
0e403d1
fixing log format
james-prysm Feb 6, 2024
34af48f
gaz
james-prysm Feb 6, 2024
aa49ecc
reverting changes on grpc
james-prysm Feb 6, 2024
800ce23
fixing unit tests
james-prysm Feb 6, 2024
048bbcc
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 6, 2024
1595264
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 6, 2024
1859840
adding in tests for health tracker and event stream
james-prysm Feb 7, 2024
73022ae
adding more tests for streaming slot
james-prysm Feb 7, 2024
9be3711
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 7, 2024
47c3f28
gaz
james-prysm Feb 7, 2024
c179901
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 7, 2024
1b6581b
Update api/client/event/event_stream.go
james-prysm Feb 8, 2024
b7cab70
review comments
james-prysm Feb 8, 2024
4ad4a46
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 8, 2024
0982c0b
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 9, 2024
dd0b4d9
Update validator/client/runner.go
james-prysm Feb 9, 2024
7464d9a
Update validator/client/validator.go
james-prysm Feb 9, 2024
0f4467f
Update validator/client/validator.go
james-prysm Feb 9, 2024
6a828bb
Update validator/client/validator.go
james-prysm Feb 9, 2024
ddd8560
Update validator/client/validator.go
james-prysm Feb 9, 2024
68ceefa
Update validator/client/beacon-api/beacon_api_validator_client.go
james-prysm Feb 9, 2024
40c566a
Update validator/client/validator.go
james-prysm Feb 9, 2024
02ba06a
Update validator/client/validator.go
james-prysm Feb 9, 2024
37da651
addressing radek comments
james-prysm Feb 9, 2024
37da93f
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 9, 2024
be240c4
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 9, 2024
371aa4c
Update validator/client/validator.go
james-prysm Feb 12, 2024
1c3e369
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 12, 2024
e802f62
addressing review feedback
james-prysm Feb 12, 2024
44d5e67
moving things to below next slot ticker
james-prysm Feb 12, 2024
4f4b473
fixing tests
james-prysm Feb 12, 2024
2127691
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 13, 2024
472d211
update naming
james-prysm Feb 13, 2024
0eb554c
adding TODO comment
james-prysm Feb 13, 2024
e7776fc
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 13, 2024
1d03da3
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 13, 2024
4d4eafa
Update api/client/beacon/health.go
james-prysm Feb 14, 2024
264282c
addressing comments
james-prysm Feb 14, 2024
1e8901c
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 14, 2024
d698395
merging develop
james-prysm Feb 15, 2024
ba6c524
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 15, 2024
e880031
fixing broken linting
james-prysm Feb 15, 2024
49af08b
fixing more import issues
james-prysm Feb 15, 2024
86f5f48
fixing more import issues
james-prysm Feb 15, 2024
243c833
linting
james-prysm Feb 15, 2024
182a683
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 15, 2024
cc08106
updating based on radek's comments
james-prysm Feb 16, 2024
dde4265
addressing more comments
james-prysm Feb 16, 2024
ab89e47
fixing nogo error
james-prysm Feb 16, 2024
2c63dba
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 16, 2024
32417a3
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 18, 2024
f0c3903
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 21, 2024
263a51b
fixing duplicate import
james-prysm Feb 21, 2024
9d08460
gaz
james-prysm Feb 21, 2024
65b85ae
adding radek's review suggestion
james-prysm Feb 21, 2024
2f80739
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 21, 2024
8e90fbf
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 22, 2024
a35a12b
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 26, 2024
54bfd46
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 26, 2024
6729ae6
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 27, 2024
8c5200b
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 4, 2024
bc8e9e2
Update proto/prysm/v1alpha1/node.proto
james-prysm Mar 4, 2024
45693f2
preston review comments
james-prysm Mar 4, 2024
98aa51e
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 5, 2024
c8bebda
Update api/client/event/event_stream.go
james-prysm Mar 5, 2024
4c4b669
Update validator/client/validator.go
james-prysm Mar 5, 2024
da6fd76
addressing some more preston review items
james-prysm Mar 5, 2024
42356da
fixing tests for linting
james-prysm Mar 5, 2024
79200f4
fixing missed linting
james-prysm Mar 5, 2024
2d77069
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 5, 2024
e3fd6b8
updating based on feedback to simplify
james-prysm Mar 5, 2024
aa144a0
adding interface check at the top
james-prysm Mar 6, 2024
d2f7df4
reverting some comments
james-prysm Mar 6, 2024
ae1720b
cleaning up intatiations
james-prysm Mar 6, 2024
024624f
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 6, 2024
c12e68b
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 8, 2024
be7963f
reworking the health tracker
james-prysm Mar 11, 2024
6a8a9b0
fixing linting
james-prysm Mar 11, 2024
0c7bb37
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 11, 2024
0d21889
fixing more linting to adhear to interface
james-prysm Mar 11, 2024
7141f63
adding interface check at the the top of the file
james-prysm Mar 11, 2024
e249fb9
fixing unit tests
james-prysm Mar 11, 2024
c49d870
attempting to fix dependency cycle
james-prysm Mar 11, 2024
8ae7314
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 11, 2024
fe599f8
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 12, 2024
effc418
addressing radek's comment
james-prysm Mar 12, 2024
414103e
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 12, 2024
92e813a
Update validator/client/beacon-api/beacon_api_validator_client.go
james-prysm Mar 12, 2024
3b92c0b
adding more tests and feedback items
james-prysm Mar 12, 2024
4c5ba7b
fixing TODO comment
james-prysm Mar 12, 2024
db14009
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions api/client/event/BUILD.bazel
@@ -0,0 +1,13 @@
load("@prysm//tools/go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["eventStream.go"],
importpath = "github.com/prysmaticlabs/prysm/v4/api/client/event",
visibility = ["//visibility:public"],
deps = [
"//api:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
141 changes: 141 additions & 0 deletions api/client/event/eventStream.go
@@ -0,0 +1,141 @@
package event

import (
"bufio"
"context"
"net/http"
"net/url"
"strings"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/api"
log "github.com/sirupsen/logrus"
)

const (
EventHead = "head"
EventBlock = "block"
EventAttestation = "attestation"
EventVoluntaryExit = "voluntary_exit"
EventBlsToExecutionChange = "bls_to_execution_change"
EventProposerSlashing = "proposer_slashing"
EventAttesterSlashing = "attester_slashing"
EventFinalizedCheckpoint = "finalized_checkpoint"
EventChainReorg = "chain_reorg"
EventContributionAndProof = "contribution_and_proof"
EventLightClientFinalityUpdate = "light_client_finality_update"
EventLightClientOptimisticUpdate = "light_client_optimistic_update"
EventPayloadAttributes = "payload_attributes"
EventBlobSidecar = "blob_sidecar"
EventError = "error"
)

var DefaultEventTopics = []string{EventHead}

type EventStreamClient interface {
Subscribe(eventsChannel chan<- *Event)
}

type Event struct {
EventType string
Data []byte
}

// EventStream is responsible for subscribing to the Beacon API events endpoint
// and dispatching received events to subscribers.
type EventStream struct {
ctx context.Context
httpClient *http.Client
host string
topics []string
}

func NewEventStream(ctx context.Context, httpClient *http.Client, host string, topics []string) (*EventStream, error) {
// Check if the host is a valid URL
_, err := url.ParseRequestURI(host)
if err != nil {
return nil, err
}
if len(topics) == 0 {
return nil, errors.New("no topics provided")
}

return &EventStream{
ctx: ctx,
httpClient: httpClient,
host: host,
topics: topics,
}, nil
}

func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
allTopics := strings.Join(h.topics, ",")
log.Info("Starting listening to Beacon API events on topics: " + allTopics)
fullUrl := h.host + "/eth/v1/events?topics=" + allTopics
req, err := http.NewRequestWithContext(h.ctx, http.MethodGet, fullUrl, nil)
if err != nil {
eventsChannel <- &Event{
EventType: EventError,
Data: []byte(errors.Wrap(err, "Failed to create HTTP request").Error()),
}
}
req.Header.Set("Accept", api.EventStreamMediaType)
req.Header.Set("Connection", api.KeepAlive)
resp, err := h.httpClient.Do(req)
if err != nil {
eventsChannel <- &Event{
EventType: EventError,
Data: []byte(errors.Wrap(err, "Failed to perform HTTP request").Error()),
}
}

defer func() {
if closeErr := resp.Body.Close(); closeErr != nil {
log.WithError(closeErr).Error("Failed to close events response body")
close(eventsChannel)
}
}()
// Create a new scanner to read lines from the response body
scanner := bufio.NewScanner(resp.Body)

var eventType, data string // Variables to store event type and data

// Iterate over lines of the event stream
for scanner.Scan() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is a span enough...

select {
case <-h.ctx.Done():
log.Info("Context canceled, stopping event stream")
close(eventsChannel)
return
default:
line := scanner.Text()
// Handle the event based on your specific format
if line == "" {
// Empty line indicates the end of an event
if eventType != "" && data != "" {
// Process the event when both eventType and data are set
eventsChannel <- &Event{EventType: eventType, Data: []byte(data)}
}

// Reset eventType and data for the next event
eventType, data = "", ""
continue
}

if line[0:7] == "event: " {
// Extract event type from the "event" field
eventType = line[7:]
} else if line[0:6] == "data: " {
// Extract data from the "data" field
data = line[6:]
}
}
}

if err := scanner.Err(); err != nil {
eventsChannel <- &Event{
EventType: EventError,
Data: []byte(errors.Wrap(err, "Error reading response body").Error()),
}
}
}
2 changes: 2 additions & 0 deletions validator/client/BUILD.bazel
Expand Up @@ -25,12 +25,14 @@ go_library(
"//validator:__subpackages__",
],
deps = [
"//api/client/event:go_default_library",
"//api/grpc:go_default_library",
"//async:go_default_library",
"//async/event:go_default_library",
"//beacon-chain/builder:go_default_library",
"//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/rpc/eth/events:go_default_library",
"//cache/lru:go_default_library",
"//cmd:go_default_library",
"//config/features:go_default_library",
Expand Down
4 changes: 2 additions & 2 deletions validator/client/beacon-api/BUILD.bazel
Expand Up @@ -15,7 +15,6 @@ go_library(
"domain_data.go",
"doppelganger.go",
"duties.go",
"event_handler.go",
"genesis.go",
"get_beacon_block.go",
"index.go",
Expand All @@ -40,11 +39,11 @@ go_library(
visibility = ["//validator:__subpackages__"],
deps = [
"//api:go_default_library",
"//api/client/event:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/rpc/eth/beacon:go_default_library",
"//beacon-chain/rpc/eth/config:go_default_library",
"//beacon-chain/rpc/eth/events:go_default_library",
"//beacon-chain/rpc/eth/node:go_default_library",
"//beacon-chain/rpc/eth/shared:go_default_library",
"//beacon-chain/rpc/eth/validator:go_default_library",
Expand All @@ -64,6 +63,7 @@ go_library(
"@com_github_golang_protobuf//ptypes/empty",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_protobuf//types/known/timestamppb:go_default_library",
],
Expand Down
34 changes: 17 additions & 17 deletions validator/client/beacon-api/beacon_api_validator_client.go
Expand Up @@ -7,28 +7,24 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/api/client/event"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/validator/client/iface"
"go.opencensus.io/trace"
)

type ValidatorClientOpt func(*beaconApiValidatorClient)

func WithEventHandler(h *EventHandler) ValidatorClientOpt {
return func(c *beaconApiValidatorClient) {
c.eventHandler = h
}
}

type beaconApiValidatorClient struct {
genesisProvider GenesisProvider
dutiesProvider dutiesProvider
stateValidatorsProvider StateValidatorsProvider
jsonRestHandler JsonRestHandler
eventHandler *EventHandler
beaconBlockConverter BeaconBlockConverter
prysmBeaconChainCLient iface.PrysmBeaconChainClient
isEventStreamRunning bool
}

func NewBeaconApiValidatorClient(jsonRestHandler JsonRestHandler, opts ...ValidatorClientOpt) iface.ValidatorClient {
Expand All @@ -42,6 +38,7 @@ func NewBeaconApiValidatorClient(jsonRestHandler JsonRestHandler, opts ...Valida
nodeClient: &beaconApiNodeClient{jsonRestHandler: jsonRestHandler},
jsonRestHandler: jsonRestHandler,
},
isEventStreamRunning: false,
}
for _, o := range opts {
o(c)
Expand Down Expand Up @@ -114,10 +111,6 @@ func (c *beaconApiValidatorClient) ProposeExit(ctx context.Context, in *ethpb.Si
return c.proposeExit(ctx, in)
}

func (c *beaconApiValidatorClient) StreamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest) (ethpb.BeaconNodeValidator_StreamSlotsClient, error) {
return c.streamSlots(ctx, in, time.Second), nil
}

func (c *beaconApiValidatorClient) StreamBlocksAltair(ctx context.Context, in *ethpb.StreamBlocksRequest) (ethpb.BeaconNodeValidator_StreamBlocksAltairClient, error) {
return c.streamBlocks(ctx, in, time.Second), nil
}
Expand Down Expand Up @@ -163,15 +156,22 @@ func (c *beaconApiValidatorClient) WaitForChainStart(ctx context.Context, _ *emp
return c.waitForChainStart(ctx)
}

func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context) error {
if c.eventHandler != nil {
if err := c.eventHandler.get(ctx, []string{"head"}); err != nil {
return errors.Wrapf(err, "could not invoke event handler")
func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context, topics []string, eventsChannel chan<- *event.Event) {
ctx, span := trace.StartSpan(ctx, "validator.jsonClient.StartEventStream")
james-prysm marked this conversation as resolved.
Show resolved Hide resolved
defer span.End()
eventStream, err := event.NewEventStream(ctx, c.jsonRestHandler.GetHttpClient(), c.jsonRestHandler.GetHost(), topics)
if err != nil {
eventsChannel <- &event.Event{
EventType: event.EventError,
Data: []byte(errors.Wrap(err, "failed to start event stream").Error()),
}
return
}
return nil
c.isEventStreamRunning = true
eventStream.Subscribe(eventsChannel)
c.isEventStreamRunning = false
rkapka marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *beaconApiValidatorClient) EventStreamIsRunning() bool {
return c.eventHandler.running
return c.isEventStreamRunning
}