Skip to content

Commit

Permalink
Detach subscriber datachannel to save memory (#2680)
Browse files Browse the repository at this point in the history
Sfu don't read message from subscriber datachannel, detach
it to bypass the readLoop can save 128KB memory and 2 goroutines per
participant.
  • Loading branch information
cnderrauber committed Apr 24, 2024
1 parent dfadb3f commit b55038a
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions pkg/rtc/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import (
sfuutils "github.com/livekit/livekit-server/pkg/sfu/utils"
"github.com/livekit/livekit-server/pkg/telemetry/prometheus"
"github.com/livekit/livekit-server/pkg/utils"
sutils "github.com/livekit/livekit-server/pkg/utils"
)

const (
Expand All @@ -75,8 +74,6 @@ const (
minConnectTimeoutAfterICE = 10 * time.Second
maxConnectTimeoutAfterICE = 20 * time.Second // max duration for waiting pc to connect after ICE is connected

maxICECandidates = 20

shortConnectionThreshold = 90 * time.Second
)

Expand Down Expand Up @@ -185,7 +182,7 @@ type PCTransport struct {
preferTCP atomic.Bool
isClosed atomic.Bool

eventsQueue *sutils.TypedOpsQueue[event]
eventsQueue *utils.TypedOpsQueue[event]

// the following should be accessed only in event processing go routine
cacheLocalCandidates bool
Expand Down Expand Up @@ -305,6 +302,7 @@ func newPeerConnection(params TransportParams, onBandwidthEstimator func(estimat

ir := &interceptor.Registry{}
if params.IsSendSide {
se.DetachDataChannels()
if params.CongestionControlConfig.UseSendSideBWE {
gf, err := cc.NewInterceptor(func() (cc.BandwidthEstimator, error) {
return gcc.NewSendSideBWE(
Expand Down Expand Up @@ -390,7 +388,7 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) {
params: params,
debouncedNegotiate: debounce.New(negotiationFrequency),
negotiationState: transport.NegotiationStateNone,
eventsQueue: sutils.NewTypedOpsQueue[event](utils.OpsQueueParams{
eventsQueue: utils.NewTypedOpsQueue[event](utils.OpsQueueParams{
Name: "transport",
MinSize: 64,
Logger: params.Logger,
Expand All @@ -402,7 +400,7 @@ func NewPCTransport(params TransportParams) (*PCTransport, error) {
if params.IsSendSide {
t.streamAllocator = streamallocator.NewStreamAllocator(streamallocator.StreamAllocatorParams{
Config: params.CongestionControlConfig,
Logger: params.Logger.WithComponent(sutils.ComponentCongestionControl),
Logger: params.Logger.WithComponent(utils.ComponentCongestionControl),
})
t.streamAllocator.OnStreamStateChange(params.Handler.OnStreamStateChange)
t.streamAllocator.Start()
Expand Down Expand Up @@ -855,8 +853,22 @@ func (t *PCTransport) CreateDataChannel(label string, dci *webrtc.DataChannelIni
defer t.lock.Unlock()
*dcPtr = dc
if t.params.DirectionConfig.StrictACKs {
dc.OnOpen(dcReadyHandler)
dc.OnOpen(func() {
if t.params.IsSendSide {
if _, err := dc.Detach(); err != nil {
t.params.Logger.Warnw("failed to detach data channel", err)
}
}
dcReadyHandler()
})
} else {
dc.OnOpen(func() {
if t.params.IsSendSide {
if _, err := dc.Detach(); err != nil {
t.params.Logger.Warnw("failed to detach data channel", err)
}
}
})
dc.OnDial(dcReadyHandler)
}
dc.OnClose(dcCloseHandler)
Expand Down

0 comments on commit b55038a

Please sign in to comment.