Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
livekit-ffi: patch
---

chore: add LocalTrackRepublished event for FFI clients - #1072 (@davidzhao)
50 changes: 49 additions & 1 deletion livekit-ffi-node-bindings/proto/room_pb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import type { BinaryReadOptions, FieldList, JsonReadOptions, JsonValue, PartialMessage, PlainMessage } from "@bufbuild/protobuf";
import { Message, proto2 } from "@bufbuild/protobuf";
import type { DisconnectReason, OwnedParticipant, ParticipantInfo, ParticipantPermission } from "./participant_pb.js";
import type { OwnedTrack, OwnedTrackPublication, PacketTrailerFeature, TrackSource } from "./track_pb.js";
import type { OwnedTrack, OwnedTrackPublication, PacketTrailerFeature, TrackPublicationInfo, TrackSource } from "./track_pb.js";
import type { RtcStats } from "./stats_pb.js";
import type { VideoCodec } from "./video_frame_pb.js";
import type { E2eeOptions, EncryptionState } from "./e2ee_pb.js";
Expand Down Expand Up @@ -2317,6 +2317,12 @@ export declare class RoomEvent extends Message<RoomEvent> {
*/
value: DataTrackUnpublished;
case: "dataTrackUnpublished";
} | {
/**
* @generated from field: livekit.proto.LocalTrackRepublished local_track_republished = 45;
*/
value: LocalTrackRepublished;
case: "localTrackRepublished";
} | { case: undefined; value?: undefined };

constructor(data?: PartialMessage<RoomEvent>);
Expand Down Expand Up @@ -2594,6 +2600,48 @@ export declare class LocalTrackUnpublished extends Message<LocalTrackUnpublished
static equals(a: LocalTrackUnpublished | PlainMessage<LocalTrackUnpublished> | undefined, b: LocalTrackUnpublished | PlainMessage<LocalTrackUnpublished> | undefined): boolean;
}

/**
* Fired when the SDK auto-republishes a local track during a full
* reconnect. The FfiPublication handle is preserved across the cycle —
* language bindings should look up the existing publication object by
* `previous_sid` (its old SID), update its TrackPublicationInfo in place
* with `info`, and rekey it under the new SID. Apps holding a cached
* reference to the publication continue to see a valid object whose
* reads/writes hit current state.
*
* @generated from message livekit.proto.LocalTrackRepublished
*/
export declare class LocalTrackRepublished extends Message<LocalTrackRepublished> {
/**
* @generated from field: required uint64 publication_handle = 1;
*/
publicationHandle?: bigint;

/**
* @generated from field: required string previous_sid = 2;
*/
previousSid?: string;

/**
* @generated from field: required livekit.proto.TrackPublicationInfo info = 3;
*/
info?: TrackPublicationInfo;

constructor(data?: PartialMessage<LocalTrackRepublished>);

static readonly runtime: typeof proto2;
static readonly typeName = "livekit.proto.LocalTrackRepublished";
static readonly fields: FieldList;

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): LocalTrackRepublished;

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): LocalTrackRepublished;

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): LocalTrackRepublished;

static equals(a: LocalTrackRepublished | PlainMessage<LocalTrackRepublished> | undefined, b: LocalTrackRepublished | PlainMessage<LocalTrackRepublished> | undefined): boolean;
}

/**
* @generated from message livekit.proto.LocalTrackSubscribed
*/
Expand Down
24 changes: 23 additions & 1 deletion livekit-ffi-node-bindings/proto/room_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Object.defineProperty(exports, "__esModule", { value: true });

const { proto2 } = require("@bufbuild/protobuf");
const { DisconnectReason, OwnedParticipant, ParticipantInfo, ParticipantPermission } = require("./participant_pb.js");
const { OwnedTrack, OwnedTrackPublication, PacketTrailerFeature, TrackSource } = require("./track_pb.js");
const { OwnedTrack, OwnedTrackPublication, PacketTrailerFeature, TrackPublicationInfo, TrackSource } = require("./track_pb.js");
const { RtcStats } = require("./stats_pb.js");
const { VideoCodec } = require("./video_frame_pb.js");
const { E2eeOptions, EncryptionState } = require("./e2ee_pb.js");
Expand Down Expand Up @@ -829,6 +829,7 @@ const RoomEvent = /*@__PURE__*/ proto2.makeMessageType(
{ no: 42, name: "participant_active", kind: "message", T: ParticipantActive, oneof: "message" },
{ no: 43, name: "data_track_published", kind: "message", T: DataTrackPublished, oneof: "message" },
{ no: 44, name: "data_track_unpublished", kind: "message", T: DataTrackUnpublished, oneof: "message" },
{ no: 45, name: "local_track_republished", kind: "message", T: LocalTrackRepublished, oneof: "message" },
],
);

Expand Down Expand Up @@ -925,6 +926,26 @@ const LocalTrackUnpublished = /*@__PURE__*/ proto2.makeMessageType(
],
);

/**
* Fired when the SDK auto-republishes a local track during a full
* reconnect. The FfiPublication handle is preserved across the cycle —
* language bindings should look up the existing publication object by
* `previous_sid` (its old SID), update its TrackPublicationInfo in place
* with `info`, and rekey it under the new SID. Apps holding a cached
* reference to the publication continue to see a valid object whose
* reads/writes hit current state.
*
* @generated from message livekit.proto.LocalTrackRepublished
*/
const LocalTrackRepublished = /*@__PURE__*/ proto2.makeMessageType(
"livekit.proto.LocalTrackRepublished",
() => [
{ no: 1, name: "publication_handle", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true },
{ no: 2, name: "previous_sid", kind: "scalar", T: 9 /* ScalarType.STRING */, req: true },
{ no: 3, name: "info", kind: "message", T: TrackPublicationInfo, req: true },
],
);

/**
* @generated from message livekit.proto.LocalTrackSubscribed
*/
Expand Down Expand Up @@ -1646,6 +1667,7 @@ exports.ParticipantActive = ParticipantActive;
exports.ParticipantDisconnected = ParticipantDisconnected;
exports.LocalTrackPublished = LocalTrackPublished;
exports.LocalTrackUnpublished = LocalTrackUnpublished;
exports.LocalTrackRepublished = LocalTrackRepublished;
exports.LocalTrackSubscribed = LocalTrackSubscribed;
exports.TrackPublished = TrackPublished;
exports.TrackUnpublished = TrackUnpublished;
Expand Down
14 changes: 14 additions & 0 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ message RoomEvent {
ParticipantActive participant_active = 42;
DataTrackPublished data_track_published = 43;
DataTrackUnpublished data_track_unpublished = 44;
LocalTrackRepublished local_track_republished = 45;
}
}

Expand Down Expand Up @@ -486,6 +487,19 @@ message LocalTrackUnpublished {
required string publication_sid = 1;
}

// Fired when the SDK auto-republishes a local track during a full
// reconnect. The FfiPublication handle is preserved across the cycle —
// language bindings should look up the existing publication object by
// `previous_sid` (its old SID), update its TrackPublicationInfo in place
// with `info`, and rekey it under the new SID. Apps holding a cached
// reference to the publication continue to see a valid object whose
// reads/writes hit current state.
message LocalTrackRepublished {
required uint64 publication_handle = 1;
required string previous_sid = 2;
required TrackPublicationInfo info = 3;
}

message LocalTrackSubscribed {
required string track_sid = 2;
}
Expand Down
106 changes: 89 additions & 17 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ pub struct RoomInner {

track_handle_lookup: Arc<Mutex<HashMap<TrackSid, FfiHandleId>>>,

// Maps a local publication's current sid -> its FfiPublication handle.
// Used to preserve the FfiPublication handle across the SDK's
// auto-republish during a full reconnect — the language binding can
// continue to use the same publication handle while the inner
// publication is swapped to the new (re-issued) one.
local_publication_lookup: Arc<Mutex<HashMap<TrackSid, FfiHandleId>>>,

// Used to forward RPC method invocation to the FfiClient and collect their results
rpc_method_invocation_waiters: Mutex<HashMap<u64, oneshot::Sender<Result<String, RpcError>>>>,

Expand Down Expand Up @@ -179,6 +186,7 @@ impl FfiRoom {
pending_published_tracks: Default::default(),
pending_unpublished_tracks: Default::default(),
track_handle_lookup: Default::default(),
local_publication_lookup: Default::default(),
rpc_method_invocation_waiters: Default::default(),
url: connect.url,
});
Expand Down Expand Up @@ -1051,36 +1059,100 @@ async fn forward_event(
}
RoomEvent::LocalTrackPublished { publication, track: _, participant: _ } => {
let sid = publication.sid();
// If we're currently reconnecting, users can't publish tracks, if we receive this
// event it means the RoomEngine is republishing tracks to finish the reconnection
// process. (So we're not waiting for any PublishCallback)
if !present_state.lock().reconnecting {
// Make sure to send the event *after* the async callback of the PublishTrackRequest
// Wait for the PublishTrack callback to be sent (waiting time is really short, so
// it is fine to not spawn a new task)
loop {
if inner.pending_published_tracks.lock().remove(&sid) {
break;
}
log::debug!("waiting for the PublishTrack callback to be sent");
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let reconnecting = present_state.lock().reconnecting;
if reconnecting {
// The engine is auto-republishing tracks during a full
// reconnect. We defer the FfiPublication swap (and the
// proto event) to the LocalTrackRepublished handler
return;
}

// Make sure to send the event *after* the async callback of the PublishTrackRequest
// Wait for the PublishTrack callback to be sent (waiting time is really short, so
// it is fine to not spawn a new task)
loop {
if inner.pending_published_tracks.lock().remove(&sid) {
break;
}
log::debug!("waiting for the PublishTrack callback to be sent");
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}

let ffi_publication = FfiPublication {
handle: server.next_id(),
publication: TrackPublication::Local(publication),
};
server.store_handle(ffi_publication.handle, ffi_publication);
let handle_id = ffi_publication.handle;
server.store_handle(handle_id, ffi_publication);
inner.local_publication_lookup.lock().insert(sid.clone(), handle_id);

let _ = send_event(proto::LocalTrackPublished { track_sid: sid.to_string() }.into());
}
RoomEvent::LocalTrackUnpublished { publication, participant: _ } => {
let sid = publication.sid();
// During a reconnect, the unpublish is the SDK's internal
// bookkeeping for the auto-republish flow. we will ignore it here and handle it in
// the LocalTrackRepublished handler
if present_state.lock().reconnecting {
return;
}

inner.local_publication_lookup.lock().remove(&sid);
inner.pending_unpublished_tracks.lock().insert(sid.clone());
let _ = send_event(proto::LocalTrackUnpublished { publication_sid: sid.into() }.into());
}
RoomEvent::LocalTrackRepublished {
previous_sid,
publication,
track: _,
participant: _,
} => {
let new_sid = publication.sid();
let mut lookup = inner.local_publication_lookup.lock();
let Some(handle_id) = lookup.remove(&previous_sid) else {
// We never tracked an FfiPublication for this SID —
// shouldn't happen on the auto-republish path, but fall
// back to creating a fresh one so the binding still gets
// a usable handle.
let ffi_publication = FfiPublication {
handle: server.next_id(),
publication: TrackPublication::Local(publication),
};
let new_handle_id = ffi_publication.handle;
let info = proto::TrackPublicationInfo::from(&ffi_publication);
server.store_handle(new_handle_id, ffi_publication);
lookup.insert(new_sid, new_handle_id);
drop(lookup);
let _ = send_event(
proto::LocalTrackRepublished {
publication_handle: new_handle_id,
previous_sid: previous_sid.into(),
info,
}
.into(),
);
return;
};

// Swap the inner publication on the existing FfiPublication
// (handle id preserved); rekey the lookup under the new sid.
let ffi_publication = FfiPublication {
handle: handle_id,
publication: TrackPublication::Local(publication),
};
let info = proto::TrackPublicationInfo::from(&ffi_publication);
server.store_handle(handle_id, ffi_publication);
lookup.insert(new_sid, handle_id);
drop(lookup);

let _ = send_event(
proto::LocalTrackUnpublished { publication_sid: publication.sid().into() }.into(),
proto::LocalTrackRepublished {
publication_handle: handle_id,
previous_sid: previous_sid.into(),
info,
}
.into(),
);

inner.pending_unpublished_tracks.lock().insert(publication.sid());
}
RoomEvent::LocalTrackSubscribed { track } => {
let _ = send_event(
Expand Down
54 changes: 40 additions & 14 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ pub enum RoomEvent {
publication: LocalTrackPublication,
participant: LocalParticipant,
},
/// Fired when the SDK auto-republishes a local track during a full
/// reconnect. The same underlying `Track` (and its bound source) is
/// preserved across the cycle, but the publication and track SIDs are
/// re-issued by the server. Bindings are expected to update the
/// existing publication object in place rather than treating this as
/// an unpublish + publish pair.
LocalTrackRepublished {
previous_sid: TrackSid,
publication: LocalTrackPublication,
track: LocalTrack,
participant: LocalParticipant,
},
LocalTrackSubscribed {
track: LocalTrack,
},
Expand Down Expand Up @@ -1567,21 +1579,35 @@ impl RoomSession {
let track = publication.track().unwrap();

let lp = session.local_participant.clone();
let republish_session = session.clone();
let republish = async move {
// Only "really" used to send LocalTrackUnpublished event (Since we don't
// really need to remove the RtpSender since we know
// we are using a new RtcSession,
// so new PeerConnetions)

let _ = lp.unpublish_track(&publication.sid()).await;
if let Err(err) =
lp.publish_track(track.clone(), publication.publish_options()).await
{
log::error!(
"failed to republish track {} after rtc_engine restarted: {}",
track.name(),
err
)
// The unpublish+publish sequence below regenerates
// server-assigned IDs but preserves the local Track
// Arc (and its bound source). We capture the prior
// SID so the `LocalTrackRepublished` event can carry
// it through to the FFI layer / language bindings,
// which use it to find the existing publication
// object and update it in place.
let previous_sid = publication.sid();
let _ = lp.unpublish_track(&previous_sid).await;
match lp.publish_track(track.clone(), publication.publish_options()).await {
Ok(new_publication) => {
republish_session.dispatcher.dispatch(
&RoomEvent::LocalTrackRepublished {
previous_sid,
publication: new_publication,
track: track.clone(),
participant: lp.clone(),
},
);
}
Err(err) => {
log::error!(
"failed to republish track {} after rtc_engine restarted: {}",
track.name(),
err
)
}
}
};

Expand Down
Loading