Skip to content

Commit

Permalink
merge changes from #864
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Nov 6, 2023
1 parent 7549987 commit 9093816
Showing 1 changed file with 14 additions and 13 deletions.
27 changes: 14 additions & 13 deletions waku/v2/peermanager/peer_selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID
// - which topics they track
// - latency?

if peerID := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopic, criteria.Ctx, criteria.SpecificPeers...); peerID != nil {
return *peerID, nil
peerID, err := pm.selectServicePeer(criteria.Proto, criteria.PubsubTopic, criteria.Ctx, criteria.SpecificPeers...)
if err == nil {
return peerID, nil
} else if !errors.Is(err, ErrNoPeersAvailable) {
pm.logger.Debug("could not retrieve random peer from slot", zap.String("protocol", string(criteria.Proto)),
zap.String("pubsubTopic", criteria.PubsubTopic), zap.Error(err))
return "", err
}

// if not found in serviceSlots or proto == WakuRelayIDv200
Expand All @@ -55,27 +60,23 @@ func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID
return selectRandomPeer(filteredPeers, pm.logger)
}

func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, ctx context.Context, specificPeers ...peer.ID) (peerIDPtr *peer.ID) {
peerIDPtr = nil
func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, ctx context.Context, specificPeers ...peer.ID) (peer.ID, error) {
var peerID peer.ID
var err error
for tries := 0; tries <= 1; tries++ {
//Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(proto); slot != nil {
if pubSubTopic == "" {
if peerID, err := slot.getRandom(); err == nil {
peerIDPtr = &peerID
}
break
return slot.getRandom()
} else { //PubsubTopic based selection
keys := make([]peer.ID, 0, len(slot.m))
for i := range slot.m {
keys = append(keys, i)
}
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, keys...)
peerID, err := selectRandomPeer(selectedPeers, pm.logger)
peerID, err = selectRandomPeer(selectedPeers, pm.logger)
if err == nil {
peerIDPtr = &peerID
break
return peerID, nil
} else {
//Trigger on-demand discovery for this topic and connect to peer immediately.
//For now discover atleast 1 peer for the criteria
Expand All @@ -86,10 +87,10 @@ func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string,
}
}
}
if peerIDPtr == nil {
if peerID == "" {
pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err))
}
return
return "", ErrNoPeersAvailable
}

// PeerSelectionCriteria is the selection Criteria that is used by PeerManager to select peers.
Expand Down

0 comments on commit 9093816

Please sign in to comment.