Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 75 additions & 56 deletions lib/src/core/engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,11 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
PCTransport? get primary => _subscriberPrimary ? subscriber : publisher;

// data channels for packets
rtc.RTCDataChannel? _reliableDC;
rtc.RTCDataChannel? _lossyDC;
rtc.RTCDataChannel? _reliableDCPub;
rtc.RTCDataChannel? _lossyDCPub;
rtc.RTCDataChannel? _reliableDCSub;
rtc.RTCDataChannel? _lossyDCSub;

rtc.RTCDataChannelState get reliableDataChannelState =>
_reliableDC?.state ?? rtc.RTCDataChannelState.RTCDataChannelClosed;

rtc.RTCDataChannelState get lossyDataChannelState =>
_lossyDC?.state ?? rtc.RTCDataChannelState.RTCDataChannelClosed;
bool _iceConnected = false;

ConnectionState _connectionState = ConnectionState.disconnected;
Expand Down Expand Up @@ -192,49 +187,61 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
Future<void> sendDataPacket(
lk_models.DataPacket packet,
) async {
// make sure we do have a data connection
await _ensurePublisherConnected();
//
rtc.RTCDataChannel? publisherDataChannel(Reliability reliability) =>
reliability == Reliability.reliable ? _reliableDCPub : _lossyDCPub;

rtc.RTCDataChannelState publisherDataChannelState(
Reliability reliability) =>
publisherDataChannel(reliability)?.state ??
rtc.RTCDataChannelState.RTCDataChannelClosed;

final reliability = packet.kind.toSDKType();

// construct the data channel message
final message =
rtc.RTCDataChannelMessage.fromBinary(packet.writeToBuffer());

// chose data channel
final rtc.RTCDataChannel? channel =
packet.kind == lk_models.DataPacket_Kind.LOSSY ? _lossyDC : _reliableDC;
if (_subscriberPrimary) {
// make sure publisher transport is connected

// send if channel exists
if (channel == null) {
throw UnexpectedStateException('Data channel is not ready');
}
if (publisher?.pc.iceConnectionState?.isConnected() != true) {
logger.fine('Publisher is not connected...');
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if there are a bunch of data channel messages sent by the app in this state? Are all of them becoming promises and get sent (promises resolved) when the data channel connects? Or are the messages just dropped on the floor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

promises should be pending until waitFor condition clears (received notification) or timeouts.
actually maybe there should be more checks to prevent negotiate() being called multiple times.. 🤔
(not sure how immediate the state changes to RTCIceConnectionStateChecking)


logger.fine('sendDataPacket(label:${channel.label})');
await channel.send(message);
}
// start negotiation
if (publisher?.pc.iceConnectionState !=
rtc.RTCIceConnectionState.RTCIceConnectionStateChecking) {
await negotiate();
}

Future<void> _ensurePublisherConnected() async {
logger.fine('ensurePublisherConnected()');
if (!_subscriberPrimary) {
return;
}
logger.fine('Waiting for publisher to ice-connect...');
await events.waitFor<EnginePublisherIceStateUpdatedEvent>(
filter: (event) => event.iceState.isConnected(),
duration: Timeouts.iceConnection,
);
}

if (publisher?.pc.iceConnectionState?.isConnected() == true) {
logger.warning('[$objectId] publisher is already connected');
return;
// wait for data channel to open (if not already)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where it waits for open state of the data channel.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some callback to indicate data channel open failure if it fails for some reason?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitFor will throw a TimeoutException if it timeouts.

if (publisherDataChannelState(packet.kind.toSDKType()) !=
rtc.RTCDataChannelState.RTCDataChannelOpen) {
logger.fine('Waiting for data channel ${reliability} to open...');
await events.waitFor<PublisherDataChannelStateUpdatedEvent>(
filter: (event) => event.type == reliability,
duration: Timeouts.connection,
);
}
}

// start negotiation
await negotiate();

logger.fine('[PUBLISHER] waiting for to ice-connect '
'(current: ${publisher?.pc.iceConnectionState})');
// chose data channel
final rtc.RTCDataChannel? channel = publisherDataChannel(reliability);

await events.waitFor<EnginePublisherIceStateUpdatedEvent>(
filter: (event) => event.iceState.isConnected(),
duration: Timeouts.iceConnection,
);
if (channel == null) {
throw UnexpectedStateException(
'Data channel for ${packet.kind.toSDKType()} is null');
}

logger.fine('[PUBLISHER] connected');
logger.fine('sendDataPacket(label:${channel.label})');
await channel.send(message);
}

@internal
Expand Down Expand Up @@ -421,11 +428,16 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
..binaryType = 'binary'
..ordered = true
..maxRetransmits = 0;
_lossyDC =
_lossyDCPub =
await publisher?.pc.createDataChannel(_lossyDCLabel, lossyInit);
_lossyDC?.onMessage = _onDCMessage;
_lossyDC?.stateChangeStream
.listen((state) => _onDCStateUpdated(Reliability.lossy, state));
_lossyDCPub?.onMessage = _onDCMessage;
_lossyDCPub?.stateChangeStream
.listen((state) => events.emit(PublisherDataChannelStateUpdatedEvent(
isPrimary: !_subscriberPrimary,
state: state,
type: Reliability.lossy,
)));
// _onDCStateUpdated(Reliability.lossy, state)
} catch (_) {
logger.severe('[$objectId] createDataChannel() did throw $_');
}
Expand All @@ -434,11 +446,15 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
final reliableInit = rtc.RTCDataChannelInit()
..binaryType = 'binary'
..ordered = true;
_reliableDC =
_reliableDCPub =
await publisher?.pc.createDataChannel(_reliableDCLabel, reliableInit);
_reliableDC?.onMessage = _onDCMessage;
_reliableDC?.stateChangeStream
.listen((state) => _onDCStateUpdated(Reliability.reliable, state));
_reliableDCPub?.onMessage = _onDCMessage;
_reliableDCPub?.stateChangeStream
.listen((state) => events.emit(PublisherDataChannelStateUpdatedEvent(
isPrimary: !_subscriberPrimary,
state: state,
type: Reliability.reliable,
)));
} catch (_) {
logger.severe('[$objectId] createDataChannel() did throw $_');
}
Expand All @@ -450,29 +466,32 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
logger.fine('Server opened DC label: ${dc.label}');
_reliableDCSub = dc;
_reliableDCSub?.onMessage = _onDCMessage;
_reliableDCSub?.stateChangeStream
.listen((state) => _onDCStateUpdated(Reliability.reliable, state));
_reliableDCSub?.stateChangeStream.listen((state) =>
_reliableDCPub?.stateChangeStream.listen(
(state) => events.emit(SubscriberDataChannelStateUpdatedEvent(
isPrimary: _subscriberPrimary,
state: state,
type: Reliability.reliable,
))));
break;
case _lossyDCLabel:
logger.fine('Server opened DC label: ${dc.label}');
_lossyDCSub = dc;
_lossyDCSub?.onMessage = _onDCMessage;
_lossyDCSub?.stateChangeStream
.listen((event) => _onDCStateUpdated(Reliability.lossy, event));
_lossyDCSub?.stateChangeStream.listen((event) =>
_reliableDCPub?.stateChangeStream.listen(
(state) => events.emit(SubscriberDataChannelStateUpdatedEvent(
isPrimary: _subscriberPrimary,
state: state,
type: Reliability.lossy,
))));
break;
default:
logger.warning('Unknown DC label: ${dc.label}');
break;
}
}

void _onDCStateUpdated(
Reliability channel,
rtc.RTCDataChannelState state,
) {
logger.fine('Data channel state updated ${channel} ${state}');
}

void _onDCMessage(rtc.RTCDataChannelMessage message) {
// always expect binary
if (!message.isBinary) {
Expand Down
48 changes: 48 additions & 0 deletions lib/src/internal/events.dart
Original file line number Diff line number Diff line change
Expand Up @@ -210,22 +210,28 @@ class SignalSubscribedQualityUpdatedEvent
// ----------------------------------------------------------------------
// Engine events
// ----------------------------------------------------------------------

@internal
class EngineConnectedEvent with EngineEvent, InternalEvent {
const EngineConnectedEvent();
}

@internal
class EngineDisconnectedEvent with EngineEvent, InternalEvent {
const EngineDisconnectedEvent();
}

@internal
class EngineReconnectingEvent with EngineEvent, InternalEvent {
const EngineReconnectingEvent();
}

@internal
class EngineReconnectedEvent with EngineEvent, InternalEvent {
const EngineReconnectedEvent();
}

@internal
class EngineTrackAddedEvent with EngineEvent, InternalEvent {
final rtc.MediaStreamTrack track;
final rtc.MediaStream stream;
Expand All @@ -237,6 +243,7 @@ class EngineTrackAddedEvent with EngineEvent, InternalEvent {
});
}

@internal
class EngineDataPacketReceivedEvent with EngineEvent, InternalEvent {
final lk_models.UserPacket packet;
final lk_models.DataPacket_Kind kind;
Expand All @@ -246,6 +253,7 @@ class EngineDataPacketReceivedEvent with EngineEvent, InternalEvent {
});
}

@internal
class EngineRemoteMuteChangedEvent with EngineEvent, InternalEvent {
final String sid;
final bool muted;
Expand All @@ -254,3 +262,43 @@ class EngineRemoteMuteChangedEvent with EngineEvent, InternalEvent {
required this.muted,
});
}

@internal
abstract class DataChannelStateUpdatedEvent with EngineEvent, InternalEvent {
final bool isPrimary;
final Reliability type;
final rtc.RTCDataChannelState state;
const DataChannelStateUpdatedEvent({
required this.isPrimary,
required this.type,
required this.state,
});
}

@internal
class PublisherDataChannelStateUpdatedEvent
extends DataChannelStateUpdatedEvent {
PublisherDataChannelStateUpdatedEvent({
required bool isPrimary,
required Reliability type,
required rtc.RTCDataChannelState state,
}) : super(
isPrimary: isPrimary,
type: type,
state: state,
);
}

@internal
class SubscriberDataChannelStateUpdatedEvent
extends DataChannelStateUpdatedEvent {
SubscriberDataChannelStateUpdatedEvent({
required bool isPrimary,
required Reliability type,
required rtc.RTCDataChannelState state,
}) : super(
isPrimary: isPrimary,
type: type,
state: state,
);
}