Skip to content

Commit

Permalink
[patch] 修复前次提交导致rtmp无法播放的bug #201
Browse files Browse the repository at this point in the history
  • Loading branch information
q191201771 committed Jul 21, 2022
1 parent 06c13d1 commit 9474cf2
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 157 deletions.
64 changes: 16 additions & 48 deletions pkg/logic/group__core_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,46 +161,13 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
}

var (
rtmpChunks4Sub []byte
rtmpChunks4Push []byte

flv4Sub []byte
lazyRtmpChunkDivider remux.LazyRtmpChunkDivider
lazyRtmpMsg2FlvTag remux.LazyRtmpMsg2FlvTag
)

needRtmp := len(group.rtmpSubSessionSet) != 0 ||
group.config.RelayPushConfig.Enable ||
(group.config.RtmpConfig.Enable && group.config.RtmpConfig.GopNum > 0)

needFlv := len(group.httpflvSubSessionSet) != 0 ||
(group.config.HttpflvConfig.Enable && group.config.HttpflvConfig.GopNum > 0) ||
group.config.RecordConfig.EnableFlv

// 目的有两个:1 rtmp按需切片 2 处理metadata的@setDataFrame
if needRtmp {
// 设置好用于发送的 rtmp 头部信息
currHeader := remux.MakeDefaultRtmpHeader(msg.Header)

if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
metadataWithOutSDF, _ := rtmp.MetadataEnsureWithoutSetDataFrame(msg.Clone().Payload)
rtmpChunks4Sub = rtmp.Message2Chunks(metadataWithOutSDF, &currHeader)

metadataWithSDF, _ := rtmp.MetadataEnsureWithSetDataFrame(msg.Clone().Payload)
rtmpChunks4Push = rtmp.Message2Chunks(metadataWithSDF, &currHeader)
} else {
rtmpChunks4Sub = rtmp.Message2Chunks(msg.Payload, &currHeader)
rtmpChunks4Push = rtmpChunks4Sub
}
}

if needFlv {
if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
msg2 := msg.Clone()
msg2.Payload, _ = rtmp.MetadataEnsureWithoutSetDataFrame(msg2.Payload)
flv4Sub = remux.RtmpMsg2FlvTag(msg2).Raw
} else {
flv4Sub = remux.RtmpMsg2FlvTag(msg).Raw
}
}
// 设置好用于发送的 rtmp 头部信息
lazyRtmpChunkDivider.Init(msg)
lazyRtmpMsg2FlvTag.Init(msg)

// # 数据有效性检查
if len(msg.Payload) == 0 {
Expand Down Expand Up @@ -298,9 +265,9 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
// ## 转发本次数据
if len(group.rtmpSubSessionSet) > 0 {
if group.rtmpMergeWriter == nil {
group.write2RtmpSubSessions(rtmpChunks4Sub)
group.write2RtmpSubSessions(lazyRtmpChunkDivider.GetEnsureWithoutSdf())
} else {
group.rtmpMergeWriter.Write(rtmpChunks4Sub)
group.rtmpMergeWriter.Write(lazyRtmpChunkDivider.GetEnsureWithoutSdf())
}
}

Expand Down Expand Up @@ -330,7 +297,7 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
v.pushSession.IsFresh = false
}

_ = v.pushSession.Write(rtmpChunks4Push)
_ = v.pushSession.Write(lazyRtmpChunkDivider.GetEnsureWithSdf())
}
}

Expand Down Expand Up @@ -363,32 +330,33 @@ func (group *Group) broadcastByRtmpMsg(msg base.RtmpMsg) {
// 是否在等待关键帧
if session.ShouldWaitVideoKeyFrame {
if msg.IsVideoKeyNalu() {
session.Write(flv4Sub)
session.Write(lazyRtmpMsg2FlvTag.GetEnsureWithoutSdf())
session.ShouldWaitVideoKeyFrame = false
}
} else {
session.Write(flv4Sub)
session.Write(lazyRtmpMsg2FlvTag.GetEnsureWithoutSdf())
}
}

// # 录制flv文件
if group.recordFlv != nil {
if err := group.recordFlv.WriteRaw(flv4Sub); err != nil {
if err := group.recordFlv.WriteRaw(lazyRtmpMsg2FlvTag.GetEnsureWithoutSdf()); err != nil {
Log.Errorf("[%s] record flv write error. err=%+v", group.UniqueKey, err)
}
}

// # 缓存关键信息,以及gop
if group.config.RtmpConfig.Enable {
group.rtmpGopCache.Feed(msg, rtmpChunks4Sub)
group.rtmpGopCache.Feed(msg, lazyRtmpChunkDivider.GetEnsureWithoutSdf())
if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
group.rtmpGopCache.SetMetadata(rtmpChunks4Push, rtmpChunks4Sub)
group.rtmpGopCache.SetMetadata(lazyRtmpChunkDivider.GetEnsureWithSdf(), lazyRtmpChunkDivider.GetEnsureWithoutSdf())
}
}
if group.config.HttpflvConfig.Enable {
group.httpflvGopCache.Feed(msg, flv4Sub)
group.httpflvGopCache.Feed(msg, lazyRtmpMsg2FlvTag.GetEnsureWithoutSdf())
if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
group.httpflvGopCache.SetMetadata(flv4Sub, flv4Sub)
// 注意,因为withSdf实际上用不上,而且我们也没实现,所以全部用without了
group.httpflvGopCache.SetMetadata(lazyRtmpMsg2FlvTag.GetEnsureWithoutSdf(), lazyRtmpMsg2FlvTag.GetEnsureWithoutSdf())
}
}

Expand Down
104 changes: 48 additions & 56 deletions pkg/remux/rtmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,60 +35,52 @@ func MakeDefaultRtmpHeader(in base.RtmpHeader) (out base.RtmpHeader) {
return
}

//// ---------------------------------------------------------------------------------------------------------------------
//
//// LazyRtmpChunkDivider 在必要时,有且仅有一次做切分成chunk的操作
////
//type LazyRtmpChunkDivider struct {
// message []byte
// header *base.RtmpHeader
// chunks []byte
//}
//
//func (lcd *LazyRtmpChunkDivider) Init(message []byte, header *base.RtmpHeader) {
// lcd.message = message
// lcd.header = header
//}
//
//func (lcd *LazyRtmpChunkDivider) GetOriginal() []byte {
// if lcd.chunks == nil {
// lcd.chunks = rtmp.Message2Chunks(lcd.message, lcd.header)
// }
// return lcd.chunks
//}
//
//func (lcd *LazyRtmpChunkDivider) GetEnsureWithSetDataFrame() []byte {
// if lcd.chunks == nil {
// var msg []byte
// var err error
// if lcd.header.MsgTypeId == base.RtmpTypeIdMetadata {
// msg, err = rtmp.MetadataEnsureWithSetDataFrame(lcd.message)
// if err != nil {
// nazalog.Errorf("[%p] rtmp.MetadataEnsureWithSetDataFrame failed. error=%+v", lcd, err)
// msg = lcd.message
// }
// } else {
// msg = lcd.message
// }
// lcd.chunks = rtmp.Message2Chunks(msg, lcd.header)
// }
// return lcd.chunks
//}
// ---------------------------------------------------------------------------------------------------------------------

// LazyRtmpChunkDivider 在必要时,有且仅有一次做切分成chunk的操作
//
//func (lcd *LazyRtmpChunkDivider) GetEnsureWithoutSetDataFrame() []byte {
// if lcd.chunks == nil {
// var msg []byte
// var err error
// if lcd.header.MsgTypeId == base.RtmpTypeIdMetadata {
// msg, err = rtmp.MetadataEnsureWithoutSetDataFrame(lcd.message)
// if err != nil {
// nazalog.Errorf("[%p] rtmp.MetadataEnsureWithoutSetDataFrame failed. error=%+v", lcd, err)
// msg = lcd.message
// }
// } else {
// msg = lcd.message
// }
// lcd.chunks = rtmp.Message2Chunks(msg, lcd.header)
// }
// return lcd.chunks
//}
type LazyRtmpChunkDivider struct {
msg base.RtmpMsg
chunksWithSdf []byte
chunksWithoutSdf []byte
}

func (lcd *LazyRtmpChunkDivider) Init(msg base.RtmpMsg) {
lcd.msg = msg
}

func (lcd *LazyRtmpChunkDivider) GetEnsureWithSdf() []byte {
if lcd.chunksWithSdf == nil {
var msg []byte
if lcd.msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
msg2 := lcd.msg.Clone()
msg2.Payload, _ = rtmp.MetadataEnsureWithSdf(msg2.Payload)
msg2.Header.MsgLen = uint32(len(msg2.Payload))
msg2.Header = MakeDefaultRtmpHeader(msg2.Header)
lcd.chunksWithSdf = rtmp.Message2Chunks(msg2.Payload, &msg2.Header)
} else {
msg = lcd.msg.Payload
h := MakeDefaultRtmpHeader(lcd.msg.Header)
lcd.chunksWithSdf = rtmp.Message2Chunks(msg, &h)
}
}
return lcd.chunksWithSdf
}

func (lcd *LazyRtmpChunkDivider) GetEnsureWithoutSdf() []byte {
if lcd.chunksWithoutSdf == nil {
var msg []byte
if lcd.msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
msg2 := lcd.msg.Clone()
msg2.Payload, _ = rtmp.MetadataEnsureWithoutSdf(msg2.Payload)
msg2.Header.MsgLen = uint32(len(msg2.Payload))
msg2.Header = MakeDefaultRtmpHeader(msg2.Header)
lcd.chunksWithoutSdf = rtmp.Message2Chunks(msg2.Payload, &msg2.Header)
} else {
msg = lcd.msg.Payload
h := MakeDefaultRtmpHeader(lcd.msg.Header)
lcd.chunksWithoutSdf = rtmp.Message2Chunks(msg, &h)
}
}
return lcd.chunksWithoutSdf
}
69 changes: 33 additions & 36 deletions pkg/remux/rtmp2flv.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ package remux
import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/httpflv"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/nazalog"
)

// RtmpMsg2FlvTag @return 返回的内存块为新申请的独立内存块
Expand All @@ -23,40 +25,35 @@ func RtmpMsg2FlvTag(msg base.RtmpMsg) *httpflv.Tag {
return &tag
}

//// ---------------------------------------------------------------------------------------------------------------------
//
//// LazyRtmpMsg2FlvTag 在必要时,有且仅有一次做转换操作
////
//type LazyRtmpMsg2FlvTag struct {
// msg base.RtmpMsg
// tag []byte
//}
//
//func (l *LazyRtmpMsg2FlvTag) Init(msg base.RtmpMsg) {
// l.msg = msg
//}
//
//func (l *LazyRtmpMsg2FlvTag) GetOriginal() []byte {
// if l.tag == nil {
// l.tag = RtmpMsg2FlvTag(l.msg).Raw
// }
// return l.tag
//}
//
//func (l *LazyRtmpMsg2FlvTag) GetEnsureWithSetDataFrame() []byte {
// // TODO(chef): [refactor] 这个函数实际上用不上 202207
// //nazalog.Errorf("LazyRtmpMsg2FlvTag::GetEnsureWithSetDataFrame() is not implemented")
// return l.GetOriginal()
//}
// -------------------------------------------------------------------------------------------------------------------

// LazyRtmpMsg2FlvTag 在必要时,有且仅有一次做转换操作
//
//func (l *LazyRtmpMsg2FlvTag) GetEnsureWithoutSetDataFrame() []byte {
// if l.tag == nil {
// b, err := rtmp.MetadataEnsureWithoutSetDataFrame(l.msg.Payload)
// if err != nil {
// b = l.msg.Payload
// }
// l.msg.Payload = b
// l.tag = RtmpMsg2FlvTag(l.msg).Raw
// }
// return l.tag
//}
type LazyRtmpMsg2FlvTag struct {
msg base.RtmpMsg
//tagWithSdf []byte
tagWithoutSdf []byte
}

func (l *LazyRtmpMsg2FlvTag) Init(msg base.RtmpMsg) {
l.msg = msg
}

func (l *LazyRtmpMsg2FlvTag) GetEnsureWithSdf() []byte {
// TODO(chef): [refactor] 这个函数目前没有实际用途 202207
nazalog.Errorf("LazyRtmpMsg2FlvTag::GetEnsureWithSdf() is not implemented")
return l.GetEnsureWithoutSdf()
}

func (l *LazyRtmpMsg2FlvTag) GetEnsureWithoutSdf() []byte {
if l.tagWithoutSdf == nil {
if l.msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
msg2 := l.msg.Clone()
msg2.Payload, _ = rtmp.MetadataEnsureWithoutSdf(msg2.Payload)
l.tagWithoutSdf = RtmpMsg2FlvTag(msg2).Raw
} else {
l.tagWithoutSdf = RtmpMsg2FlvTag(l.msg).Raw
}
}
return l.tagWithoutSdf
}
38 changes: 23 additions & 15 deletions pkg/rtmp/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package rtmp
import (
"bytes"
"github.com/q191201771/naza/pkg/nazabytes"
"github.com/q191201771/naza/pkg/nazalog"

"github.com/q191201771/lal/pkg/base"
)
Expand All @@ -35,47 +36,54 @@ func ParseMetadata(b []byte) (ObjectPairArray, error) {

// TODO(chef): [test] MetadataEnsureWithSetDataFrame 这两个函数增加单元测试 202207

// MetadataEnsureWithSetDataFrame
// MetadataEnsureWithSdf
//
// 确保metadata中包含@setDataFrame
//
// 注意,返回的内存块可能是参数`b`的内存块,也可能是新申请的独立内存块
// @return 返回的内存块为内部独立申请
//
func MetadataEnsureWithSetDataFrame(b []byte) ([]byte, error) {
func MetadataEnsureWithSdf(b []byte) ([]byte, error) {
var ret []byte
v, _, err := Amf0.ReadString(b)
if err != nil {
return b, err
nazalog.Errorf("%+v", err)
return append(ret, b...), err
}

// 已经有了
if v == "@setDataFrame" {
return b, nil
return append(ret, b...), nil
}

buf := nazabytes.NewBuffer(16 + len(b)) // 16=1+2+13 @setDataFrame
if err = Amf0.WriteString(buf, "@setDataFrame"); err != nil {
return b, err
nazalog.Errorf("%+v", err)
return append(ret, b...), err
}
_, err = buf.Write(b)
return buf.Bytes(), err
}

// MetadataEnsureWithoutSetDataFrame
// MetadataEnsureWithoutSdf
//
// 确保metadata中不包含@setDataFrame
//
// 注意,返回的内存块可能是参数`b`的内存块,也可能是新申请的独立内存块
// @return 返回的内存块为内部独立申请
//
func MetadataEnsureWithoutSetDataFrame(b []byte) ([]byte, error) {
pos := 0
v, l, err := Amf0.ReadString(b[pos:])
func MetadataEnsureWithoutSdf(b []byte) ([]byte, error) {
var ret []byte
v, l, err := Amf0.ReadString(b)
if err != nil {
return b, err
nazalog.Errorf("%+v", err)
return append(ret, b...), err
}
pos += l

// 本来就不包含
if v != "@setDataFrame" {
return b, nil
return append(ret, b...), nil
}

return b[pos:], nil
return append(ret, b[l:]...), nil
}

// BuildMetadata spec-video_file_format_spec_v10.pdf
Expand Down
Loading

0 comments on commit 9474cf2

Please sign in to comment.