Skip to content

Commit

Permalink
1 [feat] dispatch: 新增配置max_sub_session_per_ip,可控制单个ip最大拉流session数量 2 …
Browse files Browse the repository at this point in the history
…[feat] dispatch: 新增配置max_sub_duration_sec,可控制单个拉流session的最大时长 3 [feat] dispatch: 新增读取配置文件功能
  • Loading branch information
q191201771 committed May 24, 2024
1 parent 0f4c71d commit da5730f
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 41 deletions.
48 changes: 39 additions & 9 deletions app/demo/dispatch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,53 @@

package main

// 服务启动前,设置好一些配置
import "path/filepath"

// Config 服务启动前,设置好一些配置
type Config struct {
// 本服务HTTP监听端口,用于接收各lal节点的HTTP Notify
ListenAddr string
ListenAddr string `json:"listen_addr"`

// 配置向本服务汇报的节点信息
ServerId2Server map[string]Server
// ServerId2Server `json:"server_id2server"`
// 配置向本服务汇报的节点信息。
// 如果没有配置的serverId向本服务汇报,本服务讲认为该汇报信息无效。
ServerId2Server map[string]Server `json:"servers"`

// 级联拉流时,携带该Url参数,使得我们可以区分是级联拉流还是用户拉流
PullSecretParam string
PullSecretParam string `json:"pull_secret_param"`

// 检测lal节点update报活的超时时间
ServerTimeoutSec int
ServerTimeoutSec int `json:"server_timeout_sec"`

// MaxSubSessionPerIp
// 当一个client IP对应的sub session大于这个阈值时,我们认为是恶意行为,踢掉这个IP的所有sub session。
// 如果设置为0或-1,表示不限制。
MaxSubSessionPerIp int `json:"max_sub_session_per_ip"`

// MaxSubDurationSec
// 当一个sub session的持续时间超过这个阈值时,我们认为是恶意行为,踢掉这个sub session。
// 如果设置为0或-1,表示不限制。
MaxSubDurationSec int `json:"max_sub_duration_sec"`
}

// lal节点静态配置信息
// Server lal节点静态配置信息
type Server struct {
RtmpAddr string // 可用于级联拉流的RTMP地址
ApiAddr string // HTTP API接口地址
// 可用于级联拉流的RTMP地址
RtmpAddr string `json:"rtmp_addr"`
// HTTP API接口地址,比如向节点发送kick_session时使用
ApiAddr string `json:"api_addr"`
}

var config Config

var DefaultConfFilenameList = []string{
filepath.FromSlash("dispatch.conf.json"),
filepath.FromSlash("./conf/dispatch.conf.json"),
filepath.FromSlash("../dispatch.conf.json"),
filepath.FromSlash("../conf/dispatch.conf.json"),
filepath.FromSlash("../../dispatch.conf.json"),
filepath.FromSlash("../../conf/dispatch.conf.json"),
filepath.FromSlash("../../../dispatch.conf.json"),
filepath.FromSlash("../../../conf/dispatch.conf.json"),
filepath.FromSlash("lal/conf/dispatch.conf.json"),
}
152 changes: 120 additions & 32 deletions app/demo/dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
package main

import (
"encoding/json"
"flag"
"fmt"
"io"
"net"
"net/http"
"os"
"strings"
"time"

"github.com/q191201771/lal/app/demo/dispatch/datamanager"
"github.com/q191201771/lal/pkg/base"
Expand All @@ -31,22 +35,6 @@ import (
// 同一路流,推流和拉流可以在不同的节点。
//

var config = Config{
ListenAddr: ":10101",
ServerId2Server: map[string]Server{
"1": {
RtmpAddr: "127.0.0.1:19350",
ApiAddr: "127.0.0.1:8083",
},
"2": {
RtmpAddr: "127.0.0.1:19550",
ApiAddr: "127.0.0.1:8283",
},
},
PullSecretParam: "lal_cluster_inner_pull=1",
ServerTimeoutSec: 30,
}

var dataManager datamanager.DataManger

func OnPubStartHandler(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -100,21 +88,7 @@ func OnSubStartHandler(w http.ResponseWriter, r *http.Request) {
// 演示通过流名称踢掉session,服务于鉴权等场景
// 业务方真正使用时,可以通过流名称、用户IP、URL参数等信息,来判断是否需要踢掉session
if info.StreamName == "cheftestkick" {
reqServer, exist := config.ServerId2Server[info.ServerId]
if !exist {
nazalog.Errorf("[%s] req server id invalid.", id)
return
}

url := fmt.Sprintf("http://%s/api/ctrl/kick_session", reqServer.ApiAddr)
var b base.ApiCtrlKickSessionReq
b.StreamName = info.StreamName
b.SessionId = info.SessionId

nazalog.Infof("[%s] ctrl kick out session. send to %s with %+v", id, reqServer.ApiAddr, b)
if _, err := nazahttp.PostJson(url, b, nil); err != nil {
nazalog.Errorf("[%s] post json error. err=%+v", id, err)
}
kickSession(info.ServerId, info.StreamName, info.SessionId)
return
}

Expand All @@ -126,7 +100,7 @@ func OnSubStartHandler(w http.ResponseWriter, r *http.Request) {
}
// 2. 汇报的节点已经存在输入流,不需要触发
if info.HasInSession {
nazalog.Infof("[%s] in not empty, ignore.", id)
nazalog.Infof("[%s] has in session in the same node, ignore.", id)
return
}

Expand All @@ -148,6 +122,7 @@ func OnSubStartHandler(w http.ResponseWriter, r *http.Request) {
nazalog.Assert(true, exist)

// 向汇报节点,发送pull级联拉流的命令,其中包含pub所在节点信息
// TODO(chef): start_relay_pull封装成函数,所有的http请求都应该封装成函数 202405
// TODO(chef): 还没有测试新的接口start_relay_pull,只是保证可以编译通过
url := fmt.Sprintf("http://%s/api/ctrl/start_relay_pull", reqServer.ApiAddr)
var b base.ApiCtrlStartRelayPullReq
Expand Down Expand Up @@ -193,22 +168,135 @@ func OnUpdateHandler(w http.ResponseWriter, r *http.Request) {
}
}
dataManager.UpdatePub(info.ServerId, streamNameList)

if config.MaxSubSessionPerIp > 0 {
ip2SubSessions := make(map[string][]base.StatSub)
sessionId2StreamName := make(map[string]string)
for _, g := range info.Groups {
for _, sub := range g.StatSubs {
host, _, err := net.SplitHostPort(sub.RemoteAddr)
if err != nil {
nazalog.Warnf("split host port failed. remote addr=%s", sub.RemoteAddr)
continue
}
ip2SubSessions[host] = append(ip2SubSessions[host], sub)
sessionId2StreamName[sub.SessionId] = g.StreamName
}
}
for ip, subs := range ip2SubSessions {
if len(subs) > config.MaxSubSessionPerIp {
nazalog.Debugf("close session. ip=%s, session count=%d", ip, len(subs))
for _, sub := range subs {
if sub.Protocol == base.SessionProtocolHlsStr {
host, _, err := net.SplitHostPort(sub.RemoteAddr)
if err != nil {
nazalog.Warnf("split host port failed. remote addr=%s", sub.RemoteAddr)
continue
}
addIpBlacklist(info.ServerId, host, 60)
} else {
kickSession(info.ServerId, sessionId2StreamName[sub.SessionId], sub.SessionId)
}
}
}
}
}

if config.MaxSubDurationSec > 0 {
now := time.Now()
for _, g := range info.Groups {
for _, sub := range g.StatSubs {
st, err := base.ParseReadableTime(sub.StartTime)
if err != nil {
nazalog.Warnf("parse readable time failed. start time=%s, err=%+v", sub.StartTime, err)
continue
}
diff := int(now.Sub(st).Seconds())
if diff > config.MaxSubDurationSec {
nazalog.Infof("close session. sub session start time=%s, diff=%d", sub.StartTime, diff)
if sub.Protocol == base.SessionProtocolHlsStr {
host, _, err := net.SplitHostPort(sub.RemoteAddr)
if err != nil {
nazalog.Warnf("split host port failed. remote addr=%s", sub.RemoteAddr)
continue
}
addIpBlacklist(info.ServerId, host, 60)
} else {
kickSession(info.ServerId, g.StreamName, sub.SessionId)
}
}
}
}
}
}

func logHandler(w http.ResponseWriter, r *http.Request) {
b, _ := io.ReadAll(r.Body)
nazalog.Infof("r=%+v, body=%s", r, b)
}

func kickSession(serverId, streamName, sessionId string) {
reqServer, exist := config.ServerId2Server[serverId]
if !exist {
nazalog.Errorf("[%s] req server id invalid.", serverId)
return
}

url := fmt.Sprintf("http://%s/api/ctrl/kick_session", reqServer.ApiAddr)
var b base.ApiCtrlKickSessionReq
b.StreamName = streamName
b.SessionId = sessionId

nazalog.Infof("[%s] kickSession. send to %s with %+v", serverId, reqServer.ApiAddr, b)
if _, err := nazahttp.PostJson(url, b, nil); err != nil {
nazalog.Errorf("[%s] post json error. err=%+v", serverId, err)
}
return
}

func addIpBlacklist(serverId, ip string, durationSec int) {
reqServer, exist := config.ServerId2Server[serverId]
if !exist {
nazalog.Errorf("[%s] req server id invalid.", serverId)
return
}

url := fmt.Sprintf("http://%s/api/ctrl/add_ip_blacklist", reqServer.ApiAddr)
var b base.ApiCtrlAddIpBlacklistReq
b.Ip = ip
b.DurationSec = durationSec

nazalog.Infof("[%s] addIpBlacklist. send to %s with %+v", serverId, reqServer.ApiAddr, b)
if _, err := nazahttp.PostJson(url, b, nil); err != nil {
nazalog.Errorf("[%s] post json error. err=%+v", serverId, err)
}
return
}

func parseFlag() string {
cf := flag.String("c", "", "specify conf file")
flag.Parse()
return *cf
}

func main() {
_ = nazalog.Init(func(option *nazalog.Option) {
option.AssertBehavior = nazalog.AssertFatal
})
defer nazalog.Sync()
base.LogoutStartInfo()

confFilename := parseFlag()
rawContent := base.WrapReadConfigFile(confFilename, DefaultConfFilenameList, nil)
if err := json.Unmarshal(rawContent, &config); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "unmarshal conf file failed. raw content=%s err=%+v", rawContent, err)
base.OsExitAndWaitPressIfWindows(1)
}
nazalog.Infof("config=%+v", config)

dataManager = datamanager.NewDataManager(datamanager.DmtMemory, config.ServerTimeoutSec)

nazalog.Infof("> start http server. addr=%s", config.ListenAddr)
l, err := net.Listen("tcp", config.ListenAddr)
nazalog.Assert(nil, err)

Expand Down
17 changes: 17 additions & 0 deletions conf/dispatch.conf.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"listen_addr": ":10101",
"servers": {
"1": {
"rtmp_addr": "127.0.0.1:19350",
"api_addr": "127.0.0.1:8083"
},
"2": {
"rtmp_addr": "127.0.0.1:19550",
"api_addr": "127.0.0.1:8283"
}
},
"pull_secret_param": "lal_cluster_inner_pull=1",
"server_timeout_sec": 30,
"max_sub_session_per_ip": -1,
"max_sub_duration_sec": -1
}

0 comments on commit da5730f

Please sign in to comment.