/
stream.go
115 lines (95 loc) · 2.74 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package rtp
// Payload type description, as provided via SDP.
type PayloadType struct {
// Payload type number (<= 127) assigned by the SDP `rtpmap` attribute.
Number uint8
// Encoding name, from the SDP `rtpmap` attribute (e.g. "H264").
Name string
// Clock rate in Hz, from the SDP `rtpmap` attribute (e.g. 90000).
ClockRate int
// Codec-specific format parameters, from the SDP `fmtp` attribute.
Format string
// Supported feedback RTCP options, from the SDP `rtcp-fb` attributes.
FeedbackOptions []string
}
type StreamOptions struct {
LocalSSRC uint32
LocalCNAME string
RemoteSSRC uint32
RemoteCNAME string
// sendonly, recvonly, or sendrecv
Direction string
// Negotiated payload types, keyed by 7-bit dynamic payload type number.
PayloadTypes map[byte]PayloadType
// Maximum size of outgoing packets, factoring in MTU and protocol overhead.
MaxPacketSize int
}
type Stream struct {
StreamOptions
// RTP state for outgoing data.
rtpOut *rtpWriter
// RTP state for incoming data.
rtpIn *rtpReader
// RTCP state for outgoing control packets.
rtcpOut *rtcpWriter
// RTCP state for incoming control packets.
rtcpIn *rtcpReader
}
func newStream(session *Session, opts StreamOptions) *Stream {
// TODO: Validate options.
s := new(Stream)
s.StreamOptions = opts
if opts.Direction == "sendonly" || opts.Direction == "sendrecv" {
s.rtpOut = newRTPWriter(session.DataConn, opts.LocalSSRC, session.writeContext)
}
if opts.Direction == "recvonly" || opts.Direction == "sendrecv" {
s.rtpIn = newRTPReader(opts.RemoteSSRC, session.readContext)
}
s.rtcpOut = newRTCPWriter(session.ControlConn, opts.LocalSSRC, session.writeContext)
s.rtcpIn = newRTCPReader(opts.RemoteSSRC, session.readContext)
return s
}
func (s *Stream) Close() error {
s.sendGoodbye("stream closed")
s.rtpOut.cache.Clear()
s.rtpOut = nil
s.rtpIn = nil
return nil
}
func (s *Stream) sendSenderReport() error {
sdes := &rtcpSourceDescription{
ssrc: s.LocalSSRC,
cname: s.LocalCNAME,
}
return s.rtcpOut.writePacket(sdes)
}
func (s *Stream) sendReceiverReport() error {
rr := &rtcpReceiverReport{
receiver: s.LocalSSRC,
reports: []rtcpReport{{
Source: s.RemoteSSRC,
LastReceived: uint32(s.rtpIn.lastIndex),
// TODO: Jitter, arrival delay, etc.
}},
}
sdes := &rtcpSourceDescription{
ssrc: s.LocalSSRC,
cname: s.LocalCNAME,
}
return s.rtcpOut.writePacket(rr, sdes)
}
// Send RTCP Goodbye packet to inform the remote peer that we're leaving.
func (s *Stream) sendGoodbye(reason string) error {
rr := &rtcpReceiverReport{
receiver: s.LocalSSRC,
}
sdes := &rtcpSourceDescription{
ssrc: s.LocalSSRC,
cname: s.LocalCNAME,
}
bye := &rtcpGoodbye{
ssrc: s.LocalSSRC,
reason: reason,
}
return s.rtcpOut.writePacket(rr, sdes, bye)
}