forked from libp2p/go-libp2p
-
Notifications
You must be signed in to change notification settings - Fork 0
/
swarm_listen.go
157 lines (136 loc) · 3.43 KB
/
swarm_listen.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package swarm
import (
"fmt"
"time"
"github.com/libp2p/go-libp2p/core/canonicallog"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/transport"
ma "github.com/multiformats/go-multiaddr"
)
// Listen sets up listeners for all of the given addresses.
// It returns as long as we successfully listen on at least *one* address.
func (s *Swarm) Listen(addrs ...ma.Multiaddr) error {
errs := make([]error, len(addrs))
var succeeded int
for i, a := range addrs {
if err := s.AddListenAddr(a); err != nil {
errs[i] = err
} else {
succeeded++
}
}
for i, e := range errs {
if e != nil {
log.Warnw("listening failed", "on", addrs[i], "error", errs[i])
}
}
if succeeded == 0 && len(addrs) > 0 {
return fmt.Errorf("failed to listen on any addresses: %s", errs)
}
return nil
}
// ListenClose stop and delete listeners for all of the given addresses.
func (s *Swarm) ListenClose(addrs ...ma.Multiaddr) {
var listenersToClose []transport.Listener
s.listeners.Lock()
for l := range s.listeners.m {
if !containsMultiaddr(addrs, l.Multiaddr()) {
continue
}
delete(s.listeners.m, l)
listenersToClose = append(listenersToClose, l)
}
s.listeners.cacheEOL = time.Time{}
s.listeners.Unlock()
for _, l := range listenersToClose {
l.Close()
}
}
// AddListenAddr tells the swarm to listen on a single address. Unlike Listen,
// this method does not attempt to filter out bad addresses.
func (s *Swarm) AddListenAddr(a ma.Multiaddr) error {
tpt := s.TransportForListening(a)
if tpt == nil {
// TransportForListening will return nil if either:
// 1. No transport has been registered.
// 2. We're closed (so we've nulled out the transport map.
//
// Distinguish between these two cases to avoid confusing users.
select {
case <-s.ctx.Done():
return ErrSwarmClosed
default:
return ErrNoTransport
}
}
list, err := tpt.Listen(a)
if err != nil {
return err
}
s.listeners.Lock()
if s.listeners.m == nil {
s.listeners.Unlock()
list.Close()
return ErrSwarmClosed
}
s.refs.Add(1)
s.listeners.m[list] = struct{}{}
s.listeners.cacheEOL = time.Time{}
s.listeners.Unlock()
maddr := list.Multiaddr()
// signal to our notifiees on listen.
s.notifyAll(func(n network.Notifiee) {
n.Listen(s, maddr)
})
go func() {
defer func() {
s.listeners.Lock()
_, ok := s.listeners.m[list]
if ok {
delete(s.listeners.m, list)
s.listeners.cacheEOL = time.Time{}
}
s.listeners.Unlock()
if ok {
list.Close()
log.Errorf("swarm listener unintentionally closed")
}
// signal to our notifiees on listen close.
s.notifyAll(func(n network.Notifiee) {
n.ListenClose(s, maddr)
})
s.refs.Done()
}()
for {
c, err := list.Accept()
if err != nil {
return
}
canonicallog.LogPeerStatus(100, c.RemotePeer(), c.RemoteMultiaddr(), "connection_status", "established", "dir", "inbound")
log.Debugf("swarm listener accepted connection: %s <-> %s", c.LocalMultiaddr(), c.RemoteMultiaddr())
s.refs.Add(1)
go func() {
defer s.refs.Done()
_, err := s.addConn(c, network.DirInbound)
switch err {
case nil:
case ErrSwarmClosed:
// ignore.
return
default:
log.Warnw("adding connection failed", "to", a, "error", err)
return
}
}()
}
}()
return nil
}
func containsMultiaddr(addrs []ma.Multiaddr, addr ma.Multiaddr) bool {
for _, a := range addrs {
if addr == a {
return true
}
}
return false
}