From 105890cf901d75641761824af49c77020042c966 Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Thu, 9 Oct 2025 18:13:01 +0300 Subject: [PATCH 1/2] Add tests for the media timeout logic. --- pkg/sip/media_port.go | 3 +- pkg/sip/media_port_test.go | 199 +++++++++++++++++++++++++++++++++++++ 2 files changed, 201 insertions(+), 1 deletion(-) diff --git a/pkg/sip/media_port.go b/pkg/sip/media_port.go index 92cb7e92..8d4c916c 100644 --- a/pkg/sip/media_port.go +++ b/pkg/sip/media_port.go @@ -271,6 +271,7 @@ func (p *MediaPort) timeoutLoop(timeoutCallback func()) { ticker.Reset(tick) startPackets = p.packetCount.Load() lastTime = time.Now() + p.log.Infow("media timeout reset", "packets", startPackets, "tick", tick) case <-ticker.C: curPackets := p.packetCount.Load() if curPackets != lastPackets { @@ -306,7 +307,7 @@ func (p *MediaPort) timeoutLoop(timeoutCallback func()) { } // Ticker is allowed to fire earlier than the full timeout interval. Skip if it's not a full timeout yet. - if since < timeout { + if since+timeout/10 < timeout { continue } p.log.Infow("triggering media timeout", diff --git a/pkg/sip/media_port_test.go b/pkg/sip/media_port_test.go index 56ac9503..78671e60 100644 --- a/pkg/sip/media_port_test.go +++ b/pkg/sip/media_port_test.go @@ -326,3 +326,202 @@ func checkPCM(t testing.TB, exp, got msdk.PCM16Sample) { expHit := int(float64(len(expSamples)) * percHit) require.True(t, hits >= expHit, "min=%v, max=%v\ngot:\n%v", slices.Min(got), slices.Max(got), got) } + +func newMediaPair(t testing.TB, opt1, opt2 *MediaOptions) (m1, m2 *MediaPort) { + if opt1 == nil { + opt1 = &MediaOptions{} + } + if opt2 == nil { + opt2 = &MediaOptions{} + } + c1, c2 := newUDPPipe() + + opt1.IP = newIP("1.1.1.1") + opt1.Ports = rtcconfig.PortRange{Start: 10000} + + opt2.IP = newIP("2.2.2.2") + opt2.Ports = rtcconfig.PortRange{Start: 20000} + + const rate = 16000 + + log := logger.GetLogger() + + var err error + + m1, err = NewMediaPortWith(log.WithName("one"), nil, c1, opt1, rate) + require.NoError(t, err) + t.Cleanup(m1.Close) + + m2, err = NewMediaPortWith(log.WithName("two"), nil, c2, opt2, rate) + require.NoError(t, err) + t.Cleanup(m2.Close) + + offer, err := m1.NewOffer(sdp.EncryptionNone) + require.NoError(t, err) + offerData, err := offer.SDP.Marshal() + require.NoError(t, err) + + answer, mc2, err := m2.SetOffer(offerData, sdp.EncryptionNone) + require.NoError(t, err) + answerData, err := answer.SDP.Marshal() + require.NoError(t, err) + + mc1, err := m1.SetAnswer(offer, answerData, sdp.EncryptionNone) + require.NoError(t, err) + + err = m1.SetConfig(mc1) + require.NoError(t, err) + + err = m2.SetConfig(mc2) + require.NoError(t, err) + + w2 := m2.GetAudioWriter() + require.Equal(t, "Switch(16000) -> G722(encode) -> RTP(16000)", w2.String()) + + return m1, m2 +} + +func TestMediaTimeout(t *testing.T) { + const ( + timeout = time.Second / 4 + initial = timeout * 2 + dt = timeout / 4 + ) + + t.Run("initial", func(t *testing.T) { + m1, _ := newMediaPair(t, &MediaOptions{ + MediaTimeoutInitial: initial, + MediaTimeout: timeout, + }, nil) + + m1.EnableTimeout(true) + + targ := time.Now().Add(initial) + select { + case <-m1.Timeout(): + t.Fatal("initial timeout ignored") + case <-time.After(initial / 2): + } + + select { + case <-time.After(time.Until(targ) + dt): + t.Fatal("timeout didn't trigger") + case <-m1.Timeout(): + } + }) + + t.Run("regular", func(t *testing.T) { + m1, m2 := newMediaPair(t, &MediaOptions{ + MediaTimeoutInitial: initial, + MediaTimeout: timeout, + }, nil) + m1.EnableTimeout(true) + + w2 := m2.GetAudioWriter() + err := w2.WriteSample(msdk.PCM16Sample{0, 0}) + require.NoError(t, err) + + select { + case <-time.After(dt): + t.Fatal("no media received") + case <-m1.Received(): + } + + select { + case <-time.After(2*timeout + dt): + t.Fatal("timeout didn't trigger") + case <-m1.Timeout(): + } + }) + + t.Run("no timeout", func(t *testing.T) { + m1, m2 := newMediaPair(t, &MediaOptions{ + MediaTimeoutInitial: initial, + MediaTimeout: timeout, + }, nil) + m1.EnableTimeout(true) + + w2 := m2.GetAudioWriter() + + for i := 0; i < 10; i++ { + err := w2.WriteSample(msdk.PCM16Sample{0, 0}) + require.NoError(t, err) + + select { + case <-time.After(timeout / 2): + case <-m1.Timeout(): + t.Fatal("timeout") + } + } + }) + + t.Run("reset timeout", func(t *testing.T) { + m1, m2 := newMediaPair(t, &MediaOptions{ + MediaTimeoutInitial: initial, + MediaTimeout: timeout, + }, nil) + m1.EnableTimeout(true) + + w2 := m2.GetAudioWriter() + + for i := 0; i < 5; i++ { + err := w2.WriteSample(msdk.PCM16Sample{0, 0}) + require.NoError(t, err) + + select { + case <-time.After(timeout / 2): + case <-m1.Timeout(): + t.Fatal("timeout") + } + } + + m1.SetTimeout(initial, timeout) + + targ := time.Now().Add(initial) + select { + case <-m1.Timeout(): + t.Fatal("initial timeout ignored") + case <-time.After(initial / 2): + } + + select { + case <-time.After(time.Until(targ) + dt): + t.Fatal("timeout didn't trigger") + case <-m1.Timeout(): + } + }) + + t.Run("reset", func(t *testing.T) { + m1, m2 := newMediaPair(t, &MediaOptions{ + MediaTimeoutInitial: initial, + MediaTimeout: timeout, + }, nil) + m1.EnableTimeout(true) + + w2 := m2.GetAudioWriter() + + for i := 0; i < 5; i++ { + err := w2.WriteSample(msdk.PCM16Sample{0, 0}) + require.NoError(t, err) + + select { + case <-time.After(timeout / 2): + case <-m1.Timeout(): + t.Fatal("timeout") + } + } + + m1.SetTimeout(initial, timeout) + + for i := 0; i < 5; i++ { + err := w2.WriteSample(msdk.PCM16Sample{0, 0}) + require.NoError(t, err) + + select { + case <-time.After(timeout / 2): + case <-m1.Timeout(): + t.Fatal("timeout") + } + } + }) +} From d7f42a22156d435a18b47325e4a4fefcc63795a4 Mon Sep 17 00:00:00 2001 From: Denys Smirnov Date: Fri, 10 Oct 2025 11:05:29 +0300 Subject: [PATCH 2/2] Print call stats during keep-alives. Track closed states. --- pkg/sip/inbound.go | 14 +++++++++----- pkg/sip/media.go | 22 ++++++++++++++++------ pkg/sip/media_port.go | 4 ++++ pkg/sip/outbound.go | 10 ++++++++-- pkg/sip/room.go | 29 +++++++++++++++++------------ 5 files changed, 54 insertions(+), 25 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index c82f3834..d3fef82d 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -815,6 +815,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI case <-ticker.C: c.log.Debugw("sending keep-alive") c.state.ForceFlush(ctx) + c.printStats(c.log) case <-ctx.Done(): c.closeWithHangup() return nil @@ -1027,18 +1028,21 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD } } +func (c *inboundCall) printStats(log logger.Logger) { + log.Infow("call statistics", "stats", c.stats.Load()) +} + // close should only be called from handleInvite. func (c *inboundCall) close(error bool, status CallStatus, reason string) { if !c.done.CompareAndSwap(false, true) { return } - c.setStatus(status) - c.mon.CallTerminate(reason) + c.stats.Closed.Store(true) sipCode, sipStatus := status.SIPStatus() log := c.log.WithValues("status", sipCode, "reason", reason) - defer func() { - log.Infow("call statistics", "stats", c.stats.Load()) - }() + defer c.printStats(log) + c.setStatus(status) + c.mon.CallTerminate(reason) if error { log.Warnw("Closing inbound call with error", nil) } else { diff --git a/pkg/sip/media.go b/pkg/sip/media.go index 314c249f..90aedd77 100644 --- a/pkg/sip/media.go +++ b/pkg/sip/media.go @@ -20,10 +20,11 @@ import ( "strconv" "sync/atomic" - msdk "github.com/livekit/media-sdk" "github.com/pion/interceptor" prtp "github.com/pion/rtp" + msdk "github.com/livekit/media-sdk" + "github.com/livekit/media-sdk/rtp" "github.com/livekit/sip/pkg/stats" @@ -32,14 +33,16 @@ import ( var _ json.Marshaler = (*Stats)(nil) type Stats struct { - Port PortStats - Room RoomStats + Port PortStats + Room RoomStats + Closed atomic.Bool } type StatsSnapshot struct { - Port PortStatsSnapshot `json:"port"` - Room RoomStatsSnapshot `json:"room"` - Mixer MixerStatsSnapshot `json:"mixer"` + Port PortStatsSnapshot `json:"port"` + Room RoomStatsSnapshot `json:"room"` + Mixer MixerStatsSnapshot `json:"mixer"` + Closed bool `json:"closed"` } type PortStatsSnapshot struct { @@ -56,6 +59,8 @@ type PortStatsSnapshot struct { DTMFPackets uint64 `json:"dtmf_packets"` DTMFBytes uint64 `json:"dtmf_bytes"` + + Closed bool `json:"closed"` } type RoomStatsSnapshot struct { @@ -67,6 +72,8 @@ type RoomStatsSnapshot struct { OutputSamples uint64 `json:"output_samples"` OutputFrames uint64 `json:"output_frames"` + + Closed bool `json:"closed"` } type MixerStatsSnapshot struct { @@ -105,6 +112,7 @@ func (s *Stats) Load() StatsSnapshot { AudioBytes: p.AudioBytes.Load(), DTMFPackets: p.DTMFPackets.Load(), DTMFBytes: p.DTMFBytes.Load(), + Closed: p.Closed.Load(), }, Room: RoomStatsSnapshot{ InputPackets: r.InputPackets.Load(), @@ -113,6 +121,7 @@ func (s *Stats) Load() StatsSnapshot { MixerFrames: r.MixerFrames.Load(), OutputSamples: r.OutputSamples.Load(), OutputFrames: r.OutputFrames.Load(), + Closed: r.Closed.Load(), }, Mixer: MixerStatsSnapshot{ Tracks: m.Tracks.Load(), @@ -129,6 +138,7 @@ func (s *Stats) Load() StatsSnapshot { OutputSamples: m.OutputSamples.Load(), OutputFrames: m.OutputFrames.Load(), }, + Closed: s.Closed.Load(), } } diff --git a/pkg/sip/media_port.go b/pkg/sip/media_port.go index 8d4c916c..0d623d3b 100644 --- a/pkg/sip/media_port.go +++ b/pkg/sip/media_port.go @@ -58,6 +58,8 @@ type PortStats struct { DTMFPackets atomic.Uint64 DTMFBytes atomic.Uint64 + + Closed atomic.Bool } type UDPConn interface { @@ -326,6 +328,8 @@ func (p *MediaPort) timeoutLoop(timeoutCallback func()) { func (p *MediaPort) Close() { p.closed.Once(func() { + defer p.stats.Closed.Store(true) + p.mu.Lock() defer p.mu.Unlock() if w := p.audioOut.Swap(nil); w != nil { diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index fc89206e..98fe2914 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -217,6 +217,7 @@ func (c *outboundCall) WaitClose(ctx context.Context) error { case <-ticker.C: c.log.Debugw("sending keep-alive") c.state.ForceFlush(ctx) + c.printStats() case <-c.Disconnected(): c.CloseWithReason(callDropped, "removed", livekit.DisconnectReason_CLIENT_INITIATED) return nil @@ -269,8 +270,15 @@ func (c *outboundCall) closeWithTimeout() { c.close(psrpc.NewErrorf(psrpc.DeadlineExceeded, "media-timeout"), callDropped, "media-timeout", livekit.DisconnectReason_UNKNOWN_REASON) } +func (c *outboundCall) printStats() { + c.log.Infow("call statistics", "stats", c.stats.Load()) +} + func (c *outboundCall) close(err error, status CallStatus, description string, reason livekit.DisconnectReason) { c.stopped.Once(func() { + c.stats.Closed.Store(true) + defer c.printStats() + c.setStatus(status) if err != nil { c.log.Warnw("Closing outbound call with error", nil, "reason", description) @@ -292,8 +300,6 @@ func (c *outboundCall) close(err error, status CallStatus, description string, r c.stopSIP(description) - c.log.Infow("call statistics", "stats", c.stats.Load()) - c.c.cmu.Lock() delete(c.c.activeCalls, c.cc.ID()) if tag := c.cc.Tag(); tag != "" { diff --git a/pkg/sip/room.go b/pkg/sip/room.go index 599f8212..8e003828 100644 --- a/pkg/sip/room.go +++ b/pkg/sip/room.go @@ -48,6 +48,8 @@ type RoomStats struct { OutputFrames atomic.Uint64 OutputSamples atomic.Uint64 + + Closed atomic.Bool } type ParticipantInfo struct { @@ -381,18 +383,21 @@ func (r *Room) CloseWithReason(reason livekit.DisconnectReason) error { if r == nil { return nil } - - r.closed.Break() - r.subscribe.Store(false) - err := r.CloseOutput() - r.SetDTMFOutput(nil) - if r.room != nil { - r.room.DisconnectWithReason(reason) - r.room = nil - } - if r.mix != nil { - r.mix.Stop() - } + var err error + r.closed.Once(func() { + defer r.stats.Closed.Store(true) + + r.subscribe.Store(false) + err = r.CloseOutput() + r.SetDTMFOutput(nil) + if r.room != nil { + r.room.DisconnectWithReason(reason) + r.room = nil + } + if r.mix != nil { + r.mix.Stop() + } + }) return err }