-
Notifications
You must be signed in to change notification settings - Fork 0
/
rtmpproxy.go
172 lines (154 loc) · 3.26 KB
/
rtmpproxy.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package rtmpproxy
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"net"
amf "github.com/zhangpeihao/goamf"
)
type Server struct {
remoteAddr string
playUrl string
appName string
streamName string
}
func (s *Server) handleRtmpCommand(ch *rtmpChunkHeader, payload []byte) ([]byte, bool, error) {
br := bytes.NewReader(payload)
command, err := amf.ReadString(br)
if err != nil {
return nil, false, err
}
transid, err := amf.ReadDouble(br)
if err != nil {
return nil, false, err
}
args := make([]interface{}, 0, 1)
for br.Len() > 0 {
v, err := amf.ReadValue(br)
if err != nil {
return nil, false, err
}
args = append(args, v)
}
usecopy := false
switch command {
case "connect":
obj := args[0].(amf.Object)
obj["app"] = s.appName
obj["swfUrl"] = s.playUrl
obj["tcUrl"] = s.playUrl
case "releaseStream", "FCPublish":
args[1] = s.streamName
case "publish":
args[1] = s.streamName
usecopy = true
}
buf := bytes.NewBuffer(nil)
amf.WriteString(buf, command)
amf.WriteDouble(buf, transid)
for _, arg := range args {
amf.WriteValue(buf, arg)
}
return buf.Bytes(), usecopy, nil
}
func (s *Server) handleMessages(conn net.Conn, conn2 net.Conn) error {
var (
maxChunkSize = 128
usecopy = false
lastch rtmpChunkHeader
payload []byte
nread int
)
for !usecopy {
ch, err := rtmpReadHeader(conn)
if err != nil {
return err
}
if nread != 0 && ch.csid != lastch.csid {
return fmt.Errorf("unsupport multi-chunkstream at a time")
}
switch ch.format {
case 1:
ch.streamid = lastch.streamid
case 2:
ch.length = lastch.length
ch.typeid = lastch.typeid
ch.streamid = lastch.streamid
case 3:
ch.timestamp = lastch.timestamp
ch.length = lastch.length
ch.typeid = lastch.typeid
ch.streamid = lastch.streamid
}
lastch = *ch
if len(payload) != int(ch.length) {
payload = make([]byte, ch.length)
}
n := maxChunkSize
if rem := len(payload) - nread; rem < maxChunkSize {
n = rem
}
_, err = io.ReadFull(conn, payload[nread:nread+n])
if err != nil {
return err
}
nread += n
if nread < len(payload) {
continue
}
switch ch.typeid {
case 1:
if len(payload) != 4 {
return fmt.Errorf("invalid type 0 payload size: %d", len(payload))
}
maxChunkSize = int(binary.BigEndian.Uint32(payload))
if maxChunkSize <= 0 {
return fmt.Errorf("invalid chunk size: %d", maxChunkSize)
}
case 20:
payload, usecopy, err = s.handleRtmpCommand(ch, payload)
if err != nil {
return err
}
}
err = writeRtmpMessage(conn2, ch, payload, maxChunkSize)
if err != nil {
return err
}
payload = nil
nread = 0
}
return nil
}
func (s *Server) Serve(conn net.Conn) error {
defer conn.Close()
conn2, err := net.Dial("tcp", s.remoteAddr)
if err != nil {
return err
}
defer conn2.Close()
err = shadowHandshake(conn, conn2)
if err != nil {
return err
}
go func() {
io.Copy(conn, conn2)
conn.Close()
conn2.Close()
}()
err = s.handleMessages(conn, conn2)
if err != nil {
return err
}
_, err = io.Copy(conn2, conn)
return err
}
func NewServer(remoteAddr string, appName string, playUrl string, streamName string) *Server {
return &Server{
remoteAddr: remoteAddr,
appName: appName,
playUrl: playUrl,
streamName: streamName,
}
}