Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use private netFD.Writev. #27

Merged
merged 15 commits into from
Dec 18, 2015
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ Why rewrite the SRS to go-oryx:
1. v0.1.7 Use agent(source+channel+sink) to build complex stream river.
1. v0.1.8 Supports Publish and Play VP6 RTMP stream.
1. v0.1.9 Supports Delivery VP6/H.264 and Speex/AAC/MP3/Nellymoser codec.
1. v0.1.10 Supports 10k(8CPUs) play clients over RTMP.
1. v0.1.10 Supports 10k(8CPUs) for RTMP players.
1. v0.1.11 Supports 10k(4CPUs) for RTMP players.
1. v0.1.12 Supports 10k(3CPUs) for RTMP players.
1. v0.1.13 Supports 10k(2CPUs) for RTMP players.
1. [dev] Supports gop-cache and drop frame strategy.
1. [dev] Supports Connection Oriented Traceable log.

Expand Down
2 changes: 1 addition & 1 deletion agent/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ var AgentControlRepublishError = errors.New("agent republish")
// whether control error object.
func IsControlError(err error) bool {
return err == AgentControlRepublishError
}
}
19 changes: 18 additions & 1 deletion agent/rtmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,13 @@ func (v *Rtmp) cycle(conn *protocol.RtmpConnection) (err error) {
// set chunk size to larger.
// set the chunk size before any larger response greater than 128,
// to make OBS happy, @see https://github.com/ossrs/srs/issues/454
// TODO: FIXME: support set chunk size.
if err = conn.SetChunkSize(core.Conf.ChunkSize); err != nil {
if !core.IsNormalQuit(err) {
core.Error.Println("rtmp set chunk size failed. err is", err)
}
return
}
core.Info.Println("set chunk size to", core.Conf.ChunkSize)

// response the client connect ok and onBWDone.
if err = conn.ResponseConnectApp(); err != nil {
Expand Down Expand Up @@ -235,6 +241,10 @@ func (v *Rtmp) cycle(conn *protocol.RtmpConnection) (err error) {
core.Error.Println("reparse request failed. err is", err)
return
}
if err = conn.OnUrlParsed(); err != nil {
core.Error.Println("notify url parsed failed. err is", err)
return
}

// security check
// TODO: FIXME: implements it.
Expand All @@ -259,6 +269,9 @@ func (v *Rtmp) cycle(conn *protocol.RtmpConnection) (err error) {
r.Vhost = vhost.Name
}

// set chunk_size on vhost level.
// TODO: FIXME: support set chunk size.

var agent core.Agent
if conn.Req.Type.IsPlay() {
if agent, err = Manager.NewRtmpPlayAgent(conn, v.wc); err != nil {
Expand Down Expand Up @@ -309,6 +322,10 @@ func (v *Rtmp) OnReloadGlobal(scope int, cc, pc *core.Config) (err error) {
return
}

func (v *Rtmp) OnReloadVhost(vhost string, scope int, cc, pc *core.Config) (err error) {
return
}

// rtmp play agent, to serve the player or edge.
type RtmpPlayAgent struct {
conn *protocol.RtmpConnection
Expand Down
25 changes: 25 additions & 0 deletions app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,27 @@ func (s *Server) initializeRuntime() (err error) {
return
}

// show gc trace.
go func() {
stat := &debug.GCStats{}

for {
if core.Conf.Go.GcTrace > 0 {
pgc := stat.NumGC
debug.ReadGCStats(stat)
if len(stat.Pause) > 3 {
stat.Pause = append([]time.Duration{}, stat.Pause[:3]...)
}
if pgc < stat.NumGC {
core.Trace.Println("gc", stat.NumGC, stat.PauseTotal, stat.Pause, stat.PauseQuantiles)
}
time.Sleep(time.Duration(core.Conf.Go.GcTrace) * time.Second)
} else {
time.Sleep(3 * time.Second)
}
}
}()

return
}

Expand Down Expand Up @@ -401,3 +422,7 @@ func (s *Server) OnReloadGlobal(scope int, cc, pc *core.Config) (err error) {

return
}

func (v *Server) OnReloadVhost(vhost string, scope int, cc, pc *core.Config) (err error) {
return
}
24 changes: 23 additions & 1 deletion conf/full.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
// @remark: donot support reload.
// default: true
"daemon": true,
// the default chunk size is 128, max is 65536,
// some client does not support chunk size change,
// however, most clients supports it and it can improve
// performance about 10%.
// default: 60000
"chunk_size": 60000,
// go runtime section.
"go": {
// the interval for gc, in seconds. 0 to not set.
Expand All @@ -18,6 +24,12 @@
// the percent for gc. 0 to not set.
// default: 0
"gc_percent": 0,
// the interval for gc trace, in seconds. 0 to disable.
// default: 0
"gc_trace": 0,
// whether use private writev.
// default: false
"writev": false,
// the cpu profile filename, for example
// go tool pprof oryx cpu.prof
// default or empty string to ignore.
Expand Down Expand Up @@ -94,7 +106,17 @@
// for example, user use ip to access the stream: rtmp://192.168.1.2/live/livestream.
// for which cannot identify the required vhost.
{
"name": "__defaultVhost__"
"name": "__defaultVhost__",
// for play client, both RTMP and other stream clients,
// for instance, the HTTP FLV stream clients.
"play": {
// set the MW(merged-write) latency in ms.
// SRS always set mw on, so we just set the latency value.
// the latency of stream >= mw_latency + mr_latency
// the value recomment is [300, 1800]
// default: 350
"mw_latency": 350
}
}
]
}
68 changes: 65 additions & 3 deletions core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,19 @@ import (

// the scope for reload.
const (
// global specified.
ReloadWorkers = iota
ReloadLog
ReloadListen
ReloadCpuProfile
ReloadGcPercent
// vhost specified.
ReloadMwLatency
)

// merged write latency, the group messages to send.
const defaultMwLatency = 350

// the reload handler,
// the client which care about the reload event,
// must implements this interface and then register itself
Expand All @@ -49,7 +55,13 @@ type ReloadHandler interface {
// @param scope defined in const ReloadXXX.
// @param cc the current loaded config, GsConfig.
// @param pc the previous old config.
OnReloadGlobal(scope int, cc, pc *Config) error
OnReloadGlobal(scope int, cc, pc *Config) (err error)
// when reload the vhost scopes,
// for example, the Vhost.Play.MwLatency
// @param scope defined in const ReloadXXX.
// @param cc the current loaded config, GsConfig.
// @param pc the previous old config.
OnReloadVhost(vhost string, scope int, cc, pc *Config) (err error)
}

// the reader support c++-style comment,
Expand Down Expand Up @@ -222,6 +234,11 @@ func (v *Reader) Read(p []byte) (n int, err error) {
// the vhost section in config.
type Vhost struct {
Name string `json:"name"`
Play *Play `json:"play"`
}

type Play struct {
MwLatency int `json:"mw_latency`
}

// the config for this application,
Expand All @@ -233,11 +250,14 @@ type Config struct {
Workers int `json:"workers"` // the number of cpus to use

// the rtmp global section.
Listen int `json:"listen"` // the system service RTMP listen port
Daemon bool `json:"daemon"` // whether enabled the daemon for unix-like os
Listen int `json:"listen"` // the system service RTMP listen port
Daemon bool `json:"daemon"` // whether enabled the daemon for unix-like os
ChunkSize int `json:"chunk_size"` // the output chunk size. [128, 65535].

// the go section.
Go struct {
Writev bool `json:"writev"` // whether use private writev.
GcTrace int `json:"gc_trace"` // the gc trace interval in seconds.
GcInterval int `json:"gc_interval"` // the gc interval in seconds.
GcPercent int `json:"gc_percent"` // the gc percent.
CpuProfile string `json:"cpu_profile"` // the cpu profile file.
Expand Down Expand Up @@ -287,6 +307,7 @@ func NewConfig() *Config {
c.Listen = RtmpListen
c.Workers = 0
c.Daemon = true
c.ChunkSize = 60000
c.Go.GcInterval = 0

c.Heartbeat.Enabled = false
Expand Down Expand Up @@ -357,6 +378,18 @@ func (c *Config) reparse() (err error) {
c.Go.GcPercent = 100
}

// default values for vhosts.
for _, v := range c.Vhosts {
if v.Play != nil {
if v.Play.MwLatency == 0 {
// how many messages send in a group.
// one message is about 14ms for RTMP audio and video.
// @remark 0 to disable group messages to send one by one.
v.Play.MwLatency = defaultMwLatency
}
}
}

return
}

Expand All @@ -376,6 +409,9 @@ func (c *Config) Validate() error {
if c.Listen <= 0 || c.Listen > 65535 {
return fmt.Errorf("listen must in (0, 65535], actual is %v", c.Listen)
}
if c.ChunkSize < 128 || c.ChunkSize > 65535 {
return fmt.Errorf("chunk_size must in [128, 65535], actual is %v", c.ChunkSize)
}

if c.Go.GcInterval < 0 || c.Go.GcInterval > 24*3600 {
return fmt.Errorf("go gc_interval must in [0, 24*3600], actual is %v", c.Go.GcInterval)
Expand Down Expand Up @@ -511,6 +547,20 @@ func (pc *Config) Reload(cc *Config) (err error) {
Info.Println("reload ignore gc percent")
}

// vhost specified.
for k, cv := range cc.vhosts {
if pv := pc.vhosts[k]; cv.Play != nil && pv.Play != nil && cv.Play.MwLatency != pv.Play.MwLatency {
for _, h := range cc.reloadHandlers {
if err = h.OnReloadVhost(k, ReloadMwLatency, cc, pc); err != nil {
return
}
}
Trace.Println("reload apply vhost.play.mw-latency ok")
} else {
Info.Println("reload ignore vhost.play.mw-latency")
}
}

return
}

Expand All @@ -525,3 +575,15 @@ func (c *Config) Vhost(name string) (*Vhost, error) {

return nil, VhostNotFoundError
}

func (c *Config) VhostGroupMessages(vhost string) (n int, err error) {
var v *Vhost
if v, err = c.Vhost(vhost); err != nil {
return
}

if v.Play == nil {
return defaultMwLatency / 14, nil
}
return v.Play.MwLatency / 14, nil
}
2 changes: 1 addition & 1 deletion core/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ package core

import (
"errors"
"net"
"io"
"net"
)

// the quit error, used for goroutine to return.
Expand Down
2 changes: 1 addition & 1 deletion core/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import "fmt"
const (
major = 0
minor = 1
reversion = 10
reversion = 13
)

func Version() string {
Expand Down
5 changes: 0 additions & 5 deletions protocol/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,4 @@ const (

// the input cache, to read from network and put in it.
RtmpInCache = 16

// how many messages send in a group.
// one message is about 15ms for RTMP audio and video.
// @remark 0 to disable group messages to send one by one.
RtmpGroupMessageCount = 8
)
Loading