-
Notifications
You must be signed in to change notification settings - Fork 9
/
name.go
139 lines (111 loc) · 3.52 KB
/
name.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package name
import (
"context"
"path"
cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
discovery "github.com/libp2p/go-libp2p-discovery"
pubsub "github.com/libp2p/go-libp2p-pubsub"
namesys "github.com/libp2p/go-libp2p-pubsub-router"
)
// Namespace is the pubsub topic namespace.
const Namespace = "multiverse"
// System performs name resolution.
type System struct {
values *namesys.PubsubValueStore
}
// TopicForPeerID returns the topic name for the given peer id.
func TopicForPeerID(id peer.ID) string {
return path.Join("/", Namespace, peer.Encode(id))
}
// NewNameSystem returns a new name system.
func NewSystem(ctx context.Context, host host.Host, router routing.Routing, dstore datastore.Datastore) (*System, error) {
dis := discovery.NewRoutingDiscovery(router)
// TODO use datastore to persist values
sub, err := pubsub.NewGossipSub(ctx, host, pubsub.WithDiscovery(dis))
if err != nil {
return nil, err
}
values, err := namesys.NewPubsubValueStore(ctx, host, sub, Validator{})
if err != nil {
return nil, err
}
return &System{
values: values,
}, nil
}
// GetValue returns the latest value for the topic with the given peer id.
func (s *System) GetValue(ctx context.Context, id peer.ID) (*Record, error) {
val, err := s.values.GetValue(ctx, TopicForPeerID(id))
if err != nil {
return nil, err
}
return RecordFromCBOR(val)
}
// PutValue publishes the value under the topic of the given peer id.
func (s *System) PutValue(ctx context.Context, id peer.ID, rec *Record) error {
val, err := rec.Bytes()
if err != nil {
return err
}
return s.values.PutValue(ctx, TopicForPeerID(id), val)
}
// Search searches for the the latest value from the topic with the given peer ID.
func (s *System) SearchValue(ctx context.Context, id peer.ID) (*Record, error) {
out, err := s.values.SearchValue(ctx, TopicForPeerID(id))
if err != nil {
return nil, err
}
val, ok := <-out
if !ok {
return nil, routing.ErrNotFound
}
return RecordFromCBOR(val)
}
// Subscribe creates a subscription to the topic of the given peer ID.
func (s *System) Subscribe(id peer.ID) error {
return s.values.Subscribe(TopicForPeerID(id))
}
// Unsubscribe cancels a subscription to the topic of the given peer ID.
func (s *System) Unsubscribe(id peer.ID) (bool, error) {
return s.values.Cancel(TopicForPeerID(id))
}
// Publish advertises the given id to the topic of the peer ID from the private key.
func (s *System) Publish(ctx context.Context, key crypto.PrivKey, id cid.Cid) error {
peerID, err := peer.IDFromPrivateKey(key)
if err != nil {
return err
}
val, err := s.GetValue(ctx, peerID)
if err != nil && err != routing.ErrNotFound {
return err
}
rec := NewRecord(id.Bytes())
if val != nil {
rec.Sequence = val.Sequence + 1
}
if err := rec.Sign(key); err != nil {
return err
}
return s.PutValue(ctx, peerID, rec)
}
// Resolve returns the latest value from the topic with the given peer ID.
func (s *System) Resolve(ctx context.Context, id peer.ID) (cid.Cid, error) {
rec, err := s.GetValue(ctx, id)
if err != nil {
return cid.Cid{}, err
}
return cid.Cast(rec.Value)
}
// Search searches for the the latest value from the topic with the given peer ID.
func (s *System) Search(ctx context.Context, id peer.ID) (cid.Cid, error) {
rec, err := s.SearchValue(ctx, id)
if err != nil {
return cid.Cid{}, err
}
return cid.Cast(rec.Value)
}