Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace receive slot with event stream #13563

Merged
merged 107 commits into from Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 87 commits
Commits
Show all changes
107 commits
Select commit Hold shift + click to select a range
29a552b
WIP
james-prysm Jan 30, 2024
86966f5
event stream wip
james-prysm Jan 31, 2024
de96792
returning nil
james-prysm Jan 31, 2024
033448e
temp removing some tests
james-prysm Feb 1, 2024
23ec534
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 1, 2024
91ea02a
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 1, 2024
b074356
wip health checks
james-prysm Feb 5, 2024
b5670b6
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 5, 2024
b76e446
fixing conficts
james-prysm Feb 5, 2024
f3a0534
updating fields based on linting
james-prysm Feb 5, 2024
38bce44
fixing more errors
james-prysm Feb 6, 2024
87c62c9
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 6, 2024
1fb9a3e
fixing mocks
james-prysm Feb 6, 2024
8b0d2ed
fixing more mocks
james-prysm Feb 6, 2024
17eb642
fixing more linting
james-prysm Feb 6, 2024
be92c89
removing white space for lint
james-prysm Feb 6, 2024
0e403d1
fixing log format
james-prysm Feb 6, 2024
34af48f
gaz
james-prysm Feb 6, 2024
aa49ecc
reverting changes on grpc
james-prysm Feb 6, 2024
800ce23
fixing unit tests
james-prysm Feb 6, 2024
048bbcc
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 6, 2024
1595264
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 6, 2024
1859840
adding in tests for health tracker and event stream
james-prysm Feb 7, 2024
73022ae
adding more tests for streaming slot
james-prysm Feb 7, 2024
9be3711
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 7, 2024
47c3f28
gaz
james-prysm Feb 7, 2024
c179901
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 7, 2024
1b6581b
Update api/client/event/event_stream.go
james-prysm Feb 8, 2024
b7cab70
review comments
james-prysm Feb 8, 2024
4ad4a46
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 8, 2024
0982c0b
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 9, 2024
dd0b4d9
Update validator/client/runner.go
james-prysm Feb 9, 2024
7464d9a
Update validator/client/validator.go
james-prysm Feb 9, 2024
0f4467f
Update validator/client/validator.go
james-prysm Feb 9, 2024
6a828bb
Update validator/client/validator.go
james-prysm Feb 9, 2024
ddd8560
Update validator/client/validator.go
james-prysm Feb 9, 2024
68ceefa
Update validator/client/beacon-api/beacon_api_validator_client.go
james-prysm Feb 9, 2024
40c566a
Update validator/client/validator.go
james-prysm Feb 9, 2024
02ba06a
Update validator/client/validator.go
james-prysm Feb 9, 2024
37da651
addressing radek comments
james-prysm Feb 9, 2024
37da93f
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 9, 2024
be240c4
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 9, 2024
371aa4c
Update validator/client/validator.go
james-prysm Feb 12, 2024
1c3e369
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 12, 2024
e802f62
addressing review feedback
james-prysm Feb 12, 2024
44d5e67
moving things to below next slot ticker
james-prysm Feb 12, 2024
4f4b473
fixing tests
james-prysm Feb 12, 2024
2127691
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 13, 2024
472d211
update naming
james-prysm Feb 13, 2024
0eb554c
adding TODO comment
james-prysm Feb 13, 2024
e7776fc
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 13, 2024
1d03da3
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 13, 2024
4d4eafa
Update api/client/beacon/health.go
james-prysm Feb 14, 2024
264282c
addressing comments
james-prysm Feb 14, 2024
1e8901c
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 14, 2024
d698395
merging develop
james-prysm Feb 15, 2024
ba6c524
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 15, 2024
e880031
fixing broken linting
james-prysm Feb 15, 2024
49af08b
fixing more import issues
james-prysm Feb 15, 2024
86f5f48
fixing more import issues
james-prysm Feb 15, 2024
243c833
linting
james-prysm Feb 15, 2024
182a683
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 15, 2024
cc08106
updating based on radek's comments
james-prysm Feb 16, 2024
dde4265
addressing more comments
james-prysm Feb 16, 2024
ab89e47
fixing nogo error
james-prysm Feb 16, 2024
2c63dba
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 16, 2024
32417a3
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 18, 2024
f0c3903
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 21, 2024
263a51b
fixing duplicate import
james-prysm Feb 21, 2024
9d08460
gaz
james-prysm Feb 21, 2024
65b85ae
adding radek's review suggestion
james-prysm Feb 21, 2024
2f80739
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 21, 2024
8e90fbf
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 22, 2024
a35a12b
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 26, 2024
54bfd46
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 26, 2024
6729ae6
Merge branch 'develop' into remove-stream-slot
james-prysm Feb 27, 2024
8c5200b
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 4, 2024
bc8e9e2
Update proto/prysm/v1alpha1/node.proto
james-prysm Mar 4, 2024
45693f2
preston review comments
james-prysm Mar 4, 2024
98aa51e
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 5, 2024
c8bebda
Update api/client/event/event_stream.go
james-prysm Mar 5, 2024
4c4b669
Update validator/client/validator.go
james-prysm Mar 5, 2024
da6fd76
addressing some more preston review items
james-prysm Mar 5, 2024
42356da
fixing tests for linting
james-prysm Mar 5, 2024
79200f4
fixing missed linting
james-prysm Mar 5, 2024
2d77069
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 5, 2024
e3fd6b8
updating based on feedback to simplify
james-prysm Mar 5, 2024
aa144a0
adding interface check at the top
james-prysm Mar 6, 2024
d2f7df4
reverting some comments
james-prysm Mar 6, 2024
ae1720b
cleaning up intatiations
james-prysm Mar 6, 2024
024624f
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 6, 2024
c12e68b
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 8, 2024
be7963f
reworking the health tracker
james-prysm Mar 11, 2024
6a8a9b0
fixing linting
james-prysm Mar 11, 2024
0c7bb37
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 11, 2024
0d21889
fixing more linting to adhear to interface
james-prysm Mar 11, 2024
7141f63
adding interface check at the the top of the file
james-prysm Mar 11, 2024
e249fb9
fixing unit tests
james-prysm Mar 11, 2024
c49d870
attempting to fix dependency cycle
james-prysm Mar 11, 2024
8ae7314
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 11, 2024
fe599f8
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 12, 2024
effc418
addressing radek's comment
james-prysm Mar 12, 2024
414103e
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 12, 2024
92e813a
Update validator/client/beacon-api/beacon_api_validator_client.go
james-prysm Mar 12, 2024
3b92c0b
adding more tests and feedback items
james-prysm Mar 12, 2024
4c5ba7b
fixing TODO comment
james-prysm Mar 12, 2024
db14009
Merge branch 'develop' into remove-stream-slot
james-prysm Mar 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/client/beacon/BUILD.bazel
Expand Up @@ -6,6 +6,7 @@ go_library(
"checkpoint.go",
"client.go",
"doc.go",
"health.go",
],
importpath = "github.com/prysmaticlabs/prysm/v5/api/client/beacon",
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -36,6 +37,7 @@ go_test(
srcs = [
"checkpoint_test.go",
"client_test.go",
"health_test.go",
],
embed = [":go_default_library"],
deps = [
Expand Down
47 changes: 47 additions & 0 deletions api/client/beacon/health.go
@@ -0,0 +1,47 @@
package beacon

import (
"sync"
)

type NodeHealth struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why put this under api/client? Seems to be unrelated to making API calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the idea was from the client perspective this Node health object keeps track of the server ( the beacon node) so in this case the validator uses the package api/client to track the beacon node's health. happy to move it if it doesn't make sense.

isHealthy bool
healthChan chan bool
sync.RWMutex
}

func NewNodeHealth(initialStatus bool) *NodeHealth {
return &NodeHealth{
isHealthy: initialStatus,
healthChan: make(chan bool, 1),
}
}

// HealthUpdates provides a read-only channel for health updates.
func (n *NodeHealth) HealthUpdates() <-chan bool {
return n.healthChan
}

func (n *NodeHealth) IsHealthy() bool {
n.RLock()
defer n.RUnlock()
return n.isHealthy
}

func (n *NodeHealth) UpdateNodeHealth(newStatus bool) {
n.RLock()
isStatusChanged := newStatus != n.isHealthy
n.RUnlock()

if isStatusChanged {
n.Lock()
// Double-check the condition to ensure it hasn't changed since the first check.
if newStatus != n.isHealthy {
n.isHealthy = newStatus
n.Unlock() // It's better to unlock as soon as the protected section is over.
n.healthChan <- newStatus
} else {
n.Unlock()
}
}
}
102 changes: 102 additions & 0 deletions api/client/beacon/health_test.go
@@ -0,0 +1,102 @@
package beacon

import (
"sync"
"testing"
)

func TestNodeHealth_IsHealthy(t *testing.T) {
tests := []struct {
name string
isHealthy bool
want bool
}{
{"initially healthy", true, true},
{"initially unhealthy", false, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
n := &NodeHealth{
isHealthy: tt.isHealthy,
healthChan: make(chan bool, 1),
}
if got := n.IsHealthy(); got != tt.want {
t.Errorf("IsHealthy() = %v, want %v", got, tt.want)
}
})
}
}

func TestNodeHealth_UpdateNodeHealth(t *testing.T) {
tests := []struct {
name string
initial bool // Initial health status
newStatus bool // Status to update to
shouldSend bool // Should a message be sent through the channel
}{
{"healthy to unhealthy", true, false, true},
{"unhealthy to healthy", false, true, true},
{"remain healthy", true, true, false},
{"remain unhealthy", false, false, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
n := NewNodeHealth(tt.initial)
n.isHealthy = tt.initial // Set initial health status
n.UpdateNodeHealth(tt.newStatus)

// Check if health status was updated
if n.IsHealthy() != tt.newStatus {
t.Errorf("UpdateNodeHealth() failed to update isHealthy from %v to %v", tt.initial, tt.newStatus)
}

select {
case status := <-n.HealthUpdates():
if !tt.shouldSend {
t.Errorf("UpdateNodeHealth() unexpectedly sent status %v to HealthCh", status)
} else if status != tt.newStatus {
t.Errorf("UpdateNodeHealth() sent wrong status %v, want %v", status, tt.newStatus)
}
default:
if tt.shouldSend {
t.Error("UpdateNodeHealth() did not send any status to HealthCh when expected")
}
}
})
}
}

func TestNodeHealth_Concurrency(t *testing.T) {
n := NewNodeHealth(true)
var wg sync.WaitGroup

// Number of goroutines to spawn for both reading and writing
numGoroutines := 6

go func() {
for range n.HealthUpdates() {
// Consume values to avoid blocking on channel send.
}
}()

wg.Add(numGoroutines * 2) // for readers and writers

// Concurrently update health status
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
n.UpdateNodeHealth(false) // Flip health status
n.UpdateNodeHealth(true) // And flip it back
}()
}

// Concurrently read health status
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
_ = n.IsHealthy() // Just read the value
}()
}

wg.Wait() // Wait for all goroutines to finish
}
3 changes: 3 additions & 0 deletions api/client/errors.go
Expand Up @@ -21,6 +21,9 @@ var ErrNotFound = errors.Wrap(ErrNotOK, "recv 404 NotFound response from API")
// ErrInvalidNodeVersion indicates that the /eth/v1/node/version API response format was not recognized.
var ErrInvalidNodeVersion = errors.New("invalid node version response")

// ErrConnectionIssue represents a connection problem.
var ErrConnectionIssue = errors.New("could not connect")

// Non200Err is a function that parses an HTTP response to handle responses that are not 200 with a formatted error.
func Non200Err(response *http.Response) error {
bodyBytes, err := io.ReadAll(response.Body)
Expand Down
24 changes: 24 additions & 0 deletions api/client/event/BUILD.bazel
@@ -0,0 +1,24 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["event_stream.go"],
importpath = "github.com/prysmaticlabs/prysm/v5/api/client/event",
visibility = ["//visibility:public"],
deps = [
"//api:go_default_library",
"//api/client:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["event_stream_test.go"],
embed = [":go_default_library"],
deps = [
"//testing/require:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
144 changes: 144 additions & 0 deletions api/client/event/event_stream.go
@@ -0,0 +1,144 @@
package event

import (
"bufio"
"context"
"net/http"
"net/url"
"strings"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/api"
"github.com/prysmaticlabs/prysm/v5/api/client"
log "github.com/sirupsen/logrus"
)

const (
EventHead = "head"
EventBlock = "block"
EventAttestation = "attestation"
EventVoluntaryExit = "voluntary_exit"
EventBlsToExecutionChange = "bls_to_execution_change"
EventProposerSlashing = "proposer_slashing"
EventAttesterSlashing = "attester_slashing"
EventFinalizedCheckpoint = "finalized_checkpoint"
EventChainReorg = "chain_reorg"
EventContributionAndProof = "contribution_and_proof"
EventLightClientFinalityUpdate = "light_client_finality_update"
EventLightClientOptimisticUpdate = "light_client_optimistic_update"
EventPayloadAttributes = "payload_attributes"
EventBlobSidecar = "blob_sidecar"
EventError = "error"
EventConnectionError = "connection_error"
)

var DefaultEventTopics = []string{EventHead}

type EventStreamClient interface {
Subscribe(eventsChannel chan<- *Event)
}

type Event struct {
EventType string
Data []byte
}

// EventStream is responsible for subscribing to the Beacon API events endpoint
// and dispatching received events to subscribers.
type EventStream struct {
ctx context.Context
httpClient *http.Client
host string
topics []string
}

func NewEventStream(ctx context.Context, httpClient *http.Client, host string, topics []string) (*EventStream, error) {
// Check if the host is a valid URL
_, err := url.ParseRequestURI(host)
if err != nil {
return nil, err
}
if len(topics) == 0 {
return nil, errors.New("no topics provided")
}

return &EventStream{
ctx: ctx,
httpClient: httpClient,
host: host,
topics: topics,
}, nil
}

func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
allTopics := strings.Join(h.topics, ",")
log.WithField("topics", allTopics).Info("Listening to Beacon API events")
fullUrl := h.host + "/eth/v1/events?topics=" + allTopics
req, err := http.NewRequestWithContext(h.ctx, http.MethodGet, fullUrl, nil)
if err != nil {
eventsChannel <- &Event{
EventType: EventConnectionError,
Data: []byte(errors.Wrap(err, "failed to create HTTP request").Error()),
}
}
req.Header.Set("Accept", api.EventStreamMediaType)
req.Header.Set("Connection", api.KeepAlive)
resp, err := h.httpClient.Do(req)
if err != nil {
eventsChannel <- &Event{
EventType: EventConnectionError,
Data: []byte(errors.Wrap(err, client.ErrConnectionIssue.Error()).Error()),
}
}

defer func() {
if closeErr := resp.Body.Close(); closeErr != nil {
log.WithError(closeErr).Error("Failed to close events response body")
}
}()
// Create a new scanner to read lines from the response body
scanner := bufio.NewScanner(resp.Body)

var eventType, data string // Variables to store event type and data

// Iterate over lines of the event stream
for scanner.Scan() {
select {
case <-h.ctx.Done():
log.Info("Context canceled, stopping event stream")
close(eventsChannel)
return
default:
line := scanner.Text() // TODO: scanner does not handle /r and does not fully adhere to https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the TODO item? Should you fix that before merging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an area i need some suggestions on, scanner does not handle /r only /n and /n/r , i don't think any implementations use /r , would it be better to use a sse library to handle this instead of our own implementation

// Handle the event based on your specific format
if line == "" {
// Empty line indicates the end of an event
if eventType != "" && data != "" {
// Process the event when both eventType and data are set
eventsChannel <- &Event{EventType: eventType, Data: []byte(data)}
}

// Reset eventType and data for the next event
eventType, data = "", ""
continue
}
et, ok := strings.CutPrefix(line, "event: ")
if ok {
// Extract event type from the "event" field
eventType = et
}
d, ok := strings.CutPrefix(line, "data: ")
if ok {
// Extract data from the "data" field
data = d
}
}
}

if err := scanner.Err(); err != nil {
eventsChannel <- &Event{
EventType: EventConnectionError,
Data: []byte(errors.Wrap(err, errors.Wrap(client.ErrConnectionIssue, "scanner failed").Error()).Error()),
}
}
}