forked from ipfs/kubo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub.go
122 lines (96 loc) · 2.64 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package coreapi
import (
"context"
"errors"
coreiface "github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
peer "github.com/libp2p/go-libp2p-core/peer"
routing "github.com/libp2p/go-libp2p-core/routing"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
type PubSubAPI CoreAPI
type pubSubSubscription struct {
subscription *pubsub.Subscription
}
type pubSubMessage struct {
msg *pubsub.Message
}
func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) {
_, err := api.checkNode()
if err != nil {
return nil, err
}
return api.pubSub.GetTopics(), nil
}
func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) {
_, err := api.checkNode()
if err != nil {
return nil, err
}
settings, err := caopts.PubSubPeersOptions(opts...)
if err != nil {
return nil, err
}
return api.pubSub.ListPeers(settings.Topic), nil
}
func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) error {
_, err := api.checkNode()
if err != nil {
return err
}
//nolint deprecated
return api.pubSub.Publish(topic, data)
}
func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) {
// Parse the options to avoid introducing silent failures for invalid
// options. However, we don't currently have any use for them. The only
// subscription option, discovery, is now a no-op as it's handled by
// pubsub itself.
_, err := caopts.PubSubSubscribeOptions(opts...)
if err != nil {
return nil, err
}
_, err = api.checkNode()
if err != nil {
return nil, err
}
//nolint deprecated
sub, err := api.pubSub.Subscribe(topic)
if err != nil {
return nil, err
}
return &pubSubSubscription{sub}, nil
}
func (api *PubSubAPI) checkNode() (routing.Routing, error) {
if api.pubSub == nil {
return nil, errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
}
err := api.checkOnline(false)
if err != nil {
return nil, err
}
return api.routing, nil
}
func (sub *pubSubSubscription) Close() error {
sub.subscription.Cancel()
return nil
}
func (sub *pubSubSubscription) Next(ctx context.Context) (coreiface.PubSubMessage, error) {
msg, err := sub.subscription.Next(ctx)
if err != nil {
return nil, err
}
return &pubSubMessage{msg}, nil
}
func (msg *pubSubMessage) From() peer.ID {
return peer.ID(msg.msg.From)
}
func (msg *pubSubMessage) Data() []byte {
return msg.msg.Data
}
func (msg *pubSubMessage) Seq() []byte {
return msg.msg.Seqno
}
func (msg *pubSubMessage) Topics() []string {
return msg.msg.TopicIDs
}