-
Notifications
You must be signed in to change notification settings - Fork 211
/
wrapper.go
85 lines (77 loc) · 2.29 KB
/
wrapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package pubsub
import (
"context"
"errors"
"fmt"
"sync"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p/metrics"
)
// PubSub is a spacemesh-specific wrapper around gossip protocol.
type PubSub struct {
logger log.Log
pubsub *pubsub.PubSub
host host.Host
mu sync.RWMutex
topics map[string]*pubsub.Topic
}
// Register handler for topic.
func (ps *PubSub) Register(topic string, handler GossipHandler, opts ...ValidatorOpt) {
ps.mu.Lock()
defer ps.mu.Unlock()
if _, exist := ps.topics[topic]; exist {
ps.logger.Panic("already registered a topic %s", topic)
}
// Drop peers on ValidationRejectErr
handler = DropPeerOnValidationReject(handler, ps.host, ps.logger)
ps.pubsub.RegisterTopicValidator(
topic,
func(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
start := time.Now()
err := handler(log.WithNewRequestID(ctx), pid, msg.Data)
metrics.ProcessedMessagesDuration.WithLabelValues(topic, castResult(err)).
Observe(float64(time.Since(start)))
switch {
case errors.Is(err, ErrValidationReject):
return pubsub.ValidationReject
case err != nil:
return pubsub.ValidationIgnore
default:
return pubsub.ValidationAccept
}
},
opts...)
topich, err := ps.pubsub.Join(topic)
if err != nil {
ps.logger.With().Panic("failed to join a topic", log.String("topic", topic), log.Err(err))
}
ps.topics[topic] = topich
_, err = topich.Relay()
if err != nil {
ps.logger.With().Panic("failed to enable relay for topic",
log.String("topic", topic),
log.Err(err),
)
}
}
// Publish message to the topic.
func (ps *PubSub) Publish(ctx context.Context, topic string, msg []byte) error {
ps.mu.RLock()
defer ps.mu.RUnlock()
topich := ps.topics[topic]
if topich == nil {
ps.logger.Panic("Publish is called before Register for topic %s", topic)
}
if err := topich.Publish(ctx, msg); err != nil {
return fmt.Errorf("failed to publish to topic %v: %w", topic, err)
}
return nil
}
// ProtocolPeers returns list of peers that are communicating in a given protocol.
func (ps *PubSub) ProtocolPeers(protocol string) []peer.ID {
return ps.pubsub.ListPeers(protocol)
}