Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE
if existing != nil && existing.cc.InviteCSeq() < cc.InviteCSeq() {
log.Infow("accepting reinvite", "sipCallID", existing.cc.ID(), "content-type", req.ContentType(), "content-length", req.ContentLength())
existing.log().Infow("reinvite", "content-type", req.ContentType(), "content-length", req.ContentLength(), "cseq", cc.InviteCSeq())
cc.AcceptAsKeepAlive()
cc.AcceptAsKeepAlive(existing.cc.OwnSDP())
return nil
}

Expand Down Expand Up @@ -1394,6 +1394,7 @@ type sipInbound struct {
referDone chan error

mu sync.RWMutex
lastSDP []byte
inviteOk *sip.Response
nextRequestCSeq uint32
referCseq uint32
Expand Down Expand Up @@ -1436,11 +1437,18 @@ func (c *sipInbound) drop() {
}

func (c *sipInbound) respond(status sip.StatusCode, reason string) {
c.respondWithData(status, reason, "", nil)
}

func (c *sipInbound) respondWithData(status sip.StatusCode, reason string, contentType string, body []byte) {
if c.inviteTx == nil {
return
}

r := sip.NewResponseFromRequest(c.invite, status, reason, nil)
r := sip.NewResponseFromRequest(c.invite, status, reason, body)
if typ := sip.ContentTypeHeader(contentType); typ != "" {
r.AppendHeader(&typ)
}
r.AppendHeader(sip.NewHeader("Allow", "INVITE, ACK, CANCEL, BYE, NOTIFY, REFER, MESSAGE, OPTIONS, INFO, SUBSCRIBE"))
if status >= 200 {
// For an ACK to error statuses.
Expand Down Expand Up @@ -1583,8 +1591,14 @@ func (c *sipInbound) accepted(inviteOK *sip.Response) {
c.inviteTx = nil
}

func (c *sipInbound) AcceptAsKeepAlive() {
c.RespondAndDrop(sip.StatusOK, "OK")
func (c *sipInbound) AcceptAsKeepAlive(sdp []byte) {
c.respondWithData(sip.StatusOK, "OK", "application/sdp", sdp)
}

func (c *sipInbound) OwnSDP() []byte {
c.mu.RLock()
defer c.mu.RUnlock()
Comment on lines +1599 to +1600
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just here to explicitly create a memory barrier, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the slices is ~24 bytes wide, so it won't be an atomic read.

But in general, in Go it's better to always lock things. Compiler could be aggressive when optimizing and it's safer to assume that the state will be used by other goroutines sooner or later. It may lead to very subtle bugs if we rely on implicit atomic reads.

return c.lastSDP
}

func (c *sipInbound) Accept(ctx context.Context, sdpData []byte, headers map[string]string) error {
Expand All @@ -1595,6 +1609,7 @@ func (c *sipInbound) Accept(ctx context.Context, sdpData []byte, headers map[str
if c.inviteTx == nil {
return errors.New("call already rejected")
}
c.lastSDP = sdpData
r := sip.NewResponseFromRequest(c.invite, sip.StatusOK, "OK", sdpData)

// This will effectively redirect future SIP requests to this server instance (if host address is not LB).
Expand Down