Skip to content

Commit

Permalink
[feat] 为rtmp pub推流添加静音AAC音频(可动态检测是否需要添加;配置文件中可开启或关闭这个功能) (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
q191201771 committed Aug 28, 2021
1 parent 03c459a commit 2c913f4
Show file tree
Hide file tree
Showing 7 changed files with 364 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -1,3 +1,4 @@
delay.txt
profile.out
coverage.html
*.aac
Expand Down
6 changes: 4 additions & 2 deletions conf/lalserver.conf.json
@@ -1,11 +1,13 @@
{
"# doc of config": "https://pengrl.com/lal/#/ConfigBrief",
"conf_version": "v0.2.2",
"conf_version": "v0.2.3",
"rtmp": {
"enable": true,
"addr": ":1935",
"gop_num": 0,
"merge_write_size": 0
"merge_write_size": 0,
"add_dummy_audio_enable": false,
"add_dummy_audio_wait_audio_ms": 150
},
"default_http": {
"http_listen_addr": ":8080",
Expand Down
2 changes: 1 addition & 1 deletion pkg/aac/seqheader.go
Expand Up @@ -48,8 +48,8 @@ func MakeAudioDataSeqHeaderWithAsc(asc []byte) (out []byte, err error) {
return nil, ErrAac
}

// 注意,前两个字节是SequenceHeaderContext,后面跟着asc
out = make([]byte, 2+len(asc))
// <spec-video_file_format_spec_v10.pdf>, <Audio tags, AUDIODATA>, <page 10/48>
out[0] = 0xaf
out[1] = 0
copy(out[2:], asc)
Expand Down
10 changes: 6 additions & 4 deletions pkg/logic/config.go
Expand Up @@ -50,10 +50,12 @@ type Config struct {
}

type RtmpConfig struct {
Enable bool `json:"enable"`
Addr string `json:"addr"`
GopNum int `json:"gop_num"`
MergeWriteSize int `json:"merge_write_size"`
Enable bool `json:"enable"`
Addr string `json:"addr"`
GopNum int `json:"gop_num"`
MergeWriteSize int `json:"merge_write_size"`
AddDummyAudioEnable bool `json:"add_dummy_audio_enable"`
AddDummyAudioWaitAudioMs int `json:"add_dummy_audio_wait_audio_ms"`
}

type DefaultHttpConfig struct {
Expand Down
211 changes: 211 additions & 0 deletions pkg/logic/dummy_audio_filter.go
@@ -0,0 +1,211 @@
// Copyright 2021, Chef. All rights reserved.
// https://github.com/q191201771/lal
//
// Use of this source code is governed by a MIT-style license
// that can be found in the License file.
//
// Author: Chef (191201771@qq.com)

package logic

import (
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/rtmp"
"github.com/q191201771/naza/pkg/nazalog"
"math"
)

const dummyAudioFilterStageAnalysis = 1
const dummyAudioFilterStageNormal = 2
const dummyAudioFilterStageDummy = 3

type DummyAudioFilter struct {
uk string
waitAudioMs int
onPop rtmp.OnReadRtmpAvMsg

stage int
earlyStageQueue []base.RtmpMsg
firstVideoTs uint32
prevAudioTs uint32

audioCount int
}

// NewDummyAudioFilter 检测输入的rtmp流中是否有音频,如果有,则原样返回;如果没有,则制造静音音频数据叠加在rtmp流里面
//
// @param waitAudioMs 等待音频数据时间,如果超出这个时间还没有接收到音频数据,则开始制造静音数据
// @param onPop 注意,所有回调都发生在输入函数调用中
//
func NewDummyAudioFilter(uk string, waitAudioMs int, onPop rtmp.OnReadRtmpAvMsg) *DummyAudioFilter {
return &DummyAudioFilter{
uk: uk,
waitAudioMs: waitAudioMs,
onPop: onPop,
stage: dummyAudioFilterStageAnalysis,
firstVideoTs: math.MaxUint32,
prevAudioTs: math.MaxUint32,
}
}

func (filter *DummyAudioFilter) OnReadRtmpAvMsg(msg base.RtmpMsg) {
filter.Feed(msg)
}

func (filter *DummyAudioFilter) Feed(msg base.RtmpMsg) {
//nazalog.Debugf("trace_DummyAudioFilter, push. header=%+v, payload=%s, stage=%d", msg.Header, hex.EncodeToString(nazastring.SubSliceSafety(msg.Payload, 8)), filter.stage)
switch filter.stage {
case dummyAudioFilterStageAnalysis:
filter.handleAnalysisStage(msg)
case dummyAudioFilterStageNormal:
filter.handleNormalStage(msg)
case dummyAudioFilterStageDummy:
filter.handleDummyStage(msg)
}
}

// 初始阶段,分析是否存在音频
func (filter *DummyAudioFilter) handleAnalysisStage(msg base.RtmpMsg) {
switch msg.Header.MsgTypeId {
case base.RtmpTypeIdMetadata:
// metadata直接入队列
filter.cache(msg)
case base.RtmpTypeIdAudio:
// 原始流中存在音频,将所有缓存数据出队列,进入normal stage
for i := range filter.earlyStageQueue {
filter.onPopProxy(filter.earlyStageQueue[i])
}
filter.onPopProxy(msg)

filter.stage = dummyAudioFilterStageNormal
case base.RtmpTypeIdVideo:
// 分析视频数据累计时长是否达到阈值

// 注意,为了避免seq header的时间戳和视频帧不是线性的(0或其他特殊的值)我们直接入队列并跳过
if msg.IsVideoKeySeqHeader() {
filter.cache(msg)
return
}

// 记录首个视频帧的时间戳
if filter.firstVideoTs == math.MaxUint32 {
filter.cache(msg)
filter.firstVideoTs = msg.Header.TimestampAbs
return
}

// 没有达到阈值
if msg.Header.TimestampAbs-filter.firstVideoTs < uint32(filter.waitAudioMs) {
filter.cache(msg)
return
}

// 达到阈值
nazalog.Debugf("[%s] start make dummy audio.", filter.uk)
filter.stage = dummyAudioFilterStageDummy
for i := range filter.earlyStageQueue {
filter.handleDummyStage(filter.earlyStageQueue[i])
}
filter.clearCache()
filter.handleDummyStage(msg)
}
}

// 原始流中存在音频
func (filter *DummyAudioFilter) handleNormalStage(msg base.RtmpMsg) {
filter.onPopProxy(msg)
}

// 原始流中不存在音频
func (filter *DummyAudioFilter) handleDummyStage(msg base.RtmpMsg) {
if msg.Header.MsgTypeId == base.RtmpTypeIdAudio {
// 由于我们已经开始制造静音包了,静音包的编码参数可能会和实际音频参数不一致,所以我们只能过滤掉原始的音频数据了
nazalog.Warnf("[%s] recv audio but we are making dummy audio.", filter.uk)
return
}

if msg.Header.MsgTypeId == base.RtmpTypeIdMetadata {
filter.onPopProxy(msg)
return
}

if msg.IsVideoKeySeqHeader() {
// TODO(chef): 这里的时间戳可以考虑减1,但是注意处理一些边界条件
ats := msg.Header.TimestampAbs
amsg := filter.makeAudioSeqHeader(ats)
filter.onPopProxy(amsg)
filter.onPopProxy(msg)
return
}

if filter.prevAudioTs == math.MaxUint32 {
ats := msg.Header.TimestampAbs
amsg := filter.makeOneAudio(ats)
filter.onPopProxy(amsg)
filter.onPopProxy(msg)
filter.prevAudioTs = ats
} else {
for {
ats := filter.prevAudioTs + filter.calcAudioDurationMs()
if ats > msg.Header.TimestampAbs {
break
}
amsg := filter.makeOneAudio(ats)
filter.onPopProxy(amsg)
filter.prevAudioTs = ats
}
filter.onPopProxy(msg)
}
}

func (filter *DummyAudioFilter) cache(msg base.RtmpMsg) {
filter.earlyStageQueue = append(filter.earlyStageQueue, msg.Clone())
}

func (filter *DummyAudioFilter) clearCache() {
filter.earlyStageQueue = nil
}

func (filter *DummyAudioFilter) onPopProxy(msg base.RtmpMsg) {
//nazalog.Debugf("trace_DummyAudioFilter, onPopProxy. header=%+v, payload=%s, stage=%d", msg.Header, hex.EncodeToString(nazastring.SubSliceSafety(msg.Payload, 8)), filter.stage)
if filter.onPop != nil {
filter.onPop(msg)
}
}

func (filter *DummyAudioFilter) makeAudioSeqHeader(ts uint32) base.RtmpMsg {
// aac (LC), 48000 Hz, stereo, fltp
return base.RtmpMsg{
Header: base.RtmpHeader{
Csid: rtmp.CsidAudio,
MsgLen: 4,
MsgTypeId: base.RtmpTypeIdAudio,
MsgStreamId: rtmp.Msid1,
TimestampAbs: ts,
},
Payload: []byte{0xaf, 0x00, 0x11, 0x90},
}
}

func (filter *DummyAudioFilter) makeOneAudio(ts uint32) base.RtmpMsg {
filter.audioCount++
return base.RtmpMsg{
Header: base.RtmpHeader{
Csid: rtmp.CsidAudio,
MsgLen: 8,
MsgTypeId: base.RtmpTypeIdAudio,
MsgStreamId: rtmp.Msid1,
TimestampAbs: ts,
},
// 注意,前面2字节是seq header头部信息,后面6个字节是AAC静音包
Payload: []byte{0xaf, 0x01, 0x21, 0x10, 0x04, 0x60, 0x8c, 0x1c},
}
}

func (filter *DummyAudioFilter) calcAudioDurationMs() uint32 {
v := filter.audioCount % 3
if v == 1 || v == 2 {
return 21
}
return 22
}
129 changes: 129 additions & 0 deletions pkg/logic/dummy_audio_filter_test.go
@@ -0,0 +1,129 @@
package logic_test

import (
"encoding/hex"
"github.com/q191201771/lal/pkg/base"
"github.com/q191201771/lal/pkg/logic"
"github.com/q191201771/naza/pkg/assert"
"github.com/q191201771/naza/pkg/nazalog"
"strconv"
"strings"
"testing"
)

func TestDummyAudioFilter(t *testing.T) {
// case1 一个音视频都有的流
{
in := []base.RtmpMsg{
helperUnpackRtmpMsg("header={Csid:4 MsgLen:378 MsgTypeId:18 MsgStreamId:1 TimestampAbs:0}, payload=02000d40"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:48 MsgTypeId:9 MsgStreamId:1 TimestampAbs:0}, payload=17000000"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:7 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0}, payload=af001210"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:26 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0}, payload=af01de04"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:1170 MsgTypeId:9 MsgStreamId:1 TimestampAbs:23}, payload=17010000"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:8 MsgTypeId:8 MsgStreamId:1 TimestampAbs:23}, payload=af012110"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:8 MsgTypeId:8 MsgStreamId:1 TimestampAbs:46}, payload=af012120"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:849 MsgTypeId:8 MsgStreamId:1 TimestampAbs:69}, payload=af01214c"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:372 MsgTypeId:9 MsgStreamId:1 TimestampAbs:90}, payload=27010000"),
}
var out []base.RtmpMsg
filter := logic.NewDummyAudioFilter("test1", 150, func(msg base.RtmpMsg) {
out = append(out, msg)
})
//filter.Feed(helperUnpackRtmpMsg(""))
for i := 0; i <= 1; i++ {
filter.Feed(in[i])
assert.Equal(t, nil, out)
}
for i := 2; i < len(in); i++ {
filter.Feed(in[i])
assert.Equal(t, in[:i+1], out)
}
}

// case2 一个只有视频的流
{
in := []base.RtmpMsg{
helperUnpackRtmpMsg("header={Csid:4 MsgLen:269 MsgTypeId:18 MsgStreamId:1 TimestampAbs:0}, payload=02000d4073657444"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:48 MsgTypeId:9 MsgStreamId:1 TimestampAbs:0}, payload=1700000000016400"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:1170 MsgTypeId:9 MsgStreamId:1 TimestampAbs:23}, payload=1701000000000002"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:372 MsgTypeId:9 MsgStreamId:1 TimestampAbs:90}, payload=2701000000000001"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:1226 MsgTypeId:9 MsgStreamId:1 TimestampAbs:156}, payload=2701000000000004"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:1541 MsgTypeId:9 MsgStreamId:1 TimestampAbs:223}, payload=2701000000000005"),
helperUnpackRtmpMsg("header={Csid:6 MsgLen:1931 MsgTypeId:9 MsgStreamId:1 TimestampAbs:290}, payload=2701000000000005"),
}
var out []base.RtmpMsg
filter := logic.NewDummyAudioFilter("test1", 150, func(msg base.RtmpMsg) {
out = append(out, msg)
})
for i := 0; i <= 4; i++ {
filter.Feed(in[i])
assert.Equal(t, nil, out)
}
filter.Feed(in[5])
assert.Equal(t, 17, len(out))
assert.Equal(t, in[0], out[0])
assert.Equal(t, helperUnpackRtmpMsg("header={Csid:6 MsgLen:4 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0}, payload=af001190"), out[1])
assert.Equal(t, in[1], out[2])
assert.Equal(t, helperUnpackRtmpMsg("header={Csid:6 MsgLen:8 MsgTypeId:8 MsgStreamId:1 TimestampAbs:215}, payload=af01211004608c1c"), out[15])
assert.Equal(t, in[5], out[16])

filter.Feed(in[6])
assert.Equal(t, 21, len(out))
assert.Equal(t, helperUnpackRtmpMsg("header={Csid:6 MsgLen:8 MsgTypeId:8 MsgStreamId:1 TimestampAbs:236}, payload=af01211004608c1c"), out[17])
assert.Equal(t, in[6], out[20])
}

// case3 一个只有音频的流
{
in := []base.RtmpMsg{
helperUnpackRtmpMsg("header={Csid:4 MsgLen:278 MsgTypeId:18 MsgStreamId:1 TimestampAbs:0}, payload=02000d4073657444"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:7 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0}, payload=af00121056e500"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:26 MsgTypeId:8 MsgStreamId:1 TimestampAbs:0}, payload=af01de04004c6176"),
helperUnpackRtmpMsg("header={Csid:4 MsgLen:8 MsgTypeId:8 MsgStreamId:1 TimestampAbs:23}, payload=af01211004608c1c"),
}
var out []base.RtmpMsg
filter := logic.NewDummyAudioFilter("test1", 150, func(msg base.RtmpMsg) {
out = append(out, msg)
})
filter.Feed(in[0])
assert.Equal(t, nil, out)
for i := 1; i <= 3; i++ {
filter.Feed(in[i])
assert.Equal(t, in[:i+1], out)
}
}
}

// @param logstr e.g. "header={Csid:4 MsgLen:378 MsgTypeId:18 MsgStreamId:1 TimestampAbs:0}"
///
func helperUnpackRtmpMsg(logstr string) base.RtmpMsg {
var fetchItemFn = func(str string, prefix string, suffix string) string {
b := strings.Index(str, prefix)
if suffix == "" {
return str[b+len(prefix):]
}
e := strings.Index(str[b:], suffix)
return str[b+len(prefix) : b+e]
}
var fetchIntItemFn = func(str string, prefix string, suffix string) int {
ret, err := strconv.Atoi(fetchItemFn(str, prefix, suffix))
nazalog.Assert(nil, err)
return ret
}

var header base.RtmpHeader
header.Csid = fetchIntItemFn(logstr, "Csid:", " ")
header.MsgLen = uint32(fetchIntItemFn(logstr, "MsgLen:", " "))
header.MsgTypeId = uint8(fetchIntItemFn(logstr, "MsgTypeId:", " "))
header.MsgStreamId = fetchIntItemFn(logstr, "MsgStreamId:", " ")
header.TimestampAbs = uint32(fetchIntItemFn(logstr, "TimestampAbs:", "}"))

hexStr := fetchItemFn(logstr, "payload=", "")
payload, err := hex.DecodeString(hexStr)
nazalog.Assert(nil, err)

return base.RtmpMsg{
Header: header,
Payload: payload,
}
}

0 comments on commit 2c913f4

Please sign in to comment.