Skip to content

Commit

Permalink
[feat] httpts支持gop缓冲,提高秒开 #129
Browse files Browse the repository at this point in the history
  • Loading branch information
q191201771 committed Mar 19, 2022
1 parent aa480aa commit 9330270
Show file tree
Hide file tree
Showing 15 changed files with 236 additions and 23 deletions.
5 changes: 3 additions & 2 deletions conf/ConfigBrief.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
```
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief", //. 配置文件对应的文档说明链接,在程序中没实际用途
"conf_version": "0.2.8", //. 配置文件版本号,业务方不应该手动修改,程序中会检查该版本
"conf_version": "0.2.9", //. 配置文件版本号,业务方不应该手动修改,程序中会检查该版本
// 号是否与代码中声明的一致
"rtmp": {
"enable": true, //. 是否开启rtmp服务的监听
Expand Down Expand Up @@ -85,8 +85,9 @@
"httpts": {
"enable": true, //. 是否开启HTTP-TS服务的监听。注意,这并不是HLS中的TS,而是在一条HTTP长连接上持续性传输TS流
"enable_https": true, //. 是否开启HTTPS-TS监听
"url_pattern": "/" //. 拉流url路由路径地址。默认值为`/`,表示不受限制,路由地址可以为任意路径地址。
"url_pattern": "/", //. 拉流url路由路径地址。默认值为`/`,表示不受限制,路由地址可以为任意路径地址。
// 如果设置为`/live/`,则只能从`/live/`路径下拉流,比如`/live/test110.ts`
"gop_num": 0 //. 见rtmp.gop_num
},
"rtsp": {
"enable": true, //. 是否开启rtsp服务的监听,目前只支持rtsp推流
Expand Down
5 changes: 3 additions & 2 deletions conf/lalserver.conf.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.2.8",
"conf_version": "v0.2.9",
"rtmp": {
"enable": true,
"addr": ":1935",
Expand Down Expand Up @@ -35,7 +35,8 @@
"httpts": {
"enable": true,
"enable_https": true,
"url_pattern": "/"
"url_pattern": "/",
"gop_num": 0
},
"rtsp": {
"enable": true,
Expand Down
5 changes: 3 additions & 2 deletions conf/lalserver.conf.json.tmpl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.2.8",
"conf_version": "v0.2.9",
"rtmp": {
"enable": true,
"addr": ":1935",
Expand Down Expand Up @@ -35,7 +35,8 @@
"httpts": {
"enable": true,
"enable_https": true,
"url_pattern": "/"
"url_pattern": "/",
"gop_num": 0
},
"rtsp": {
"enable": true,
Expand Down
2 changes: 2 additions & 0 deletions pkg/base/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"strings"
)

// TODO(chef): [refactor] 整理 subsession 接口部分 IsFresh 和 ShouldWaitVideoKeyFrame

// group中,session Dispose表现记录
//
// Dispose结束后回调OnDel:
Expand Down
10 changes: 6 additions & 4 deletions pkg/httpts/server_sub_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ import (
var tsHttpResponseHeader []byte

type SubSession struct {
core *base.HttpSubSession
IsFresh bool
core *base.HttpSubSession
IsFresh bool
ShouldWaitBoundary bool
}

func NewSubSession(conn net.Conn, urlCtx base.UrlContext, isWebSocket bool, websocketKey string) *SubSession {
uk := base.GenUkTsSubSession()
s := &SubSession{
base.NewHttpSubSession(base.HttpSubSessionOption{
core: base.NewHttpSubSession(base.HttpSubSessionOption{
Conn: conn,
ConnModOption: func(option *connection.Option) {
option.WriteChanSize = SubSessionWriteChanSize
Expand All @@ -37,7 +38,8 @@ func NewSubSession(conn net.Conn, urlCtx base.UrlContext, isWebSocket bool, webs
IsWebSocket: isWebSocket,
WebSocketKey: websocketKey,
}),
true,
IsFresh: true,
ShouldWaitBoundary: true,
}
Log.Infof("[%s] lifecycle new httpts SubSession. session=%p, remote addr=%s", uk, s, conn.RemoteAddr().String())
return s
Expand Down
2 changes: 2 additions & 0 deletions pkg/innertest/innertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func Entry(tt *testing.T) {
}

func entry() {
Log.Debugf("> innertest")

if _, err := os.Lstat(confFile); err != nil {
Log.Warnf("lstat %s error. err=%+v", confFile, err)
return
Expand Down
4 changes: 3 additions & 1 deletion pkg/logic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)

const ConfVersion = "v0.2.8"
const ConfVersion = "v0.2.9"

const (
defaultHlsCleanupMode = hls.CleanupModeInTheEnd
Expand Down Expand Up @@ -72,6 +72,8 @@ type HttpflvConfig struct {

type HttptsConfig struct {
CommonHttpServerConfig

GopNum int `json:"gop_num"`
}

type HlsConfig struct {
Expand Down
8 changes: 6 additions & 2 deletions pkg/logic/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ type Group struct {
pullProxy *pullProxy
// rtmp pub使用
dummyAudioFilter *remux.DummyAudioFilter
// rtmp pub/pull使用
rtmpGopCache *remux.GopCache
// rtmp使用
rtmpGopCache *remux.GopCache
// httpflv使用
httpflvGopCache *remux.GopCache
// httpts使用
httptsGopCache *remux.GopCacheMpegts
// rtsp使用
sdpCtx *sdp.LogicContext
// mpegts使用
Expand Down Expand Up @@ -95,6 +98,7 @@ func NewGroup(appName string, streamName string, config *Config, observer GroupO
rtspSubSessionSet: make(map[*rtsp.SubSession]struct{}),
rtmpGopCache: remux.NewGopCache("rtmp", uk, config.RtmpConfig.GopNum),
httpflvGopCache: remux.NewGopCache("httpflv", uk, config.HttpflvConfig.GopNum),
httptsGopCache: remux.NewGopCacheMpegts(uk, config.HttptsConfig.GopNum),
pullProxy: &pullProxy{},
}

Expand Down
37 changes: 33 additions & 4 deletions pkg/logic/group__core_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
}
session.ShouldWaitVideoKeyFrame = false
}
}
} // for loop iterate rtmpSubSessionSet

// ## 转发本次数据
if len(group.rtmpSubSessionSet) > 0 {
if group.rtmpMergeWriter == nil {
Expand Down Expand Up @@ -400,23 +401,51 @@ func (group *Group) feedTsPackets(tsPackets []byte, frame *mpegts.Frame, boundar
group.hlsMuxer.FeedMpegts(tsPackets, frame, boundary)
}

// # 遍历 httpts sub session
for session := range group.httptsSubSessionSet {
if session.IsFresh {
// ## 如果是新加入者

// 发送头
session.Write(group.patpmt)

// 如果有缓存,发送缓存
// 并且设置标志,后续都实时转发就行了
gopCount := group.httptsGopCache.GetGopCount()
for i := 0; i < gopCount; i++ {
for _, item := range group.httptsGopCache.GetGopDataAt(i) {
session.Write(item)
}
}
if gopCount > 0 {
session.ShouldWaitBoundary = false
}

// 新加入逻辑只用走一次
session.IsFresh = false
}

// ## 转发本次数据
if session.ShouldWaitBoundary {
if boundary {
session.Write(group.patpmt)
session.Write(tsPackets)
session.IsFresh = false

session.ShouldWaitBoundary = false
} else {
// 需要继续等
}
} else {
session.Write(tsPackets)
}
}
} // for loop iterate httptsSubSessionSet

if group.recordMpegts != nil {
if err := group.recordMpegts.Write(tsPackets); err != nil {
Log.Errorf("[%s] record mpegts write error. err=%+v", group.UniqueKey, err)
}
}

group.httptsGopCache.Feed(tsPackets, boundary)
}

// ---------------------------------------------------------------------------------------------------------------------
Expand Down
4 changes: 3 additions & 1 deletion pkg/logic/group__in.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,10 @@ func (group *Group) delIn() {
group.rtsp2RtmpRemuxer = nil
group.rtmp2RtspRemuxer = nil
group.dummyAudioFilter = nil

group.rtmpGopCache.Clear()
group.httpflvGopCache.Clear()
group.patpmt = nil
group.httptsGopCache.Clear()
group.sdpCtx = nil
group.patpmt = nil
}
11 changes: 10 additions & 1 deletion pkg/remux/avpacket2rtmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,16 @@ func (r *AvPacket2RtmpRemuxer) InitWithAvConfig(asc, vps, sps, pps []byte) {
}
}

// FeedAvPacket @param pkt: 内部不持有该内存块
// FeedAvPacket
//
// 输入 base.AvPacket 数据
//
// @param pkt:
//
// - 如果是aac,格式是裸数据,不需要adts头
// - 如果是h264,格式是avcc
//
// 内部不持有该内存块
//
func (r *AvPacket2RtmpRemuxer) FeedAvPacket(pkt base.AvPacket) {
switch pkt.PayloadType {
Expand Down
25 changes: 22 additions & 3 deletions pkg/remux/gop_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,13 @@ type GopCache struct {
gopSize int
}

// NewGopCache @param gopNum: gop缓存大小
// 如果为0,则不缓存音频数据,也即GOP缓存功能不生效
// 如果>0,则缓存<gopNum>个完整GOP,另外还可能有半个最近不完整的GOP
// NewGopCache
//
// @param gopNum:
// gop缓存大小
//
// - 如果为0,则不缓存音频数据,也即GOP缓存功能不生效
// - 如果>0,则缓存[0, gopNum]个GOP,最多缓存 gopNum 个GOP。注意,最后一个GOP可能是不完整的
//
func NewGopCache(t string, uniqueKey string, gopNum int) *GopCache {
return &GopCache{
Expand All @@ -82,6 +86,10 @@ func NewGopCache(t string, uniqueKey string, gopNum int) *GopCache {

type LazyGet func() []byte

// Feed
//
// @param lg: 内部可能持有lg返回的内存块
//
func (gc *GopCache) Feed(msg base.RtmpMsg, lg LazyGet) {
switch msg.Header.MsgTypeId {
case base.RtmpTypeIdMetadata:
Expand Down Expand Up @@ -112,6 +120,7 @@ func (gc *GopCache) Feed(msg base.RtmpMsg, lg LazyGet) {
}

// GetGopCount 获取GOP数量,注意,最后一个可能是不完整的
//
func (gc *GopCache) GetGopCount() int {
return (gc.gopRingLast + gc.gopSize - gc.gopRingFirst) % gc.gopSize
}
Expand All @@ -133,15 +142,21 @@ func (gc *GopCache) Clear() {

// ---------------------------------------------------------------------------------------------------------------------

// feedLastGop
//
// 往最后一个GOP元素追加一个msg
// 注意,如果GopCache为空,则不缓存msg
//
func (gc *GopCache) feedLastGop(msg base.RtmpMsg, b []byte) {
if !gc.isGopRingEmpty() {
gc.gopRing[(gc.gopRingLast-1+gc.gopSize)%gc.gopSize].Feed(msg, b)
}
}

// feedNewGop
//
// 生成一个最新的GOP元素,并往里追加一个msg
//
func (gc *GopCache) feedNewGop(msg base.RtmpMsg, b []byte) {
if gc.isGopRingFull() {
gc.gopRingFirst = (gc.gopRingFirst + 1) % gc.gopSize
Expand All @@ -165,6 +180,10 @@ type Gop struct {
data [][]byte
}

// Feed
//
// @param b: 内部持有`b`内存块
//
func (g *Gop) Feed(msg base.RtmpMsg, b []byte) {
g.data = append(g.data, b)
}
Expand Down
Loading

0 comments on commit 9330270

Please sign in to comment.