Skip to content

Commit

Permalink
only increment participant version after updates (#1646)
Browse files Browse the repository at this point in the history
* only increment participant version after updates

* fix test util

* cleanup

* test uptrackmanager permission update version check
  • Loading branch information
paulwe committed Apr 23, 2023
1 parent a77eb2a commit 745410b
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 104 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26
github.com/livekit/protocol v1.5.5-0.20230421192204-0975cb52f603
github.com/livekit/protocol v1.5.5-0.20230422131440-eb3ef0f4bf36
github.com/livekit/psrpc v0.3.0
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.14.0
Expand Down Expand Up @@ -92,7 +92,7 @@ require (
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/exp v0.0.0-20230420155640-133eef4313cb // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26 h1:QlQFyMwCDgjyySsrgmrMcVbEBA6KZcyTzvK+z346tUA=
github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26/go.mod h1:eDA41kiySZoG+wy4Etsjb3w0jjLx69i/vAmSjG4bteA=
github.com/livekit/protocol v1.5.5-0.20230421192204-0975cb52f603 h1:1O+cSgXWFWZDF+6cYOmJhu4w2HAsQcpLJI1YJfuixR8=
github.com/livekit/protocol v1.5.5-0.20230421192204-0975cb52f603/go.mod h1:/LpoK6XF8IeJDs/d5BPVmicp8pDPXmgTN1GVGhdpEX0=
github.com/livekit/protocol v1.5.5-0.20230422131440-eb3ef0f4bf36 h1:Qrl0N7dAeR2iLOk6u4WNelFhTh2OetyZzd/h8kbxLSI=
github.com/livekit/protocol v1.5.5-0.20230422131440-eb3ef0f4bf36/go.mod h1:iZ289+6H5xn/9kP2iqpRvVWxuc8GXBMqN0qI7LdN9HI=
github.com/livekit/psrpc v0.3.0 h1:giBZsfM3CWA0oIYXofsMITbVQtyW7u/ES9sQmVspHPM=
github.com/livekit/psrpc v0.3.0/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
Expand Down Expand Up @@ -281,8 +281,8 @@ golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ=
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/exp v0.0.0-20230420155640-133eef4313cb h1:rhjz/8Mbfa8xROFiH+MQphmAmgqRM0bOMnytznhWEXk=
golang.org/x/exp v0.0.0-20230420155640-133eef4313cb/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
Expand Down
77 changes: 58 additions & 19 deletions pkg/rtc/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,12 @@ type ParticipantImpl struct {
rttUpdatedAt time.Time
lastRTT uint32

lock utils.RWMutex
once sync.Once
version atomic.Uint32
lock utils.RWMutex
once sync.Once

dirty atomic.Bool
version atomic.Uint32
timedVersion utils.TimedVersion

// callbacks & handlers
onTrackPublished func(types.LocalParticipant, types.MediaTrack)
Expand Down Expand Up @@ -194,6 +197,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}),
}
p.version.Store(params.InitialVersion)
p.timedVersion.Update(params.VersionGenerator.New())
p.migrateState.Store(types.MigrateStateInit)
p.state.Store(livekit.ParticipantInfo_JOINING)
p.grants = params.Grants
Expand Down Expand Up @@ -291,16 +295,18 @@ func (p *ParticipantImpl) GetBufferFactory() *buffer.Factory {
// SetName attaches name to the participant
func (p *ParticipantImpl) SetName(name string) {
p.lock.Lock()
changed := p.grants.Name != name
if p.grants.Name == name {
p.lock.Unlock()
return
}

p.grants.Name = name
p.dirty.Store(true)

onParticipantUpdate := p.onParticipantUpdate
onClaimsChanged := p.onClaimsChanged
p.lock.Unlock()

if !changed {
return
}

if onParticipantUpdate != nil {
onParticipantUpdate(p)
}
Expand All @@ -312,16 +318,18 @@ func (p *ParticipantImpl) SetName(name string) {
// SetMetadata attaches metadata to the participant
func (p *ParticipantImpl) SetMetadata(metadata string) {
p.lock.Lock()
changed := p.grants.Metadata != metadata
if p.grants.Metadata == metadata {
p.lock.Unlock()
return
}

p.grants.Metadata = metadata
p.dirty.Store(true)

onParticipantUpdate := p.onParticipantUpdate
onClaimsChanged := p.onClaimsChanged
p.lock.Unlock()

if !changed {
return
}

if onParticipantUpdate != nil {
onParticipantUpdate(p)
}
Expand Down Expand Up @@ -350,6 +358,7 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio
}

video.UpdateFromPermission(permission)
p.dirty.Store(true)

canPublish := video.GetCanPublish()
canSubscribe := video.GetCanSubscribe()
Expand Down Expand Up @@ -392,24 +401,37 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio
return true
}

func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo {
func (p *ParticipantImpl) ToProtoWithVersion() (*livekit.ParticipantInfo, utils.TimedVersion) {
v := p.version.Load()
piv := p.timedVersion.Load()
if p.dirty.Swap(false) {
v = p.version.Inc()
piv = p.params.VersionGenerator.Next()
p.timedVersion.Update(&piv)
}

p.lock.RLock()
info := &livekit.ParticipantInfo{
pi := &livekit.ParticipantInfo{
Sid: string(p.params.SID),
Identity: string(p.params.Identity),
Name: p.grants.Name,
State: p.State(),
JoinedAt: p.ConnectedAt().Unix(),
Version: p.version.Inc(),
Version: v,
Permission: p.grants.Video.ToPermission(),
Metadata: p.grants.Metadata,
Region: p.params.Region,
IsPublisher: p.IsPublisher(),
}
p.lock.RUnlock()
info.Tracks = p.UpTrackManager.ToProto()
pi.Tracks = p.UpTrackManager.ToProto()

return info
return pi, piv
}

func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo {
pi, _ := p.ToProtoWithVersion()
return pi
}

// callbacks for clients
Expand Down Expand Up @@ -545,6 +567,10 @@ func (p *ParticipantImpl) handleMigrateMutedTrack() {
}
}
p.mutedTrackNotFired = append(p.mutedTrackNotFired, addedTracks...)

if len(addedTracks) != 0 {
p.dirty.Store(true)
}
p.pendingTracksLock.Unlock()

// launch callbacks in goroutine since they could block.
Expand Down Expand Up @@ -754,6 +780,7 @@ func (p *ParticipantImpl) SetMigrateState(s types.MigrateState) {

p.params.Logger.Debugw("SetMigrateState", "state", s)
p.migrateState.Store(s)
p.dirty.Store(true)

processPendingOffer := false
if s == types.MigrateStateSync {
Expand Down Expand Up @@ -1054,6 +1081,7 @@ func (p *ParticipantImpl) setupUpTrackManager() {
})

p.UpTrackManager.OnPublishedTrackUpdated(func(track types.MediaTrack) {
p.dirty.Store(true)
p.lock.RLock()
onTrackUpdated := p.onTrackUpdated
p.lock.RUnlock()
Expand Down Expand Up @@ -1084,8 +1112,11 @@ func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) {
if state == oldState {
return
}
p.state.Store(state)

p.params.Logger.Debugw("updating participant state", "state", state.String())
p.state.Store(state)
p.dirty.Store(true)

p.lock.RLock()
onStateChange := p.onStateChange
p.lock.RUnlock()
Expand All @@ -1103,6 +1134,8 @@ func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) {

func (p *ParticipantImpl) setIsPublisher(isPublisher bool) {
if p.isPublisher.Swap(isPublisher) != isPublisher {
p.dirty.Store(true)

// trigger update as well if participant is already fully connected
if p.State() == livekit.ParticipantInfo_ACTIVE {
p.lock.RLock()
Expand Down Expand Up @@ -1164,6 +1197,8 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
"mime", track.Codec().MimeType,
)

p.dirty.Store(true)

if !isNewTrack && !publishedTrack.HasPendingCodec() && p.IsReady() {
p.lock.RLock()
onTrackUpdated := p.onTrackUpdated
Expand Down Expand Up @@ -1515,6 +1550,7 @@ func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fro
}

func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) {
p.dirty.Store(true)
p.supervisor.SetPublicationMute(trackID, muted)

track := p.UpTrackManager.SetPublishedTrackMuted(trackID, muted)
Expand Down Expand Up @@ -1579,6 +1615,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei
ti.MimeType = track.Codec().MimeType
mt = p.addMediaTrack(signalCid, track.ID(), ti)
newTrack = true
p.dirty.Store(true)
}

ssrc := uint32(track.SSRC())
Expand Down Expand Up @@ -1699,6 +1736,8 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv
}
p.pendingTracksLock.Unlock()

p.dirty.Store(true)

if !p.IsClosed() {
// unpublished events aren't necessary when participant is closed
p.params.Logger.Infow("unpublished track", "trackID", ti.Sid, "trackInfo", ti)
Expand Down
1 change: 1 addition & 0 deletions pkg/rtc/participant_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ func newParticipantForTestWithOpts(identity livekit.ParticipantIdentity, opts *p
ClientInfo: ClientInfo{ClientInfo: opts.clientInfo},
Logger: LoggerWithParticipant(logger.GetLogger(), identity, sid, false),
Telemetry: &telemetryfakes.FakeTelemetryService{},
VersionGenerator: utils.NewDefaultTimedVersionGenerator(),
})
p.isPublisher.Store(opts.publisher)
p.updateState(livekit.ParticipantInfo_ACTIVE)
Expand Down
2 changes: 1 addition & 1 deletion pkg/rtc/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func (r *Room) SyncState(participant types.LocalParticipant, state *livekit.Sync
}

func (r *Room) UpdateSubscriptionPermission(participant types.LocalParticipant, subscriptionPermission *livekit.SubscriptionPermission) error {
if err := participant.UpdateSubscriptionPermission(subscriptionPermission, nil, r.GetParticipant, r.GetParticipantByID); err != nil {
if err := participant.UpdateSubscriptionPermission(subscriptionPermission, utils.TimedVersion{}, r.GetParticipant, r.GetParticipantByID); err != nil {
return err
}
for _, track := range participant.GetPublishedTracks() {
Expand Down
7 changes: 5 additions & 2 deletions pkg/rtc/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"

"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/sfu"
Expand Down Expand Up @@ -195,12 +196,12 @@ type Participant interface {
Start()
Close(sendLeave bool, reason ParticipantCloseReason) error

SubscriptionPermission() (*livekit.SubscriptionPermission, *livekit.TimedVersion)
SubscriptionPermission() (*livekit.SubscriptionPermission, utils.TimedVersion)

// updates from remotes
UpdateSubscriptionPermission(
subscriptionPermission *livekit.SubscriptionPermission,
timedVersion *livekit.TimedVersion,
timedVersion utils.TimedVersion,
resolverByIdentity func(participantIdentity livekit.ParticipantIdentity) LocalParticipant,
resolverBySid func(participantID livekit.ParticipantID) LocalParticipant,
) error
Expand Down Expand Up @@ -229,6 +230,8 @@ type AddTrackParams struct {
type LocalParticipant interface {
Participant

ToProtoWithVersion() (*livekit.ParticipantInfo, utils.TimedVersion)

// getters
GetLogger() logger.Logger
GetAdaptiveStream() bool
Expand Down
Loading

0 comments on commit 745410b

Please sign in to comment.