-
-
Notifications
You must be signed in to change notification settings - Fork 3k
/
providers.go
154 lines (132 loc) · 3.11 KB
/
providers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package dht
import (
"time"
key "github.com/ipfs/go-ipfs/blocks/key"
goprocess "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
goprocessctx "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess/context"
peer "gx/ipfs/QmUBogf4nUefBjmYjn6jfsfPJRkmDGSeMhNj4usRKq69f4/go-libp2p/p2p/peer"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)
type ProviderManager struct {
// all non channel fields are meant to be accessed only within
// the run method
providers map[key.Key]*providerSet
local map[key.Key]struct{}
lpeer peer.ID
getlocal chan chan []key.Key
newprovs chan *addProv
getprovs chan *getProv
period time.Duration
proc goprocess.Process
}
type providerSet struct {
providers []peer.ID
set map[peer.ID]time.Time
}
type addProv struct {
k key.Key
val peer.ID
}
type getProv struct {
k key.Key
resp chan []peer.ID
}
func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
pm := new(ProviderManager)
pm.getprovs = make(chan *getProv)
pm.newprovs = make(chan *addProv)
pm.providers = make(map[key.Key]*providerSet)
pm.getlocal = make(chan chan []key.Key)
pm.local = make(map[key.Key]struct{})
pm.proc = goprocessctx.WithContext(ctx)
pm.proc.Go(func(p goprocess.Process) { pm.run() })
return pm
}
func (pm *ProviderManager) run() {
tick := time.NewTicker(time.Hour)
for {
select {
case np := <-pm.newprovs:
if np.val == pm.lpeer {
pm.local[np.k] = struct{}{}
}
provs, ok := pm.providers[np.k]
if !ok {
provs = newProviderSet()
pm.providers[np.k] = provs
}
provs.Add(np.val)
case gp := <-pm.getprovs:
var parr []peer.ID
provs, ok := pm.providers[gp.k]
if ok {
parr = provs.providers
}
gp.resp <- parr
case lc := <-pm.getlocal:
var keys []key.Key
for k := range pm.local {
keys = append(keys, k)
}
lc <- keys
case <-tick.C:
for _, provs := range pm.providers {
var filtered []peer.ID
for p, t := range provs.set {
if time.Now().Sub(t) > time.Hour*24 {
delete(provs.set, p)
} else {
filtered = append(filtered, p)
}
}
provs.providers = filtered
}
case <-pm.proc.Closing():
return
}
}
}
func (pm *ProviderManager) AddProvider(ctx context.Context, k key.Key, val peer.ID) {
prov := &addProv{
k: k,
val: val,
}
select {
case pm.newprovs <- prov:
case <-ctx.Done():
}
}
func (pm *ProviderManager) GetProviders(ctx context.Context, k key.Key) []peer.ID {
gp := &getProv{
k: k,
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
}
select {
case <-ctx.Done():
return nil
case pm.getprovs <- gp:
}
select {
case <-ctx.Done():
return nil
case peers := <-gp.resp:
return peers
}
}
func (pm *ProviderManager) GetLocal() []key.Key {
resp := make(chan []key.Key)
pm.getlocal <- resp
return <-resp
}
func newProviderSet() *providerSet {
return &providerSet{
set: make(map[peer.ID]time.Time),
}
}
func (ps *providerSet) Add(p peer.ID) {
_, found := ps.set[p]
if !found {
ps.providers = append(ps.providers, p)
}
ps.set[p] = time.Now()
}