This repository has been archived by the owner on Jul 13, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 28
/
transportbase.go
137 lines (114 loc) · 3.18 KB
/
transportbase.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package quic
import (
"crypto"
"crypto/x509"
"errors"
"fmt"
"net"
"sync"
"github.com/pion/quic/internal/wrapper"
)
// TransportBase is the base for Transport. Most of the
// functionality of a Transport is in the base class to allow for
// other subclasses (such as a p2p variant) to share the same interface.
type TransportBase struct {
lock sync.RWMutex
onBidirectionalStreamHdlr func(*BidirectionalStream)
session *wrapper.Session
}
// Config is used to hold the configuration of StartBase
type Config struct {
Client bool
Certificate *x509.Certificate
PrivateKey crypto.PrivateKey
}
// StartBase is used to start the TransportBase. Most implementations
// should instead use the methods on quic.Transport or
// webrtc.QUICTransport to setup a Quic connection.
func (b *TransportBase) StartBase(conn net.Conn, config *Config) error {
cfg := config.clone()
cfg.SkipVerify = true // Using self signed certificates; WebRTC will check the fingerprint
var s *wrapper.Session
var err error
if config.Client {
// Assumes the peer offered to be passive and we accepted.
s, err = wrapper.Client(conn, cfg)
} else {
// Assumes we offer to be passive and this is accepted.
var l *wrapper.Listener
l, err = wrapper.Server(conn, cfg)
if err != nil {
return err
}
s, err = l.Accept()
}
if err != nil {
return err
}
return b.startBase(s)
}
func (b *TransportBase) startBase(s *wrapper.Session) error {
b.session = s
go b.acceptStreams()
return nil
}
func (c *Config) clone() *wrapper.Config {
return &wrapper.Config{
Certificate: c.Certificate,
PrivateKey: c.PrivateKey,
}
}
// CreateBidirectionalStream creates an QuicBidirectionalStream object.
func (b *TransportBase) CreateBidirectionalStream() (*BidirectionalStream, error) {
s, err := b.session.OpenStream()
if err != nil {
return nil, err
}
return &BidirectionalStream{
s: s,
}, nil
}
// OnBidirectionalStream allows setting an event handler for that is fired
// when data is received from a BidirectionalStream for the first time.
func (b *TransportBase) OnBidirectionalStream(f func(*BidirectionalStream)) {
b.lock.Lock()
defer b.lock.Unlock()
b.onBidirectionalStreamHdlr = f
}
func (b *TransportBase) onBidirectionalStream(s *BidirectionalStream) {
b.lock.Lock()
f := b.onBidirectionalStreamHdlr
b.lock.Unlock()
if f != nil {
go f(s)
}
}
// GetRemoteCertificates returns the certificate chain in use by the remote side
func (b *TransportBase) GetRemoteCertificates() []*x509.Certificate {
return b.session.GetRemoteCertificates()
}
func (b *TransportBase) acceptStreams() {
for {
s, err := b.session.AcceptStream()
if err != nil {
fmt.Println("Failed to accept stream:", err)
// TODO: Kill TransportBase?
return
}
stream := &BidirectionalStream{s: s}
b.onBidirectionalStream(stream)
}
}
// Stop stops and closes the TransportBase.
func (b *TransportBase) Stop(stopInfo TransportStopInfo) error {
b.lock.Lock()
defer b.lock.Unlock()
if b.session == nil {
return nil
}
if stopInfo.ErrorCode > 0 ||
len(stopInfo.Reason) > 0 {
return b.session.CloseWithError(stopInfo.ErrorCode, errors.New(stopInfo.Reason))
}
return b.session.Close()
}