Skip to content

Commit

Permalink
Prometheus metrics for recording feature:
Browse files Browse the repository at this point in the history
- recording_save_errors: number of errors during save to the recording OS
- recording_saved_segments: number of segments saved to the recording OS
- recording_save_latency: time takes to save segment to OS
- also marked "segment_transcoded_appeared_total" and  "segment_source_appeared_total" metrics with with "segment type" tag which
  can be "regular" (streams with recoding disabled) and "rec" (streams with recording enabled)

Fixes #1709
  • Loading branch information
darkdarkdragon committed Dec 20, 2020
1 parent 00385f6 commit 9703cec
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 15 deletions.
14 changes: 12 additions & 2 deletions core/playlistmanager.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/golang/glog"
"github.com/livepeer/go-livepeer/common"
"github.com/livepeer/go-livepeer/drivers"
"github.com/livepeer/go-livepeer/monitor"
ffmpeg "github.com/livepeer/lpms/ffmpeg"
"github.com/livepeer/m3u8"
)
Expand Down Expand Up @@ -253,8 +254,17 @@ func (mgr *BasicPlaylistManager) FlushRecord() {
go func(name string, data []byte) {
now := time.Now()
_, err := mgr.recordSession.SaveData(name, b, nil)
glog.V(common.VERBOSE).Infof("Saving json playlist name=%s size=%d bytes took=%s err=%v", name,
len(b), time.Since(now), err)
took := time.Since(now)
if err != nil {
glog.Errorf("Error saving json playlist name=%s bytes=%d took=%s err=%v", name,
len(b), took, err)
} else {
glog.V(common.VERBOSE).Infof("Saving json playlist name=%s bytes=%d took=%s err=%v", name,
len(b), took, err)
}
if monitor.Enabled {
monitor.RecordingPlaylistSaved(took, err)
}
}(mgr.jsonList.name, b)
if mgr.jsonList.DurationMs > jsonPlaylistRotationInterval {
mgr.jsonList = NewJSONPlaylist()
Expand Down
84 changes: 76 additions & 8 deletions monitor/census.go
Expand Up @@ -60,6 +60,9 @@ const (
Broadcaster NodeType = "bctr"
Transcoder NodeType = "trcr"
Redeemer NodeType = "rdmr"

segTypeRegular = "regular"
segTypeRec = "recorded" // segment in the stream for which recording is enabled
)

// Enabled true if metrics was enabled in command line
Expand All @@ -85,6 +88,7 @@ type (
kSender tag.Key
kRecipient tag.Key
kManifestID tag.Key
kSegmentType tag.Key
mSegmentSourceAppeared *stats.Int64Measure
mSegmentEmerged *stats.Int64Measure
mSegmentEmergedUnprocessed *stats.Int64Measure
Expand Down Expand Up @@ -124,6 +128,9 @@ type (
mRealtimeHalf *stats.Int64Measure
mRealtimeSlow *stats.Int64Measure
mTranscodeScore *stats.Float64Measure
mRecordingSaveLatency *stats.Float64Measure
mRecordingSaveErrors *stats.Int64Measure
mRecordingSavedSegments *stats.Int64Measure

// Metrics for sending payments
mTicketValueSent *stats.Float64Measure
Expand Down Expand Up @@ -197,6 +204,7 @@ func InitCensus(nodeType NodeType, version string) {
census.kSender = tag.MustNewKey("sender")
census.kRecipient = tag.MustNewKey("recipient")
census.kManifestID = tag.MustNewKey("manifestID")
census.kSegmentType = tag.MustNewKey("seg_type")
census.ctx, err = tag.New(ctx, tag.Insert(census.kNodeType, string(nodeType)), tag.Insert(census.kNodeID, NodeID))
if err != nil {
glog.Fatal("Error creating context", err)
Expand Down Expand Up @@ -242,6 +250,10 @@ func InitCensus(nodeType NodeType, version string) {
census.mAuthWebhookTime = stats.Float64("auth_webhook_time_milliseconds", "Authentication webhook execution time", "ms")
census.mSourceSegmentDuration = stats.Float64("source_segment_duration_seconds", "Source segment's duration", "sec")
census.mTranscodeScore = stats.Float64("transcode_score", "Ratio of source segment duration vs. transcode time", "rat")
census.mRecordingSaveLatency = stats.Float64("recording_save_latency",
"How long it takes to save segment to the OS", "sec")
census.mRecordingSaveErrors = stats.Int64("recording_save_errors", "Number of errors during save to the recording OS", "tot")
census.mRecordingSavedSegments = stats.Int64("recording_saved_segments", "Number of segments saved to the recording OS", "tot")

// Metrics for sending payments
census.mTicketValueSent = stats.Float64("ticket_value_sent", "TicketValueSent", "gwei")
Expand Down Expand Up @@ -372,7 +384,7 @@ func InitCensus(nodeType NodeType, version string) {
Name: "segment_source_appeared_total",
Measure: census.mSegmentSourceAppeared,
Description: "SegmentSourceAppeared",
TagKeys: append([]tag.Key{census.kProfile}, baseTags...),
TagKeys: append([]tag.Key{census.kProfile, census.kSegmentType}, baseTags...),
Aggregation: view.Count(),
},
{
Expand Down Expand Up @@ -435,7 +447,7 @@ func InitCensus(nodeType NodeType, version string) {
Name: "segment_transcoded_appeared_total",
Measure: census.mSegmentTranscodedAppeared,
Description: "SegmentTranscodedAppeared",
TagKeys: append([]tag.Key{census.kProfile}, baseTags...),
TagKeys: append([]tag.Key{census.kProfile, census.kSegmentType}, baseTags...),
Aggregation: view.Count(),
},
{
Expand Down Expand Up @@ -480,6 +492,27 @@ func InitCensus(nodeType NodeType, version string) {
TagKeys: append([]tag.Key{census.kProfiles}, baseTags...),
Aggregation: view.Distribution(0, .5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5, 10, 15, 20, 40),
},
{
Name: "recording_save_latency",
Measure: census.mRecordingSaveLatency,
Description: "How long it takes to save segment to the OS",
TagKeys: baseTags,
Aggregation: view.Distribution(0, .500, .75, 1.000, 1.500, 2.000, 2.500, 3.000, 3.500, 4.000, 4.500, 5.000, 10.000, 30.000),
},
{
Name: "recording_save_errors",
Measure: census.mRecordingSaveErrors,
Description: "Number of errors during save to the recording OS",
TagKeys: baseTags,
Aggregation: view.Count(),
},
{
Name: "recording_saved_segments",
Measure: census.mRecordingSavedSegments,
Description: "Number of segments saved to the recording OS",
TagKeys: baseTags,
Aggregation: view.Count(),
},
{
Name: "upload_time_seconds",
Measure: census.mUploadTime,
Expand Down Expand Up @@ -921,20 +954,29 @@ func (cen *censusMetricsCounter) segmentEmerged(nonce, seqNo uint64, profilesNum
stats.Record(cen.ctx, cen.mSegmentEmergedUnprocessed.M(1))
}

func SourceSegmentAppeared(nonce, seqNo uint64, manifestID, profile string) {
func SourceSegmentAppeared(nonce, seqNo uint64, manifestID, profile string, recordingEnabled bool) {
glog.V(logLevel).Infof("Logging SourceSegmentAppeared... nonce=%d manifestID=%s seqNo=%d profile=%s", nonce,
manifestID, seqNo, profile)
census.segmentSourceAppeared(nonce, seqNo, profile)
census.segmentSourceAppeared(nonce, seqNo, profile, recordingEnabled)
}

func (cen *censusMetricsCounter) segmentSourceAppeared(nonce, seqNo uint64, profile string) {
func (cen *censusMetricsCounter) segmentSourceAppeared(nonce, seqNo uint64, profile string, recordingEnabled bool) {
cen.lock.Lock()
defer cen.lock.Unlock()
ctx, err := tag.New(cen.ctx, tag.Insert(census.kProfile, profile))
if err != nil {
glog.Error("Error creating context", err)
return
}
segType := segTypeRegular
if recordingEnabled {
segType = segTypeRec
}
ctx, err = tag.New(ctx, tag.Insert(cen.kSegmentType, segType))
if err != nil {
glog.Error("Error creating context", err)
return
}
stats.Record(ctx, cen.mSegmentSourceAppeared.M(1))
}

Expand Down Expand Up @@ -1105,12 +1147,29 @@ func SegmentFullyTranscoded(nonce, seqNo uint64, profiles string, errCode Segmen
census.sendSuccess()
}

func TranscodedSegmentAppeared(nonce, seqNo uint64, profile string) {
func RecordingPlaylistSaved(dur time.Duration, err error) {
if err != nil {
stats.Record(census.ctx, census.mRecordingSaveErrors.M(1))
} else {
stats.Record(census.ctx, census.mRecordingSaveLatency.M(dur.Seconds()))
}
}

func RecordingSegmentSaved(dur time.Duration, err error) {
if err != nil {
stats.Record(census.ctx, census.mRecordingSaveErrors.M(1))
} else {
stats.Record(census.ctx, census.mRecordingSaveLatency.M(dur.Seconds()))
stats.Record(census.ctx, census.mRecordingSavedSegments.M(1))
}
}

func TranscodedSegmentAppeared(nonce, seqNo uint64, profile string, recordingEnabled bool) {
glog.V(logLevel).Infof("Logging LogTranscodedSegmentAppeared... nonce=%d seqNo=%d profile=%s", nonce, seqNo, profile)
census.segmentTranscodedAppeared(nonce, seqNo, profile)
census.segmentTranscodedAppeared(nonce, seqNo, profile, recordingEnabled)
}

func (cen *censusMetricsCounter) segmentTranscodedAppeared(nonce, seqNo uint64, profile string) {
func (cen *censusMetricsCounter) segmentTranscodedAppeared(nonce, seqNo uint64, profile string, recordingEnabled bool) {
cen.lock.Lock()
defer cen.lock.Unlock()
ctx, err := tag.New(cen.ctx, tag.Insert(cen.kProfile, profile))
Expand All @@ -1126,6 +1185,15 @@ func (cen *censusMetricsCounter) segmentTranscodedAppeared(nonce, seqNo uint64,
stats.Record(ctx, cen.mTranscodeLatency.M(latency.Seconds()))
}

segType := segTypeRegular
if recordingEnabled {
segType = segTypeRec
}
ctx, err = tag.New(ctx, tag.Insert(cen.kSegmentType, segType))
if err != nil {
glog.Error("Error creating context", err)
return
}
stats.Record(ctx, cen.mSegmentTranscodedAppeared.M(1))
}

Expand Down
19 changes: 14 additions & 5 deletions server/broadcast.go
Expand Up @@ -306,12 +306,17 @@ func processSegment(cxn *rtmpConnection, seg *stream.HLSSegment) ([]string, erro
go func() {
now := time.Now()
uri, err := drivers.SaveRetried(ros, name, seg.Data, map[string]string{"duration": segDurMs}, 2)
took := time.Since(now)
if err != nil {
glog.Errorf("Error saving nonce=%d manifestID=%s name=%s to record store err=%v", nonce, mid, name, err)
glog.Errorf("Error saving nonce=%d manifestID=%s name=%s bytes=%d to record store err=%v",
nonce, mid, name, len(seg.Data), err)
} else {
cpl.InsertHLSSegmentJSON(vProfile, seg.SeqNo, uri, seg.Duration)
glog.Infof("Successfully saved nonce=%d manifestID=%s name=%s bytes=%d to record store took=%s",
nonce, mid, name, len(seg.Data), time.Since(now))
nonce, mid, name, len(seg.Data), took)
}
if monitor.Enabled {
monitor.RecordingSegmentSaved(took, err)
}
}()
}
Expand All @@ -328,7 +333,7 @@ func processSegment(cxn *rtmpConnection, seg *stream.HLSSegment) ([]string, erro
}
err = cpl.InsertHLSSegment(vProfile, seg.SeqNo, uri, seg.Duration)
if monitor.Enabled {
monitor.SourceSegmentAppeared(nonce, seg.SeqNo, string(mid), vProfile.Name)
monitor.SourceSegmentAppeared(nonce, seg.SeqNo, string(mid), vProfile.Name, ros != nil)
}
if err != nil {
glog.Errorf("Error inserting segment nonce=%d seqNo=%d: %v", nonce, seg.SeqNo, err)
Expand Down Expand Up @@ -495,12 +500,16 @@ func transcodeSegment(cxn *rtmpConnection, seg *stream.HLSSegment, name string,
segDurMs := getSegDurMsString(seg)
now := time.Now()
uri, err := drivers.SaveRetried(bros, name, data, map[string]string{"duration": segDurMs}, 2)
took := time.Since(now)
if err != nil {
glog.Errorf("Error saving nonce=%d manifestID=%s name=%s to record store err=%v", nonce, cxn.mid, name, err)
} else {
cpl.InsertHLSSegmentJSON(&profile, seg.SeqNo, uri, seg.Duration)
glog.Infof("Successfully saved nonce=%d manifestID=%s name=%s size=%d bytes to record store took=%s",
nonce, cxn.mid, name, len(data), time.Since(now))
nonce, cxn.mid, name, len(data), took)
}
if monitor.Enabled {
monitor.RecordingSegmentSaved(took, err)
}
}()
}
Expand Down Expand Up @@ -536,7 +545,7 @@ func transcodeSegment(cxn *rtmpConnection, seg *stream.HLSSegment, name string,
segLock.Unlock()

if monitor.Enabled {
monitor.TranscodedSegmentAppeared(nonce, seg.SeqNo, profile.Name)
monitor.TranscodedSegmentAppeared(nonce, seg.SeqNo, profile.Name, bros != nil)
}
}

Expand Down

0 comments on commit 9703cec

Please sign in to comment.