Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 1 addition & 25 deletions lib/src/core/engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -557,16 +557,11 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
// for subscriberPrimary, we negotiate when necessary (lazy)
await negotiate();
}

// Relay to Room
events.emit(event);
})
..on<SignalConnectionStateUpdatedEvent>((event) async {
if (event.newState == ConnectionState.disconnected) {
await _onDisconnected(DisconnectReason.signal);
}
// Relay to Room
events.emit(event);
})
..on<SignalOfferEvent>((event) async {
if (subscriber == null) {
Expand Down Expand Up @@ -611,20 +606,6 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await publisher!.addIceCandidate(event.candidate);
}
})
// relay to Room
..on<SignalParticipantUpdateEvent>((event) => events.emit(event))
// relay to Room
..on<SignalSpeakersChangedEvent>((event) => events.emit(event))
// relay to Room
..on<SignalConnectionQualityUpdateEvent>((event) => events.emit(event))
// relay to Room
..on<SignalStreamStateUpdatedEvent>((event) => events.emit(event))
// relay to Room
..on<SignalSubscribedQualityUpdatedEvent>((event) => events.emit(event))
// relay to Room
..on<SignalSubscriptionPermissionUpdateEvent>((event) => events.emit(event))
// relay to Room
..on<SignalRoomUpdateEvent>((event) => events.emit(event))
..on<SignalTokenUpdatedEvent>((event) {
logger.fine('Server refreshed the token');
token = event.token;
Expand All @@ -636,12 +617,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
return;
}
await cleanUp();
})
..on<SignalMuteTrackEvent>(
(event) => events.emit(EngineRemoteMuteChangedEvent(
sid: event.sid,
muted: event.muted,
)));
});
}

extension EnginePrivateMethods on Engine {
Expand Down
77 changes: 43 additions & 34 deletions lib/src/core/room.dart
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
final Engine engine;
// suppport for multiple event listeners
late final EventsListener<EngineEvent> _engineListener;
//
late final EventsListener<SignalEvent> _signalListener;

Room({
ConnectOptions? connectOptions,
Expand All @@ -82,7 +84,10 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
_roomOptions = roomOptions,
engine = engine ?? Engine() {
_engineListener = this.engine.createListener();
_setUpListeners();
_setUpEngineListeners();

_signalListener = this.engine.signalClient.createListener();
_setUpSignalListeners();

// Any event emitted will trigger ChangeNotifier
events.listen((event) {
Expand All @@ -97,7 +102,9 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
await events.dispose();
// dispose local participant
await localParticipant?.dispose();
// dispose all listeners for RTCEngine
// dispose all listeners for SignalClient
await _signalListener.dispose();
// dispose all listeners for Engine
await _engineListener.dispose();
// dispose the engine
await this.engine.dispose();
Expand All @@ -117,27 +124,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
return engine.connect(url, token, this.connectOptions);
}

void _setUpListeners() => _engineListener
..on<EngineConnectionStateUpdatedEvent>((event) async {
if (event.didReconnect) {
events.emit(const RoomReconnectedEvent());
await _handlePostReconnect(false);
} else if (event.newState == ConnectionState.reconnecting) {
events.emit(const RoomReconnectingEvent());
} else if (event.newState == ConnectionState.disconnected) {
await _cleanUp();
events.emit(const RoomDisconnectedEvent());
}
// always notify ChangeNotifier
notifyListeners();
})
..on<SignalConnectionStateUpdatedEvent>((event) {
// during reconnection, need to send sync state upon signal connection.
if (event.didReconnect) {
logger.fine('Sending syncState');
_sendSyncState();
}
})
void _setUpSignalListeners() => _signalListener
..on<SignalJoinResponseEvent>((event) {
_sid = event.response.room.sid;
_name = event.response.room.name;
Expand All @@ -163,23 +150,12 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
})
..on<SignalParticipantUpdateEvent>(
(event) => _onParticipantUpdateEvent(event.participants))
..on<EngineActiveSpeakersUpdateEvent>(
(event) => _onEngineActiveSpeakersUpdateEvent(event.speakers))
..on<SignalSpeakersChangedEvent>(
(event) => _onSignalSpeakersChangedEvent(event.speakers))
..on<SignalConnectionQualityUpdateEvent>(
(event) => _onSignalConnectionQualityUpdateEvent(event.updates))
..on<SignalStreamStateUpdatedEvent>(
(event) => _onSignalStreamStateUpdateEvent(event.updates))
..on<EngineDataPacketReceivedEvent>(_onDataMessageEvent)
..on<EngineRemoteMuteChangedEvent>((event) async {
final publication = localParticipant?.trackPublications[event.sid];
if (event.muted) {
await publication?.mute();
} else {
await publication?.unmute();
}
})
..on<SignalSubscribedQualityUpdatedEvent>((event) {
// Signal for Dynacast
final options = roomOptions ?? const RoomOptions();
Expand Down Expand Up @@ -221,6 +197,39 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
_metadata = event.room.metadata;
events.emit(RoomMetadataChangedEvent(metadata: event.room.metadata));
})
..on<SignalConnectionStateUpdatedEvent>((event) {
// during reconnection, need to send sync state upon signal connection.
if (event.didReconnect) {
logger.fine('Sending syncState');
_sendSyncState();
}
})
..on<SignalRemoteMuteTrackEvent>((event) async {
final publication = localParticipant?.trackPublications[event.sid];
if (event.muted) {
await publication?.mute();
} else {
await publication?.unmute();
}
});

void _setUpEngineListeners() => _engineListener
..on<EngineConnectionStateUpdatedEvent>((event) async {
if (event.didReconnect) {
events.emit(const RoomReconnectedEvent());
await _handlePostReconnect(false);
} else if (event.newState == ConnectionState.reconnecting) {
events.emit(const RoomReconnectingEvent());
} else if (event.newState == ConnectionState.disconnected) {
await _cleanUp();
events.emit(const RoomDisconnectedEvent());
}
// always notify ChangeNotifier
notifyListeners();
})
..on<EngineActiveSpeakersUpdateEvent>(
(event) => _onEngineActiveSpeakersUpdateEvent(event.speakers))
..on<EngineDataPacketReceivedEvent>(_onDataMessageEvent)
..on<EngineTrackAddedEvent>((event) async {
logger.fine('EngineTrackAddedEvent trackSid:${event.track.id}');

Expand Down
2 changes: 1 addition & 1 deletion lib/src/core/signal_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class SignalClient extends Disposable with EventsEmittable<SignalEvent> {
events.emit(SignalLeaveEvent(canReconnect: msg.leave.canReconnect));
break;
case lk_rtc.SignalResponse_Message.mute:
events.emit(SignalMuteTrackEvent(
events.emit(SignalRemoteMuteTrackEvent(
sid: msg.mute.sid,
muted: msg.mute.muted,
));
Expand Down
37 changes: 11 additions & 26 deletions lib/src/internal/events.dart
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class InternalTrackMuteUpdatedEvent with TrackEvent, InternalEvent {

@internal
// Received a JoinResponse from the server.
class SignalJoinResponseEvent with SignalEvent, EngineEvent, InternalEvent {
class SignalJoinResponseEvent with SignalEvent, InternalEvent {
final lk_rtc.JoinResponse response;
const SignalJoinResponseEvent({
required this.response,
Expand All @@ -91,7 +91,7 @@ class SignalJoinResponseEvent with SignalEvent, EngineEvent, InternalEvent {

/// Base class for a ConnectionStateUpdated event
@internal
abstract class ConnectionStateUpdatedEvent with EngineEvent, InternalEvent {
abstract class ConnectionStateUpdatedEvent with InternalEvent {
final ConnectionState newState;
final ConnectionState oldState;
final bool didReconnect;
Expand Down Expand Up @@ -168,17 +168,15 @@ class SignalTrickleEvent with SignalEvent, InternalEvent {

@internal
// relayed by Engine
class SignalParticipantUpdateEvent
with SignalEvent, EngineEvent, InternalEvent {
class SignalParticipantUpdateEvent with SignalEvent, InternalEvent {
final List<lk_models.ParticipantInfo> participants;
const SignalParticipantUpdateEvent({
required this.participants,
});
}

@internal
class SignalConnectionQualityUpdateEvent
with SignalEvent, EngineEvent, InternalEvent {
class SignalConnectionQualityUpdateEvent with SignalEvent, InternalEvent {
final List<lk_rtc.ConnectionQualityInfo> updates;
const SignalConnectionQualityUpdateEvent({
required this.updates,
Expand All @@ -197,7 +195,7 @@ class SignalLocalTrackPublishedEvent with SignalEvent, InternalEvent {
}

@internal
class SignalRoomUpdateEvent with SignalEvent, EngineEvent, InternalEvent {
class SignalRoomUpdateEvent with SignalEvent, InternalEvent {
final lk_models.Room room;

const SignalRoomUpdateEvent({required this.room});
Expand All @@ -206,7 +204,7 @@ class SignalRoomUpdateEvent with SignalEvent, EngineEvent, InternalEvent {
@internal
// Speaker update received through websocket
// relayed by Engine
class SignalSpeakersChangedEvent with SignalEvent, EngineEvent, InternalEvent {
class SignalSpeakersChangedEvent with SignalEvent, InternalEvent {
final List<lk_models.SpeakerInfo> speakers;

const SignalSpeakersChangedEvent({
Expand All @@ -232,27 +230,25 @@ class SignalLeaveEvent with SignalEvent, InternalEvent {
}

@internal
class SignalMuteTrackEvent with SignalEvent, InternalEvent {
class SignalRemoteMuteTrackEvent with SignalEvent, InternalEvent {
final String sid;
final bool muted;
const SignalMuteTrackEvent({
const SignalRemoteMuteTrackEvent({
required this.sid,
required this.muted,
});
}

@internal
class SignalStreamStateUpdatedEvent
with SignalEvent, EngineEvent, InternalEvent {
class SignalStreamStateUpdatedEvent with SignalEvent, InternalEvent {
final List<lk_rtc.StreamStateInfo> updates;
const SignalStreamStateUpdatedEvent({
required this.updates,
});
}

@internal
class SignalSubscribedQualityUpdatedEvent
with SignalEvent, EngineEvent, InternalEvent {
class SignalSubscribedQualityUpdatedEvent with SignalEvent, InternalEvent {
final String trackSid;
final List<lk_rtc.SubscribedQuality> updates;
const SignalSubscribedQualityUpdatedEvent({
Expand All @@ -262,8 +258,7 @@ class SignalSubscribedQualityUpdatedEvent
}

@internal
class SignalSubscriptionPermissionUpdateEvent
with SignalEvent, EngineEvent, InternalEvent {
class SignalSubscriptionPermissionUpdateEvent with SignalEvent, InternalEvent {
final String participantSid;
final String trackSid;
final bool allowed;
Expand Down Expand Up @@ -308,16 +303,6 @@ class EngineDataPacketReceivedEvent with EngineEvent, InternalEvent {
});
}

@internal
class EngineRemoteMuteChangedEvent with EngineEvent, InternalEvent {
final String sid;
final bool muted;
const EngineRemoteMuteChangedEvent({
required this.sid,
required this.muted,
});
}

@internal
abstract class DataChannelStateUpdatedEvent with EngineEvent, InternalEvent {
final bool isPrimary;
Expand Down