diff --git a/.changeset/tidy-goats-marry.md b/.changeset/tidy-goats-marry.md new file mode 100644 index 000000000..5b742edb5 --- /dev/null +++ b/.changeset/tidy-goats-marry.md @@ -0,0 +1,5 @@ +--- +"@livekit/protocol": patch +--- + +add duration seconds reporting diff --git a/observability/agentsobs/gen_reporter.go b/observability/agentsobs/gen_reporter.go index 6b9587e65..314786d85 100644 --- a/observability/agentsobs/gen_reporter.go +++ b/observability/agentsobs/gen_reporter.go @@ -6,7 +6,7 @@ import ( "time" ) -const Version_66BRGG8 = true +const Version_IIJ8EL8 = true type KeyResolver interface { Resolve(string) @@ -105,6 +105,7 @@ type jobReporter interface { ReportWorkerKind(v WorkerKind) ReportStatus(v JobStatus) ReportDuration(v uint32) + ReportDurationSeconds(v uint32) ReportDurationMinutes(v uint8) ReportStartTime(v time.Time) ReportEndTime(v time.Time) diff --git a/observability/agentsobs/gen_reporter_noop.go b/observability/agentsobs/gen_reporter_noop.go index 832a13829..efdf08802 100644 --- a/observability/agentsobs/gen_reporter_noop.go +++ b/observability/agentsobs/gen_reporter_noop.go @@ -166,6 +166,7 @@ func (r *noopJobReporter) ReportKind(v JobKind) {} func (r *noopJobReporter) ReportWorkerKind(v WorkerKind) {} func (r *noopJobReporter) ReportStatus(v JobStatus) {} func (r *noopJobReporter) ReportDuration(v uint32) {} +func (r *noopJobReporter) ReportDurationSeconds(v uint32) {} func (r *noopJobReporter) ReportDurationMinutes(v uint8) {} func (r *noopJobReporter) ReportStartTime(v time.Time) {} func (r *noopJobReporter) ReportEndTime(v time.Time) {} @@ -177,12 +178,13 @@ func (t *noopJobTx) Worker() WorkerTx { return &noopWorkerTx{} } -func (t *noopJobTx) ReportRoomSessionID(v string) {} -func (t *noopJobTx) ReportKind(v JobKind) {} -func (t *noopJobTx) ReportWorkerKind(v WorkerKind) {} -func (t *noopJobTx) ReportStatus(v JobStatus) {} -func (t *noopJobTx) ReportDuration(v uint32) {} -func (t *noopJobTx) ReportDurationMinutes(v uint8) {} -func (t *noopJobTx) ReportStartTime(v time.Time) {} -func (t *noopJobTx) ReportEndTime(v time.Time) {} -func (t *noopJobTx) ReportJoinLatency(v uint32) {} +func (t *noopJobTx) ReportRoomSessionID(v string) {} +func (t *noopJobTx) ReportKind(v JobKind) {} +func (t *noopJobTx) ReportWorkerKind(v WorkerKind) {} +func (t *noopJobTx) ReportStatus(v JobStatus) {} +func (t *noopJobTx) ReportDuration(v uint32) {} +func (t *noopJobTx) ReportDurationSeconds(v uint32) {} +func (t *noopJobTx) ReportDurationMinutes(v uint8) {} +func (t *noopJobTx) ReportStartTime(v time.Time) {} +func (t *noopJobTx) ReportEndTime(v time.Time) {} +func (t *noopJobTx) ReportJoinLatency(v uint32) {} diff --git a/observability/agentsv2obs/gen_reporter.go b/observability/agentsv2obs/gen_reporter.go index 5989a6192..3407ab7a4 100644 --- a/observability/agentsv2obs/gen_reporter.go +++ b/observability/agentsv2obs/gen_reporter.go @@ -6,7 +6,7 @@ import ( "time" ) -const Version_4FC3258 = true +const Version_Q2H41LG = true type KeyResolver interface { Resolve(string) @@ -122,6 +122,7 @@ type jobReporter interface { ReportWorkerKind(v WorkerKind) ReportStatus(v JobStatus) ReportDuration(v uint32) + ReportDurationSeconds(v uint32) ReportDurationMinutes(v uint8) ReportStartTime(v time.Time) ReportEndTime(v time.Time) diff --git a/observability/agentsv2obs/gen_reporter_noop.go b/observability/agentsv2obs/gen_reporter_noop.go index e8324e47d..5694bd54c 100644 --- a/observability/agentsv2obs/gen_reporter_noop.go +++ b/observability/agentsv2obs/gen_reporter_noop.go @@ -190,6 +190,7 @@ func (r *noopJobReporter) ReportKind(v JobKind) {} func (r *noopJobReporter) ReportWorkerKind(v WorkerKind) {} func (r *noopJobReporter) ReportStatus(v JobStatus) {} func (r *noopJobReporter) ReportDuration(v uint32) {} +func (r *noopJobReporter) ReportDurationSeconds(v uint32) {} func (r *noopJobReporter) ReportDurationMinutes(v uint8) {} func (r *noopJobReporter) ReportStartTime(v time.Time) {} func (r *noopJobReporter) ReportEndTime(v time.Time) {} @@ -201,12 +202,13 @@ func (t *noopJobTx) Worker() WorkerTx { return &noopWorkerTx{} } -func (t *noopJobTx) ReportRoomSessionID(v string) {} -func (t *noopJobTx) ReportKind(v JobKind) {} -func (t *noopJobTx) ReportWorkerKind(v WorkerKind) {} -func (t *noopJobTx) ReportStatus(v JobStatus) {} -func (t *noopJobTx) ReportDuration(v uint32) {} -func (t *noopJobTx) ReportDurationMinutes(v uint8) {} -func (t *noopJobTx) ReportStartTime(v time.Time) {} -func (t *noopJobTx) ReportEndTime(v time.Time) {} -func (t *noopJobTx) ReportJoinLatency(v uint32) {} +func (t *noopJobTx) ReportRoomSessionID(v string) {} +func (t *noopJobTx) ReportKind(v JobKind) {} +func (t *noopJobTx) ReportWorkerKind(v WorkerKind) {} +func (t *noopJobTx) ReportStatus(v JobStatus) {} +func (t *noopJobTx) ReportDuration(v uint32) {} +func (t *noopJobTx) ReportDurationSeconds(v uint32) {} +func (t *noopJobTx) ReportDurationMinutes(v uint8) {} +func (t *noopJobTx) ReportStartTime(v time.Time) {} +func (t *noopJobTx) ReportEndTime(v time.Time) {} +func (t *noopJobTx) ReportJoinLatency(v uint32) {} diff --git a/observability/corecallobs/gen_reporter.go b/observability/corecallobs/gen_reporter.go index 92fa41df2..447f12632 100644 --- a/observability/corecallobs/gen_reporter.go +++ b/observability/corecallobs/gen_reporter.go @@ -6,7 +6,7 @@ import ( "time" ) -const Version_857KEC0 = true +const Version_LA3TQS0 = true type KeyResolver interface { Resolve(string) @@ -38,6 +38,7 @@ type callReporter interface { ReportStartTime(v time.Time) ReportEndTime(v time.Time) ReportDuration(v uint64) + ReportDurationSeconds(v uint64) ReportDurationMinutes(v uint16) ReportDirection(v CallDirection) ReportCallType(v CallCallType) diff --git a/observability/corecallobs/gen_reporter_noop.go b/observability/corecallobs/gen_reporter_noop.go index 24d2a54f9..36ce18871 100644 --- a/observability/corecallobs/gen_reporter_noop.go +++ b/observability/corecallobs/gen_reporter_noop.go @@ -63,6 +63,7 @@ func (r *noopCallReporter) TxAt(ts time.Time, f func(CallTx)) {} func (r *noopCallReporter) ReportStartTime(v time.Time) {} func (r *noopCallReporter) ReportEndTime(v time.Time) {} func (r *noopCallReporter) ReportDuration(v uint64) {} +func (r *noopCallReporter) ReportDurationSeconds(v uint64) {} func (r *noopCallReporter) ReportDurationMinutes(v uint16) {} func (r *noopCallReporter) ReportDirection(v CallDirection) {} func (r *noopCallReporter) ReportCallType(v CallCallType) {} @@ -83,6 +84,7 @@ func (t *noopCallTx) Project() ProjectTx { func (t *noopCallTx) ReportStartTime(v time.Time) {} func (t *noopCallTx) ReportEndTime(v time.Time) {} func (t *noopCallTx) ReportDuration(v uint64) {} +func (t *noopCallTx) ReportDurationSeconds(v uint64) {} func (t *noopCallTx) ReportDurationMinutes(v uint16) {} func (t *noopCallTx) ReportDirection(v CallDirection) {} func (t *noopCallTx) ReportCallType(v CallCallType) {} diff --git a/observability/roomobs/gen_reporter.go b/observability/roomobs/gen_reporter.go index 7efe7eb8f..3c147cf7d 100644 --- a/observability/roomobs/gen_reporter.go +++ b/observability/roomobs/gen_reporter.go @@ -6,7 +6,7 @@ import ( "time" ) -const Version_G1QTVIG = true +const Version_G73HPJG = true type KeyResolver interface { Resolve(string) @@ -105,6 +105,7 @@ type participantSessionReporter interface { ReportStartTime(v time.Time) ReportEndTime(v time.Time) ReportDuration(v uint16) + ReportDurationSeconds(v uint16) ReportDurationMinutes(v uint8) ReportKind(v string) ReportName(v string) diff --git a/observability/roomobs/gen_reporter_noop.go b/observability/roomobs/gen_reporter_noop.go index b35833e01..0aecbb865 100644 --- a/observability/roomobs/gen_reporter_noop.go +++ b/observability/roomobs/gen_reporter_noop.go @@ -161,6 +161,7 @@ func (r *noopParticipantSessionReporter) ReportIspAsn(v uint32) func (r *noopParticipantSessionReporter) ReportStartTime(v time.Time) {} func (r *noopParticipantSessionReporter) ReportEndTime(v time.Time) {} func (r *noopParticipantSessionReporter) ReportDuration(v uint16) {} +func (r *noopParticipantSessionReporter) ReportDurationSeconds(v uint16) {} func (r *noopParticipantSessionReporter) ReportDurationMinutes(v uint8) {} func (r *noopParticipantSessionReporter) ReportKind(v string) {} func (r *noopParticipantSessionReporter) ReportName(v string) {} @@ -191,6 +192,7 @@ func (t *noopParticipantSessionTx) ReportIspAsn(v uint32) {} func (t *noopParticipantSessionTx) ReportStartTime(v time.Time) {} func (t *noopParticipantSessionTx) ReportEndTime(v time.Time) {} func (t *noopParticipantSessionTx) ReportDuration(v uint16) {} +func (t *noopParticipantSessionTx) ReportDurationSeconds(v uint16) {} func (t *noopParticipantSessionTx) ReportDurationMinutes(v uint8) {} func (t *noopParticipantSessionTx) ReportKind(v string) {} func (t *noopParticipantSessionTx) ReportName(v string) {} diff --git a/observability/sessiontimer.go b/observability/sessiontimer.go index 77b38f1c0..8338839b5 100644 --- a/observability/sessiontimer.go +++ b/observability/sessiontimer.go @@ -1,27 +1,53 @@ package observability -import "time" +import ( + "time" + + "github.com/livekit/protocol/utils/options" +) type SessionTimer struct { lastMilli int64 + lastSec int64 lastMin int64 + minSecs int64 + minMins int64 +} + +type SessionTimerOption func(*SessionTimer) + +// WithMinSeconds ensures the first Advance that produces a non-zero secs +// return reports at least n seconds. Subsequent advances behave normally. +func WithMinSeconds(n int64) SessionTimerOption { + return func(h *SessionTimer) { h.minSecs = n } } -func NewSessionTimer(startTime time.Time) *SessionTimer { +// WithMinMinutes ensures the first Advance that produces a non-zero mins +// return reports at least n minutes. Subsequent advances behave normally. +func WithMinMinutes(n int64) SessionTimerOption { + return func(h *SessionTimer) { h.minMins = n } +} + +func NewSessionTimer(startTime time.Time, opts ...SessionTimerOption) *SessionTimer { ts := startTime.UnixMilli() - return &SessionTimer{ts, ts} + return options.Apply(&SessionTimer{lastMilli: ts, lastSec: ts, lastMin: ts}, opts) } -func (h *SessionTimer) Advance(now time.Time) (millis, mins int64) { +func (h *SessionTimer) Advance(now time.Time) (millis, secs, mins int64) { ts := now.UnixMilli() if ts > h.lastMilli { millis = ts - h.lastMilli h.lastMilli = ts } + if ts > h.lastSec { + secs = max((ts-h.lastSec+999)/1000, h.minSecs) + h.minSecs = 0 + h.lastSec += secs * 1000 + } if ts > h.lastMin { - n := (ts - h.lastMin + 59999) / 60000 - mins += n - h.lastMin += n * 60000 + mins = max((ts-h.lastMin+59999)/60000, h.minMins) + h.minMins = 0 + h.lastMin += mins * 60000 } return } diff --git a/observability/sessiontimer_test.go b/observability/sessiontimer_test.go index 26ea0e772..05b318d04 100644 --- a/observability/sessiontimer_test.go +++ b/observability/sessiontimer_test.go @@ -12,12 +12,14 @@ func TestSessionTimer(t *testing.T) { ts := time.Now() st := NewSessionTimer(ts) - millis, mins := st.Advance(ts.Add(100 * time.Millisecond)) + millis, secs, mins := st.Advance(ts.Add(100 * time.Millisecond)) require.EqualValues(t, 100, millis) + require.EqualValues(t, 1, secs) require.EqualValues(t, 1, mins) - millis, mins = st.Advance(ts.Add(200 * time.Millisecond)) + millis, secs, mins = st.Advance(ts.Add(200 * time.Millisecond)) require.EqualValues(t, 100, millis) + require.EqualValues(t, 0, secs) require.EqualValues(t, 0, mins) }) @@ -25,8 +27,65 @@ func TestSessionTimer(t *testing.T) { ts := time.Now() st := NewSessionTimer(ts) - millis, mins := st.Advance(ts.Add(150 * time.Second)) + millis, secs, mins := st.Advance(ts.Add(150 * time.Second)) require.EqualValues(t, 150000, millis) + require.EqualValues(t, 150, secs) + require.EqualValues(t, 3, mins) + }) + + t.Run("WithMinSeconds floors first advance and consumes watermark", func(t *testing.T) { + ts := time.Now() + st := NewSessionTimer(ts, WithMinSeconds(60)) + + _, secs, _ := st.Advance(ts.Add(time.Second)) + require.EqualValues(t, 60, secs) + + _, secs, _ = st.Advance(ts.Add(30 * time.Second)) + require.EqualValues(t, 0, secs) + + _, secs, _ = st.Advance(ts.Add(62 * time.Second)) + require.EqualValues(t, 2, secs) + }) + + t.Run("WithMinSeconds does not lower larger natural value", func(t *testing.T) { + ts := time.Now() + st := NewSessionTimer(ts, WithMinSeconds(60)) + + _, secs, _ := st.Advance(ts.Add(120 * time.Second)) + require.EqualValues(t, 120, secs) + }) + + t.Run("WithMinSeconds does not trigger on no-op advance", func(t *testing.T) { + ts := time.Now() + st := NewSessionTimer(ts, WithMinSeconds(60)) + + _, secs, _ := st.Advance(ts) + require.EqualValues(t, 0, secs) + + _, secs, _ = st.Advance(ts.Add(time.Second)) + require.EqualValues(t, 60, secs) + }) + + t.Run("WithMinMinutes floors first advance", func(t *testing.T) { + ts := time.Now() + st := NewSessionTimer(ts, WithMinMinutes(5)) + + _, _, mins := st.Advance(ts.Add(time.Second)) + require.EqualValues(t, 5, mins) + + _, _, mins = st.Advance(ts.Add(2 * time.Minute)) + require.EqualValues(t, 0, mins) + + _, _, mins = st.Advance(ts.Add(7 * time.Minute)) + require.EqualValues(t, 2, mins) + }) + + t.Run("WithMinSeconds and WithMinMinutes apply independently", func(t *testing.T) { + ts := time.Now() + st := NewSessionTimer(ts, WithMinSeconds(45), WithMinMinutes(3)) + + _, secs, mins := st.Advance(ts.Add(time.Second)) + require.EqualValues(t, 45, secs) require.EqualValues(t, 3, mins) }) } diff --git a/observability/sipcallobs/gen_reporter.go b/observability/sipcallobs/gen_reporter.go index b616b44db..004ab3f12 100644 --- a/observability/sipcallobs/gen_reporter.go +++ b/observability/sipcallobs/gen_reporter.go @@ -6,7 +6,7 @@ import ( "time" ) -const Version_GQ98NIG = true +const Version_OUSNKC0 = true type KeyResolver interface { Resolve(string) @@ -38,6 +38,7 @@ type callReporter interface { ReportStartTime(v time.Time) ReportEndTime(v time.Time) ReportDuration(v uint64) + ReportDurationSeconds(v uint64) ReportDurationMinutes(v uint16) ReportTrunkID(v string) ReportTrunkType(v CallTrunkType) diff --git a/observability/sipcallobs/gen_reporter_noop.go b/observability/sipcallobs/gen_reporter_noop.go index c5e983a48..39a137d91 100644 --- a/observability/sipcallobs/gen_reporter_noop.go +++ b/observability/sipcallobs/gen_reporter_noop.go @@ -63,6 +63,7 @@ func (r *noopCallReporter) TxAt(ts time.Time, f func(CallTx)) func (r *noopCallReporter) ReportStartTime(v time.Time) {} func (r *noopCallReporter) ReportEndTime(v time.Time) {} func (r *noopCallReporter) ReportDuration(v uint64) {} +func (r *noopCallReporter) ReportDurationSeconds(v uint64) {} func (r *noopCallReporter) ReportDurationMinutes(v uint16) {} func (r *noopCallReporter) ReportTrunkID(v string) {} func (r *noopCallReporter) ReportTrunkType(v CallTrunkType) {} @@ -108,6 +109,7 @@ func (t *noopCallTx) Project() ProjectTx { func (t *noopCallTx) ReportStartTime(v time.Time) {} func (t *noopCallTx) ReportEndTime(v time.Time) {} func (t *noopCallTx) ReportDuration(v uint64) {} +func (t *noopCallTx) ReportDurationSeconds(v uint64) {} func (t *noopCallTx) ReportDurationMinutes(v uint16) {} func (t *noopCallTx) ReportTrunkID(v string) {} func (t *noopCallTx) ReportTrunkType(v CallTrunkType) {} diff --git a/observability/telephonyobs/gen_reporter.go b/observability/telephonyobs/gen_reporter.go index 0c068a45d..8178d8e79 100644 --- a/observability/telephonyobs/gen_reporter.go +++ b/observability/telephonyobs/gen_reporter.go @@ -6,7 +6,7 @@ import ( "time" ) -const Version_AAHNL9O = true +const Version_94HQ8HG = true type KeyResolver interface { Resolve(string) @@ -93,6 +93,7 @@ type callReporter interface { ReportCountryCode(v string) ReportPhoneNumber(v string) ReportDuration(v uint32) + ReportDurationSeconds(v uint32) ReportDurationMinutes(v uint16) ReportStartTime(v time.Time) ReportEndTime(v time.Time) diff --git a/observability/telephonyobs/gen_reporter_noop.go b/observability/telephonyobs/gen_reporter_noop.go index 280bf01d9..0122906b6 100644 --- a/observability/telephonyobs/gen_reporter_noop.go +++ b/observability/telephonyobs/gen_reporter_noop.go @@ -139,6 +139,7 @@ func (r *noopCallReporter) ReportTrunkType(v TrunkType) {} func (r *noopCallReporter) ReportCountryCode(v string) {} func (r *noopCallReporter) ReportPhoneNumber(v string) {} func (r *noopCallReporter) ReportDuration(v uint32) {} +func (r *noopCallReporter) ReportDurationSeconds(v uint32) {} func (r *noopCallReporter) ReportDurationMinutes(v uint16) {} func (r *noopCallReporter) ReportStartTime(v time.Time) {} func (r *noopCallReporter) ReportEndTime(v time.Time) {} @@ -156,6 +157,7 @@ func (t *noopCallTx) ReportTrunkType(v TrunkType) {} func (t *noopCallTx) ReportCountryCode(v string) {} func (t *noopCallTx) ReportPhoneNumber(v string) {} func (t *noopCallTx) ReportDuration(v uint32) {} +func (t *noopCallTx) ReportDurationSeconds(v uint32) {} func (t *noopCallTx) ReportDurationMinutes(v uint16) {} func (t *noopCallTx) ReportStartTime(v time.Time) {} func (t *noopCallTx) ReportEndTime(v time.Time) {}