Skip to content

Commit 18b3203

Browse files
prestonvanloonnisdasprylabs-bulldozer[bot]
authored
Wait for subnet peers before broadcasting onto attestation subnet topic (#6893)
* Initial pass * Add metric to measure success * Use a subnet RWLock to prevent duplicate requests, give up after 3 attempts * push latest commented code * try with non-blocking broadcast * Add feature flag, ignore parent deadline if any * Add slot as metadata * add tests * gaz Co-authored-by: nisdas <nishdas93@gmail.com> Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
1 parent 33105f1 commit 18b3203

File tree

15 files changed

+432
-18
lines changed

15 files changed

+432
-18
lines changed

beacon-chain/p2p/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ go_test(
118118
"//beacon-chain/p2p/testing:go_default_library",
119119
"//proto/beacon/p2p/v1:go_default_library",
120120
"//proto/testing:go_default_library",
121+
"//shared/featureconfig:go_default_library",
121122
"//shared/iputils:go_default_library",
122123
"//shared/p2putils:go_default_library",
123124
"//shared/params:go_default_library",

beacon-chain/p2p/broadcaster.go

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ import (
55
"context"
66
"fmt"
77
"reflect"
8+
"time"
89

910
"github.com/gogo/protobuf/proto"
1011
"github.com/pkg/errors"
1112
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
13+
"github.com/prysmaticlabs/prysm/shared/featureconfig"
1214
"github.com/prysmaticlabs/prysm/shared/hashutil"
15+
"github.com/prysmaticlabs/prysm/shared/params"
1316
"github.com/prysmaticlabs/prysm/shared/traceutil"
1417
"go.opencensus.io/trace"
1518
)
@@ -18,10 +21,18 @@ import (
1821
// GossipTypeMapping.
1922
var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub topic")
2023

24+
// Max number of attempts to search the network for a specific subnet.
25+
const maxSubnetDiscoveryAttempts = 1
26+
2127
// Broadcast a message to the p2p network.
2228
func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
2329
ctx, span := trace.StartSpan(ctx, "p2p.Broadcast")
2430
defer span.End()
31+
32+
twoSlots := time.Duration(2*params.BeaconConfig().SecondsPerSlot) * time.Second
33+
ctx, cancel := context.WithTimeout(ctx, twoSlots)
34+
defer cancel()
35+
2536
forkDigest, err := s.forkDigest()
2637
if err != nil {
2738
err := errors.Wrap(err, "could not retrieve fork digest")
@@ -47,7 +58,67 @@ func (s *Service) BroadcastAttestation(ctx context.Context, subnet uint64, att *
4758
traceutil.AnnotateError(span, err)
4859
return err
4960
}
50-
return s.broadcastObject(ctx, att, attestationToTopic(subnet, forkDigest))
61+
62+
if featureconfig.Get().EnableAttBroadcastDiscoveryAttempts {
63+
// Non-blocking broadcast.
64+
go s.broadcastAttestation(ctx, subnet, att, forkDigest)
65+
} else {
66+
return s.broadcastObject(ctx, att, attestationToTopic(subnet, forkDigest))
67+
}
68+
69+
return nil
70+
}
71+
72+
func (s *Service) broadcastAttestation(ctx context.Context, subnet uint64, att *eth.Attestation, forkDigest [4]byte) {
73+
ctx, span := trace.StartSpan(ctx, "p2p.broadcastAttestation")
74+
defer span.End()
75+
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
76+
77+
oneEpoch := time.Duration(1*params.BeaconConfig().SlotsPerEpoch*params.BeaconConfig().SecondsPerSlot) * time.Second
78+
ctx, cancel := context.WithTimeout(ctx, oneEpoch)
79+
defer cancel()
80+
81+
// Ensure we have peers with this subnet.
82+
s.subnetLocker(subnet).RLock()
83+
hasPeer := s.hasPeerWithSubnet(subnet)
84+
s.subnetLocker(subnet).RUnlock()
85+
86+
span.AddAttributes(
87+
trace.BoolAttribute("hasPeer", hasPeer),
88+
trace.Int64Attribute("slot", int64(att.Data.Slot)),
89+
trace.Int64Attribute("subnet", int64(subnet)),
90+
)
91+
92+
attestationBroadcastAttempts.Inc()
93+
94+
if !hasPeer {
95+
if err := func() error {
96+
s.subnetLocker(subnet).Lock()
97+
defer s.subnetLocker(subnet).Unlock()
98+
for i := 0; i < maxSubnetDiscoveryAttempts; i++ {
99+
if err := ctx.Err(); err != nil {
100+
return err
101+
}
102+
ok, err := s.FindPeersWithSubnet(ctx, subnet)
103+
if err != nil {
104+
return err
105+
}
106+
if ok {
107+
savedAttestationBroadcasts.Inc()
108+
break
109+
}
110+
}
111+
return nil
112+
}(); err != nil {
113+
log.WithError(err).Error("Failed to find peers")
114+
traceutil.AnnotateError(span, err)
115+
}
116+
}
117+
118+
if err := s.broadcastObject(ctx, att, attestationToTopic(subnet, forkDigest)); err != nil {
119+
log.WithError(err).Error("Failed to broadcast attestation")
120+
traceutil.AnnotateError(span, err)
121+
}
51122
}
52123

53124
// method to broadcast messages to other peers in our gossip mesh.

beacon-chain/p2p/broadcaster_test.go

Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,25 @@ package p2p
33
import (
44
"context"
55
"fmt"
6+
"net"
67
"reflect"
78
"sync"
89
"testing"
910
"time"
1011

12+
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
13+
14+
"github.com/ethereum/go-ethereum/p2p/discover"
1115
"github.com/gogo/protobuf/proto"
16+
"github.com/libp2p/go-libp2p-core/host"
1217
pubsub "github.com/libp2p/go-libp2p-pubsub"
1318
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
19+
"github.com/prysmaticlabs/go-bitfield"
1420
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
21+
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers"
1522
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
1623
testpb "github.com/prysmaticlabs/prysm/proto/testing"
24+
"github.com/prysmaticlabs/prysm/shared/featureconfig"
1725
"github.com/prysmaticlabs/prysm/shared/testutil"
1826
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
1927
"github.com/prysmaticlabs/prysm/shared/testutil/require"
@@ -129,3 +137,261 @@ func TestService_Attestation_Subnet(t *testing.T) {
129137
assert.Equal(t, tt.topic, attestationToTopic(subnet, [4]byte{} /* fork digest */), "Wrong topic")
130138
}
131139
}
140+
141+
func TestService_BroadcastAttestation(t *testing.T) {
142+
p1 := p2ptest.NewTestP2P(t)
143+
p2 := p2ptest.NewTestP2P(t)
144+
p1.Connect(p2)
145+
if len(p1.BHost.Network().Peers()) == 0 {
146+
t.Fatal("No peers")
147+
}
148+
149+
p := &Service{
150+
host: p1.BHost,
151+
pubsub: p1.PubSub(),
152+
joinedTopics: map[string]*pubsub.Topic{},
153+
cfg: &Config{},
154+
genesisTime: time.Now(),
155+
genesisValidatorsRoot: []byte{'A'},
156+
subnetsLock: make(map[uint64]*sync.RWMutex),
157+
subnetsLockLock: sync.Mutex{},
158+
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
159+
ScorerParams: &peers.PeerScorerConfig{},
160+
}),
161+
}
162+
163+
msg := &eth.Attestation{
164+
AggregationBits: bitfield.NewBitlist(7),
165+
Data: &eth.AttestationData{
166+
Slot: 0,
167+
CommitteeIndex: 0,
168+
BeaconBlockRoot: make([]byte, 32),
169+
Source: &eth.Checkpoint{
170+
Epoch: 0,
171+
Root: make([]byte, 32),
172+
},
173+
Target: &eth.Checkpoint{
174+
Epoch: 0,
175+
Root: make([]byte, 32),
176+
},
177+
},
178+
Signature: make([]byte, 96),
179+
}
180+
181+
subnet := uint64(5)
182+
183+
topic := AttestationSubnetTopicFormat
184+
GossipTypeMapping[reflect.TypeOf(msg)] = topic
185+
digest, err := p.forkDigest()
186+
require.NoError(t, err)
187+
topic = fmt.Sprintf(topic, digest, subnet)
188+
189+
// External peer subscribes to the topic.
190+
topic += p.Encoding().ProtocolSuffix()
191+
sub, err := p2.SubscribeToTopic(topic)
192+
require.NoError(t, err)
193+
194+
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
195+
196+
// Async listen for the pubsub, must be before the broadcast.
197+
var wg sync.WaitGroup
198+
wg.Add(1)
199+
go func(tt *testing.T) {
200+
defer wg.Done()
201+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
202+
defer cancel()
203+
204+
incomingMessage, err := sub.Next(ctx)
205+
require.NoError(t, err)
206+
207+
result := &eth.Attestation{}
208+
require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result))
209+
if !proto.Equal(result, msg) {
210+
tt.Errorf("Did not receive expected message, got %+v, wanted %+v", result, msg)
211+
}
212+
}(t)
213+
214+
// Broadcast to peers and wait.
215+
require.NoError(t, p.BroadcastAttestation(context.Background(), subnet, msg))
216+
if testutil.WaitTimeout(&wg, 1*time.Second) {
217+
t.Error("Failed to receive pubsub within 1s")
218+
}
219+
}
220+
221+
func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
222+
// Setup bootnode.
223+
cfg := &Config{}
224+
port := 2000
225+
cfg.UDPPort = uint(port)
226+
_, pkey := createAddrAndPrivKey(t)
227+
ipAddr := net.ParseIP("127.0.0.1")
228+
genesisTime := time.Now()
229+
genesisValidatorsRoot := make([]byte, 32)
230+
s := &Service{
231+
cfg: cfg,
232+
genesisTime: genesisTime,
233+
genesisValidatorsRoot: genesisValidatorsRoot,
234+
}
235+
bootListener, err := s.createListener(ipAddr, pkey)
236+
require.NoError(t, err)
237+
defer bootListener.Close()
238+
239+
// Use shorter period for testing.
240+
currentPeriod := pollingPeriod
241+
pollingPeriod = 1 * time.Second
242+
defer func() {
243+
pollingPeriod = currentPeriod
244+
}()
245+
246+
bootNode := bootListener.Self()
247+
subnet := uint64(5)
248+
249+
var listeners []*discover.UDPv5
250+
var hosts []host.Host
251+
// setup other nodes.
252+
cfg = &Config{
253+
BootstrapNodeAddr: []string{bootNode.String()},
254+
Discv5BootStrapAddr: []string{bootNode.String()},
255+
MaxPeers: 30,
256+
}
257+
// Setup 2 different hosts
258+
for i := 1; i <= 2; i++ {
259+
h, pkey, ipAddr := createHost(t, port+i)
260+
cfg.UDPPort = uint(port + i)
261+
cfg.TCPPort = uint(port + i)
262+
s := &Service{
263+
cfg: cfg,
264+
genesisTime: genesisTime,
265+
genesisValidatorsRoot: genesisValidatorsRoot,
266+
}
267+
listener, err := s.startDiscoveryV5(ipAddr, pkey)
268+
// Set for 2nd peer
269+
if i == 2 {
270+
s.dv5Listener = listener
271+
s.metaData = new(pb.MetaData)
272+
bitV := bitfield.NewBitvector64()
273+
bitV.SetBitAt(subnet, true)
274+
s.updateSubnetRecordWithMetadata(bitV)
275+
}
276+
assert.NoError(t, err, "Could not start discovery for node")
277+
listeners = append(listeners, listener)
278+
hosts = append(hosts, h)
279+
}
280+
defer func() {
281+
// Close down all peers.
282+
for _, listener := range listeners {
283+
listener.Close()
284+
}
285+
}()
286+
287+
// close peers upon exit of test
288+
defer func() {
289+
for _, h := range hosts {
290+
if err := h.Close(); err != nil {
291+
t.Log(err)
292+
}
293+
}
294+
}()
295+
296+
f := featureconfig.Get()
297+
f.EnableAttBroadcastDiscoveryAttempts = true
298+
rst := featureconfig.InitWithReset(f)
299+
defer rst()
300+
301+
ps1, err := pubsub.NewFloodSub(context.Background(), hosts[0],
302+
pubsub.WithMessageSigning(false),
303+
pubsub.WithStrictSignatureVerification(false),
304+
)
305+
require.NoError(t, err)
306+
307+
ps2, err := pubsub.NewFloodSub(context.Background(), hosts[1],
308+
pubsub.WithMessageSigning(false),
309+
pubsub.WithStrictSignatureVerification(false),
310+
)
311+
require.NoError(t, err)
312+
313+
p := &Service{
314+
host: hosts[0],
315+
pubsub: ps1,
316+
dv5Listener: listeners[0],
317+
joinedTopics: map[string]*pubsub.Topic{},
318+
cfg: &Config{},
319+
genesisTime: time.Now(),
320+
genesisValidatorsRoot: []byte{'A'},
321+
subnetsLock: make(map[uint64]*sync.RWMutex),
322+
subnetsLockLock: sync.Mutex{},
323+
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
324+
ScorerParams: &peers.PeerScorerConfig{},
325+
}),
326+
}
327+
328+
p2 := &Service{
329+
host: hosts[1],
330+
pubsub: ps2,
331+
dv5Listener: listeners[1],
332+
joinedTopics: map[string]*pubsub.Topic{},
333+
cfg: &Config{},
334+
genesisTime: time.Now(),
335+
genesisValidatorsRoot: []byte{'A'},
336+
subnetsLock: make(map[uint64]*sync.RWMutex),
337+
subnetsLockLock: sync.Mutex{},
338+
peers: peers.NewStatus(context.Background(), &peers.StatusConfig{
339+
ScorerParams: &peers.PeerScorerConfig{},
340+
}),
341+
}
342+
343+
msg := &eth.Attestation{
344+
AggregationBits: bitfield.NewBitlist(7),
345+
Data: &eth.AttestationData{
346+
Slot: 0,
347+
CommitteeIndex: 0,
348+
BeaconBlockRoot: make([]byte, 32),
349+
Source: &eth.Checkpoint{
350+
Epoch: 0,
351+
Root: make([]byte, 32),
352+
},
353+
Target: &eth.Checkpoint{
354+
Epoch: 0,
355+
Root: make([]byte, 32),
356+
},
357+
},
358+
Signature: make([]byte, 96),
359+
}
360+
361+
topic := AttestationSubnetTopicFormat
362+
GossipTypeMapping[reflect.TypeOf(msg)] = topic
363+
digest, err := p.forkDigest()
364+
require.NoError(t, err)
365+
topic = fmt.Sprintf(topic, digest, subnet)
366+
367+
// External peer subscribes to the topic.
368+
topic += p.Encoding().ProtocolSuffix()
369+
sub, err := p2.SubscribeToTopic(topic)
370+
require.NoError(t, err)
371+
372+
time.Sleep(50 * time.Millisecond) // libp2p fails without this delay...
373+
374+
// Async listen for the pubsub, must be before the broadcast.
375+
var wg sync.WaitGroup
376+
wg.Add(1)
377+
go func(tt *testing.T) {
378+
defer wg.Done()
379+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
380+
defer cancel()
381+
382+
incomingMessage, err := sub.Next(ctx)
383+
require.NoError(t, err)
384+
385+
result := &eth.Attestation{}
386+
require.NoError(t, p.Encoding().DecodeGossip(incomingMessage.Data, result))
387+
if !proto.Equal(result, msg) {
388+
tt.Errorf("Did not receive expected message, got %+v, wanted %+v", result, msg)
389+
}
390+
}(t)
391+
392+
// Broadcast to peers and wait.
393+
require.NoError(t, p.BroadcastAttestation(context.Background(), subnet, msg))
394+
if testutil.WaitTimeout(&wg, 1*time.Second) {
395+
t.Error("Failed to receive pubsub within 5s")
396+
}
397+
}

0 commit comments

Comments
 (0)