Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

only increment participant version after updates #1646

Merged
merged 4 commits into from
Apr 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26
github.com/livekit/protocol v1.5.5-0.20230421192204-0975cb52f603
github.com/livekit/protocol v1.5.5-0.20230422131440-eb3ef0f4bf36
github.com/livekit/psrpc v0.3.0
github.com/mackerelio/go-osstat v0.2.4
github.com/magefile/mage v1.14.0
Expand Down Expand Up @@ -92,7 +92,7 @@ require (
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/exp v0.0.0-20230420155640-133eef4313cb // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26 h1:QlQFyMwCDgjyySsrgmrMcVbEBA6KZcyTzvK+z346tUA=
github.com/livekit/mediatransportutil v0.0.0-20230326055817-ed569ca13d26/go.mod h1:eDA41kiySZoG+wy4Etsjb3w0jjLx69i/vAmSjG4bteA=
github.com/livekit/protocol v1.5.5-0.20230421192204-0975cb52f603 h1:1O+cSgXWFWZDF+6cYOmJhu4w2HAsQcpLJI1YJfuixR8=
github.com/livekit/protocol v1.5.5-0.20230421192204-0975cb52f603/go.mod h1:/LpoK6XF8IeJDs/d5BPVmicp8pDPXmgTN1GVGhdpEX0=
github.com/livekit/protocol v1.5.5-0.20230422131440-eb3ef0f4bf36 h1:Qrl0N7dAeR2iLOk6u4WNelFhTh2OetyZzd/h8kbxLSI=
github.com/livekit/protocol v1.5.5-0.20230422131440-eb3ef0f4bf36/go.mod h1:iZ289+6H5xn/9kP2iqpRvVWxuc8GXBMqN0qI7LdN9HI=
github.com/livekit/psrpc v0.3.0 h1:giBZsfM3CWA0oIYXofsMITbVQtyW7u/ES9sQmVspHPM=
github.com/livekit/psrpc v0.3.0/go.mod h1:n6JntEg+zT6Ji8InoyTpV7wusPNwGqqtxmHlkNhDN0U=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
Expand Down Expand Up @@ -281,8 +281,8 @@ golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ=
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/exp v0.0.0-20230420155640-133eef4313cb h1:rhjz/8Mbfa8xROFiH+MQphmAmgqRM0bOMnytznhWEXk=
golang.org/x/exp v0.0.0-20230420155640-133eef4313cb/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
Expand Down
77 changes: 58 additions & 19 deletions pkg/rtc/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,12 @@ type ParticipantImpl struct {
rttUpdatedAt time.Time
lastRTT uint32

lock utils.RWMutex
once sync.Once
version atomic.Uint32
lock utils.RWMutex
once sync.Once

dirty atomic.Bool
version atomic.Uint32
timedVersion utils.TimedVersion

// callbacks & handlers
onTrackPublished func(types.LocalParticipant, types.MediaTrack)
Expand Down Expand Up @@ -194,6 +197,7 @@ func NewParticipant(params ParticipantParams) (*ParticipantImpl, error) {
supervisor: supervisor.NewParticipantSupervisor(supervisor.ParticipantSupervisorParams{Logger: params.Logger}),
}
p.version.Store(params.InitialVersion)
p.timedVersion.Update(params.VersionGenerator.New())
p.migrateState.Store(types.MigrateStateInit)
p.state.Store(livekit.ParticipantInfo_JOINING)
p.grants = params.Grants
Expand Down Expand Up @@ -291,16 +295,18 @@ func (p *ParticipantImpl) GetBufferFactory() *buffer.Factory {
// SetName attaches name to the participant
func (p *ParticipantImpl) SetName(name string) {
p.lock.Lock()
changed := p.grants.Name != name
if p.grants.Name == name {
p.lock.Unlock()
return
}

p.grants.Name = name
p.dirty.Store(true)

onParticipantUpdate := p.onParticipantUpdate
onClaimsChanged := p.onClaimsChanged
p.lock.Unlock()

if !changed {
return
}

if onParticipantUpdate != nil {
onParticipantUpdate(p)
}
Expand All @@ -312,16 +318,18 @@ func (p *ParticipantImpl) SetName(name string) {
// SetMetadata attaches metadata to the participant
func (p *ParticipantImpl) SetMetadata(metadata string) {
p.lock.Lock()
changed := p.grants.Metadata != metadata
if p.grants.Metadata == metadata {
p.lock.Unlock()
return
}

p.grants.Metadata = metadata
p.dirty.Store(true)

onParticipantUpdate := p.onParticipantUpdate
onClaimsChanged := p.onClaimsChanged
p.lock.Unlock()

if !changed {
return
}

if onParticipantUpdate != nil {
onParticipantUpdate(p)
}
Expand Down Expand Up @@ -350,6 +358,7 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio
}

video.UpdateFromPermission(permission)
p.dirty.Store(true)

canPublish := video.GetCanPublish()
canSubscribe := video.GetCanSubscribe()
Expand Down Expand Up @@ -392,24 +401,37 @@ func (p *ParticipantImpl) SetPermission(permission *livekit.ParticipantPermissio
return true
}

func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo {
func (p *ParticipantImpl) ToProtoWithVersion() (*livekit.ParticipantInfo, utils.TimedVersion) {
v := p.version.Load()
piv := p.timedVersion.Load()
if p.dirty.Swap(false) {
v = p.version.Inc()
piv = p.params.VersionGenerator.Next()
p.timedVersion.Update(&piv)
}

p.lock.RLock()
info := &livekit.ParticipantInfo{
pi := &livekit.ParticipantInfo{
Sid: string(p.params.SID),
Identity: string(p.params.Identity),
Name: p.grants.Name,
State: p.State(),
JoinedAt: p.ConnectedAt().Unix(),
Version: p.version.Inc(),
Version: v,
Permission: p.grants.Video.ToPermission(),
Metadata: p.grants.Metadata,
Region: p.params.Region,
IsPublisher: p.IsPublisher(),
}
p.lock.RUnlock()
info.Tracks = p.UpTrackManager.ToProto()
pi.Tracks = p.UpTrackManager.ToProto()

return info
return pi, piv
}

func (p *ParticipantImpl) ToProto() *livekit.ParticipantInfo {
pi, _ := p.ToProtoWithVersion()
return pi
}

// callbacks for clients
Expand Down Expand Up @@ -545,6 +567,10 @@ func (p *ParticipantImpl) handleMigrateMutedTrack() {
}
}
p.mutedTrackNotFired = append(p.mutedTrackNotFired, addedTracks...)

if len(addedTracks) != 0 {
p.dirty.Store(true)
}
p.pendingTracksLock.Unlock()

// launch callbacks in goroutine since they could block.
Expand Down Expand Up @@ -754,6 +780,7 @@ func (p *ParticipantImpl) SetMigrateState(s types.MigrateState) {

p.params.Logger.Debugw("SetMigrateState", "state", s)
p.migrateState.Store(s)
p.dirty.Store(true)

processPendingOffer := false
if s == types.MigrateStateSync {
Expand Down Expand Up @@ -1054,6 +1081,7 @@ func (p *ParticipantImpl) setupUpTrackManager() {
})

p.UpTrackManager.OnPublishedTrackUpdated(func(track types.MediaTrack) {
p.dirty.Store(true)
p.lock.RLock()
onTrackUpdated := p.onTrackUpdated
p.lock.RUnlock()
Expand Down Expand Up @@ -1084,8 +1112,11 @@ func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) {
if state == oldState {
return
}
p.state.Store(state)

p.params.Logger.Debugw("updating participant state", "state", state.String())
p.state.Store(state)
p.dirty.Store(true)

p.lock.RLock()
onStateChange := p.onStateChange
p.lock.RUnlock()
Expand All @@ -1103,6 +1134,8 @@ func (p *ParticipantImpl) updateState(state livekit.ParticipantInfo_State) {

func (p *ParticipantImpl) setIsPublisher(isPublisher bool) {
if p.isPublisher.Swap(isPublisher) != isPublisher {
p.dirty.Store(true)

// trigger update as well if participant is already fully connected
if p.State() == livekit.ParticipantInfo_ACTIVE {
p.lock.RLock()
Expand Down Expand Up @@ -1164,6 +1197,8 @@ func (p *ParticipantImpl) onMediaTrack(track *webrtc.TrackRemote, rtpReceiver *w
"mime", track.Codec().MimeType,
)

p.dirty.Store(true)

if !isNewTrack && !publishedTrack.HasPendingCodec() && p.IsReady() {
p.lock.RLock()
onTrackUpdated := p.onTrackUpdated
Expand Down Expand Up @@ -1515,6 +1550,7 @@ func (p *ParticipantImpl) SetTrackMuted(trackID livekit.TrackID, muted bool, fro
}

func (p *ParticipantImpl) setTrackMuted(trackID livekit.TrackID, muted bool) {
p.dirty.Store(true)
p.supervisor.SetPublicationMute(trackID, muted)

track := p.UpTrackManager.SetPublishedTrackMuted(trackID, muted)
Expand Down Expand Up @@ -1579,6 +1615,7 @@ func (p *ParticipantImpl) mediaTrackReceived(track *webrtc.TrackRemote, rtpRecei
ti.MimeType = track.Codec().MimeType
mt = p.addMediaTrack(signalCid, track.ID(), ti)
newTrack = true
p.dirty.Store(true)
}

ssrc := uint32(track.SSRC())
Expand Down Expand Up @@ -1699,6 +1736,8 @@ func (p *ParticipantImpl) addMediaTrack(signalCid string, sdpCid string, ti *liv
}
p.pendingTracksLock.Unlock()

p.dirty.Store(true)

if !p.IsClosed() {
// unpublished events aren't necessary when participant is closed
p.params.Logger.Infow("unpublished track", "trackID", ti.Sid, "trackInfo", ti)
Expand Down
1 change: 1 addition & 0 deletions pkg/rtc/participant_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ func newParticipantForTestWithOpts(identity livekit.ParticipantIdentity, opts *p
ClientInfo: ClientInfo{ClientInfo: opts.clientInfo},
Logger: LoggerWithParticipant(logger.GetLogger(), identity, sid, false),
Telemetry: &telemetryfakes.FakeTelemetryService{},
VersionGenerator: utils.NewDefaultTimedVersionGenerator(),
})
p.isPublisher.Store(opts.publisher)
p.updateState(livekit.ParticipantInfo_ACTIVE)
Expand Down
2 changes: 1 addition & 1 deletion pkg/rtc/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func (r *Room) SyncState(participant types.LocalParticipant, state *livekit.Sync
}

func (r *Room) UpdateSubscriptionPermission(participant types.LocalParticipant, subscriptionPermission *livekit.SubscriptionPermission) error {
if err := participant.UpdateSubscriptionPermission(subscriptionPermission, nil, r.GetParticipant, r.GetParticipantByID); err != nil {
if err := participant.UpdateSubscriptionPermission(subscriptionPermission, utils.TimedVersion{}, r.GetParticipant, r.GetParticipantByID); err != nil {
return err
}
for _, track := range participant.GetPublishedTracks() {
Expand Down
7 changes: 5 additions & 2 deletions pkg/rtc/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"

"github.com/livekit/livekit-server/pkg/routing"
"github.com/livekit/livekit-server/pkg/sfu"
Expand Down Expand Up @@ -195,12 +196,12 @@ type Participant interface {
Start()
Close(sendLeave bool, reason ParticipantCloseReason) error

SubscriptionPermission() (*livekit.SubscriptionPermission, *livekit.TimedVersion)
SubscriptionPermission() (*livekit.SubscriptionPermission, utils.TimedVersion)

// updates from remotes
UpdateSubscriptionPermission(
subscriptionPermission *livekit.SubscriptionPermission,
timedVersion *livekit.TimedVersion,
timedVersion utils.TimedVersion,
resolverByIdentity func(participantIdentity livekit.ParticipantIdentity) LocalParticipant,
resolverBySid func(participantID livekit.ParticipantID) LocalParticipant,
) error
Expand Down Expand Up @@ -229,6 +230,8 @@ type AddTrackParams struct {
type LocalParticipant interface {
Participant

ToProtoWithVersion() (*livekit.ParticipantInfo, utils.TimedVersion)

// getters
GetLogger() logger.Logger
GetAdaptiveStream() bool
Expand Down
Loading