Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,9 @@ final roomOptions = RoomOptions(

final room = Room();

// you can use `prepareConnection` to speed up connection.
await room.prepareConnection(url, token);

await room.connect(url, token, roomOptions: roomOptions);

try {
Expand Down
10 changes: 6 additions & 4 deletions example/lib/pages/prejoin.dart
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ class _PreJoinPageState extends State<PreJoinPage> {

try {
//create new room
var cameraEncoding = VideoEncoding(
var cameraEncoding = const VideoEncoding(
maxBitrate: 5 * 1000 * 1000,
maxFramerate: 30,
);

var screenEncoding = VideoEncoding(
var screenEncoding = const VideoEncoding(
maxBitrate: 3 * 1000 * 1000,
maxFramerate: 15,
);
Expand All @@ -189,10 +189,10 @@ class _PreJoinPageState extends State<PreJoinPage> {
defaultAudioPublishOptions: const AudioPublishOptions(
name: 'custom_audio_track_name',
),
defaultCameraCaptureOptions: CameraCaptureOptions(
defaultCameraCaptureOptions: const CameraCaptureOptions(
maxFrameRate: 30,
params: VideoParameters(
dimensions: const VideoDimensions(1280, 720),
dimensions: VideoDimensions(1280, 720),
)),
defaultScreenShareCaptureOptions: const ScreenShareCaptureOptions(
useiOSBroadcastExtension: true,
Expand All @@ -214,6 +214,8 @@ class _PreJoinPageState extends State<PreJoinPage> {
// Create a Listener before connecting
final listener = room.createListener();

await room.prepareConnection(args.url, args.token);

// Try to connect to the room
// This will throw an Exception if it fails for any reason.
await room.connect(
Expand Down
4 changes: 2 additions & 2 deletions example/lib/pages/room.dart
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ class _RoomPageState extends State<RoomPage> {
String decoded = 'Failed to decode';
try {
decoded = utf8.decode(event.data);
} catch (_) {
print('Failed to decode: $_');
} catch (err) {
print('Failed to decode: $err');
}
context.showDataReceivedDialog(decoded);
})
Expand Down
2 changes: 1 addition & 1 deletion example/lib/widgets/controls.dart
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class _ControlsWidgetState extends State<ControlsWidget> {
const androidConfig = FlutterBackgroundAndroidConfig(
notificationTitle: 'Screen Sharing',
notificationText: 'LiveKit Example is sharing the screen.',
notificationImportance: AndroidNotificationImportance.Default,
notificationImportance: AndroidNotificationImportance.normal,
notificationIcon: AndroidResource(
name: 'livekit_ic_launcher', defType: 'mipmap'),
);
Expand Down
145 changes: 95 additions & 50 deletions lib/src/core/engine.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import '../proto/livekit_models.pb.dart' as lk_models;
import '../proto/livekit_rtc.pb.dart' as lk_rtc;
import '../publication/local.dart';
import '../support/disposable.dart';
import '../support/region_url_provider.dart';
import '../support/websocket.dart';
import '../track/local/video.dart';
import '../types/internal.dart';
Expand Down Expand Up @@ -130,6 +131,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {

bool attemptingReconnect = false;

RegionUrlProvider? _regionUrlProvider;

void clearReconnectTimeout() {
if (reconnectTimeout != null) {
reconnectTimeout?.cancel();
Expand Down Expand Up @@ -171,6 +174,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
ConnectOptions? connectOptions,
RoomOptions? roomOptions,
FastConnectOptions? fastConnectOptions,
RegionUrlProvider? regionUrlProvider,
}) async {
this.url = url;
this.token = token;
Expand All @@ -179,6 +183,10 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
this.roomOptions = roomOptions ?? this.roomOptions;
this.fastConnectOptions = fastConnectOptions;

if (regionUrlProvider != null) {
_regionUrlProvider = regionUrlProvider;
}

try {
// wait for socket to connect rtc server
await signalClient.connect(
Expand All @@ -192,7 +200,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await _signalListener.waitFor<SignalJoinResponseEvent>(
duration: this.connectOptions.timeouts.connection,
onTimeout: () => throw ConnectException(
'Timed out waiting for SignalJoinResponseEvent'),
'Timed out waiting for SignalJoinResponseEvent',
reason: ConnectionErrorReason.Timeout),
);

logger.fine('Waiting for engine to connect...');
Expand Down Expand Up @@ -663,6 +672,11 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
));

clearReconnectTimeout();
if (token != null && _regionUrlProvider != null) {
// token may have been refreshed, we do not want to recreate the regionUrlProvider
// since the current engine may have inherited a regional url
_regionUrlProvider!.updateToken(token!);
}
logger.fine(
'WebSocket reconnecting in $delay ms, retry times $reconnectAttempts');
reconnectTimeout = Timer(Duration(milliseconds: delay), () async {
Expand Down Expand Up @@ -700,7 +714,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
duration: connectOptions.timeouts.connection * 10,
filter: (event) => !event.state.contains(ConnectivityResult.none),
onTimeout: () => throw ConnectException(
'attemptReconnect: Timed out waiting for SignalConnectivityChangedEvent'),
'attemptReconnect: Timed out waiting for SignalConnectivityChangedEvent',
reason: ConnectionErrorReason.Timeout),
);
}

Expand Down Expand Up @@ -756,7 +771,8 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await events.waitFor<SignalReconnectedEvent>(
duration: connectOptions.timeouts.connection,
onTimeout: () => throw ConnectException(
'resumeConnection: Timed out waiting for SignalReconnectedEvent'),
'resumeConnection: Timed out waiting for SignalReconnectedEvent',
reason: ConnectionErrorReason.Timeout),
);

logger.fine('resumeConnection: reason: ${reason.name}');
Expand Down Expand Up @@ -789,53 +805,65 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
}

@internal
Future<void> restartConnection([bool signalEvents = false]) async {
Future<void> restartConnection({String? regionUrl}) async {
if (_isClosed) {
return;
}

events.emit(const EngineFullRestartingEvent());
try {
events.emit(const EngineFullRestartingEvent());

if (signalClient.connectionState == ConnectionState.connected) {
await signalClient.sendLeave();
}
if (signalClient.connectionState == ConnectionState.connected) {
await signalClient.sendLeave();
}

await publisher?.dispose();
publisher = null;
await publisher?.dispose();
publisher = null;

await subscriber?.dispose();
subscriber = null;
await subscriber?.dispose();
subscriber = null;

_reliableDCSub = null;
_reliableDCPub = null;
_lossyDCSub = null;
_lossyDCPub = null;
_reliableDCSub = null;
_reliableDCPub = null;
_lossyDCSub = null;
_lossyDCPub = null;

await _signalListener.cancelAll();
await _signalListener.cancelAll();

_signalListener = signalClient.createListener(synchronized: true);
_setUpSignalListeners();
_signalListener = signalClient.createListener(synchronized: true);
_setUpSignalListeners();

await connect(
url!,
token!,
roomOptions: roomOptions,
connectOptions: connectOptions,
fastConnectOptions: fastConnectOptions,
);

if (_hasPublished) {
await negotiate();
logger.fine('restartConnection: Waiting for publisher to ice-connect...');
await events.waitFor<EnginePublisherPeerStateUpdatedEvent>(
filter: (event) => event.state.isConnected(),
duration: connectOptions.timeouts.peerConnection,
await connect(
regionUrl ?? url!,
token!,
roomOptions: roomOptions,
connectOptions: connectOptions,
fastConnectOptions: fastConnectOptions,
);
}

fullReconnectOnNext = false;

events.emit(const EngineRestartedEvent());
if (_hasPublished) {
await negotiate();
logger
.fine('restartConnection: Waiting for publisher to ice-connect...');
await events.waitFor<EnginePublisherPeerStateUpdatedEvent>(
filter: (event) => event.state.isConnected(),
duration: connectOptions.timeouts.peerConnection,
);
}
fullReconnectOnNext = false;
_regionUrlProvider?.resetAttempts();
events.emit(const EngineRestartedEvent());
} catch (error) {
final nextRegionUrl = await _regionUrlProvider?.getNextBestRegionUrl();
if (nextRegionUrl != null) {
await restartConnection(regionUrl: nextRegionUrl);
return;
} else {
// no more regions to try (or we're not on cloud)
_regionUrlProvider?.resetAttempts();
rethrow;
}
}
}

@internal
Expand Down Expand Up @@ -992,19 +1020,32 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
token = event.token;
})
..on<SignalLeaveEvent>((event) async {
if (event.canReconnect) {
fullReconnectOnNext = true;
// reconnect immediately instead of waiting for next attempt
await handleDisconnect(ClientDisconnectReason.leaveReconnect);
} else {
if (connectionState == ConnectionState.reconnecting) {
logger.warning(
'[Signal] Received Leave while engine is reconnecting, ignoring...');
return;
}
await signalClient.cleanUp();
await cleanUp();
events.emit(EngineDisconnectedEvent(reason: event.reason.toSDKType()));
if (event.regions != null && _regionUrlProvider != null) {
logger.fine('updating regions');
_regionUrlProvider?.setServerReportedRegions(event.regions!);
}
switch (event.action) {
case lk_rtc.LeaveRequest_Action.DISCONNECT:
if (connectionState == ConnectionState.reconnecting) {
logger.warning(
'[Signal] Received Leave while engine is reconnecting, ignoring...');
return;
}
await signalClient.cleanUp();
await cleanUp();
events
.emit(EngineDisconnectedEvent(reason: event.reason.toSDKType()));
break;
case lk_rtc.LeaveRequest_Action.RECONNECT:
fullReconnectOnNext = true;
// reconnect immediately instead of waiting for next attempt
await handleDisconnect(ClientDisconnectReason.leaveReconnect);
break;
case lk_rtc.LeaveRequest_Action.RESUME:
// reconnect immediately instead of waiting for next attempt
await handleDisconnect(ClientDisconnectReason.leaveReconnect);
default:
break;
}
});

Expand All @@ -1016,6 +1057,10 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
await cleanUp();
}
}

void setRegionUrlProvider(RegionUrlProvider provider) {
_regionUrlProvider = provider;
}
}

extension EnginePrivateMethods on Engine {
Expand Down
Loading
Loading