Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
github.com/livekit/media-sdk v0.0.0-20250927154350-bd99739b439b
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded
github.com/livekit/protocol v1.42.3-0.20251028133809-672acf9e6a48
github.com/livekit/protocol v1.42.3-0.20251106135512-8c21d2d3727e
github.com/livekit/psrpc v0.7.1-0.20251021235041-bdebea7dacf4
github.com/livekit/server-sdk-go/v2 v2.11.4-0.20251016050252-63f7c8381817
github.com/livekit/sipgo v0.13.2-0.20250601220430-a77cc3f220fb
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ github.com/livekit/media-sdk v0.0.0-20250927154350-bd99739b439b h1:dPf9i0JPyf0Sg
github.com/livekit/media-sdk v0.0.0-20250927154350-bd99739b439b/go.mod h1:7ssWiG+U4xnbvLih9WiZbhQP6zIKMjgXdUtIE1bm/E8=
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded h1:ylZPdnlX1RW9Z15SD4mp87vT2D2shsk0hpLJwSPcq3g=
github.com/livekit/mediatransportutil v0.0.0-20250519131108-fb90f5acfded/go.mod h1:mSNtYzSf6iY9xM3UX42VEI+STHvMgHmrYzEHPcdhB8A=
github.com/livekit/protocol v1.42.3-0.20251028133809-672acf9e6a48 h1:CuXGJx4Yuo9cIjiwO454u43sml9KV06ruzJxjVLPSog=
github.com/livekit/protocol v1.42.3-0.20251028133809-672acf9e6a48/go.mod h1:TpqU2qCI1ES4Lk7PAWSgYO4RaexfVXb54ZO2hXv0Bmc=
github.com/livekit/protocol v1.42.3-0.20251106135512-8c21d2d3727e h1:NcT7Fb1G4aV14y3X+w89QoK9Jl8pq9byrA12f/kq5Uo=
github.com/livekit/protocol v1.42.3-0.20251106135512-8c21d2d3727e/go.mod h1:TpqU2qCI1ES4Lk7PAWSgYO4RaexfVXb54ZO2hXv0Bmc=
github.com/livekit/psrpc v0.7.1-0.20251021235041-bdebea7dacf4 h1:YA5HMfNW9IPPdTSkwsyCtK9ZcNW8QpPNlkuD7UUToZE=
github.com/livekit/psrpc v0.7.1-0.20251021235041-bdebea7dacf4/go.mod h1:AuDC5uOoEjQJEc69v4Li3t77Ocz0e0NdjQEuFfO+vfk=
github.com/livekit/server-sdk-go/v2 v2.11.4-0.20251016050252-63f7c8381817 h1:y5YYe37lcGMOZMw4rvreljGjLj60eDSce/N6Asp7pw8=
Expand Down
5 changes: 4 additions & 1 deletion pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/tracer"
"github.com/livekit/protocol/utils/traceid"
"github.com/livekit/psrpc"
"github.com/livekit/sipgo"
"github.com/livekit/sipgo/sip"
Expand Down Expand Up @@ -165,8 +166,10 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
if err != nil {
return nil, err
}
tid := traceid.FromGUID(req.SipCallId)
log = log.WithValues(
"callID", req.SipCallId,
"traceID", tid.String(),
"room", req.RoomName,
"participant", req.ParticipantIdentity,
"participantName", req.ParticipantName,
Expand Down Expand Up @@ -224,7 +227,7 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
displayName: req.DisplayName,
}
log.Infow("Creating SIP participant")
call, err := c.newCall(ctx, c.conf, log, LocalTag(req.SipCallId), roomConf, sipConf, state, req.ProjectId)
call, err := c.newCall(ctx, tid, c.conf, log, LocalTag(req.SipCallId), roomConf, sipConf, state, req.ProjectId)
if err != nil {
return nil, err
}
Expand Down
44 changes: 27 additions & 17 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ import (
"sync/atomic"
"time"

msdk "github.com/livekit/media-sdk"
"github.com/livekit/protocol/rpc"

"github.com/frostbyte73/core"
"github.com/icholy/digest"
"github.com/pkg/errors"

msdk "github.com/livekit/media-sdk"
"github.com/livekit/media-sdk/dtmf"
"github.com/livekit/media-sdk/rtp"
"github.com/livekit/media-sdk/sdp"
"github.com/livekit/media-sdk/tones"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
lksip "github.com/livekit/protocol/sip"
"github.com/livekit/protocol/tracer"
"github.com/livekit/protocol/utils/traceid"
"github.com/livekit/psrpc"
lksdk "github.com/livekit/server-sdk-go/v2"
"github.com/livekit/sipgo/sip"
Expand Down Expand Up @@ -150,7 +150,7 @@ func (s *Server) getInvite(sipCallID string) *inProgressInvite {
return is
}

func (s *Server) handleInviteAuth(log logger.Logger, req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) {
func (s *Server) handleInviteAuth(tid traceid.ID, log logger.Logger, req *sip.Request, tx sip.ServerTransaction, from, username, password string) (ok bool) {
log = log.WithValues(
"username", username,
"passwordHash", hashPassword(password),
Expand Down Expand Up @@ -306,10 +306,12 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
return psrpc.NewError(psrpc.MalformedRequest, errors.Wrap(err, "cannot parse source IP"))
}
callID := lksip.NewCallID()
tid := traceid.FromGUID(callID)
tr := callTransportFromReq(req)
legTr := legTransportFromReq(req)
log := s.log.WithValues(
"callID", callID,
"traceID", tid.String(),
"fromIP", src.Addr(),
"toIP", req.Destination(),
"transport", tr,
Expand Down Expand Up @@ -446,7 +448,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
cc.Processing()
}
s.getCallInfo(cc.SIPCallID()).countInvite(log, req)
if !s.handleInviteAuth(log, req, tx, from.User, r.Username, r.Password) {
if !s.handleInviteAuth(tid, log, req, tx, from.User, r.Username, r.Password) {
cmon.InviteErrorShort("unauthorized")
// handleInviteAuth will generate the SIP Response as needed
return psrpc.NewErrorf(psrpc.PermissionDenied, "invalid credentials were provided")
Expand All @@ -457,9 +459,9 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
// ok
}

call = s.newInboundCall(log, cmon, cc, callInfo, state, nil)
call = s.newInboundCall(tid, log, cmon, cc, callInfo, state, start, nil)
call.joinDur = joinDur
return call.handleInvite(call.ctx, req, r.TrunkID, s.conf)
return call.handleInvite(call.ctx, tid, req, r.TrunkID, s.conf)
}

func (s *Server) onOptions(log *slog.Logger, req *sip.Request, tx sip.ServerTransaction) {
Expand Down Expand Up @@ -576,10 +578,12 @@ func (s *Server) onNotify(log *slog.Logger, req *sip.Request, tx sip.ServerTrans

type inboundCall struct {
s *Server
tid traceid.ID
log logger.Logger
cc *sipInbound
mon *stats.CallMonitor
state *CallState
callStart time.Time
extraAttrs map[string]string
attrsToHdr map[string]string
ctx context.Context
Expand All @@ -600,18 +604,22 @@ type inboundCall struct {
}

func (s *Server) newInboundCall(
tid traceid.ID,
log logger.Logger,
mon *stats.CallMonitor,
cc *sipInbound,
call *rpc.SIPCall,
state *CallState,
callStart time.Time,
extra map[string]string,
) *inboundCall {
// Map known headers immediately on join. The rest of the mapping will be available later.
extra = HeadersToAttrs(extra, nil, 0, cc, nil)
c := &inboundCall{
s: s,
log: log,
tid: tid,
callStart: callStart,
mon: mon,
cc: cc,
call: call,
Expand All @@ -633,7 +641,7 @@ func (s *Server) newInboundCall(
return c
}

func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkID string, conf *config.Config) error {
func (c *inboundCall) handleInvite(ctx context.Context, tid traceid.ID, req *sip.Request, trunkID string, conf *config.Config) error {
c.mon.InviteAccept()
c.mon.CallStart()
defer c.mon.CallEnd()
Expand Down Expand Up @@ -697,7 +705,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
}

runMedia := func(enc livekit.SIPMediaEncryption) ([]byte, error) {
answerData, err := c.runMediaConn(req.Body(), enc, conf, disp.EnabledFeatures)
answerData, err := c.runMediaConn(tid, req.Body(), enc, conf, disp.EnabledFeatures)
if err != nil {
isError := true
status, reason := callDropped, "media-failed"
Expand Down Expand Up @@ -874,7 +882,7 @@ func (c *inboundCall) handleInvite(ctx context.Context, req *sip.Request, trunkI
}
}

func (c *inboundCall) runMediaConn(offerData []byte, enc livekit.SIPMediaEncryption, conf *config.Config, features []livekit.SIPFeature) (answerData []byte, _ error) {
func (c *inboundCall) runMediaConn(tid traceid.ID, offerData []byte, enc livekit.SIPMediaEncryption, conf *config.Config, features []livekit.SIPFeature) (answerData []byte, _ error) {
c.mon.SDPSize(len(offerData), true)
c.log.Debugw("SDP offer", "sdp", string(offerData))
e, err := sdpEncryption(enc)
Expand All @@ -883,7 +891,7 @@ func (c *inboundCall) runMediaConn(offerData []byte, enc livekit.SIPMediaEncrypt
return nil, err
}

mp, err := NewMediaPort(c.log, c.mon, &MediaOptions{
mp, err := NewMediaPort(tid, c.log, c.mon, &MediaOptions{
IP: c.s.sconf.MediaIP,
Ports: conf.RTPPort,
MediaTimeoutInitial: c.s.conf.MediaTimeoutInitial,
Expand Down Expand Up @@ -1057,7 +1065,7 @@ func (c *inboundCall) pinPrompt(ctx context.Context, trunkID string) (disp CallD
}

func (c *inboundCall) printStats(log logger.Logger) {
log.Infow("call statistics", "stats", c.stats.Load())
log.Infow("call statistics", "stats", c.stats.Load(), "durMin", int(time.Since(c.callStart).Minutes()))
}

// close should only be called from handleInvite.
Expand Down Expand Up @@ -1095,11 +1103,13 @@ func (c *inboundCall) close(error bool, status CallStatus, reason string) {

// Call the handler asynchronously to avoid blocking
if c.s.handler != nil {
go c.s.handler.OnSessionEnd(context.Background(), &CallIdentifier{
ProjectID: c.projectID,
CallID: c.call.LkCallId,
SipCallID: c.call.SipCallId,
}, c.state.callInfo, reason)
go func(tid traceid.ID) {
c.s.handler.OnSessionEnd(context.Background(), &CallIdentifier{
ProjectID: c.projectID,
CallID: c.call.LkCallId,
SipCallID: c.call.SipCallId,
}, c.state.callInfo, reason)
}(c.tid)
}

c.cancel()
Expand Down
62 changes: 50 additions & 12 deletions pkg/sip/media_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/livekit/media-sdk/srtp"
"github.com/livekit/mediatransportutil/pkg/rtcconfig"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils/traceid"

"github.com/livekit/sip/pkg/stats"
)
Expand Down Expand Up @@ -132,11 +133,11 @@ type MediaOptions struct {
EnableJitterBuffer bool
}

func NewMediaPort(log logger.Logger, mon *stats.CallMonitor, opts *MediaOptions, sampleRate int) (*MediaPort, error) {
return NewMediaPortWith(log, mon, nil, opts, sampleRate)
func NewMediaPort(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, opts *MediaOptions, sampleRate int) (*MediaPort, error) {
return NewMediaPortWith(tid, log, mon, nil, opts, sampleRate)
}

func NewMediaPortWith(log logger.Logger, mon *stats.CallMonitor, conn UDPConn, opts *MediaOptions, sampleRate int) (*MediaPort, error) {
func NewMediaPortWith(tid traceid.ID, log logger.Logger, mon *stats.CallMonitor, conn UDPConn, opts *MediaOptions, sampleRate int) (*MediaPort, error) {
if opts == nil {
opts = &MediaOptions{}
}
Expand All @@ -158,6 +159,7 @@ func NewMediaPortWith(log logger.Logger, mon *stats.CallMonitor, conn UDPConn, o
}
mediaTimeout := make(chan struct{})
p := &MediaPort{
tid: tid,
log: log,
opts: opts,
mon: mon,
Expand All @@ -172,7 +174,7 @@ func NewMediaPortWith(log logger.Logger, mon *stats.CallMonitor, conn UDPConn, o
}
p.timeoutInitial.Store(&opts.MediaTimeoutInitial)
p.timeoutGeneral.Store(&opts.MediaTimeout)
go p.timeoutLoop(func() {
go p.timeoutLoop(tid, func() {
close(mediaTimeout)
})
p.log.Debugw("listening for media on UDP", "port", p.Port())
Expand All @@ -181,6 +183,7 @@ func NewMediaPortWith(log logger.Logger, mon *stats.CallMonitor, conn UDPConn, o

// MediaPort combines all functionality related to sending and accepting SIP media.
type MediaPort struct {
tid traceid.ID
log logger.Logger
opts *MediaOptions
mon *stats.CallMonitor
Expand Down Expand Up @@ -221,6 +224,7 @@ func (p *MediaPort) EnableOut() {
}

func (p *MediaPort) disableTimeout() {
p.log.Infow("media timeout disabled")
p.timeoutStart.Store(nil)
}

Expand Down Expand Up @@ -256,14 +260,16 @@ func (p *MediaPort) SetTimeout(initial, general time.Duration) {
p.enableTimeout(initial, general)
}

func (p *MediaPort) timeoutLoop(timeoutCallback func()) {
func (p *MediaPort) timeoutLoop(tid traceid.ID, timeoutCallback func()) {
defer p.log.Infow("media timeout loop stopped")
ticker := time.NewTicker(p.opts.MediaTimeout)
defer ticker.Stop()

var (
lastPackets uint64
startPackets uint64
lastTime time.Time
lastLog = time.Now()
)
for {
select {
Expand All @@ -273,16 +279,45 @@ func (p *MediaPort) timeoutLoop(timeoutCallback func()) {
ticker.Reset(tick)
startPackets = p.packetCount.Load()
lastTime = time.Now()
lastLog = lastTime
p.log.Infow("media timeout reset", "packets", startPackets, "tick", tick)
case <-ticker.C:
log := p.log
curPackets := p.packetCount.Load()
startPtr := p.timeoutStart.Load()
var startTime time.Time
if startPtr != nil {
startTime = *startPtr
}
verbose := false
if now := time.Now(); now.Sub(lastLog) > time.Hour {
verbose = true
lastLog = now
log = log.WithValues(
"startPackets", startPackets,
"packets", curPackets,
"lastPackets", lastPackets,
"sinceLast", time.Since(lastTime),
"sinceStart", time.Since(startTime),
)
if curPackets == startPackets {
log.Warnw("media timout is idle for a long time", nil)
} else {
log.Infow("media timeout stats")
}
}
if curPackets != lastPackets {
lastPackets = curPackets
lastTime = time.Now()
if verbose {
log.Infow("got a new packet")
}
continue // wait for the next tick
}
startPtr := p.timeoutStart.Load()
if startPtr == nil {
if verbose {
log.Infow("timeout is disabled")
}
continue // timeout disabled
}
isInitial := lastPackets == startPackets
Expand Down Expand Up @@ -310,6 +345,9 @@ func (p *MediaPort) timeoutLoop(timeoutCallback func()) {

// Ticker is allowed to fire earlier than the full timeout interval. Skip if it's not a full timeout yet.
if since+timeout/10 < timeout {
if verbose {
log.Infow("too early to trigger", "since", since, "timeout", timeout)
}
continue
}
p.log.Infow("triggering media timeout",
Expand Down Expand Up @@ -456,14 +494,14 @@ func (p *MediaPort) SetConfig(c *MediaConf) error {
p.conf = c
p.sess = sess

if err = p.setupOutput(); err != nil {
if err = p.setupOutput(p.tid); err != nil {
return err
}
p.setupInput()
return nil
}

func (p *MediaPort) rtpLoop(sess rtp.Session) {
func (p *MediaPort) rtpLoop(tid traceid.ID, sess rtp.Session) {
// Need a loop to process all incoming packets.
for {
r, ssrc, err := sess.AcceptStream()
Expand All @@ -477,11 +515,11 @@ func (p *MediaPort) rtpLoop(sess rtp.Session) {
p.mediaReceived.Break()
log := p.log.WithValues("ssrc", ssrc)
log.Infow("accepting RTP stream")
go p.rtpReadLoop(log, r)
go p.rtpReadLoop(tid, log, r)
}
}

func (p *MediaPort) rtpReadLoop(log logger.Logger, r rtp.ReadStream) {
func (p *MediaPort) rtpReadLoop(tid traceid.ID, log logger.Logger, r rtp.ReadStream) {
const maxErrors = 50 // 1 sec, given 20 ms frames
buf := make([]byte, rtp.MTUSize+1)
overflow := false
Expand Down Expand Up @@ -546,11 +584,11 @@ func (p *MediaPort) rtpReadLoop(log logger.Logger, r rtp.ReadStream) {
}

// Must be called holding the lock
func (p *MediaPort) setupOutput() error {
func (p *MediaPort) setupOutput(tid traceid.ID) error {
if p.closed.IsBroken() {
return errors.New("media is already closed")
}
go p.rtpLoop(p.sess)
go p.rtpLoop(tid, p.sess)
w, err := p.sess.OpenWriteStream()
if err != nil {
return err
Expand Down
Loading