Skip to content

Commit

Permalink
relay on-demand discovery ,use proto to enr field mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Oct 27, 2023
1 parent f4fd8aa commit 27a8d46
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 59 deletions.
1 change: 1 addition & 0 deletions waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
}

w.opts.legacyFilterOpts = append(w.opts.legacyFilterOpts, legacy_filter.WithPeerManager(w.peermanager))
w.opts.filterOpts = append(w.opts.filterOpts, filter.WithPeerManager(w.peermanager))

w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullNode, w.timesource, w.opts.prometheusReg, w.log, w.opts.legacyFilterOpts...)
w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...)
Expand Down
95 changes: 52 additions & 43 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/discv5"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/service"
Expand All @@ -33,21 +32,29 @@ type NodeTopicDetails struct {
topic *pubsub.Topic
}

// WakuProtoInfo holds protocol specific info
// To be used at a later stage to set various config such as criteria for peer management specific to each Waku protocols
// This should make peer-manager agnostic to protocol
type WakuProtoInfo struct {
waku2ENRBitField uint8
}

// PeerManager applies various controls and manage connections towards peers.
type PeerManager struct {
peerConnector *PeerConnectionStrategy
maxPeers int
maxRelayPeers int
logger *zap.Logger
InRelayPeersTarget int
OutRelayPeersTarget int
host host.Host
serviceSlots *ServiceSlots
ctx context.Context
sub event.Subscription
topicMutex sync.RWMutex
subRelayTopics map[string]*NodeTopicDetails
discoveryService *discv5.DiscoveryV5
peerConnector *PeerConnectionStrategy
maxPeers int
maxRelayPeers int
logger *zap.Logger
InRelayPeersTarget int
OutRelayPeersTarget int
host host.Host
serviceSlots *ServiceSlots
ctx context.Context
sub event.Subscription
topicMutex sync.RWMutex
subRelayTopics map[string]*NodeTopicDetails
discoveryService *discv5.DiscoveryV5
wakuprotoToENRFieldMap map[protocol.ID]WakuProtoInfo
}

// PeerSelection provides various options based on which Peer is selected from a list of peers.
Expand Down Expand Up @@ -92,13 +99,14 @@ func NewPeerManager(maxConnections int, maxPeers int, logger *zap.Logger) *PeerM
}

pm := &PeerManager{
logger: logger.Named("peer-manager"),
maxRelayPeers: maxRelayPeers,
InRelayPeersTarget: inRelayPeersTarget,
OutRelayPeersTarget: outRelayPeersTarget,
serviceSlots: NewServiceSlot(),
subRelayTopics: make(map[string]*NodeTopicDetails),
maxPeers: maxPeers,
logger: logger.Named("peer-manager"),
maxRelayPeers: maxRelayPeers,
InRelayPeersTarget: inRelayPeersTarget,
OutRelayPeersTarget: outRelayPeersTarget,
serviceSlots: NewServiceSlot(),
subRelayTopics: make(map[string]*NodeTopicDetails),
maxPeers: maxPeers,
wakuprotoToENRFieldMap: map[protocol.ID]WakuProtoInfo{},
}
logger.Info("PeerManager init values", zap.Int("maxConnections", maxConnections),
zap.Int("maxRelayPeers", maxRelayPeers),
Expand All @@ -125,6 +133,11 @@ func (pm *PeerManager) SetPeerConnector(pc *PeerConnectionStrategy) {

// Start starts the processing to be done by peer manager.
func (pm *PeerManager) Start(ctx context.Context) {

var enrField uint8
enrField |= (1 << 0)
pm.RegisterWakuProtocol(relay.WakuRelayID_v200, enrField)

pm.ctx = ctx
if pm.sub != nil {
go pm.peerEventLoop(ctx)
Expand Down Expand Up @@ -211,16 +224,10 @@ func (pm *PeerManager) DiscoverAndConnectToPeers(ctx context.Context, cluster ui
return nil
}

// Convert wakuProtocols to enrBitField
func wakuProtoToENRFlags(protocol protocol.ID) (uint8, error) {
//TODO: figure out where to implement this without causing import loop
/*
switch(protocol){
case string(relay.WakuRelayID_v200):
}
*/
return 0, nil
// RegisterWakuProtocol to be used by Waku protocols that could be used for peer discovery
// Which means protoocl should be as defined in waku2 ENR key in https://rfc.vac.dev/spec/31/.
func (pm *PeerManager) RegisterWakuProtocol(proto protocol.ID, bitField uint8) {
pm.wakuprotoToENRFieldMap[proto] = WakuProtoInfo{waku2ENRBitField: bitField}
}

//type Predicate func(enode.Iterator) enode.Iterator
Expand All @@ -231,14 +238,15 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16,
shard uint16, wakuProtocol protocol.ID) ([]service.PeerData, error) {
var peers []service.PeerData

wakuENRFlags, err := wakuProtoToENRFlags(wakuProtocol)
if err != nil {
return nil, err
wakuProtoInfo, ok := pm.wakuprotoToENRFieldMap[wakuProtocol]
if !ok {
pm.logger.Error("cannot do on demand discovery for non-waku protocol", zap.String("protocol", string(wakuProtocol)))
return nil, errors.New("cannot do on demand discovery for non-waku protocol")
}

iterator, err := pm.discoveryService.PeerIterator(
discv5.FilterShard(cluster, shard),
discv5.FilterCapabilities(wakuENRFlags))
discv5.FilterCapabilities(wakuProtoInfo.waku2ENRBitField))
if err != nil {
pm.logger.Error("failed to find peers for shard and services", zap.Uint16("cluster", cluster),
zap.Uint16("shard", shard), zap.String("service", string(wakuProtocol)), zap.Error(err))
Expand All @@ -249,7 +257,7 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16,
defer iterator.Close()

for iterator.Next() {
pInfo, err := enr.EnodeToPeerInfo(iterator.Node())
pInfo, err := wenr.EnodeToPeerInfo(iterator.Node())
if err != nil {
continue
}
Expand All @@ -264,13 +272,13 @@ func (pm *PeerManager) discoverOnDemand(cluster uint16,
return peers, nil
}

func (pm *PeerManager) discoverRelayPeersByPubsub(topic string) {
shardInfo, err := waku_proto.TopicsToRelayShards(topic)
func (pm *PeerManager) discoverPeersByPubsubTopic(pubsubTopic string, proto protocol.ID) {
shardInfo, err := waku_proto.TopicsToRelayShards(pubsubTopic)
if err != nil {
pm.logger.Error("failed to convert pubsub topic to shard", zap.String("topic", topic), zap.Error(err))
pm.logger.Error("failed to convert pubsub topic to shard", zap.String("topic", pubsubTopic), zap.Error(err))
return
}
pm.DiscoverAndConnectToPeers(pm.ctx, shardInfo[0].Cluster, shardInfo[0].Indices[0], relay.WakuRelayID_v200)
pm.DiscoverAndConnectToPeers(pm.ctx, shardInfo[0].Cluster, shardInfo[0].Indices[0], proto)
}

// ensureMinRelayConnsPerTopic makes sure there are min of D conns per pubsubTopic.
Expand All @@ -289,7 +297,7 @@ func (pm *PeerManager) ensureMinRelayConnsPerTopic() {
//Find not connected peers.
notConnectedPeers := pm.getNotConnectedPers(topicStr)
if notConnectedPeers.Len() == 0 {
pm.discoverRelayPeersByPubsub(topicStr)
pm.discoverPeersByPubsubTopic(topicStr, relay.WakuRelayID_v200)
continue
}
//Connect to eligible peers.
Expand Down Expand Up @@ -581,8 +589,9 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string,
if err == nil {
peerIDPtr = &peerID
} else {
//TODO: Trigger on-demand discovery for this topic.

//TODO:Trigger on-demand discovery for this topic and connect to peer immediately and set peerID.
//TODO: Use context to limit time for connectivity
//pm.discoverPeersByPubsubTopic(pubSubTopic, proto)
pm.logger.Debug("could not select random peer", zap.Error(err))
}
}
Expand Down
7 changes: 7 additions & 0 deletions waku/v2/protocol/filter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type (
FilterParameters struct {
Timeout time.Duration
MaxSubscribers int
pm *peermanager.PeerManager
}

Option func(*FilterParameters)
Expand Down Expand Up @@ -156,6 +157,12 @@ func WithMaxSubscribers(maxSubscribers int) Option {
}
}

func WithPeerManager(pm *peermanager.PeerManager) Option {
return func(params *FilterParameters) {
params.pm = pm
}
}

func DefaultOptions() []Option {
return []Option{
WithTimeout(24 * time.Hour),
Expand Down
6 changes: 5 additions & 1 deletion waku/v2/protocol/filter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi
wf.metrics = newMetrics(reg)
wf.subscriptions = NewSubscribersMap(params.Timeout)
wf.maxSubscriptions = params.MaxSubscribers

if params.pm != nil {
var enrField uint8
enrField |= (1 << 2)
params.pm.RegisterWakuProtocol(FilterSubscribeID_v20beta1, enrField)
}
return wf
}

Expand Down
6 changes: 6 additions & 0 deletions waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p
wakuLP.log = log.Named("lightpush")
wakuLP.pm = pm
wakuLP.metrics = newMetrics(reg)

return wakuLP
}

Expand All @@ -69,6 +70,11 @@ func (wakuLP *WakuLightPush) Start(ctx context.Context) error {
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest(ctx))
wakuLP.log.Info("Light Push protocol started")

if wakuLP.pm != nil {
var enrField uint8
enrField |= (1 << 3)
wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, enrField)
}
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions waku/v2/protocol/store/waku_store_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,10 @@ func NewWakuStore(p MessageProvider, pm *peermanager.PeerManager, timesource tim
wakuStore.pm = pm
wakuStore.metrics = newMetrics(reg)

if pm != nil {
var enrField uint8
enrField |= (1 << 1)
pm.RegisterWakuProtocol(StoreID_v20beta4, enrField)
}
return wakuStore
}
15 changes: 0 additions & 15 deletions waku/v2/protocol/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,3 @@ func PrefixTextMatch(prefix string) func(protocol.ID) bool {
return strings.HasPrefix(string(receivedProtocol), prefix)
}
}

// Convert wakuProtocols to enrBitField
/* func wakuProtoToENRFlags(protocol string) (uint8, error) {
var wakuENRField uint8
switch protocol {
case string(store.StoreID_v20beta4):
case string(filter.FilterSubscribeID_v20beta1):
case string(lightpush.LightPushID_v20beta1):
case string(relay.WakuRelayID_v200):
default:
return 0, errors.New("protocol provided is not a waku protocol")
}
return wakuENRField, nil
}
*/

0 comments on commit 27a8d46

Please sign in to comment.