/
stream.go
162 lines (135 loc) · 3.42 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package ice
import (
"errors"
"io"
"net"
"time"
)
var ErrReadTimeout = errors.New("read timeout")
// A DataStream represents an active connection between two peers. ICE control
// packets are filtered out and handled by the ICE agent, so reads and writes
// deal exclusively with the data-level protocol.
type DataStream struct {
// Parent connection. Write operations pass through to the parent, read
// operations use the in channel.
conn net.PacketConn
// Remote address that this data stream writes to.
raddr net.Addr
// Inbound packet stream, fed by a read loop on the parent connection.
in <-chan []byte
// Single-fire channel used to indicate that the read loop has terminated.
dead <-chan struct{}
// Function that returns why the read loop died.
cause func() error
// Timer used to implement read deadlines.
timer *time.Timer
// Signal channel used to notify pending reads when the deadline changes.
notify chan struct{}
}
// Create a new DataStream for the selected candidate pair.
func newDataStream(p *CandidatePair, dataIn <-chan []byte) *DataStream {
base := p.local.base
return &DataStream{
conn: base,
raddr: p.remote.address.netAddr(),
in: dataIn,
dead: base.dead,
cause: func() error {
return base.err
},
}
}
// Update a data stream with the new selected candidate pair.
func (s *DataStream) update(p *CandidatePair) {
base := p.local.base
s.conn = base
s.raddr = p.remote.address.netAddr()
s.dead = base.dead
s.cause = func() error {
return base.err
}
}
// Return a channel that fires when the data session is severed.
func (s *DataStream) Done() <-chan struct{} {
return s.dead
}
func (s *DataStream) Err() error {
return s.cause()
}
func (s *DataStream) Write(b []byte) (int, error) {
return s.conn.WriteTo(b, s.raddr)
}
func (s *DataStream) Read(b []byte) (int, error) {
if s.notify == nil {
s.notify = make(chan struct{})
}
for {
var timeout <-chan time.Time
if s.timer != nil {
timeout = s.timer.C
}
select {
case <-s.notify:
continue
case <-s.dead:
return 0, io.EOF
case <-timeout:
return 0, ErrReadTimeout
case data := <-s.in:
n := len(data)
if n > len(b) {
// For packet-oriented connections, the destination buffer must
// be large enough to fit an entire packet.
return 0, io.ErrShortBuffer
}
copy(b, data)
return n, nil
}
}
}
func (s *DataStream) Close() error {
return s.conn.Close()
}
func (s *DataStream) LocalAddr() net.Addr {
return s.conn.LocalAddr()
}
func (s *DataStream) RemoteAddr() net.Addr {
return s.raddr
}
func (s *DataStream) SetDeadline(t time.Time) error {
if err := s.SetReadDeadline(t); err != nil {
return err
}
return s.SetWriteDeadline(t)
}
func (s *DataStream) SetWriteDeadline(t time.Time) error {
return s.conn.SetWriteDeadline(t)
}
// See net.Conn#SetReadDeadline for semantics.
func (s *DataStream) SetReadDeadline(t time.Time) error {
if s.timer != nil {
if !s.timer.Stop() {
// Drain the channel if we were unable to stop the timer before it
// fired. This may race with pending reads, so use best effort.
select {
case <-s.timer.C:
default:
}
}
}
if t.IsZero() {
// Zero deadline means no timeout.
s.timer = nil
} else if s.timer == nil {
s.timer = time.NewTimer(time.Until(t))
} else {
s.timer.Reset(time.Until(t))
}
// Notify pending reads of the new deadline.
n := s.notify
s.notify = make(chan struct{})
if n != nil {
close(n)
}
return nil
}