diff --git a/lib/src/core/engine.dart b/lib/src/core/engine.dart index 455ef7785..b536af96b 100644 --- a/lib/src/core/engine.dart +++ b/lib/src/core/engine.dart @@ -43,16 +43,11 @@ class Engine extends Disposable with EventsEmittable { PCTransport? get primary => _subscriberPrimary ? subscriber : publisher; // data channels for packets - rtc.RTCDataChannel? _reliableDC; - rtc.RTCDataChannel? _lossyDC; + rtc.RTCDataChannel? _reliableDCPub; + rtc.RTCDataChannel? _lossyDCPub; rtc.RTCDataChannel? _reliableDCSub; rtc.RTCDataChannel? _lossyDCSub; - rtc.RTCDataChannelState get reliableDataChannelState => - _reliableDC?.state ?? rtc.RTCDataChannelState.RTCDataChannelClosed; - - rtc.RTCDataChannelState get lossyDataChannelState => - _lossyDC?.state ?? rtc.RTCDataChannelState.RTCDataChannelClosed; bool _iceConnected = false; ConnectionState _connectionState = ConnectionState.disconnected; @@ -192,49 +187,61 @@ class Engine extends Disposable with EventsEmittable { Future sendDataPacket( lk_models.DataPacket packet, ) async { - // make sure we do have a data connection - await _ensurePublisherConnected(); + // + rtc.RTCDataChannel? publisherDataChannel(Reliability reliability) => + reliability == Reliability.reliable ? _reliableDCPub : _lossyDCPub; + + rtc.RTCDataChannelState publisherDataChannelState( + Reliability reliability) => + publisherDataChannel(reliability)?.state ?? + rtc.RTCDataChannelState.RTCDataChannelClosed; + + final reliability = packet.kind.toSDKType(); // construct the data channel message final message = rtc.RTCDataChannelMessage.fromBinary(packet.writeToBuffer()); - // chose data channel - final rtc.RTCDataChannel? channel = - packet.kind == lk_models.DataPacket_Kind.LOSSY ? _lossyDC : _reliableDC; + if (_subscriberPrimary) { + // make sure publisher transport is connected - // send if channel exists - if (channel == null) { - throw UnexpectedStateException('Data channel is not ready'); - } + if (publisher?.pc.iceConnectionState?.isConnected() != true) { + logger.fine('Publisher is not connected...'); - logger.fine('sendDataPacket(label:${channel.label})'); - await channel.send(message); - } + // start negotiation + if (publisher?.pc.iceConnectionState != + rtc.RTCIceConnectionState.RTCIceConnectionStateChecking) { + await negotiate(); + } - Future _ensurePublisherConnected() async { - logger.fine('ensurePublisherConnected()'); - if (!_subscriberPrimary) { - return; - } + logger.fine('Waiting for publisher to ice-connect...'); + await events.waitFor( + filter: (event) => event.iceState.isConnected(), + duration: Timeouts.iceConnection, + ); + } - if (publisher?.pc.iceConnectionState?.isConnected() == true) { - logger.warning('[$objectId] publisher is already connected'); - return; + // wait for data channel to open (if not already) + if (publisherDataChannelState(packet.kind.toSDKType()) != + rtc.RTCDataChannelState.RTCDataChannelOpen) { + logger.fine('Waiting for data channel ${reliability} to open...'); + await events.waitFor( + filter: (event) => event.type == reliability, + duration: Timeouts.connection, + ); + } } - // start negotiation - await negotiate(); - - logger.fine('[PUBLISHER] waiting for to ice-connect ' - '(current: ${publisher?.pc.iceConnectionState})'); + // chose data channel + final rtc.RTCDataChannel? channel = publisherDataChannel(reliability); - await events.waitFor( - filter: (event) => event.iceState.isConnected(), - duration: Timeouts.iceConnection, - ); + if (channel == null) { + throw UnexpectedStateException( + 'Data channel for ${packet.kind.toSDKType()} is null'); + } - logger.fine('[PUBLISHER] connected'); + logger.fine('sendDataPacket(label:${channel.label})'); + await channel.send(message); } @internal @@ -421,11 +428,16 @@ class Engine extends Disposable with EventsEmittable { ..binaryType = 'binary' ..ordered = true ..maxRetransmits = 0; - _lossyDC = + _lossyDCPub = await publisher?.pc.createDataChannel(_lossyDCLabel, lossyInit); - _lossyDC?.onMessage = _onDCMessage; - _lossyDC?.stateChangeStream - .listen((state) => _onDCStateUpdated(Reliability.lossy, state)); + _lossyDCPub?.onMessage = _onDCMessage; + _lossyDCPub?.stateChangeStream + .listen((state) => events.emit(PublisherDataChannelStateUpdatedEvent( + isPrimary: !_subscriberPrimary, + state: state, + type: Reliability.lossy, + ))); + // _onDCStateUpdated(Reliability.lossy, state) } catch (_) { logger.severe('[$objectId] createDataChannel() did throw $_'); } @@ -434,11 +446,15 @@ class Engine extends Disposable with EventsEmittable { final reliableInit = rtc.RTCDataChannelInit() ..binaryType = 'binary' ..ordered = true; - _reliableDC = + _reliableDCPub = await publisher?.pc.createDataChannel(_reliableDCLabel, reliableInit); - _reliableDC?.onMessage = _onDCMessage; - _reliableDC?.stateChangeStream - .listen((state) => _onDCStateUpdated(Reliability.reliable, state)); + _reliableDCPub?.onMessage = _onDCMessage; + _reliableDCPub?.stateChangeStream + .listen((state) => events.emit(PublisherDataChannelStateUpdatedEvent( + isPrimary: !_subscriberPrimary, + state: state, + type: Reliability.reliable, + ))); } catch (_) { logger.severe('[$objectId] createDataChannel() did throw $_'); } @@ -450,15 +466,25 @@ class Engine extends Disposable with EventsEmittable { logger.fine('Server opened DC label: ${dc.label}'); _reliableDCSub = dc; _reliableDCSub?.onMessage = _onDCMessage; - _reliableDCSub?.stateChangeStream - .listen((state) => _onDCStateUpdated(Reliability.reliable, state)); + _reliableDCSub?.stateChangeStream.listen((state) => + _reliableDCPub?.stateChangeStream.listen( + (state) => events.emit(SubscriberDataChannelStateUpdatedEvent( + isPrimary: _subscriberPrimary, + state: state, + type: Reliability.reliable, + )))); break; case _lossyDCLabel: logger.fine('Server opened DC label: ${dc.label}'); _lossyDCSub = dc; _lossyDCSub?.onMessage = _onDCMessage; - _lossyDCSub?.stateChangeStream - .listen((event) => _onDCStateUpdated(Reliability.lossy, event)); + _lossyDCSub?.stateChangeStream.listen((event) => + _reliableDCPub?.stateChangeStream.listen( + (state) => events.emit(SubscriberDataChannelStateUpdatedEvent( + isPrimary: _subscriberPrimary, + state: state, + type: Reliability.lossy, + )))); break; default: logger.warning('Unknown DC label: ${dc.label}'); @@ -466,13 +492,6 @@ class Engine extends Disposable with EventsEmittable { } } - void _onDCStateUpdated( - Reliability channel, - rtc.RTCDataChannelState state, - ) { - logger.fine('Data channel state updated ${channel} ${state}'); - } - void _onDCMessage(rtc.RTCDataChannelMessage message) { // always expect binary if (!message.isBinary) { diff --git a/lib/src/internal/events.dart b/lib/src/internal/events.dart index 4443a22fb..fcd61d18e 100644 --- a/lib/src/internal/events.dart +++ b/lib/src/internal/events.dart @@ -210,22 +210,28 @@ class SignalSubscribedQualityUpdatedEvent // ---------------------------------------------------------------------- // Engine events // ---------------------------------------------------------------------- + +@internal class EngineConnectedEvent with EngineEvent, InternalEvent { const EngineConnectedEvent(); } +@internal class EngineDisconnectedEvent with EngineEvent, InternalEvent { const EngineDisconnectedEvent(); } +@internal class EngineReconnectingEvent with EngineEvent, InternalEvent { const EngineReconnectingEvent(); } +@internal class EngineReconnectedEvent with EngineEvent, InternalEvent { const EngineReconnectedEvent(); } +@internal class EngineTrackAddedEvent with EngineEvent, InternalEvent { final rtc.MediaStreamTrack track; final rtc.MediaStream stream; @@ -237,6 +243,7 @@ class EngineTrackAddedEvent with EngineEvent, InternalEvent { }); } +@internal class EngineDataPacketReceivedEvent with EngineEvent, InternalEvent { final lk_models.UserPacket packet; final lk_models.DataPacket_Kind kind; @@ -246,6 +253,7 @@ class EngineDataPacketReceivedEvent with EngineEvent, InternalEvent { }); } +@internal class EngineRemoteMuteChangedEvent with EngineEvent, InternalEvent { final String sid; final bool muted; @@ -254,3 +262,43 @@ class EngineRemoteMuteChangedEvent with EngineEvent, InternalEvent { required this.muted, }); } + +@internal +abstract class DataChannelStateUpdatedEvent with EngineEvent, InternalEvent { + final bool isPrimary; + final Reliability type; + final rtc.RTCDataChannelState state; + const DataChannelStateUpdatedEvent({ + required this.isPrimary, + required this.type, + required this.state, + }); +} + +@internal +class PublisherDataChannelStateUpdatedEvent + extends DataChannelStateUpdatedEvent { + PublisherDataChannelStateUpdatedEvent({ + required bool isPrimary, + required Reliability type, + required rtc.RTCDataChannelState state, + }) : super( + isPrimary: isPrimary, + type: type, + state: state, + ); +} + +@internal +class SubscriberDataChannelStateUpdatedEvent + extends DataChannelStateUpdatedEvent { + SubscriberDataChannelStateUpdatedEvent({ + required bool isPrimary, + required Reliability type, + required rtc.RTCDataChannelState state, + }) : super( + isPrimary: isPrimary, + type: type, + state: state, + ); +}