Skip to content

Commit

Permalink
add missing Prometheus exports (bluenviron#2620, bluenviron#2619):
Browse files Browse the repository at this point in the history
paths_bytes_sent, srt_conns, srt_conns_bytes_received, srt_conns_bytes_sent
  • Loading branch information
rse committed Nov 2, 2023
1 parent 15f1c73 commit 4401b6a
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 2 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1433,6 +1433,7 @@ Obtaining:
# metrics of every path
paths{name="[path_name]",state="[state]"} 1
paths_bytes_received{name="[path_name]",state="[state]"} 1234
paths_bytes_sent{name="[path_name]",state="[state]"} 1234

# metrics of every HLS muxer
hls_muxers{name="[name]"} 1
Expand Down Expand Up @@ -1463,6 +1464,11 @@ rtmp_conns{id="[id]",state="[state]"} 1
rtmp_conns_bytes_received{id="[id]",state="[state]"} 1234
rtmp_conns_bytes_sent{id="[id]",state="[state]"} 187

# metrics of every SRT connection
srt_conns{id="[id]",state="[state]"} 1
srt_conns_bytes_received{id="[id]",state="[state]"} 1234
srt_conns_bytes_sent{id="[id]",state="[state]"} 187

# metrics of every WebRTC session
webrtc_sessions{id="[id]"} 1
webrtc_sessions_bytes_received{id="[id]",state="[state]"} 1234
Expand Down
3 changes: 3 additions & 0 deletions apidocs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,9 @@ components:
bytesReceived:
type: integer
format: int64
bytesSent:
type: integer
format: int64
readers:
type: array
items:
Expand Down
2 changes: 2 additions & 0 deletions internal/core/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func TestAPIPathsList(t *testing.T) {
Ready bool `json:"ready"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}

type pathList struct {
Expand Down Expand Up @@ -625,6 +626,7 @@ func TestAPIPathsGet(t *testing.T) {
Ready bool `json:"Ready"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}

var pathName string
Expand Down
1 change: 1 addition & 0 deletions internal/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.RunOnConnectRestart,
p.conf.RunOnDisconnect,
p.externalCmdPool,
p.metrics,
p.pathManager,
p,
)
Expand Down
25 changes: 25 additions & 0 deletions internal/core/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type metrics struct {
rtspServer apiRTSPServer
rtspsServer apiRTSPServer
rtmpServer apiRTMPServer
srtServer apiSRTServer
hlsManager apiHLSManager
webRTCManager apiWebRTCManager
}
Expand Down Expand Up @@ -96,6 +97,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
tags := "{name=\"" + i.Name + "\",state=\"" + state + "\"}"
out += metric("paths", tags, 1)
out += metric("paths_bytes_received", tags, int64(i.BytesReceived))
out += metric("paths_bytes_sent", tags, int64(i.BytesSent))
}
} else {
out += metric("paths", "", 0)
Expand Down Expand Up @@ -199,6 +201,22 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
}
}

if !interfaceIsEmpty(m.srtServer) {
data, err := m.srtServer.apiConnsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
tags := "{id=\"" + i.ID.String() + "\",state=\"" + string(i.State) + "\"}"
out += metric("srt_conns", tags, 1)
out += metric("srt_conns_bytes_received", tags, int64(i.BytesReceived))
out += metric("srt_conns_bytes_sent", tags, int64(i.BytesSent))
}
} else {
out += metric("srt_conns", "", 0)
out += metric("srt_conns_bytes_received", "", 0)
out += metric("srt_conns_bytes_sent", "", 0)
}
}

if !interfaceIsEmpty(m.webRTCManager) {
data, err := m.webRTCManager.apiSessionsList()
if err == nil && len(data.Items) != 0 {
Expand Down Expand Up @@ -254,6 +272,13 @@ func (m *metrics) rtmpServerSet(s apiRTMPServer) {
m.rtmpServer = s
}

// srtServerSet is called by srtServer.
func (m *metrics) srtServerSet(s apiSRTServer) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.srtServer = s
}

// webRTCManagerSet is called by webRTCManager.
func (m *metrics) webRTCManagerSet(s apiWebRTCManager) {
m.mutex.Lock()
Expand Down
6 changes: 6 additions & 0 deletions internal/core/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,12 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) {
}
return pa.stream.BytesReceived()
}(),
BytesSent: func() uint64 {
if pa.stream == nil {
return 0
}
return pa.stream.BytesSent()
}(),
Readers: func() []defs.APIPathSourceOrReader {
ret := []defs.APIPathSourceOrReader{}
for r := range pa.readers {
Expand Down
7 changes: 7 additions & 0 deletions internal/core/srt_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type srtServer struct {
runOnConnectRestart bool
runOnDisconnect string
externalCmdPool *externalcmd.Pool
metrics *metrics
pathManager *pathManager
parent srtServerParent

Expand Down Expand Up @@ -96,6 +97,7 @@ func newSRTServer(
runOnConnectRestart bool,
runOnDisconnect string,
externalCmdPool *externalcmd.Pool,
metrics *metrics,
pathManager *pathManager,
parent srtServerParent,
) (*srtServer, error) {
Expand All @@ -120,6 +122,7 @@ func newSRTServer(
runOnConnectRestart: runOnConnectRestart,
runOnDisconnect: runOnDisconnect,
externalCmdPool: externalCmdPool,
metrics: metrics,
pathManager: pathManager,
parent: parent,
ctx: ctx,
Expand All @@ -136,6 +139,10 @@ func newSRTServer(

s.Log(logger.Info, "listener opened on "+address+" (UDP)")

if s.metrics != nil {
s.metrics.srtServerSet(s)
}

newSRTListener(
s.ln,
&s.wg,
Expand Down
3 changes: 1 addition & 2 deletions internal/core/static_source_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ func newStaticSourceHandler(
case strings.HasPrefix(cnf.Source, "http://") ||
strings.HasPrefix(cnf.Source, "https://"):
s.instance = &hlssource.Source{
ReadTimeout: readTimeout,
Parent: s,
Parent: s,
}

case strings.HasPrefix(cnf.Source, "udp://"):
Expand Down
1 change: 1 addition & 0 deletions internal/defs/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type APIPath struct {
ReadyTime *time.Time `json:"readyTime"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
Readers []APIPathSourceOrReader `json:"readers"`
}

Expand Down
7 changes: 7 additions & 0 deletions internal/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Stream struct {
desc *description.Session

bytesReceived *uint64
bytesSent *uint64
smedias map[*description.Media]*streamMedia
mutex sync.RWMutex
rtspStream *gortsplib.ServerStream
Expand All @@ -40,6 +41,7 @@ func New(
s := &Stream{
desc: desc,
bytesReceived: new(uint64),
bytesSent: new(uint64),
}

s.smedias = make(map[*description.Media]*streamMedia)
Expand Down Expand Up @@ -75,6 +77,11 @@ func (s *Stream) BytesReceived() uint64 {
return atomic.LoadUint64(s.bytesReceived)
}

// BytesSent returns sent bytes.
func (s *Stream) BytesSent() uint64 {
return atomic.LoadUint64(s.bytesSent)
}

// RTSPStream returns the RTSP stream.
func (s *Stream) RTSPStream(server *gortsplib.Server) *gortsplib.ServerStream {
s.mutex.Lock()
Expand Down
3 changes: 3 additions & 0 deletions internal/stream/stream_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,19 +89,22 @@ func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u uni

if s.rtspStream != nil {
for _, pkt := range u.GetRTPPackets() {
atomic.AddUint64(s.bytesSent, unitSize(u))
s.rtspStream.WritePacketRTPWithNTP(medi, pkt, u.GetNTP()) //nolint:errcheck
}
}

if s.rtspsStream != nil {
for _, pkt := range u.GetRTPPackets() {
atomic.AddUint64(s.bytesSent, unitSize(u))
s.rtspsStream.WritePacketRTPWithNTP(medi, pkt, u.GetNTP()) //nolint:errcheck
}
}

for writer, cb := range sf.readers {
ccb := cb
writer.Push(func() error {
atomic.AddUint64(s.bytesSent, unitSize(u))
return ccb(u)
})
}
Expand Down
Binary file added mediamtx
Binary file not shown.

0 comments on commit 4401b6a

Please sign in to comment.