Skip to content

Commit

Permalink
[patch] 重构前次提交,去掉LazyRtmpChunkDivider和LazyRtmpMsg2FlvTag的写法 #201
Browse files Browse the repository at this point in the history
  • Loading branch information
q191201771 committed Jul 20, 2022
1 parent 8b3c65d commit 06c13d1
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 142 deletions.
7 changes: 5 additions & 2 deletions pkg/gb28181/gb28181.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
"github.com/q191201771/naza/pkg/nazalog"
)

// TODO(chef): gb28181 package处于开发中阶段,请不使用
// TODO(chef): [opt] rtp排序 202206
// TODO(chef): [feat] http api start_rtp_pub 202207
// TODO(chef): [feat] http api stop_rtp_pub 202207
// TODO(chef): [feat] http api /api/stat/all_rtp_pub,不过这个可以用已有的all_group代替 202207
// TODO(chef): [feat] pub接入group 202207
// TODO(chef): [feat] 超时自动关闭 202207
// TODO(chef): [test] 保存rtp数据,用于回放分析 202206
// TODO(chef): [perf] 优化ps解析,内存块 202207

Expand Down
84 changes: 60 additions & 24 deletions pkg/logic/group__core_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,23 +151,63 @@ func (group *Group) OnFragmentOpen() {
// @param msg 调用结束后,内部不持有msg.Payload内存块
//
func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
// TODO(chef): [refactor] 依赖lazy的out越来越多,可以考虑去掉lazy,将lazy逻辑直接写在开头处,比如if x != nil || len(y) > 0 202207

var (
lcd remux.LazyRtmpChunkDivider
lrm2ft remux.LazyRtmpMsg2FlvTag
)
if msg.Header.MsgLen != uint32(len(msg.Payload)) {
Log.Errorf("[%s] diff. msgLen=%d, payload len=%d, %+v", group.UniqueKey, msg.Header.MsgLen, len(msg.Payload), msg.Header)
}

if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
m, err := rtmp.ParseMetadata(msg.Payload)
nazalog.Debugf("[%s] metadata. err=%+v, len=%d, value=%s", group.UniqueKey, err, len(m), m.DebugString())
}

var (
rtmpChunks4Sub []byte
rtmpChunks4Push []byte

flv4Sub []byte
)

needRtmp := len(group.rtmpSubSessionSet) != 0 ||
group.config.RelayPushConfig.Enable ||
(group.config.RtmpConfig.Enable && group.config.RtmpConfig.GopNum > 0)

needFlv := len(group.httpflvSubSessionSet) != 0 ||
(group.config.HttpflvConfig.Enable && group.config.HttpflvConfig.GopNum > 0) ||
group.config.RecordConfig.EnableFlv

// 目的有两个:1 rtmp按需切片 2 处理metadata的@setDataFrame
if needRtmp {
// 设置好用于发送的 rtmp 头部信息
currHeader := remux.MakeDefaultRtmpHeader(msg.Header)

if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
metadataWithOutSDF, _ := rtmp.MetadataEnsureWithoutSetDataFrame(msg.Clone().Payload)
rtmpChunks4Sub = rtmp.Message2Chunks(metadataWithOutSDF, &currHeader)

metadataWithSDF, _ := rtmp.MetadataEnsureWithSetDataFrame(msg.Clone().Payload)
rtmpChunks4Push = rtmp.Message2Chunks(metadataWithSDF, &currHeader)
} else {
rtmpChunks4Sub = rtmp.Message2Chunks(msg.Payload, &currHeader)
rtmpChunks4Push = rtmpChunks4Sub
}
}

if needFlv {
if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
msg2 := msg.Clone()
msg2.Payload, _ = rtmp.MetadataEnsureWithoutSetDataFrame(msg2.Payload)
flv4Sub = remux.RtmpMsg2FlvTag(msg2).Raw
} else {
flv4Sub = remux.RtmpMsg2FlvTag(msg).Raw
}
}

// # 数据有效性检查
if len(msg.Payload) == 0 {
Log.Warnf("[%s] msg payload length is 0. %+v", group.UniqueKey, msg.Header)
return
}

// TODO(chef): 暂时不打开,因为过滤掉了innertest中rtmp和flv的输出和输入就不完全相同了
//if msg.Header.MsgTypeId == base.RtmpTypeIdAudio {
// if len(msg.Payload) <= 2 {
Expand Down Expand Up @@ -203,16 +243,6 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
group.rtmp2RtspRemuxer.FeedRtmpMsg(msg)
}

// # 设置好用于发送的 rtmp 头部信息
currHeader := remux.MakeDefaultRtmpHeader(msg.Header)
if currHeader.MsgLen != uint32(len(msg.Payload)) {
Log.Errorf("[%s] diff. msgLen=%d, payload len=%d, %+v", group.UniqueKey, currHeader.MsgLen, len(msg.Payload), msg.Header)
}

// # 懒初始化rtmp chunk切片,以及httpflv转换
lcd.Init(msg.Payload, &currHeader)
lrm2ft.Init(msg)

// # 广播。遍历所有 rtmp sub session,转发数据
// ## 如果是新的 sub session,发送已缓存的信息
for session := range group.rtmpSubSessionSet {
Expand Down Expand Up @@ -268,9 +298,9 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
// ## 转发本次数据
if len(group.rtmpSubSessionSet) > 0 {
if group.rtmpMergeWriter == nil {
group.write2RtmpSubSessions(lcd.GetEnsureWithoutSetDataFrame())
group.write2RtmpSubSessions(rtmpChunks4Sub)
} else {
group.rtmpMergeWriter.Write(lcd.GetEnsureWithoutSetDataFrame())
group.rtmpMergeWriter.Write(rtmpChunks4Sub)
}
}

Expand Down Expand Up @@ -300,7 +330,7 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
v.pushSession.IsFresh = false
}

_ = v.pushSession.Write(lcd.GetEnsureWithSetDataFrame())
_ = v.pushSession.Write(rtmpChunks4Push)
}
}

Expand Down Expand Up @@ -333,27 +363,33 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
// 是否在等待关键帧
if session.ShouldWaitVideoKeyFrame {
if msg.IsVideoKeyNalu() {
session.Write(lrm2ft.GetEnsureWithoutSetDataFrame())
session.Write(flv4Sub)
session.ShouldWaitVideoKeyFrame = false
}
} else {
session.Write(lrm2ft.GetEnsureWithoutSetDataFrame())
session.Write(flv4Sub)
}
}

// # 录制flv文件
if group.recordFlv != nil {
if err := group.recordFlv.WriteRaw(lrm2ft.GetEnsureWithoutSetDataFrame()); err != nil {
if err := group.recordFlv.WriteRaw(flv4Sub); err != nil {
Log.Errorf("[%s] record flv write error. err=%+v", group.UniqueKey, err)
}
}

// # 缓存关键信息,以及gop
if group.config.RtmpConfig.Enable {
group.rtmpGopCache.Feed(msg, lcd.GetEnsureWithSetDataFrame, lcd.GetEnsureWithoutSetDataFrame)
group.rtmpGopCache.Feed(msg, rtmpChunks4Sub)
if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
group.rtmpGopCache.SetMetadata(rtmpChunks4Push, rtmpChunks4Sub)
}
}
if group.config.HttpflvConfig.Enable {
group.httpflvGopCache.Feed(msg, lrm2ft.GetEnsureWithSetDataFrame, lrm2ft.GetEnsureWithoutSetDataFrame)
group.httpflvGopCache.Feed(msg, flv4Sub)
if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
group.httpflvGopCache.SetMetadata(flv4Sub, flv4Sub)
}
}

// # 记录stat
Expand Down
22 changes: 14 additions & 8 deletions pkg/remux/gop_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,38 +87,44 @@ func NewGopCache(t string, uniqueKey string, gopNum int) *GopCache {

type LazyGet func() []byte

func (gc *GopCache) SetMetadata(w []byte, wo []byte) {
// TODO(chef): [refactor] 将metadata等缓存逻辑从GopCache中移除 202207

gc.MetadataEnsureWithSetDataFrame = w
gc.MetadataEnsureWithoutSetDataFrame = wo
Log.Debugf("[%s] cache %s metadata. size:%d", gc.uniqueKey, gc.t, len(gc.MetadataEnsureWithSetDataFrame))
}

// Feed
//
// @param lg: 内部可能持有lg返回的内存块
//
func (gc *GopCache) Feed(msg base.RtmpMsg, lg LazyGet, lg2 LazyGet) {
func (gc *GopCache) Feed(msg base.RtmpMsg, b []byte) {
// TODO(chef): [refactor] 重构lg两个参数这种方式 202207

switch msg.Header.MsgTypeId {
case base.RtmpTypeIdMetadata:
gc.MetadataEnsureWithSetDataFrame = lg()
gc.MetadataEnsureWithoutSetDataFrame = lg()
Log.Debugf("[%s] cache %s metadata. size:%d", gc.uniqueKey, gc.t, len(gc.MetadataEnsureWithSetDataFrame))
// noop
return
case base.RtmpTypeIdAudio:
if msg.IsAacSeqHeader() {
gc.AacSeqHeader = lg()
gc.AacSeqHeader = b
Log.Debugf("[%s] cache %s aac seq header. size:%d", gc.uniqueKey, gc.t, len(gc.AacSeqHeader))
return
}
case base.RtmpTypeIdVideo:
if msg.IsVideoKeySeqHeader() {
gc.VideoSeqHeader = lg()
gc.VideoSeqHeader = b
Log.Debugf("[%s] cache %s video seq header. size:%d", gc.uniqueKey, gc.t, len(gc.VideoSeqHeader))
return
}
}

if gc.gopSize > 1 {
if msg.IsVideoKeyNalu() {
gc.feedNewGop(msg, lg())
gc.feedNewGop(msg, b)
} else {
gc.feedLastGop(msg, lg())
gc.feedLastGop(msg, b)
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/remux/gop_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,52 +66,52 @@ func TestGopCache_Feed(t *testing.T) {
assert.Equal(t, nil, nc.GetGopDataAt(2))
assert.Equal(t, nil, nc.GetGopDataAt(3))

nc.Feed(i1, i1f, i1f)
nc.Feed(i1, i1f())
assert.Equal(t, 1, nc.GetGopCount())
assert.Equal(t, [][]byte{{1, 1}}, nc.GetGopDataAt(0))
assert.Equal(t, nil, nc.GetGopDataAt(1))
assert.Equal(t, nil, nc.GetGopDataAt(2))
assert.Equal(t, nil, nc.GetGopDataAt(3))
nc.Feed(p1, p1f, p1f)
nc.Feed(p1, p1f())
assert.Equal(t, 1, nc.GetGopCount())
assert.Equal(t, [][]byte{{1, 1}, {0, 1}}, nc.GetGopDataAt(0))
assert.Equal(t, nil, nc.GetGopDataAt(1))
assert.Equal(t, nil, nc.GetGopDataAt(2))
assert.Equal(t, nil, nc.GetGopDataAt(3))

nc.Feed(i2, i2f, i2f)
nc.Feed(i2, i2f())
assert.Equal(t, 2, nc.GetGopCount())
assert.Equal(t, [][]byte{{1, 1}, {0, 1}}, nc.GetGopDataAt(0))
assert.Equal(t, [][]byte{{1, 2}}, nc.GetGopDataAt(1))
assert.Equal(t, nil, nc.GetGopDataAt(2))
assert.Equal(t, nil, nc.GetGopDataAt(3))
nc.Feed(p2, p2f, p2f)
nc.Feed(p2, p2f())
assert.Equal(t, 2, nc.GetGopCount())
assert.Equal(t, [][]byte{{1, 1}, {0, 1}}, nc.GetGopDataAt(0))
assert.Equal(t, [][]byte{{1, 2}, {0, 2}}, nc.GetGopDataAt(1))
assert.Equal(t, nil, nc.GetGopDataAt(2))
assert.Equal(t, nil, nc.GetGopDataAt(3))

nc.Feed(i3, i3f, i3f)
nc.Feed(i3, i3f())
assert.Equal(t, 3, nc.GetGopCount())
assert.Equal(t, [][]byte{{1, 1}, {0, 1}}, nc.GetGopDataAt(0))
assert.Equal(t, [][]byte{{1, 2}, {0, 2}}, nc.GetGopDataAt(1))
assert.Equal(t, [][]byte{{1, 3}}, nc.GetGopDataAt(2))
assert.Equal(t, nil, nc.GetGopDataAt(3))
nc.Feed(p3, p3f, p3f)
nc.Feed(p3, p3f())
assert.Equal(t, 3, nc.GetGopCount())
assert.Equal(t, [][]byte{{1, 1}, {0, 1}}, nc.GetGopDataAt(0))
assert.Equal(t, [][]byte{{1, 2}, {0, 2}}, nc.GetGopDataAt(1))
assert.Equal(t, [][]byte{{1, 3}, {0, 3}}, nc.GetGopDataAt(2))
assert.Equal(t, nil, nc.GetGopDataAt(3))

nc.Feed(i4, i4f, i4f)
nc.Feed(i4, i4f())
assert.Equal(t, 3, nc.GetGopCount())
assert.Equal(t, [][]byte{{1, 2}, {0, 2}}, nc.GetGopDataAt(0))
assert.Equal(t, [][]byte{{1, 3}, {0, 3}}, nc.GetGopDataAt(1))
assert.Equal(t, [][]byte{{1, 4}}, nc.GetGopDataAt(2))
assert.Equal(t, nil, nc.GetGopDataAt(3))
nc.Feed(p4, p4f, p4f)
nc.Feed(p4, p4f())
assert.Equal(t, 3, nc.GetGopCount())
assert.Equal(t, [][]byte{{1, 2}, {0, 2}}, nc.GetGopDataAt(0))
assert.Equal(t, [][]byte{{1, 3}, {0, 3}}, nc.GetGopDataAt(1))
Expand Down
113 changes: 56 additions & 57 deletions pkg/remux/rtmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ package remux
import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/nazalog"
)

// MakeDefaultRtmpHeader
Expand All @@ -36,60 +35,60 @@ func MakeDefaultRtmpHeader(in base.RtmpHeader) (out base.RtmpHeader) {
return
}

// ---------------------------------------------------------------------------------------------------------------------

// LazyRtmpChunkDivider 在必要时,有且仅有一次做切分成chunk的操作
//// ---------------------------------------------------------------------------------------------------------------------
//
type LazyRtmpChunkDivider struct {
message []byte
header *base.RtmpHeader
chunks []byte
}

func (lcd *LazyRtmpChunkDivider) Init(message []byte, header *base.RtmpHeader) {
lcd.message = message
lcd.header = header
}

func (lcd *LazyRtmpChunkDivider) GetOriginal() []byte {
if lcd.chunks == nil {
lcd.chunks = rtmp.Message2Chunks(lcd.message, lcd.header)
}
return lcd.chunks
}

func (lcd *LazyRtmpChunkDivider) GetEnsureWithSetDataFrame() []byte {
if lcd.chunks == nil {
var msg []byte
var err error
if lcd.header.MsgTypeId == base.RtmpTypeIdMetadata {
msg, err = rtmp.MetadataEnsureWithSetDataFrame(lcd.message)
if err != nil {
nazalog.Errorf("[%p] rtmp.MetadataEnsureWithSetDataFrame failed. error=%+v", lcd, err)
msg = lcd.message
}
} else {
msg = lcd.message
}
lcd.chunks = rtmp.Message2Chunks(msg, lcd.header)
}
return lcd.chunks
}

func (lcd *LazyRtmpChunkDivider) GetEnsureWithoutSetDataFrame() []byte {
if lcd.chunks == nil {
var msg []byte
var err error
if lcd.header.MsgTypeId == base.RtmpTypeIdMetadata {
msg, err = rtmp.MetadataEnsureWithoutSetDataFrame(lcd.message)
if err != nil {
nazalog.Errorf("[%p] rtmp.MetadataEnsureWithoutSetDataFrame failed. error=%+v", lcd, err)
msg = lcd.message
}
} else {
msg = lcd.message
}
lcd.chunks = rtmp.Message2Chunks(msg, lcd.header)
}
return lcd.chunks
}
//// LazyRtmpChunkDivider 在必要时,有且仅有一次做切分成chunk的操作
////
//type LazyRtmpChunkDivider struct {
// message []byte
// header *base.RtmpHeader
// chunks []byte
//}
//
//func (lcd *LazyRtmpChunkDivider) Init(message []byte, header *base.RtmpHeader) {
// lcd.message = message
// lcd.header = header
//}
//
//func (lcd *LazyRtmpChunkDivider) GetOriginal() []byte {
// if lcd.chunks == nil {
// lcd.chunks = rtmp.Message2Chunks(lcd.message, lcd.header)
// }
// return lcd.chunks
//}
//
//func (lcd *LazyRtmpChunkDivider) GetEnsureWithSetDataFrame() []byte {
// if lcd.chunks == nil {
// var msg []byte
// var err error
// if lcd.header.MsgTypeId == base.RtmpTypeIdMetadata {
// msg, err = rtmp.MetadataEnsureWithSetDataFrame(lcd.message)
// if err != nil {
// nazalog.Errorf("[%p] rtmp.MetadataEnsureWithSetDataFrame failed. error=%+v", lcd, err)
// msg = lcd.message
// }
// } else {
// msg = lcd.message
// }
// lcd.chunks = rtmp.Message2Chunks(msg, lcd.header)
// }
// return lcd.chunks
//}
//
//func (lcd *LazyRtmpChunkDivider) GetEnsureWithoutSetDataFrame() []byte {
// if lcd.chunks == nil {
// var msg []byte
// var err error
// if lcd.header.MsgTypeId == base.RtmpTypeIdMetadata {
// msg, err = rtmp.MetadataEnsureWithoutSetDataFrame(lcd.message)
// if err != nil {
// nazalog.Errorf("[%p] rtmp.MetadataEnsureWithoutSetDataFrame failed. error=%+v", lcd, err)
// msg = lcd.message
// }
// } else {
// msg = lcd.message
// }
// lcd.chunks = rtmp.Message2Chunks(msg, lcd.header)
// }
// return lcd.chunks
//}
Loading

0 comments on commit 06c13d1

Please sign in to comment.