Skip to content

Commit

Permalink
Merge pull request #341 from ZSC714725/ws-rtsp
Browse files Browse the repository at this point in the history
[feat] 支持rtsp-over-websocket播放
  • Loading branch information
q191201771 committed Feb 5, 2024
2 parents 8fa60e4 + 7e0c354 commit 0beec39
Show file tree
Hide file tree
Showing 8 changed files with 371 additions and 35 deletions.
4 changes: 3 additions & 1 deletion conf/lalserver.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@
"auth_enable": false,
"auth_method": 1,
"username": "q191201771",
"password": "pengrl"
"password": "pengrl",
"ws_rtsp_enable": true,
"ws_rtsp_addr": ":5566"
},
"record": {
"enable_flv": false,
Expand Down
2 changes: 1 addition & 1 deletion pkg/base/basic_http_sub_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (session *BasicHttpSubSession) Dispose() error {

func (session *BasicHttpSubSession) WriteHttpResponseHeader(b []byte) {
if session.IsWebSocket {
session.write(UpdateWebSocketHeader(session.WebSocketKey))
session.write(UpdateWebSocketHeader(session.WebSocketKey, ""))
} else {
session.write(b)
}
Expand Down
156 changes: 147 additions & 9 deletions pkg/base/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@
package base

import (
"bufio"
"crypto/sha1"
"encoding/base64"
"encoding/binary"
"fmt"
"io"
"math"

"github.com/q191201771/naza/pkg/bele"
Expand Down Expand Up @@ -140,17 +144,151 @@ func MakeWsFrameHeader(wsHeader WsHeader) (buf []byte) {
}
return buf
}
func UpdateWebSocketHeader(secWebSocketKey string) []byte {

func UpdateWebSocketHeader(secWebSocketKey, protocol string) []byte {
firstLine := "HTTP/1.1 101 Switching Protocol\r\n"
sha1Sum := sha1.Sum([]byte(secWebSocketKey + WsMagicStr))
secWebSocketAccept := base64.StdEncoding.EncodeToString(sha1Sum[:])
webSocketResponseHeaderStr := firstLine +
"Server: " + LalHttpflvSubSessionServer + "\r\n" +
"Sec-WebSocket-Accept:" + secWebSocketAccept + "\r\n" +
"Keep-Alive: timeout=15, max=100\r\n" +
"Connection: Upgrade\r\n" +
"Upgrade: websocket\r\n" +
CorsHeaders +
"\r\n"

var webSocketResponseHeaderStr string
if protocol == "" {
webSocketResponseHeaderStr = firstLine +
"Server: " + LalHttpflvSubSessionServer + "\r\n" +
"Sec-WebSocket-Accept:" + secWebSocketAccept + "\r\n" +
"Keep-Alive: timeout=15, max=100\r\n" +
"Connection: Upgrade\r\n" +
"Upgrade: websocket\r\n" +
CorsHeaders +
"\r\n"
} else {
webSocketResponseHeaderStr = firstLine +
"Server: " + LalHttpflvSubSessionServer + "\r\n" +
"Sec-WebSocket-Accept:" + secWebSocketAccept + "\r\n" +
"Keep-Alive: timeout=15, max=100\r\n" +
"Connection: Upgrade\r\n" +
"Upgrade: websocket\r\n" +
CorsHeaders +
"Sec-WebSocket-Protocol:" + protocol + "\r\n" +
"\r\n"
}
return []byte(webSocketResponseHeaderStr)
}

func ReadWsPayload(r *bufio.Reader) ([]byte, error) {
var h WsHeader

buf := make([]byte, 2)
_, err := io.ReadFull(r, buf)
if err != nil {
return nil, err
}

h.Fin = (buf[0] & 0x80) != 0
h.Rsv1 = (buf[0] & 0x40) != 0
h.Rsv2 = (buf[0] & 0x20) != 0
h.Rsv3 = (buf[0] & 0x10) != 0
h.Opcode = buf[0] & 0x0f

if buf[1]&0x80 != 0 {
h.Masked = true
}

length := buf[1] & 0x7f
switch {
case length < 126:
h.PayloadLength = uint64(length)
case length == 126:
buf = make([]byte, 2)
_, err := io.ReadFull(r, buf)
if err != nil {
return nil, err
}

h.PayloadLength = uint64(binary.BigEndian.Uint16(buf))
case length == 127:
buf = make([]byte, 8)
_, err := io.ReadFull(r, buf)
if err != nil {
return nil, err
}

h.PayloadLength = binary.BigEndian.Uint64(buf)

default:
err = fmt.Errorf("header error: the most significant bit must be 0")
return nil, err
}

if h.Masked {
buf = make([]byte, 4)
_, err := io.ReadFull(r, buf)
if err != nil {
return nil, err
}

h.MaskKey = bele.BeUint32(buf)
}

payload := make([]byte, h.PayloadLength)
_, err = io.ReadFull(r, payload)
if err != nil {
return nil, err
}

if h.Masked {
mask := make([]byte, 4)
binary.BigEndian.PutUint32(mask, h.MaskKey)
cipher(payload, mask, 0)
}

return payload, nil
}

func cipher(payload []byte, mask []byte, offset int) {
n := len(payload)
if n < 8 {
for i := 0; i < n; i++ {
payload[i] ^= mask[(offset+i)%4]
}
return
}

// Calculate position in mask due to previously processed bytes number.
mpos := offset % 4
// Count number of bytes will processed one by one from the beginning of payload.
ln := remain[mpos]
// Count number of bytes will processed one by one from the end of payload.
// This is done to process payload by 8 bytes in each iteration of main loop.
rn := (n - ln) % 8

for i := 0; i < ln; i++ {
payload[i] ^= mask[(mpos+i)%4]
}
for i := n - rn; i < n; i++ {
payload[i] ^= mask[(mpos+i)%4]
}

// NOTE: we use here binary.LittleEndian regardless of what is real
// endianness on machine is. To do so, we have to use binary.LittleEndian in
// the masking loop below as well.
var (
m = binary.LittleEndian.Uint32((mask[:]))
m2 = uint64(m)<<32 | uint64(m)
)
// Skip already processed right part.
// Get number of uint64 parts remaining to process.
n = (n - ln - rn) >> 3
for i := 0; i < n; i++ {
var (
j = ln + (i << 3)
chunk = payload[j : j+8]
)
p := binary.LittleEndian.Uint64(chunk)
p = p ^ m2
binary.LittleEndian.PutUint64(chunk, p)
}
}

// remain maps position in masking key [0,4) to number
// of bytes that need to be processed manually inside Cipher().
var remain = [4]int{0, 3, 2, 1}
2 changes: 2 additions & 0 deletions pkg/logic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ type RtspConfig struct {
RtspsCertFile string `json:"rtsps_cert_file"`
RtspsKeyFile string `json:"rtsps_key_file"`
OutWaitKeyFrameFlag bool `json:"out_wait_key_frame_flag"`
WsRtspEnable bool `json:"ws_rtsp_enable"`
WsRtspAddr string `json:"ws_rtsp_addr"`
rtsp.ServerAuthConfig
}

Expand Down
16 changes: 15 additions & 1 deletion pkg/logic/server_manager__.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ package logic
import (
"flag"
"fmt"
"github.com/q191201771/naza/pkg/taskpool"
"net/http"
_ "net/http/pprof"
"os"
"path/filepath"
"sync"
"time"

"github.com/q191201771/naza/pkg/taskpool"

"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/hls"
"github.com/q191201771/lal/pkg/httpflv"
Expand All @@ -45,6 +46,7 @@ type ServerManager struct {
rtspsServer *rtsp.Server
httpApiServer *HttpApiServer
pprofServer *http.Server
wsrtspServer *rtsp.WebsocketServer
exitChan chan struct{}

mutex sync.Mutex
Expand Down Expand Up @@ -139,6 +141,9 @@ Doc: %s
if sm.config.RtspConfig.RtspsEnable {
sm.rtspsServer = rtsp.NewServer(sm.config.RtspConfig.RtspsAddr, sm, sm.config.RtspConfig.ServerAuthConfig)
}
if sm.config.RtspConfig.WsRtspEnable {
sm.wsrtspServer = rtsp.NewWebsocketServer(sm.config.RtspConfig.WsRtspAddr, sm, sm.config.RtspConfig.ServerAuthConfig)
}
if sm.config.HttpApiConfig.Enable {
sm.httpApiServer = NewHttpApiServer(sm.config.HttpApiConfig.Addr, sm)
}
Expand Down Expand Up @@ -268,6 +273,15 @@ func (sm *ServerManager) RunLoop() error {
}
}

if sm.wsrtspServer != nil {
go func() {
err := sm.wsrtspServer.Listen()
if err != nil {
Log.Error(err)
}
}()
}

if sm.httpApiServer != nil {
if err := sm.httpApiServer.Listen(); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/rtsp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (s *Server) OnDelRtspSubSession(session *SubSession) {
// ---------------------------------------------------------------------------------------------------------------------

func (s *Server) handleTcpConnect(conn net.Conn) {
session := NewServerCommandSession(s, conn, s.auth)
session := NewServerCommandSession(s, conn, s.auth, false, "")
s.observer.OnNewRtspSessionConnect(session)

err := session.RunLoop()
Expand Down
Loading

0 comments on commit 0beec39

Please sign in to comment.