Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
case <-ticker.C:
c.log.Debugw("sending keep-alive")
c.state.ForceFlush(ctx)
c.printStats(c.log)
case <-ctx.Done():
c.closeWithHangup()
return nil
Expand Down Expand Up @@ -1027,18 +1028,21 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD
}
}

func (c *inboundCall) printStats(log logger.Logger) {
log.Infow("call statistics", "stats", c.stats.Load())
}

// close should only be called from handleInvite.
func (c *inboundCall) close(error bool, status CallStatus, reason string) {
if !c.done.CompareAndSwap(false, true) {
return
}
c.setStatus(status)
c.mon.CallTerminate(reason)
c.stats.Closed.Store(true)
sipCode, sipStatus := status.SIPStatus()
log := c.log.WithValues("status", sipCode, "reason", reason)
defer func() {
log.Infow("call statistics", "stats", c.stats.Load())
}()
defer c.printStats(log)
c.setStatus(status)
c.mon.CallTerminate(reason)
if error {
log.Warnw("Closing inbound call with error", nil)
} else {
Expand Down
22 changes: 16 additions & 6 deletions pkg/sip/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
"strconv"
"sync/atomic"

msdk "github.com/livekit/media-sdk"
"github.com/pion/interceptor"
prtp "github.com/pion/rtp"

msdk "github.com/livekit/media-sdk"

"github.com/livekit/media-sdk/rtp"

"github.com/livekit/sip/pkg/stats"
Expand All @@ -32,14 +33,16 @@ import (
var _ json.Marshaler = (*Stats)(nil)

type Stats struct {
Port PortStats
Room RoomStats
Port PortStats
Room RoomStats
Closed atomic.Bool
}

type StatsSnapshot struct {
Port PortStatsSnapshot `json:"port"`
Room RoomStatsSnapshot `json:"room"`
Mixer MixerStatsSnapshot `json:"mixer"`
Port PortStatsSnapshot `json:"port"`
Room RoomStatsSnapshot `json:"room"`
Mixer MixerStatsSnapshot `json:"mixer"`
Closed bool `json:"closed"`
}

type PortStatsSnapshot struct {
Expand All @@ -56,6 +59,8 @@ type PortStatsSnapshot struct {

DTMFPackets uint64 `json:"dtmf_packets"`
DTMFBytes uint64 `json:"dtmf_bytes"`

Closed bool `json:"closed"`
}

type RoomStatsSnapshot struct {
Expand All @@ -67,6 +72,8 @@ type RoomStatsSnapshot struct {

OutputSamples uint64 `json:"output_samples"`
OutputFrames uint64 `json:"output_frames"`

Closed bool `json:"closed"`
}

type MixerStatsSnapshot struct {
Expand Down Expand Up @@ -105,6 +112,7 @@ func (s *Stats) Load() StatsSnapshot {
AudioBytes: p.AudioBytes.Load(),
DTMFPackets: p.DTMFPackets.Load(),
DTMFBytes: p.DTMFBytes.Load(),
Closed: p.Closed.Load(),
},
Room: RoomStatsSnapshot{
InputPackets: r.InputPackets.Load(),
Expand All @@ -113,6 +121,7 @@ func (s *Stats) Load() StatsSnapshot {
MixerFrames: r.MixerFrames.Load(),
OutputSamples: r.OutputSamples.Load(),
OutputFrames: r.OutputFrames.Load(),
Closed: r.Closed.Load(),
},
Mixer: MixerStatsSnapshot{
Tracks: m.Tracks.Load(),
Expand All @@ -129,6 +138,7 @@ func (s *Stats) Load() StatsSnapshot {
OutputSamples: m.OutputSamples.Load(),
OutputFrames: m.OutputFrames.Load(),
},
Closed: s.Closed.Load(),
}
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/sip/media_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type PortStats struct {

DTMFPackets atomic.Uint64
DTMFBytes atomic.Uint64

Closed atomic.Bool
}

type UDPConn interface {
Expand Down Expand Up @@ -271,6 +273,7 @@ func (p *MediaPort) timeoutLoop(timeoutCallback func()) {
ticker.Reset(tick)
startPackets = p.packetCount.Load()
lastTime = time.Now()
p.log.Infow("media timeout reset", "packets", startPackets, "tick", tick)
case <-ticker.C:
curPackets := p.packetCount.Load()
if curPackets != lastPackets {
Expand Down Expand Up @@ -306,7 +309,7 @@ func (p *MediaPort) timeoutLoop(timeoutCallback func()) {
}

// Ticker is allowed to fire earlier than the full timeout interval. Skip if it's not a full timeout yet.
if since < timeout {
if since+timeout/10 < timeout {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we doing this ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests were detecting a timeout that triggers too late. It was off by a few ms, hence this change. Not a big deal if we trigger it later (as it is right now), but this makes tests a bit cleaner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests were detecting a timeout that triggers too late.

Is this dues to go scheduling? What's the average wake delay we're working with?

continue
}
p.log.Infow("triggering media timeout",
Expand All @@ -325,6 +328,8 @@ func (p *MediaPort) timeoutLoop(timeoutCallback func()) {

func (p *MediaPort) Close() {
p.closed.Once(func() {
defer p.stats.Closed.Store(true)

p.mu.Lock()
defer p.mu.Unlock()
if w := p.audioOut.Swap(nil); w != nil {
Expand Down
199 changes: 199 additions & 0 deletions pkg/sip/media_port_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,202 @@ func checkPCM(t testing.TB, exp, got msdk.PCM16Sample) {
expHit := int(float64(len(expSamples)) * percHit)
require.True(t, hits >= expHit, "min=%v, max=%v\ngot:\n%v", slices.Min(got), slices.Max(got), got)
}

func newMediaPair(t testing.TB, opt1, opt2 *MediaOptions) (m1, m2 *MediaPort) {
if opt1 == nil {
opt1 = &MediaOptions{}
}
if opt2 == nil {
opt2 = &MediaOptions{}
}
c1, c2 := newUDPPipe()

opt1.IP = newIP("1.1.1.1")
opt1.Ports = rtcconfig.PortRange{Start: 10000}

opt2.IP = newIP("2.2.2.2")
opt2.Ports = rtcconfig.PortRange{Start: 20000}

const rate = 16000

log := logger.GetLogger()

var err error

m1, err = NewMediaPortWith(log.WithName("one"), nil, c1, opt1, rate)
require.NoError(t, err)
t.Cleanup(m1.Close)

m2, err = NewMediaPortWith(log.WithName("two"), nil, c2, opt2, rate)
require.NoError(t, err)
t.Cleanup(m2.Close)

offer, err := m1.NewOffer(sdp.EncryptionNone)
require.NoError(t, err)
offerData, err := offer.SDP.Marshal()
require.NoError(t, err)

answer, mc2, err := m2.SetOffer(offerData, sdp.EncryptionNone)
require.NoError(t, err)
answerData, err := answer.SDP.Marshal()
require.NoError(t, err)

mc1, err := m1.SetAnswer(offer, answerData, sdp.EncryptionNone)
require.NoError(t, err)

err = m1.SetConfig(mc1)
require.NoError(t, err)

err = m2.SetConfig(mc2)
require.NoError(t, err)

w2 := m2.GetAudioWriter()
require.Equal(t, "Switch(16000) -> G722(encode) -> RTP(16000)", w2.String())

return m1, m2
}

func TestMediaTimeout(t *testing.T) {
const (
timeout = time.Second / 4
initial = timeout * 2
dt = timeout / 4
)

t.Run("initial", func(t *testing.T) {
m1, _ := newMediaPair(t, &MediaOptions{
MediaTimeoutInitial: initial,
MediaTimeout: timeout,
}, nil)

m1.EnableTimeout(true)

targ := time.Now().Add(initial)
select {
case <-m1.Timeout():
t.Fatal("initial timeout ignored")
case <-time.After(initial / 2):
}

select {
case <-time.After(time.Until(targ) + dt):
t.Fatal("timeout didn't trigger")
case <-m1.Timeout():
}
})

t.Run("regular", func(t *testing.T) {
m1, m2 := newMediaPair(t, &MediaOptions{
MediaTimeoutInitial: initial,
MediaTimeout: timeout,
}, nil)
m1.EnableTimeout(true)

w2 := m2.GetAudioWriter()
err := w2.WriteSample(msdk.PCM16Sample{0, 0})
require.NoError(t, err)

select {
case <-time.After(dt):
t.Fatal("no media received")
case <-m1.Received():
}

select {
case <-time.After(2*timeout + dt):
t.Fatal("timeout didn't trigger")
case <-m1.Timeout():
}
})

t.Run("no timeout", func(t *testing.T) {
m1, m2 := newMediaPair(t, &MediaOptions{
MediaTimeoutInitial: initial,
MediaTimeout: timeout,
}, nil)
m1.EnableTimeout(true)

w2 := m2.GetAudioWriter()

for i := 0; i < 10; i++ {
err := w2.WriteSample(msdk.PCM16Sample{0, 0})
require.NoError(t, err)

select {
case <-time.After(timeout / 2):
case <-m1.Timeout():
t.Fatal("timeout")
}
}
})

t.Run("reset timeout", func(t *testing.T) {
m1, m2 := newMediaPair(t, &MediaOptions{
MediaTimeoutInitial: initial,
MediaTimeout: timeout,
}, nil)
m1.EnableTimeout(true)

w2 := m2.GetAudioWriter()

for i := 0; i < 5; i++ {
err := w2.WriteSample(msdk.PCM16Sample{0, 0})
require.NoError(t, err)

select {
case <-time.After(timeout / 2):
case <-m1.Timeout():
t.Fatal("timeout")
}
}

m1.SetTimeout(initial, timeout)

targ := time.Now().Add(initial)
select {
case <-m1.Timeout():
t.Fatal("initial timeout ignored")
case <-time.After(initial / 2):
}

select {
case <-time.After(time.Until(targ) + dt):
t.Fatal("timeout didn't trigger")
case <-m1.Timeout():
}
})

t.Run("reset", func(t *testing.T) {
m1, m2 := newMediaPair(t, &MediaOptions{
MediaTimeoutInitial: initial,
MediaTimeout: timeout,
}, nil)
m1.EnableTimeout(true)

w2 := m2.GetAudioWriter()

for i := 0; i < 5; i++ {
err := w2.WriteSample(msdk.PCM16Sample{0, 0})
require.NoError(t, err)

select {
case <-time.After(timeout / 2):
case <-m1.Timeout():
t.Fatal("timeout")
}
}

m1.SetTimeout(initial, timeout)

for i := 0; i < 5; i++ {
err := w2.WriteSample(msdk.PCM16Sample{0, 0})
require.NoError(t, err)

select {
case <-time.After(timeout / 2):
case <-m1.Timeout():
t.Fatal("timeout")
}
}
})
}
10 changes: 8 additions & 2 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (c *outboundCall) WaitClose(ctx context.Context) error {
case <-ticker.C:
c.log.Debugw("sending keep-alive")
c.state.ForceFlush(ctx)
c.printStats()
case <-c.Disconnected():
c.CloseWithReason(callDropped, "removed", livekit.DisconnectReason_CLIENT_INITIATED)
return nil
Expand Down Expand Up @@ -269,8 +270,15 @@ func (c *outboundCall) closeWithTimeout() {
c.close(psrpc.NewErrorf(psrpc.DeadlineExceeded, "media-timeout"), callDropped, "media-timeout", livekit.DisconnectReason_UNKNOWN_REASON)
}

func (c *outboundCall) printStats() {
c.log.Infow("call statistics", "stats", c.stats.Load())
}

func (c *outboundCall) close(err error, status CallStatus, description string, reason livekit.DisconnectReason) {
c.stopped.Once(func() {
c.stats.Closed.Store(true)
defer c.printStats()

c.setStatus(status)
if err != nil {
c.log.Warnw("Closing outbound call with error", nil, "reason", description)
Expand All @@ -292,8 +300,6 @@ func (c *outboundCall) close(err error, status CallStatus, description string, r

c.stopSIP(description)

c.log.Infow("call statistics", "stats", c.stats.Load())

c.c.cmu.Lock()
delete(c.c.activeCalls, c.cc.ID())
if tag := c.cc.Tag(); tag != "" {
Expand Down
Loading
Loading