/
transport.go
112 lines (91 loc) · 2.5 KB
/
transport.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package star
import (
"context"
"fmt"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/transport"
ma "github.com/multiformats/go-multiaddr"
"github.com/pion/webrtc/v2"
"sync"
)
type Transport struct {
signals map[string]*signal
m sync.Mutex
addressBook addressBook
peerID peer.ID
signalConfiguration SignalConfiguration
webRTCConfiguration webrtc.Configuration
multiplexer mux.Multiplexer
}
var _ transport.Transport = new(Transport)
func (t *Transport) Dial(ctx context.Context, raddr ma.Multiaddr, p peer.ID) (transport.CapableConn, error) {
logger.Debugf("Dial peer (ID: %s, address: %v)", p, raddr)
signal, err := t.getOrRegisterSignal(raddr)
if err != nil {
return nil, err
}
return signal.dial(ctx, p)
}
func (t *Transport) Listen(laddr ma.Multiaddr) (transport.Listener, error) {
logger.Debugf("Listen on address: %s", laddr)
signal, err := t.getOrRegisterSignal(laddr)
if err != nil {
return nil, err
}
return newListener(laddr, signal, t.unregisterSignal)
}
func (t *Transport) getOrRegisterSignal(addr ma.Multiaddr) (*signal, error) {
var err error
sAddr := addr.String()
t.m.Lock()
defer t.m.Unlock()
if signal, ok := t.signals[sAddr]; ok {
return signal, nil
}
t.signals[sAddr], err = newSignal(t, addr, t.addressBook, t.peerID, t.signalConfiguration, t.webRTCConfiguration,
t.multiplexer)
if err != nil {
return nil, err
}
return t.signals[sAddr], nil
}
func (t *Transport) unregisterSignal(addr ma.Multiaddr) error {
sAddr := addr.String()
t.m.Lock()
defer t.m.Unlock()
if signal, ok := t.signals[sAddr]; ok {
err := signal.close()
if err != nil {
logger.Errorf("Error while closing signal: %v", err)
}
delete(t.signals, sAddr)
return nil
}
return fmt.Errorf(`no signal registered for "%s"`, sAddr)
}
func (t *Transport) CanDial(addr ma.Multiaddr) bool {
return format.Matches(addr)
}
func (t *Transport) Protocols() []int {
return []int{protocol.Code}
}
func (t *Transport) Proxy() bool {
return false
}
func New(peerID peer.ID, peerstore addressBook, multiplexer mux.Multiplexer) *Transport {
return &Transport{
signals: map[string]*signal{},
peerID: peerID,
addressBook: peerstore,
multiplexer: multiplexer,
}
}
func (t *Transport) WithSignalConfiguration(c SignalConfiguration) *Transport {
t.signalConfiguration = c
return t
}
func (t *Transport) WithWebRTCConfiguration(c webrtc.Configuration) *Transport {
t.webRTCConfiguration = c
return t
}