diff --git a/example/android/app/src/main/AndroidManifest.xml b/example/android/app/src/main/AndroidManifest.xml index e439f3fbf..d69062758 100644 --- a/example/android/app/src/main/AndroidManifest.xml +++ b/example/android/app/src/main/AndroidManifest.xml @@ -10,7 +10,7 @@ + android:icon="@mipmap/livekit_ic_launcher"> - - - - + + - - CFBundleInfoDictionaryVersion 6.0 CFBundleName - livekit_example + LiveKit CFBundlePackageType APPL CFBundleShortVersionString diff --git a/example/lib/exts.dart b/example/lib/exts.dart index 45a362f74..5068044d4 100644 --- a/example/lib/exts.dart +++ b/example/lib/exts.dart @@ -20,6 +20,24 @@ extension LKExampleExt on BuildContext { ), ); + Future showUnPublishDialog() => showDialog( + context: this, + builder: (ctx) => AlertDialog( + title: const Text('UnPublish'), + content: const Text('Would you like to un-publish your Camera & Mic ?'), + actions: [ + TextButton( + onPressed: () => Navigator.pop(ctx, false), + child: const Text('NO'), + ), + TextButton( + onPressed: () => Navigator.pop(ctx, true), + child: const Text('YES'), + ), + ], + ), + ); + Future showErrorDialog(dynamic exception) => showDialog( context: this, builder: (ctx) => AlertDialog( diff --git a/example/lib/main.dart b/example/lib/main.dart index 54ba4ef55..e5816b6a9 100644 --- a/example/lib/main.dart +++ b/example/lib/main.dart @@ -1,16 +1,14 @@ import 'package:flutter/material.dart'; -import 'package:livekit_client/livekit_client.dart'; import 'package:livekit_example/theme.dart'; import 'package:logging/logging.dart'; import 'pages/connect.dart'; void main() { - print('This is a test for ${SignalTrickleEvent} test.'); // configure logs for debugging Logger.root.level = Level.FINE; Logger.root.onRecord.listen((record) { - print('${record.level.name}: ${record.time}: ${record.message}'); + print('${record.level.name}: ${record.message}'); }); WidgetsFlutterBinding.ensureInitialized(); diff --git a/example/lib/pages/room.dart b/example/lib/pages/room.dart index b1616e89d..53b418131 100644 --- a/example/lib/pages/room.dart +++ b/example/lib/pages/room.dart @@ -1,6 +1,7 @@ import 'dart:convert'; import 'dart:math' as math; +import 'package:flutter/cupertino.dart'; import 'package:flutter/material.dart'; import 'package:flutter/widgets.dart'; import 'package:livekit_client/livekit_client.dart'; @@ -25,7 +26,7 @@ class RoomPage extends StatefulWidget { class _RoomPageState extends State { // List participants = []; - late final _listener = EventsListener(widget.room.events); + late final EventsListener _listener = widget.room.createListener(); @override void initState() { @@ -48,7 +49,9 @@ class _RoomPageState extends State { } void _setUpListeners() => _listener - ..on((_) => Navigator.pop(context)) + ..on((_) async { + WidgetsBinding.instance?.addPostFrameCallback((timeStamp) => Navigator.pop(context)); + }) ..on((event) { String decoded = 'Failed to decode'; try { diff --git a/example/lib/widgets/controls.dart b/example/lib/widgets/controls.dart index 0ba975dac..3be30cab8 100644 --- a/example/lib/widgets/controls.dart +++ b/example/lib/widgets/controls.dart @@ -1,9 +1,10 @@ import 'dart:convert'; +import 'package:collection/collection.dart'; import 'package:eva_icons_flutter/eva_icons_flutter.dart'; import 'package:flutter/material.dart'; import 'package:livekit_client/livekit_client.dart'; -import 'package:collection/collection.dart'; + import '../exts.dart'; class ControlsWidget extends StatefulWidget { @@ -44,6 +45,15 @@ class _ControlsWidgetState extends State { setState(() {}); } + void _unpublishAll() async { + final result = await context.showUnPublishDialog(); + if (result == true) { + await participant.unpublishAllTracks(); + // Force to update UI for now + participant.notifyListeners(); + } + } + void _muteAudio() { if (participant.hasAudio) { final audioPub = participant.audioTracks.first; @@ -146,6 +156,10 @@ class _ControlsWidgetState extends State { return Row( mainAxisAlignment: MainAxisAlignment.center, children: [ + IconButton( + onPressed: _unpublishAll, + icon: const Icon(EvaIcons.closeCircleOutline), + ), if (canMute) IconButton( onPressed: _muteAudio, diff --git a/example/pubspec.lock b/example/pubspec.lock index 68cbe27f1..c04b21609 100644 --- a/example/pubspec.lock +++ b/example/pubspec.lock @@ -111,11 +111,11 @@ packages: dependency: transitive description: path: "." - ref: use-custom-webrtc-build - resolved-ref: "4942e7faec2e5775d35c42e22c2929ca6ca53769" - url: "https://github.com/livekit/flutter-webrtc" + ref: master + resolved-ref: "32d83d85fa2faebcd37a19534f8a84273b12cbce" + url: "https://github.com/flutter-webrtc/flutter-webrtc" source: git - version: "0.6.7" + version: "0.6.8" google_fonts: dependency: "direct main" description: diff --git a/example/web/favicon.png b/example/web/favicon.png index 8aaa46ac1..7437724ee 100644 Binary files a/example/web/favicon.png and b/example/web/favicon.png differ diff --git a/example/web/icons/Icon-192.png b/example/web/icons/Icon-192.png index b749bfef0..ba48301fe 100644 Binary files a/example/web/icons/Icon-192.png and b/example/web/icons/Icon-192.png differ diff --git a/example/web/icons/Icon-512.png b/example/web/icons/Icon-512.png index 88cfd48df..606087760 100644 Binary files a/example/web/icons/Icon-512.png and b/example/web/icons/Icon-512.png differ diff --git a/ios/.gitignore b/ios/.gitignore new file mode 100644 index 000000000..0c885071e --- /dev/null +++ b/ios/.gitignore @@ -0,0 +1,38 @@ +.idea/ +.vagrant/ +.sconsign.dblite +.svn/ + +.DS_Store +*.swp +profile + +DerivedData/ +build/ +GeneratedPluginRegistrant.h +GeneratedPluginRegistrant.m + +.generated/ + +*.pbxuser +*.mode1v3 +*.mode2v3 +*.perspectivev3 + +!default.pbxuser +!default.mode1v3 +!default.mode2v3 +!default.perspectivev3 + +xcuserdata + +*.moved-aside + +*.pyc +*sync/ +Icon? +.tags* + +/Flutter/Generated.xcconfig +/Flutter/ephemeral/ +/Flutter/flutter_export_environment.sh \ No newline at end of file diff --git a/ios/Assets/.gitkeep b/ios/Assets/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/ios/Classes/LivekitClientPlugin.h b/ios/Classes/LivekitClientPlugin.h new file mode 100644 index 000000000..1219c0d84 --- /dev/null +++ b/ios/Classes/LivekitClientPlugin.h @@ -0,0 +1,4 @@ +#import + +@interface LiveKitClientPlugin : NSObject +@end diff --git a/ios/Classes/LivekitClientPlugin.m b/ios/Classes/LivekitClientPlugin.m new file mode 100644 index 000000000..670eb810c --- /dev/null +++ b/ios/Classes/LivekitClientPlugin.m @@ -0,0 +1,15 @@ +#import "LiveKitClientPlugin.h" +#if __has_include() +#import +#else +// Support project import fallback if the generated compatibility header +// is not copied when this plugin is created as a library. +// https://forums.swift.org/t/swift-static-libraries-dont-copy-generated-objective-c-header/19816 +#import "livekit_client-Swift.h" +#endif + +@implementation LiveKitClientPlugin ++ (void)registerWithRegistrar:(NSObject*)registrar { + [SwiftLiveKitClientPlugin registerWithRegistrar:registrar]; +} +@end diff --git a/ios/Classes/SwiftLiveKitClientPlugin.swift b/ios/Classes/SwiftLiveKitClientPlugin.swift new file mode 100644 index 000000000..14aa2bea1 --- /dev/null +++ b/ios/Classes/SwiftLiveKitClientPlugin.swift @@ -0,0 +1,114 @@ +import Flutter +import UIKit +import WebRTC + +public class SwiftLiveKitClientPlugin: NSObject, FlutterPlugin { + public static func register(with registrar: FlutterPluginRegistrar) { + let channel = FlutterMethodChannel(name: "livekit_client", binaryMessenger: registrar.messenger()) + let instance = SwiftLiveKitClientPlugin() + registrar.addMethodCallDelegate(instance, channel: channel) + } + + // https://developer.apple.com/documentation/avfaudio/avaudiosession/category + let categoryMap: [String: AVAudioSession.Category] = [ + "ambient": .ambient, + "multiRoute": .multiRoute, + "playAndRecord": .playAndRecord, + "playback": .playback, + "record": .record, + "soloAmbient": .soloAmbient, + ] + + // https://developer.apple.com/documentation/avfaudio/avaudiosession/categoryoptions + let categoryOptionsMap: [String: AVAudioSession.CategoryOptions] = [ + "mixWithOthers": .mixWithOthers, + "duckOthers": .duckOthers, + "interruptSpokenAudioAndMixWithOthers": .interruptSpokenAudioAndMixWithOthers, + "allowBluetooth": .allowBluetooth, + "allowBluetoothA2DP": .allowBluetoothA2DP, + "allowAirPlay": .allowAirPlay, + "defaultToSpeaker": .defaultToSpeaker, +// @available(iOS 14.5, *) +// "overrideMutedMicrophoneInterruption": .overrideMutedMicrophoneInterruption, + ] + + // https://developer.apple.com/documentation/avfaudio/avaudiosession/mode + let modeMap: [String: AVAudioSession.Mode] = [ + "default": .default, + "gameChat": .gameChat, + "measurement": .measurement, + "moviePlayback": .moviePlayback, + "spokenAudio": .spokenAudio, + "videoChat": .videoChat, + "videoRecording": .videoRecording, + "voiceChat": .voiceChat, + "voicePrompt": .voicePrompt, + ] + + private func categoryOptions(fromFlutter options: [String]) -> AVAudioSession.CategoryOptions { + var result: AVAudioSession.CategoryOptions = [] + for option in categoryOptionsMap { + if (options.contains(option.key)) { + result.insert(option.value) + } + } + return result + } + + public func handleConfigureNativeAudio(args: [String: Any?], result: @escaping FlutterResult) { + + let configuration = RTCAudioSessionConfiguration.webRTC() + + // Category + if let string = args["appleAudioCategory"] as? String, + let category = categoryMap[string] { + configuration.category = category.rawValue + print("[LiveKit] Configuring category: ", configuration.category) + } + + // CategoryOptions + if let strings = args["appleAudioCategoryOptions"] as? [String] { + configuration.categoryOptions = categoryOptions(fromFlutter: strings) + print("[LiveKit] Configuring categoryOptions: ", strings) + } + + // Mode + if let string = args["appleAudioMode"] as? String, + let mode = modeMap[string] { + configuration.mode = mode.rawValue + print("[LiveKit] Configuring mode: ", configuration.mode) + } + + let session = RTCAudioSession.sharedInstance() + session.lockForConfiguration() + defer { + session.unlockForConfiguration() + } + + do { + try session.setConfiguration(configuration, active: true) + print("[LiveKit] Configure success") + result(true) + } catch let error { + print("[LiveKit] Configure audio error: ", error) + result(FlutterError(code: "configure", message: error.localizedDescription, details: nil)) + } + } + + public func handle(_ call: FlutterMethodCall, result: @escaping FlutterResult) { + + guard let args = call.arguments as? [String: Any?] else { + print("[LiveKit] arguments must be a dictionary") + result(FlutterMethodNotImplemented) + return + } + + switch call.method { + case "configureNativeAudio": + handleConfigureNativeAudio(args: args, result: result) + default: + print("[LiveKit] method not found: ", call.method) + result(FlutterMethodNotImplemented) + } + } +} diff --git a/ios/livekit_client.podspec b/ios/livekit_client.podspec new file mode 100644 index 000000000..ddb698c39 --- /dev/null +++ b/ios/livekit_client.podspec @@ -0,0 +1,26 @@ +# +# LiveKit +# https://livekit.io/ +# https://github.com/livekit +# + +Pod::Spec.new do |s| + s.name = 'livekit_client' + s.version = '0.0.1' + s.summary = 'Open source platform for real-time audio and video.' + s.description = 'Open source platform for real-time audio and video.' + s.homepage = 'https://livekit.io/' + s.license = { :file => '../LICENSE' } + s.author = { 'LiveKit' => 'contact@livekit.io' } + s.source = { :path => '.' } + s.source_files = 'Classes/**/*' + s.public_header_files = 'Classes/**/*.h' + s.platform = :ios, '12.1' + # Flutter.framework does not contain a i386 slice. + s.pod_target_xcconfig = { 'DEFINES_MODULE' => 'YES', 'EXCLUDED_ARCHS[sdk=iphonesimulator*]' => 'i386' } + s.swift_version = '5.0' + s.static_framework = true + + s.dependency 'Flutter' + s.dependency 'WebRTC-SDK', '92.4515.07' +end diff --git a/lib/livekit_client.dart b/lib/livekit_client.dart index 7c53d2bc9..3f51f7bd5 100644 --- a/lib/livekit_client.dart +++ b/lib/livekit_client.dart @@ -1,7 +1,7 @@ /// Flutter Client SDK to LiveKit. library livekit_client; -export 'src/errors.dart'; +export 'src/exceptions.dart'; export 'src/events.dart'; export 'src/livekit.dart'; export 'src/managers/event.dart'; diff --git a/lib/src/classes/change_notifier.dart b/lib/src/classes/change_notifier.dart deleted file mode 100644 index b8b4f5396..000000000 --- a/lib/src/classes/change_notifier.dart +++ /dev/null @@ -1,32 +0,0 @@ -import 'package:flutter/material.dart'; -import 'package:livekit_client/src/logger.dart'; - -// dispose safe change notifier -abstract class LKChangeNotifier extends ChangeNotifier { - bool _disposed = false; - bool get isDisposed => _disposed; - - @override - void dispose() { - _disposed = true; - super.dispose(); - } - - @override - void addListener(VoidCallback listener) { - if (_disposed) { - logger.warning('calling addListener on a disposed ChangeNotifier'); - return; - } - super.addListener(listener); - } - - @override - void removeListener(VoidCallback listener) { - if (_disposed) { - logger.warning('calling removeListener on a disposed ChangeNotifier'); - return; - } - super.removeListener(listener); - } -} diff --git a/lib/src/events.dart b/lib/src/events.dart index a6510a492..fb0cb3d1e 100644 --- a/lib/src/events.dart +++ b/lib/src/events.dart @@ -1,6 +1,4 @@ import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; -import 'package:livekit_client/livekit_client.dart'; -import 'package:meta/meta.dart'; import 'participant/participant.dart'; import 'participant/remote_participant.dart'; @@ -8,6 +6,7 @@ import 'proto/livekit_models.pb.dart' as lk_models; import 'proto/livekit_rtc.pb.dart' as lk_rtc; import 'track/remote_track_publication.dart'; import 'track/track.dart'; +import 'track/track_publication.dart'; import 'types.dart'; abstract class LiveKitEvent {} @@ -110,14 +109,6 @@ class TrackSubscribedEvent with RoomEvent, ParticipantEvent { }); } -@internal -class ParticipantInfoUpdatedEvent with ParticipantEvent { - final RemoteParticipant participant; - const ParticipantInfoUpdatedEvent({ - required this.participant, - }); -} - /// An error has occured during track subscription. /// Emitted by [Room] and [RemoteParticipant]. class TrackSubscriptionExceptionEvent with RoomEvent, ParticipantEvent { @@ -226,12 +217,12 @@ class EngineReconnectedEvent with EngineEvent { const EngineReconnectedEvent(); } -class EngineParticipantUpdateEvent with EngineEvent { - final List participants; - const EngineParticipantUpdateEvent({ - required this.participants, - }); -} +// class EngineParticipantUpdateEvent with EngineEvent { +// final List participants; +// const EngineParticipantUpdateEvent({ +// required this.participants, +// }); +// } class EngineTrackAddedEvent with EngineEvent { final rtc.MediaStreamTrack track; @@ -244,13 +235,6 @@ class EngineTrackAddedEvent with EngineEvent { }); } -class EngineSpeakersUpdateEvent with EngineEvent { - final List speakers; - const EngineSpeakersUpdateEvent({ - required this.speakers, - }); -} - class EngineDataPacketReceivedEvent with EngineEvent { final lk_models.UserPacket packet; final lk_models.DataPacket_Kind kind; @@ -363,10 +347,11 @@ class SignalTrickleEvent with SignalEvent { }); } -class SignalParticipantUpdateEvent with SignalEvent { - final List updates; +// relayed by Engine +class SignalParticipantUpdateEvent with SignalEvent, EngineEvent { + final List participants; const SignalParticipantUpdateEvent({ - required this.updates, + required this.participants, }); } @@ -379,9 +364,19 @@ class SignalLocalTrackPublishedEvent with SignalEvent { }); } -class SignalActiveSpeakersChangedEvent with SignalEvent { +// speaker update received through websocket +// relayed by Engine +class SignalSpeakersChangedEvent with SignalEvent, EngineEvent { + final List speakers; + const SignalSpeakersChangedEvent({ + required this.speakers, + }); +} + +// Event received through data channel +class EngineActiveSpeakersUpdateEvent with EngineEvent { final List speakers; - const SignalActiveSpeakersChangedEvent({ + const EngineActiveSpeakersUpdateEvent({ required this.speakers, }); } diff --git a/lib/src/errors.dart b/lib/src/exceptions.dart similarity index 99% rename from lib/src/errors.dart rename to lib/src/exceptions.dart index 382ac9ce6..ac312fb28 100644 --- a/lib/src/errors.dart +++ b/lib/src/exceptions.dart @@ -1,6 +1,3 @@ -// -// -// class LiveKitException implements Exception { final String message; const LiveKitException._(this.message); diff --git a/lib/src/managers/delay.dart b/lib/src/managers/delay.dart index 8fe30cecc..f5026d860 100644 --- a/lib/src/managers/delay.dart +++ b/lib/src/managers/delay.dart @@ -1,9 +1,14 @@ import 'package:async/async.dart'; -class CancelableDelayManager { +import '../support/disposable.dart'; + +class CancelableDelayManager extends Disposable { // final _delays = >[]; + CancelableDelayManager() { + onDispose(() async => await cancelAll()); + } // delay but cancelable Future waitFor( Duration wait, { @@ -19,7 +24,16 @@ class CancelableDelayManager { if (!op.isCanceled) ifNotCancelled?.call(); } - Future dispose() async { + // @override + // Future dispose() async { + // final didDispose = await super.dispose(); + // if (didDispose) { + // await cancelAll(); + // } + // return didDispose; + // } + + Future cancelAll() async { // cancel all delays if (_delays.isEmpty) return; // make a copy so we don't mutate while iterating diff --git a/lib/src/managers/event.dart b/lib/src/managers/event.dart index 022a7159b..7bc0c7eb5 100644 --- a/lib/src/managers/event.dart +++ b/lib/src/managers/event.dart @@ -1,13 +1,19 @@ import 'dart:async'; -import 'package:flutter/material.dart'; import 'package:synchronized/synchronized.dart' as sync; -import '../errors.dart'; +import '../exceptions.dart'; import '../extensions.dart'; import '../logger.dart'; +import '../support/disposable.dart'; import '../types.dart'; +mixin EventsEmittable { + final events = EventsEmitter(); + EventsListener createListener({bool synchronized = false}) => + EventsListener(events, synchronized: synchronized); +} + // Type-safe, multi-listenable, dispose safe event handling // TODO: Move to a separate package @@ -17,7 +23,9 @@ class EventsEmitter extends EventsListenable { EventsEmitter({ bool listenSynchronized = false, - }) : super(synchronized: listenSynchronized); + }) : super(synchronized: listenSynchronized) { + onDispose(() async => await streamCtrl.close()); + } @override EventsEmitter get emitter => this; @@ -28,13 +36,6 @@ class EventsEmitter extends EventsListenable { // emit the event streamCtrl.add(event); } - - @override - @mustCallSuper - Future dispose() async { - await streamCtrl.close(); - await super.dispose(); - } } // for listening only @@ -51,7 +52,7 @@ class EventsListener extends EventsListenable { } // ensures all listeners will close on dispose -abstract class EventsListenable { +abstract class EventsListenable extends Disposable { // the emitter to listen to EventsEmitter get emitter; @@ -62,17 +63,34 @@ abstract class EventsListenable { EventsListenable({ required this.synchronized, - }); - - @mustCallSuper - Future dispose() async { - // Stop listening to all events - logger.fine('${objectId} dispose() cancelling ${_listeners.length} event(s)'); - for (final listener in _listeners) { - await listener.cancel(); - } + }) { + onDispose(() async { + if (_listeners.isNotEmpty) { + // Stop listening to all events + logger.fine('${objectId} cancelling ${_listeners.length} listeners(s)'); + for (final listener in _listeners) { + await listener.cancel(); + } + } + }); } + // @override + // @mustCallSuper + // Future dispose() async { + // // mark as disposed + // final didDispose = await super.dispose(); + // if (didDispose && _listeners.isNotEmpty) { + // // Stop listening to all events + // logger.fine('${objectId} dispose() cancelling ${_listeners.length} event(s)'); + // for (final listener in _listeners) { + // await listener.cancel(); + // } + // } + + // return didDispose; + // } + // listens to all events, guaranteed to be cancelled on dispose CancelListenFunc listen(FutureOr Function(T) onEvent) { // diff --git a/lib/src/participant/local_participant.dart b/lib/src/participant/local_participant.dart index 86896de11..0d096ba51 100644 --- a/lib/src/participant/local_participant.dart +++ b/lib/src/participant/local_participant.dart @@ -1,8 +1,9 @@ import 'package:flutter/foundation.dart'; import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; +import 'package:meta/meta.dart'; -import '../errors.dart'; import '../events.dart'; +import '../exceptions.dart'; import '../extensions.dart'; import '../logger.dart'; import '../managers/event.dart'; @@ -19,16 +20,17 @@ import 'participant.dart'; /// Represents the current participant in the room. class LocalParticipant extends Participant { - final RTCEngine _engine; + @internal + final RTCEngine engine; + @internal final TrackPublishOptions? defaultPublishOptions; LocalParticipant({ - required RTCEngine engine, + required this.engine, required lk_models.ParticipantInfo info, this.defaultPublishOptions, required EventsEmitter roomEvents, - }) : _engine = engine, - super( + }) : super( info.sid, info.identity, roomEvents: roomEvents, @@ -36,39 +38,43 @@ class LocalParticipant extends Participant { updateFromInfo(info); } - /// for internal use - /// {@nodoc} - RTCEngine get engine => _engine; - /// publish an audio track to the room Future publishAudioTrack(LocalAudioTrack track) async { if (audioTracks.any((e) => e.track?.mediaStreamTrack.id == track.mediaStreamTrack.id)) { throw TrackPublishException('track already exists'); } - // try { - final trackInfo = await _engine.addTrack( - cid: track.getCid(), - name: track.name, - kind: track.kind, - ); - - final transceiverInit = rtc.RTCRtpTransceiverInit( - direction: rtc.TransceiverDirection.SendOnly, - ); - // addTransceiver cannot pass in a kind parameter due to a bug in flutter-webrtc (web) - track.transceiver = await _engine.publisher?.pc.addTransceiver( - track: track.mediaStreamTrack, - kind: rtc.RTCRtpMediaType.RTCRtpMediaTypeAudio, - init: transceiverInit, - ); - await _engine.negotiate(); - - final pub = LocalTrackPublication(trackInfo, track, this); - addTrackPublication(pub); - notifyListeners(); - - return pub; + // await AudioManager().incrementPublishCounter(); + + try { + final trackInfo = await engine.addTrack( + cid: track.getCid(), + name: track.name, + kind: track.kind, + ); + + await track.start(); + + final transceiverInit = rtc.RTCRtpTransceiverInit( + direction: rtc.TransceiverDirection.SendOnly, + ); + // addTransceiver cannot pass in a kind parameter due to a bug in flutter-webrtc (web) + track.transceiver = await engine.publisher?.pc.addTransceiver( + track: track.mediaStreamTrack, + kind: rtc.RTCRtpMediaType.RTCRtpMediaTypeAudio, + init: transceiverInit, + ); + await engine.negotiate(); + + final pub = LocalTrackPublication(trackInfo, track, this); + addTrackPublication(pub); + notifyListeners(); + return pub; + } catch (e) { + // In any case there was an exception, revert the count. + // await AudioManager().decrementPublishCounter(); + rethrow; + } } /// Publish a video track to the room @@ -83,13 +89,16 @@ class LocalParticipant extends Participant { // Use default options from `ConnectOptions` if options is null options = options ?? defaultPublishOptions; - final trackInfo = await _engine.addTrack( + final trackInfo = await engine.addTrack( cid: track.getCid(), name: track.name, kind: track.kind, ); + logger.fine('publishVideoTrack addTrack response: ${trackInfo}'); + await track.start(); + // Video encodings and simulcasts // use constraints passed to getUserMedia by default @@ -124,14 +133,14 @@ class LocalParticipant extends Participant { streams: [track.mediaStream], ); - logger.fine('publishVideoTrack publisher: ${_engine.publisher}'); + logger.fine('publishVideoTrack publisher: ${engine.publisher}'); - track.transceiver = await _engine.publisher?.pc.addTransceiver( + track.transceiver = await engine.publisher?.pc.addTransceiver( track: track.mediaStreamTrack, kind: rtc.RTCRtpMediaType.RTCRtpMediaTypeVideo, init: transceiverInit, ); - await _engine.negotiate(); + await engine.negotiate(); final pub = LocalTrackPublication(trackInfo, track, this); addTrackPublication(pub); @@ -156,8 +165,17 @@ class LocalParticipant extends Participant { final sender = track.transceiver?.sender; if (sender != null) { - await engine.publisher?.pc.removeTrack(sender); - await engine.negotiate(); + try { + await engine.publisher?.pc.removeTrack(sender); + } catch (_) { + logger.warning('[$objectId] rtc.removeTrack() did throw ${_}'); + } + + // doesn't make sense to negotiate if already disposed + if (!isDisposed) { + // manual negotiation since track changed + await engine.negotiate(); + } } } } diff --git a/lib/src/participant/participant.dart b/lib/src/participant/participant.dart index 32ed8be66..1ede18c01 100644 --- a/lib/src/participant/participant.dart +++ b/lib/src/participant/participant.dart @@ -1,13 +1,12 @@ import 'package:collection/collection.dart'; -import 'package:flutter/foundation.dart'; import 'package:meta/meta.dart'; -import '../classes/change_notifier.dart'; import '../events.dart'; import '../extensions.dart'; import '../logger.dart'; import '../managers/event.dart'; import '../proto/livekit_models.pb.dart' as lk_models; +import '../support/disposable.dart'; import '../track/track_publication.dart'; import 'remote_participant.dart'; @@ -21,7 +20,7 @@ import 'remote_participant.dart'; /// Base for [RemoteParticipant] and [LocalParticipant], /// can not be instantiated directly. -abstract class Participant extends LKChangeNotifier { +abstract class Participant extends DisposableChangeNotifier with EventsEmittable { /// map of track sid => published track final trackPublications = {}; @@ -44,7 +43,6 @@ abstract class Participant extends LKChangeNotifier { bool _isSpeaking = false; // suppport for multiple event listeners - final events = EventsEmitter(); final EventsEmitter roomEvents; /// when the participant joined the room @@ -85,14 +83,11 @@ abstract class Participant extends LKChangeNotifier { logger.fine('[ParticipantEvent] $event, will notifyListeners()'); notifyListeners(); }); - } - @override - @mustCallSuper - Future dispose() async { - logger.fine('$objectId dispose()'); - await events.dispose(); - super.dispose(); + onDispose(() async { + await events.dispose(); + await unpublishAllTracks(); + }); } /// for internal use @@ -106,7 +101,7 @@ abstract class Participant extends LKChangeNotifier { lastSpokeAt = DateTime.now(); } - [events, roomEvents].emit(SpeakingChangedEvent( + events.emit(SpeakingChangedEvent( participant: this, speaking: speaking, )); @@ -145,15 +140,17 @@ abstract class Participant extends LKChangeNotifier { // Must implement Future unpublishTrack(String trackSid, {bool notify = false}); - Future unpublishAllTracks() async { - final _ = List.from(trackPublications.values); - for (final track in _) { - await unpublishTrack(track.sid); + Future unpublishAllTracks({bool notify = false}) async { + final trackSids = trackPublications.keys.toSet(); + for (final trackid in trackSids) { + await unpublishTrack(trackid, notify: notify); } } + // // Equality operators // Object is considered equal when sid is equal + // @override int get hashCode => sid.hashCode; diff --git a/lib/src/participant/remote_participant.dart b/lib/src/participant/remote_participant.dart index fdcf8a158..177a7eb7f 100644 --- a/lib/src/participant/remote_participant.dart +++ b/lib/src/participant/remote_participant.dart @@ -90,8 +90,10 @@ class RemoteParticipant extends Participant { final Track track; if (pub.kind == lk_models.TrackType.AUDIO) { // audio track + // await AudioManager().incrementSubscriptionCounter(); + final audioTrack = AudioTrack(pub.name, mediaTrack, stream); - audioTrack.start(); + await audioTrack.start(); track = audioTrack; } else { // video track @@ -143,8 +145,7 @@ class RemoteParticipant extends Participant { // unpublish any track that is not in the info final validSids = info.tracks.map((e) => e.sid); - final removeSids = - trackPublications.values.where((e) => !validSids.contains(e.sid)).map((e) => e.sid); + final removeSids = trackPublications.keys.where((e) => !validSids.contains(e)).toSet(); for (final sid in removeSids) { await unpublishTrack(sid, notify: true); } @@ -165,6 +166,10 @@ class RemoteParticipant extends Participant { track: track, publication: pub, )); + + // if (track is AudioTrack) { + // await AudioManager().decrementSubscriptionCounter(); + // } } if (notify) { diff --git a/lib/src/proto/livekit_models.pb.dart b/lib/src/proto/livekit_models.pb.dart index 09f934cde..0d4251ff7 100644 --- a/lib/src/proto/livekit_models.pb.dart +++ b/lib/src/proto/livekit_models.pb.dart @@ -37,6 +37,7 @@ class Room extends $pb.GeneratedMessage { const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'enabledCodecs', $pb.PbFieldType.PM, subBuilder: Codec.create) + ..aOS(8, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'metadata') ..hasRequiredFields = false; Room._() : super(); @@ -48,6 +49,7 @@ class Room extends $pb.GeneratedMessage { $fixnum.Int64? creationTime, $core.String? turnPassword, $core.Iterable? enabledCodecs, + $core.String? metadata, }) { final _result = create(); if (sid != null) { @@ -71,6 +73,9 @@ class Room extends $pb.GeneratedMessage { if (enabledCodecs != null) { _result.enabledCodecs.addAll(enabledCodecs); } + if (metadata != null) { + _result.metadata = metadata; + } return _result; } factory Room.fromBuffer($core.List<$core.int> i, @@ -171,6 +176,18 @@ class Room extends $pb.GeneratedMessage { @$pb.TagNumber(7) $core.List get enabledCodecs => $_getList(6); + + @$pb.TagNumber(8) + $core.String get metadata => $_getSZ(7); + @$pb.TagNumber(8) + set metadata($core.String v) { + $_setString(7, v); + } + + @$pb.TagNumber(8) + $core.bool hasMetadata() => $_has(7); + @$pb.TagNumber(8) + void clearMetadata() => clearField(8); } class Codec extends $pb.GeneratedMessage { @@ -922,3 +939,112 @@ class UserPacket extends $pb.GeneratedMessage { @$pb.TagNumber(3) $core.List<$core.String> get destinationSids => $_getList(2); } + +class RecordingResult extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo( + const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RecordingResult', + package: const $pb.PackageName( + const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'livekit'), + createEmptyInstance: create) + ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id') + ..aOS(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'error') + ..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'duration') + ..aOS(4, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'location') + ..hasRequiredFields = false; + + RecordingResult._() : super(); + factory RecordingResult({ + $core.String? id, + $core.String? error, + $fixnum.Int64? duration, + $core.String? location, + }) { + final _result = create(); + if (id != null) { + _result.id = id; + } + if (error != null) { + _result.error = error; + } + if (duration != null) { + _result.duration = duration; + } + if (location != null) { + _result.location = location; + } + return _result; + } + factory RecordingResult.fromBuffer($core.List<$core.int> i, + [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => + create()..mergeFromBuffer(i, r); + factory RecordingResult.fromJson($core.String i, + [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => + create()..mergeFromJson(i, r); + @$core.Deprecated('Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' + 'Will be removed in next major version') + RecordingResult clone() => RecordingResult()..mergeFromMessage(this); + @$core.Deprecated('Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' + 'Will be removed in next major version') + RecordingResult copyWith(void Function(RecordingResult) updates) => + super.copyWith((message) => updates(message as RecordingResult)) + as RecordingResult; // ignore: deprecated_member_use + $pb.BuilderInfo get info_ => _i; + @$core.pragma('dart2js:noInline') + static RecordingResult create() => RecordingResult._(); + RecordingResult createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); + @$core.pragma('dart2js:noInline') + static RecordingResult getDefault() => + _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static RecordingResult? _defaultInstance; + + @$pb.TagNumber(1) + $core.String get id => $_getSZ(0); + @$pb.TagNumber(1) + set id($core.String v) { + $_setString(0, v); + } + + @$pb.TagNumber(1) + $core.bool hasId() => $_has(0); + @$pb.TagNumber(1) + void clearId() => clearField(1); + + @$pb.TagNumber(2) + $core.String get error => $_getSZ(1); + @$pb.TagNumber(2) + set error($core.String v) { + $_setString(1, v); + } + + @$pb.TagNumber(2) + $core.bool hasError() => $_has(1); + @$pb.TagNumber(2) + void clearError() => clearField(2); + + @$pb.TagNumber(3) + $fixnum.Int64 get duration => $_getI64(2); + @$pb.TagNumber(3) + set duration($fixnum.Int64 v) { + $_setInt64(2, v); + } + + @$pb.TagNumber(3) + $core.bool hasDuration() => $_has(2); + @$pb.TagNumber(3) + void clearDuration() => clearField(3); + + @$pb.TagNumber(4) + $core.String get location => $_getSZ(3); + @$pb.TagNumber(4) + set location($core.String v) { + $_setString(3, v); + } + + @$pb.TagNumber(4) + $core.bool hasLocation() => $_has(3); + @$pb.TagNumber(4) + void clearLocation() => clearField(4); +} diff --git a/lib/src/proto/livekit_models.pbjson.dart b/lib/src/proto/livekit_models.pbjson.dart index c03167f61..0ca1adf1f 100644 --- a/lib/src/proto/livekit_models.pbjson.dart +++ b/lib/src/proto/livekit_models.pbjson.dart @@ -40,12 +40,13 @@ const Room$json = const { '6': '.livekit.Codec', '10': 'enabledCodecs' }, + const {'1': 'metadata', '3': 8, '4': 1, '5': 9, '10': 'metadata'}, ], }; /// Descriptor for `Room`. Decode as a `google.protobuf.DescriptorProto`. final $typed_data.Uint8List roomDescriptor = $convert.base64Decode( - 'CgRSb29tEhAKA3NpZBgBIAEoCVIDc2lkEhIKBG5hbWUYAiABKAlSBG5hbWUSIwoNZW1wdHlfdGltZW91dBgDIAEoDVIMZW1wdHlUaW1lb3V0EikKEG1heF9wYXJ0aWNpcGFudHMYBCABKA1SD21heFBhcnRpY2lwYW50cxIjCg1jcmVhdGlvbl90aW1lGAUgASgDUgxjcmVhdGlvblRpbWUSIwoNdHVybl9wYXNzd29yZBgGIAEoCVIMdHVyblBhc3N3b3JkEjUKDmVuYWJsZWRfY29kZWNzGAcgAygLMg4ubGl2ZWtpdC5Db2RlY1INZW5hYmxlZENvZGVjcw=='); + 'CgRSb29tEhAKA3NpZBgBIAEoCVIDc2lkEhIKBG5hbWUYAiABKAlSBG5hbWUSIwoNZW1wdHlfdGltZW91dBgDIAEoDVIMZW1wdHlUaW1lb3V0EikKEG1heF9wYXJ0aWNpcGFudHMYBCABKA1SD21heFBhcnRpY2lwYW50cxIjCg1jcmVhdGlvbl90aW1lGAUgASgDUgxjcmVhdGlvblRpbWUSIwoNdHVybl9wYXNzd29yZBgGIAEoCVIMdHVyblBhc3N3b3JkEjUKDmVuYWJsZWRfY29kZWNzGAcgAygLMg4ubGl2ZWtpdC5Db2RlY1INZW5hYmxlZENvZGVjcxIaCghtZXRhZGF0YRgIIAEoCVIIbWV0YWRhdGE='); @$core.Deprecated('Use codecDescriptor instead') const Codec$json = const { '1': 'Codec', @@ -182,3 +183,17 @@ const UserPacket$json = const { /// Descriptor for `UserPacket`. Decode as a `google.protobuf.DescriptorProto`. final $typed_data.Uint8List userPacketDescriptor = $convert.base64Decode( 'CgpVc2VyUGFja2V0EicKD3BhcnRpY2lwYW50X3NpZBgBIAEoCVIOcGFydGljaXBhbnRTaWQSGAoHcGF5bG9hZBgCIAEoDFIHcGF5bG9hZBIpChBkZXN0aW5hdGlvbl9zaWRzGAMgAygJUg9kZXN0aW5hdGlvblNpZHM='); +@$core.Deprecated('Use recordingResultDescriptor instead') +const RecordingResult$json = const { + '1': 'RecordingResult', + '2': const [ + const {'1': 'id', '3': 1, '4': 1, '5': 9, '10': 'id'}, + const {'1': 'error', '3': 2, '4': 1, '5': 9, '10': 'error'}, + const {'1': 'duration', '3': 3, '4': 1, '5': 3, '10': 'duration'}, + const {'1': 'location', '3': 4, '4': 1, '5': 9, '10': 'location'}, + ], +}; + +/// Descriptor for `RecordingResult`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List recordingResultDescriptor = $convert.base64Decode( + 'Cg9SZWNvcmRpbmdSZXN1bHQSDgoCaWQYASABKAlSAmlkEhQKBWVycm9yGAIgASgJUgVlcnJvchIaCghkdXJhdGlvbhgDIAEoA1IIZHVyYXRpb24SGgoIbG9jYXRpb24YBCABKAlSCGxvY2F0aW9u'); diff --git a/lib/src/proto/livekit_rtc.pb.dart b/lib/src/proto/livekit_rtc.pb.dart index 42dab15bd..d6ceb95a7 100644 --- a/lib/src/proto/livekit_rtc.pb.dart +++ b/lib/src/proto/livekit_rtc.pb.dart @@ -282,9 +282,10 @@ enum SignalResponse_Message { trickle, update, trackPublished, - speaker, leave, mute, + speakersChanged, + roomUpdate, notSet } @@ -296,9 +297,10 @@ class SignalResponse extends $pb.GeneratedMessage { 4: SignalResponse_Message.trickle, 5: SignalResponse_Message.update, 6: SignalResponse_Message.trackPublished, - 7: SignalResponse_Message.speaker, 8: SignalResponse_Message.leave, 9: SignalResponse_Message.mute, + 10: SignalResponse_Message.speakersChanged, + 11: SignalResponse_Message.roomUpdate, 0: SignalResponse_Message.notSet }; static final $pb.BuilderInfo _i = $pb.BuilderInfo( @@ -306,7 +308,7 @@ class SignalResponse extends $pb.GeneratedMessage { package: const $pb.PackageName( const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'livekit'), createEmptyInstance: create) - ..oo(0, [1, 2, 3, 4, 5, 6, 7, 8, 9]) + ..oo(0, [1, 2, 3, 4, 5, 6, 8, 9, 10, 11]) ..aOM( 1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'join', subBuilder: JoinResponse.create) @@ -325,15 +327,18 @@ class SignalResponse extends $pb.GeneratedMessage { ..aOM( 6, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'trackPublished', subBuilder: TrackPublishedResponse.create) - ..aOM<$0.ActiveSpeakerUpdate>( - 7, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'speaker', - subBuilder: $0.ActiveSpeakerUpdate.create) ..aOM( 8, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'leave', subBuilder: LeaveRequest.create) ..aOM( 9, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'mute', subBuilder: MuteTrackRequest.create) + ..aOM( + 10, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'speakersChanged', + subBuilder: SpeakersChanged.create) + ..aOM( + 11, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'roomUpdate', + subBuilder: RoomUpdate.create) ..hasRequiredFields = false; SignalResponse._() : super(); @@ -344,9 +349,10 @@ class SignalResponse extends $pb.GeneratedMessage { TrickleRequest? trickle, ParticipantUpdate? update, TrackPublishedResponse? trackPublished, - $0.ActiveSpeakerUpdate? speaker, LeaveRequest? leave, MuteTrackRequest? mute, + SpeakersChanged? speakersChanged, + RoomUpdate? roomUpdate, }) { final _result = create(); if (join != null) { @@ -367,15 +373,18 @@ class SignalResponse extends $pb.GeneratedMessage { if (trackPublished != null) { _result.trackPublished = trackPublished; } - if (speaker != null) { - _result.speaker = speaker; - } if (leave != null) { _result.leave = leave; } if (mute != null) { _result.mute = mute; } + if (speakersChanged != null) { + _result.speakersChanged = speakersChanged; + } + if (roomUpdate != null) { + _result.roomUpdate = roomUpdate; + } return _result; } factory SignalResponse.fromBuffer($core.List<$core.int> i, @@ -491,47 +500,61 @@ class SignalResponse extends $pb.GeneratedMessage { @$pb.TagNumber(6) TrackPublishedResponse ensureTrackPublished() => $_ensure(5); - @$pb.TagNumber(7) - $0.ActiveSpeakerUpdate get speaker => $_getN(6); - @$pb.TagNumber(7) - set speaker($0.ActiveSpeakerUpdate v) { - setField(7, v); - } - - @$pb.TagNumber(7) - $core.bool hasSpeaker() => $_has(6); - @$pb.TagNumber(7) - void clearSpeaker() => clearField(7); - @$pb.TagNumber(7) - $0.ActiveSpeakerUpdate ensureSpeaker() => $_ensure(6); - @$pb.TagNumber(8) - LeaveRequest get leave => $_getN(7); + LeaveRequest get leave => $_getN(6); @$pb.TagNumber(8) set leave(LeaveRequest v) { setField(8, v); } @$pb.TagNumber(8) - $core.bool hasLeave() => $_has(7); + $core.bool hasLeave() => $_has(6); @$pb.TagNumber(8) void clearLeave() => clearField(8); @$pb.TagNumber(8) - LeaveRequest ensureLeave() => $_ensure(7); + LeaveRequest ensureLeave() => $_ensure(6); @$pb.TagNumber(9) - MuteTrackRequest get mute => $_getN(8); + MuteTrackRequest get mute => $_getN(7); @$pb.TagNumber(9) set mute(MuteTrackRequest v) { setField(9, v); } @$pb.TagNumber(9) - $core.bool hasMute() => $_has(8); + $core.bool hasMute() => $_has(7); @$pb.TagNumber(9) void clearMute() => clearField(9); @$pb.TagNumber(9) - MuteTrackRequest ensureMute() => $_ensure(8); + MuteTrackRequest ensureMute() => $_ensure(7); + + @$pb.TagNumber(10) + SpeakersChanged get speakersChanged => $_getN(8); + @$pb.TagNumber(10) + set speakersChanged(SpeakersChanged v) { + setField(10, v); + } + + @$pb.TagNumber(10) + $core.bool hasSpeakersChanged() => $_has(8); + @$pb.TagNumber(10) + void clearSpeakersChanged() => clearField(10); + @$pb.TagNumber(10) + SpeakersChanged ensureSpeakersChanged() => $_ensure(8); + + @$pb.TagNumber(11) + RoomUpdate get roomUpdate => $_getN(9); + @$pb.TagNumber(11) + set roomUpdate(RoomUpdate v) { + setField(11, v); + } + + @$pb.TagNumber(11) + $core.bool hasRoomUpdate() => $_has(9); + @$pb.TagNumber(11) + void clearRoomUpdate() => clearField(11); + @$pb.TagNumber(11) + RoomUpdate ensureRoomUpdate() => $_ensure(9); } class AddTrackRequest extends $pb.GeneratedMessage { @@ -1559,3 +1582,117 @@ class ICEServer extends $pb.GeneratedMessage { @$pb.TagNumber(3) void clearCredential() => clearField(3); } + +class SpeakersChanged extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo( + const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'SpeakersChanged', + package: const $pb.PackageName( + const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'livekit'), + createEmptyInstance: create) + ..pc<$0.SpeakerInfo>( + 1, + const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'speakers', + $pb.PbFieldType.PM, + subBuilder: $0.SpeakerInfo.create) + ..hasRequiredFields = false; + + SpeakersChanged._() : super(); + factory SpeakersChanged({ + $core.Iterable<$0.SpeakerInfo>? speakers, + }) { + final _result = create(); + if (speakers != null) { + _result.speakers.addAll(speakers); + } + return _result; + } + factory SpeakersChanged.fromBuffer($core.List<$core.int> i, + [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => + create()..mergeFromBuffer(i, r); + factory SpeakersChanged.fromJson($core.String i, + [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => + create()..mergeFromJson(i, r); + @$core.Deprecated('Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' + 'Will be removed in next major version') + SpeakersChanged clone() => SpeakersChanged()..mergeFromMessage(this); + @$core.Deprecated('Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' + 'Will be removed in next major version') + SpeakersChanged copyWith(void Function(SpeakersChanged) updates) => + super.copyWith((message) => updates(message as SpeakersChanged)) + as SpeakersChanged; // ignore: deprecated_member_use + $pb.BuilderInfo get info_ => _i; + @$core.pragma('dart2js:noInline') + static SpeakersChanged create() => SpeakersChanged._(); + SpeakersChanged createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); + @$core.pragma('dart2js:noInline') + static SpeakersChanged getDefault() => + _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static SpeakersChanged? _defaultInstance; + + @$pb.TagNumber(1) + $core.List<$0.SpeakerInfo> get speakers => $_getList(0); +} + +class RoomUpdate extends $pb.GeneratedMessage { + static final $pb.BuilderInfo _i = $pb.BuilderInfo( + const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RoomUpdate', + package: const $pb.PackageName( + const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'livekit'), + createEmptyInstance: create) + ..aOM<$0.Room>(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'room', + subBuilder: $0.Room.create) + ..hasRequiredFields = false; + + RoomUpdate._() : super(); + factory RoomUpdate({ + $0.Room? room, + }) { + final _result = create(); + if (room != null) { + _result.room = room; + } + return _result; + } + factory RoomUpdate.fromBuffer($core.List<$core.int> i, + [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => + create()..mergeFromBuffer(i, r); + factory RoomUpdate.fromJson($core.String i, + [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => + create()..mergeFromJson(i, r); + @$core.Deprecated('Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.deepCopy] instead. ' + 'Will be removed in next major version') + RoomUpdate clone() => RoomUpdate()..mergeFromMessage(this); + @$core.Deprecated('Using this can add significant overhead to your binary. ' + 'Use [GeneratedMessageGenericExtensions.rebuild] instead. ' + 'Will be removed in next major version') + RoomUpdate copyWith(void Function(RoomUpdate) updates) => + super.copyWith((message) => updates(message as RoomUpdate)) + as RoomUpdate; // ignore: deprecated_member_use + $pb.BuilderInfo get info_ => _i; + @$core.pragma('dart2js:noInline') + static RoomUpdate create() => RoomUpdate._(); + RoomUpdate createEmptyInstance() => create(); + static $pb.PbList createRepeated() => $pb.PbList(); + @$core.pragma('dart2js:noInline') + static RoomUpdate getDefault() => + _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor(create); + static RoomUpdate? _defaultInstance; + + @$pb.TagNumber(1) + $0.Room get room => $_getN(0); + @$pb.TagNumber(1) + set room($0.Room v) { + setField(1, v); + } + + @$pb.TagNumber(1) + $core.bool hasRoom() => $_has(0); + @$pb.TagNumber(1) + void clearRoom() => clearField(1); + @$pb.TagNumber(1) + $0.Room ensureRoom() => $_ensure(0); +} diff --git a/lib/src/proto/livekit_rtc.pbjson.dart b/lib/src/proto/livekit_rtc.pbjson.dart index 36b572d90..4833792c8 100644 --- a/lib/src/proto/livekit_rtc.pbjson.dart +++ b/lib/src/proto/livekit_rtc.pbjson.dart @@ -186,15 +186,6 @@ const SignalResponse$json = const { '9': 0, '10': 'trackPublished' }, - const { - '1': 'speaker', - '3': 7, - '4': 1, - '5': 11, - '6': '.livekit.ActiveSpeakerUpdate', - '9': 0, - '10': 'speaker' - }, const { '1': 'leave', '3': 8, @@ -213,6 +204,24 @@ const SignalResponse$json = const { '9': 0, '10': 'mute' }, + const { + '1': 'speakers_changed', + '3': 10, + '4': 1, + '5': 11, + '6': '.livekit.SpeakersChanged', + '9': 0, + '10': 'speakersChanged' + }, + const { + '1': 'room_update', + '3': 11, + '4': 1, + '5': 11, + '6': '.livekit.RoomUpdate', + '9': 0, + '10': 'roomUpdate' + }, ], '8': const [ const {'1': 'message'}, @@ -221,7 +230,7 @@ const SignalResponse$json = const { /// Descriptor for `SignalResponse`. Decode as a `google.protobuf.DescriptorProto`. final $typed_data.Uint8List signalResponseDescriptor = $convert.base64Decode( - 'Cg5TaWduYWxSZXNwb25zZRIrCgRqb2luGAEgASgLMhUubGl2ZWtpdC5Kb2luUmVzcG9uc2VIAFIEam9pbhI1CgZhbnN3ZXIYAiABKAsyGy5saXZla2l0LlNlc3Npb25EZXNjcmlwdGlvbkgAUgZhbnN3ZXISMwoFb2ZmZXIYAyABKAsyGy5saXZla2l0LlNlc3Npb25EZXNjcmlwdGlvbkgAUgVvZmZlchIzCgd0cmlja2xlGAQgASgLMhcubGl2ZWtpdC5Ucmlja2xlUmVxdWVzdEgAUgd0cmlja2xlEjQKBnVwZGF0ZRgFIAEoCzIaLmxpdmVraXQuUGFydGljaXBhbnRVcGRhdGVIAFIGdXBkYXRlEkoKD3RyYWNrX3B1Ymxpc2hlZBgGIAEoCzIfLmxpdmVraXQuVHJhY2tQdWJsaXNoZWRSZXNwb25zZUgAUg50cmFja1B1Ymxpc2hlZBI4CgdzcGVha2VyGAcgASgLMhwubGl2ZWtpdC5BY3RpdmVTcGVha2VyVXBkYXRlSABSB3NwZWFrZXISLQoFbGVhdmUYCCABKAsyFS5saXZla2l0LkxlYXZlUmVxdWVzdEgAUgVsZWF2ZRIvCgRtdXRlGAkgASgLMhkubGl2ZWtpdC5NdXRlVHJhY2tSZXF1ZXN0SABSBG11dGVCCQoHbWVzc2FnZQ=='); + 'Cg5TaWduYWxSZXNwb25zZRIrCgRqb2luGAEgASgLMhUubGl2ZWtpdC5Kb2luUmVzcG9uc2VIAFIEam9pbhI1CgZhbnN3ZXIYAiABKAsyGy5saXZla2l0LlNlc3Npb25EZXNjcmlwdGlvbkgAUgZhbnN3ZXISMwoFb2ZmZXIYAyABKAsyGy5saXZla2l0LlNlc3Npb25EZXNjcmlwdGlvbkgAUgVvZmZlchIzCgd0cmlja2xlGAQgASgLMhcubGl2ZWtpdC5Ucmlja2xlUmVxdWVzdEgAUgd0cmlja2xlEjQKBnVwZGF0ZRgFIAEoCzIaLmxpdmVraXQuUGFydGljaXBhbnRVcGRhdGVIAFIGdXBkYXRlEkoKD3RyYWNrX3B1Ymxpc2hlZBgGIAEoCzIfLmxpdmVraXQuVHJhY2tQdWJsaXNoZWRSZXNwb25zZUgAUg50cmFja1B1Ymxpc2hlZBItCgVsZWF2ZRgIIAEoCzIVLmxpdmVraXQuTGVhdmVSZXF1ZXN0SABSBWxlYXZlEi8KBG11dGUYCSABKAsyGS5saXZla2l0Lk11dGVUcmFja1JlcXVlc3RIAFIEbXV0ZRJFChBzcGVha2Vyc19jaGFuZ2VkGAogASgLMhgubGl2ZWtpdC5TcGVha2Vyc0NoYW5nZWRIAFIPc3BlYWtlcnNDaGFuZ2VkEjYKC3Jvb21fdXBkYXRlGAsgASgLMhMubGl2ZWtpdC5Sb29tVXBkYXRlSABSCnJvb21VcGRhdGVCCQoHbWVzc2FnZQ=='); @$core.Deprecated('Use addTrackRequestDescriptor instead') const AddTrackRequest$json = const { '1': 'AddTrackRequest', @@ -402,3 +411,25 @@ const ICEServer$json = const { /// Descriptor for `ICEServer`. Decode as a `google.protobuf.DescriptorProto`. final $typed_data.Uint8List iCEServerDescriptor = $convert.base64Decode( 'CglJQ0VTZXJ2ZXISEgoEdXJscxgBIAMoCVIEdXJscxIaCgh1c2VybmFtZRgCIAEoCVIIdXNlcm5hbWUSHgoKY3JlZGVudGlhbBgDIAEoCVIKY3JlZGVudGlhbA=='); +@$core.Deprecated('Use speakersChangedDescriptor instead') +const SpeakersChanged$json = const { + '1': 'SpeakersChanged', + '2': const [ + const {'1': 'speakers', '3': 1, '4': 3, '5': 11, '6': '.livekit.SpeakerInfo', '10': 'speakers'}, + ], +}; + +/// Descriptor for `SpeakersChanged`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List speakersChangedDescriptor = $convert.base64Decode( + 'Cg9TcGVha2Vyc0NoYW5nZWQSMAoIc3BlYWtlcnMYASADKAsyFC5saXZla2l0LlNwZWFrZXJJbmZvUghzcGVha2Vycw=='); +@$core.Deprecated('Use roomUpdateDescriptor instead') +const RoomUpdate$json = const { + '1': 'RoomUpdate', + '2': const [ + const {'1': 'room', '3': 1, '4': 1, '5': 11, '6': '.livekit.Room', '10': 'room'}, + ], +}; + +/// Descriptor for `RoomUpdate`. Decode as a `google.protobuf.DescriptorProto`. +final $typed_data.Uint8List roomUpdateDescriptor = + $convert.base64Decode('CgpSb29tVXBkYXRlEiEKBHJvb20YASABKAsyDS5saXZla2l0LlJvb21SBHJvb20='); diff --git a/lib/src/room.dart b/lib/src/room.dart index 5a0852e4b..5672b3fcb 100644 --- a/lib/src/room.dart +++ b/lib/src/room.dart @@ -1,10 +1,11 @@ -import 'dart:async'; import 'dart:collection'; -import 'classes/change_notifier.dart'; +import 'package:collection/collection.dart'; +import 'package:flutter/foundation.dart'; + import 'constants.dart'; -import 'errors.dart'; import 'events.dart'; +import 'exceptions.dart'; import 'extensions.dart'; import 'logger.dart'; import 'managers/event.dart'; @@ -15,9 +16,8 @@ import 'participant/remote_participant.dart'; import 'proto/livekit_models.pb.dart' as lk_models; import 'proto/livekit_rtc.pb.dart' as lk_rtc; import 'rtc_engine.dart'; -import 'signal_client.dart'; +import 'support/disposable.dart'; import 'track/track.dart'; -import 'track/track_publication.dart'; import 'types.dart'; /// Room is the primary construct for LiveKit conferences. It contains a @@ -29,14 +29,14 @@ import 'types.dart'; /// * participant membership changes /// * active speakers are different /// {@category Room} -class Room extends LKChangeNotifier { +class Room extends DisposableChangeNotifier with EventsEmittable { // Room is only instantiated if connected, so defaults to connected. ConnectionState _connectionState = ConnectionState.connected; /// connection state of the room ConnectionState get connectionState => _connectionState; - final Map _participants = {}; + final _participants = {}; /// map of SID to RemoteParticipant UnmodifiableMapView get participants => @@ -46,10 +46,10 @@ class Room extends LKChangeNotifier { late final LocalParticipant localParticipant; /// name of the room - late final String name; + final String name; /// sid of the room - late final String sid; + final String sid; List _activeSpeakers = []; @@ -60,8 +60,7 @@ class Room extends LKChangeNotifier { final RTCEngine engine; // suppport for multiple event listeners - final events = EventsEmitter(); - late final _engineListener = EventsListener(engine.events); + late final _engineListener = engine.createListener(); /// internal use /// {@nodoc} @@ -69,7 +68,8 @@ class Room extends LKChangeNotifier { required this.engine, required lk_rtc.JoinResponse joinResponse, ConnectOptions? connectOptions, - }) { + }) : sid = joinResponse.room.sid, + name = joinResponse.room.name { // _setUpListeners(); @@ -80,9 +80,6 @@ class Room extends LKChangeNotifier { roomEvents: events, ); - sid = joinResponse.room.sid; - name = joinResponse.room.name; - for (final info in joinResponse.otherParticipants) { _getOrCreateRemoteParticipant(info.sid, info); } @@ -92,20 +89,17 @@ class Room extends LKChangeNotifier { logger.fine('[RoomEvent] $event, will notifyListeners()'); notifyListeners(); }); - } - @override - Future dispose() async { - // dispose local participant - await localParticipant.dispose(); - // dispose Room's events emitter - await events.dispose(); - // dispose all listeners for RTCEngine - await _engineListener.dispose(); - // dispose the engine - await engine.dispose(); - - super.dispose(); + onDispose(() async { + // dispose events + await events.dispose(); + // dispose local participant + await localParticipant.dispose(); + // dispose all listeners for RTCEngine + await _engineListener.dispose(); + // dispose the engine + await engine.dispose(); + }); } static Future connect( @@ -116,7 +110,6 @@ class Room extends LKChangeNotifier { }) async { // final engine = RTCEngine( - SignalClient(), rtcConfig, ); @@ -167,16 +160,16 @@ class Room extends LKChangeNotifier { ..on((event) async { _connectionState = ConnectionState.connected; events.emit(const RoomReconnectedEvent()); - notifyListeners(); }) ..on((event) async { _connectionState = ConnectionState.reconnecting; events.emit(const RoomReconnectingEvent()); - notifyListeners(); }) - ..on((event) => _onDisconnectedEvent()) - ..on((event) => _onParticipantUpdateEvent(event.participants)) - ..on((event) => _onSpeakerUpdateEvent(event.speakers)) + ..on((event) => _handleClose()) + ..on((event) => _onParticipantUpdateEvent(event.participants)) + ..on( + (event) => _onEngineActiveSpeakersUpdateEvent(event.speakers)) + ..on((event) => _onSignalSpeakersChangedEvent(event.speakers)) ..on(_onDataMessageEvent) ..on((event) async { final track = localParticipant.trackPublications[event.sid]; @@ -210,8 +203,10 @@ class Room extends LKChangeNotifier { /// Disconnects from the room, notifying server of disconnection. Future disconnect() async { - engine.signalClient.sendLeave(); - await _onDisconnectedEvent(); + if (_connectionState != ConnectionState.disconnected) { + engine.signalClient.sendLeave(); + } + await _handleClose(); } Future reconnect() async { @@ -244,42 +239,36 @@ class Room extends LKChangeNotifier { return participant; } - Future _onDisconnectedEvent() async { + // there should be no problem calling this method multiple times + Future _handleClose() async { + logger.fine('[$objectId] _handleClose()'); if (_connectionState == ConnectionState.disconnected) { - logger.fine('$objectId: _handleDisconnect() already disconnected'); - return; + logger.warning('[$objectId]: close() already disconnected'); } - // we need to flag room as disconnected immediately to avoid - // this method firing multiple times since the following code - // is being awaited - _connectionState = ConnectionState.disconnected; // clean up RemoteParticipants - for (final _ in _participants.values) { + for (final _ in _participants.values.toList()) { // RemoteParticipant is responsible for disposing resources - await _.unpublishAllTracks(); await _.dispose(); } _participants.clear(); // clean up LocalParticipant - // for (final pub in localParticipant.tracks.values) { - // await pub.track?.stop(); - // } await localParticipant.unpublishAllTracks(); - // await localParticipant.dispose(); - // localParticipant = null; - + // clean up engine await engine.close(); _activeSpeakers.clear(); - notifyListeners(); - events.emit(const RoomDisconnectedEvent()); + // only notify if was not disconnected + if (_connectionState != ConnectionState.disconnected) { + _connectionState = ConnectionState.disconnected; + events.emit(const RoomDisconnectedEvent()); + } } - void _onParticipantUpdateEvent(List updates) async { + Future _onParticipantUpdateEvent(List updates) async { // trigger change notifier only if list of participants membership is changed var hasChanged = false; for (final info in updates) { @@ -290,7 +279,7 @@ class Room extends LKChangeNotifier { if (info.state == lk_models.ParticipantInfo_State.DISCONNECTED) { hasChanged = true; - _handleParticipantDisconnect(info.sid); + await _handleParticipantDisconnect(info.sid); continue; } @@ -310,43 +299,63 @@ class Room extends LKChangeNotifier { } } - void _onSpeakerUpdateEvent(List speakers) { - final seenSids = {}; - List newSpeakers = []; - for (final info in speakers) { - seenSids.add(info.sid); - - if (info.sid == localParticipant.sid) { - localParticipant.audioLevel = info.level; - localParticipant.isSpeaking = true; - newSpeakers.add(localParticipant); - continue; + void _onSignalSpeakersChangedEvent(List speakers) { + // + final lastSpeakers = { + for (final p in _activeSpeakers) p.sid: p, + }; + + for (final speaker in speakers) { + Participant? p = _participants[speaker.sid]; + if (speaker.sid == localParticipant.sid) p = localParticipant; + if (p == null) continue; + + p.audioLevel = speaker.level; + p.isSpeaking = speaker.active; + if (speaker.active) { + lastSpeakers[speaker.sid] = p; + } else { + lastSpeakers.remove(speaker.sid); } + } - final participant = participants[info.sid]; - if (participant != null) { - participant.audioLevel = info.level; - participant.isSpeaking = true; - newSpeakers.add(participant); + final activeSpeakers = lastSpeakers.values.toList(); + activeSpeakers.sort((a, b) => b.audioLevel.compareTo(a.audioLevel)); + _activeSpeakers = activeSpeakers; + events.emit(ActiveSpeakersChangedEvent(speakers: activeSpeakers)); + } + + // from data channel + // updates are sent only when there's a change to speaker ordering + void _onEngineActiveSpeakersUpdateEvent(List speakers) { + List activeSpeakers = []; + + // localParticipant & remote participants + final allParticipants = { + localParticipant.sid: localParticipant, + ..._participants, + }; + + for (final speaker in speakers) { + final p = allParticipants[speaker.sid]; + if (p != null) { + p.audioLevel = speaker.level; + p.isSpeaking = true; + activeSpeakers.add(p); } } - // clear previous speakers - if (seenSids.contains(localParticipant.sid)) { - localParticipant.audioLevel = 0; - localParticipant.isSpeaking = false; - } - for (final participant in _participants.values) { - if (!seenSids.contains(participant.sid)) { - participant.audioLevel = 0; - participant.isSpeaking = false; + // clear if not in the speakers list + final speakerSids = speakers.map((e) => e.sid).toSet(); + for (final p in allParticipants.values) { + if (!speakerSids.contains(p.sid)) { + p.audioLevel = 0; + p.isSpeaking = false; } } - events.emit(ActiveSpeakersChangedEvent(speakers: newSpeakers)); - - _activeSpeakers = newSpeakers; - notifyListeners(); + _activeSpeakers = activeSpeakers; + events.emit(ActiveSpeakersChangedEvent(speakers: activeSpeakers)); } void _onDataMessageEvent(EngineDataPacketReceivedEvent dataPacketEvent) { @@ -368,16 +377,13 @@ class Room extends LKChangeNotifier { events.emit(event); } - void _handleParticipantDisconnect(String sid) { + Future _handleParticipantDisconnect(String sid) async { final participant = _participants.remove(sid); if (participant == null) { return; } - final toRemove = List.from(participant.trackPublications.values); - for (final track in toRemove) { - participant.unpublishTrack(track.sid, notify: true); - } + await participant.unpublishAllTracks(notify: true); events.emit(ParticipantDisconnectedEvent(participant: participant)); } diff --git a/lib/src/rtc_engine.dart b/lib/src/rtc_engine.dart index 53adb1ad2..882935ca9 100644 --- a/lib/src/rtc_engine.dart +++ b/lib/src/rtc_engine.dart @@ -3,10 +3,11 @@ import 'dart:async'; import 'package:collection/collection.dart'; import 'package:flutter/foundation.dart'; import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; +import 'package:meta/meta.dart'; import 'constants.dart'; -import 'errors.dart'; import 'events.dart'; +import 'exceptions.dart'; import 'extensions.dart'; import 'logger.dart'; import 'managers/delay.dart'; @@ -15,10 +16,11 @@ import 'options.dart'; import 'proto/livekit_models.pb.dart' as lk_models; import 'proto/livekit_rtc.pb.dart' as lk_rtc; import 'signal_client.dart'; +import 'support/disposable.dart'; import 'transport.dart'; import 'types.dart'; -class RTCEngine { +class RTCEngine extends Disposable with EventsEmittable { static const _lossyDCLabel = '_lossy'; static const _reliableDCLabel = '_reliable'; static const _maxReconnectAttempts = 5; @@ -27,19 +29,25 @@ class RTCEngine { // config for RTCPeerConnection final RTCConfiguration? rtcConfig; + @internal PCTransport? publisher; + + @internal PCTransport? subscriber; - PCTransport? get primary => _subscriberPrimary ? subscriber : publisher; - // used for ice state notifications - CancelListenFunc? _primaryIceStateListener; + @internal + PCTransport? get primary => _subscriberPrimary ? subscriber : publisher; // data channels for packets - rtc.RTCDataChannel? reliableDC; - rtc.RTCDataChannel? lossyDC; - bool iceConnected = false; - bool isReconnecting = false; - bool isClosed = true; + rtc.RTCDataChannel? _reliableDC; + rtc.RTCDataChannel? _lossyDC; + bool _iceConnected = false; + + ConnectionState _connectionState = ConnectionState.disconnected; + + /// connection state of the room + ConnectionState get connectionState => _connectionState; + // true if publisher connection has already been established. // this is helpful to know if we need to restart ICE on the publisher connection bool _hasPublished = false; @@ -55,41 +63,29 @@ class RTCEngine { // internal int _reconnectAttempts = 0; - final events = EventsEmitter(); - late final _signalListener = EventsListener(signalClient.events, synchronized: true); + late final _signalListener = signalClient.createListener(synchronized: true); final delays = CancelableDelayManager(); - // late final Timer _statsTimer; - RTCEngine( - this.signalClient, - this.rtcConfig, - ) { + this.rtcConfig, { + SignalClient? signalClient, + }) : signalClient = signalClient ?? SignalClient() { if (kDebugMode) { // log all EngineEvents events.listen((event) => logger.fine('[EngineEvent] $objectId ${event.runtimeType}')); } _setUpListeners(); - // _statsTimer = Timer.periodic(const Duration(seconds: 1), _onStatTimer); - } - Future dispose() async { - await events.dispose(); - await _signalListener.dispose(); + onDispose(() async { + await events.dispose(); + await delays.dispose(); + await close(); + await _signalListener.dispose(); + }); } - // void _onStatTimer(Timer _) async { - // // - // final stats = await publisher?.pc.getStats(); - // if (stats == null || stats.isEmpty) return; - - // for (final s in stats) { - // logger.fine('STATS ${s.values}'); - // } - // } - Future join( String url, String token, { @@ -110,22 +106,15 @@ class RTCEngine { return event.response; } + // there is no side-effect calling this method multiple times Future close() async { - logger.fine('${objectId} close()'); - if (isClosed) { - logger.fine('${objectId} close() already closed'); - return; + logger.fine('[$objectId] close()'); + if (_connectionState == ConnectionState.disconnected) { + logger.warning('[$objectId]: close() already disconnected'); } - isClosed = true; - // _statsTimer.cancel(); - - // cancel events - await _primaryIceStateListener?.call(); - _primaryIceStateListener = null; - // cancel all ongoing delays - await delays.dispose(); + await delays.cancelAll(); // PCTransport is responsible for disposing RTCPeerConnection await publisher?.dispose(); @@ -134,7 +123,10 @@ class RTCEngine { await subscriber?.dispose(); subscriber = null; - signalClient.close(); + await signalClient.close(); + + _connectionState = ConnectionState.disconnected; + // notifyListeners(); } Future addTrack({ @@ -174,10 +166,10 @@ class RTCEngine { final dcMessage = rtc.RTCDataChannelMessage.fromBinary(packet.writeToBuffer()); - if (packet.kind == lk_models.DataPacket_Kind.LOSSY && lossyDC != null) { - await lossyDC?.send(dcMessage); - } else if (packet.kind == lk_models.DataPacket_Kind.RELIABLE && reliableDC != null) { - await reliableDC?.send(dcMessage); + if (packet.kind == lk_models.DataPacket_Kind.LOSSY && _lossyDC != null) { + await _lossyDC?.send(dcMessage); + } else if (packet.kind == lk_models.DataPacket_Kind.RELIABLE && _reliableDC != null) { + await _reliableDC?.send(dcMessage); } } @@ -188,7 +180,7 @@ class RTCEngine { } if (publisher?.pc.iceConnectionState?.isConnected() == true) { - logger.warning('publisher is already connected'); + logger.warning('[$objectId] publisher is already connected'); return; } @@ -207,7 +199,7 @@ class RTCEngine { } Future reconnect() async { - if (isClosed) { + if (_connectionState == ConnectionState.disconnected) { logger.fine('$objectId reconnect() already closed'); return; } @@ -225,7 +217,8 @@ class RTCEngine { _reconnectAttempts++; try { - isReconnecting = true; + // isReconnecting = true; + _connectionState = ConnectionState.reconnecting; await signalClient.reconnect(url, token); if (publisher == null || subscriber == null) { @@ -256,7 +249,8 @@ class RTCEngine { // don't catch and pass up any exception } finally { // always set reconnecting to false - isReconnecting = false; + // isReconnecting = false; + _connectionState = ConnectionState.disconnected; } } @@ -296,31 +290,25 @@ class RTCEngine { subscriber?.pc.onDataChannel = _onDataChannel; } - // logger.fine('subscriber.pc: ${subscriber?.pc}'); - subscriber?.pc.onIceConnectionState = (state) { - // - events.emit(EngineSubscriberIceStateUpdatedEvent( - state: state, - isPrimary: _subscriberPrimary, - )); - }; + subscriber?.pc.onIceConnectionState = + (state) => events.emit(EngineSubscriberIceStateUpdatedEvent( + state: state, + isPrimary: _subscriberPrimary, + )); - publisher?.pc.onIceConnectionState = (state) { - // - events.emit(EnginePublisherIceStateUpdatedEvent( - state: state, - isPrimary: !_subscriberPrimary, - )); - }; + publisher?.pc.onIceConnectionState = (state) => events.emit(EnginePublisherIceStateUpdatedEvent( + state: state, + isPrimary: !_subscriberPrimary, + )); - _primaryIceStateListener ??= events.on((event) { + events.on((event) { // only listen to primary ice events if (!event.isPrimary) return; if (event.iceState == rtc.RTCIceConnectionState.RTCIceConnectionStateConnected) { - if (!iceConnected) { - iceConnected = true; - if (isReconnecting) { + if (!_iceConnected) { + _iceConnected = true; + if (_connectionState == ConnectionState.reconnecting) { events.emit(const EngineReconnectedEvent()); } else { events.emit(const EngineConnectedEvent()); @@ -328,8 +316,8 @@ class RTCEngine { } } else if (event.iceState == rtc.RTCIceConnectionState.RTCIceConnectionStateFailed) { // trigger reconnect sequence - if (iceConnected) { - iceConnected = false; + if (_iceConnected) { + _iceConnected = false; _onDisconnected('peerconnection'); } } @@ -350,34 +338,40 @@ class RTCEngine { )); }; - // data channels - final lossyInit = rtc.RTCDataChannelInit() - ..binaryType = 'binary' - ..ordered = true - ..maxRetransmits = 0; - lossyDC = await publisher?.pc.createDataChannel(_lossyDCLabel, lossyInit); - - final reliableInit = rtc.RTCDataChannelInit() - ..binaryType = 'binary' - ..ordered = true; - reliableDC = await publisher?.pc.createDataChannel(_reliableDCLabel, reliableInit); - // also handle messages over the pub channel, for backwards compatibility - lossyDC?.onMessage = _onDCMessage; - reliableDC?.onMessage = _onDCMessage; + try { + final lossyInit = rtc.RTCDataChannelInit() + ..binaryType = 'binary' + ..ordered = true + ..maxRetransmits = 0; + _lossyDC = await publisher?.pc.createDataChannel(_lossyDCLabel, lossyInit); + _lossyDC?.onMessage = _onDCMessage; + } catch (_) { + logger.severe('[$objectId] createDataChannel() did throw $_'); + } + + try { + final reliableInit = rtc.RTCDataChannelInit() + ..binaryType = 'binary' + ..ordered = true; + _reliableDC = await publisher?.pc.createDataChannel(_reliableDCLabel, reliableInit); + _reliableDC?.onMessage = _onDCMessage; + } catch (_) { + logger.severe('[$objectId] createDataChannel() did throw $_'); + } } void _onDataChannel(rtc.RTCDataChannel dc) { switch (dc.label) { case _reliableDCLabel: logger.fine('Server opened DC label: ${dc.label}'); - reliableDC = dc; - reliableDC?.onMessage = _onDCMessage; + _reliableDC = dc; + _reliableDC?.onMessage = _onDCMessage; break; case _lossyDCLabel: logger.fine('Server opened DC label: ${dc.label}'); - lossyDC = dc; - lossyDC?.onMessage = _onDCMessage; + _lossyDC = dc; + _lossyDC?.onMessage = _onDCMessage; break; default: logger.warning('Unknown DC label: ${dc.label}'); @@ -395,7 +389,7 @@ class RTCEngine { final dp = lk_models.DataPacket.fromBuffer(message.binary); if (dp.whichValue() == lk_models.DataPacket_Value.speaker) { // Speaker packet - events.emit(EngineSpeakersUpdateEvent(speakers: dp.speaker.speakers)); + events.emit(EngineActiveSpeakersUpdateEvent(speakers: dp.speaker.speakers)); } else if (dp.whichValue() == lk_models.DataPacket_Value.user) { // User packet events.emit(EngineDataPacketReceivedEvent( @@ -406,11 +400,15 @@ class RTCEngine { } Future _onDisconnected(String reason) async { - if (isClosed) return; + if (_connectionState == ConnectionState.disconnected) { + logger.fine('[$objectId] Already disconnected $reason'); + return; + } + + logger.fine('[$objectId] Disconnected $reason'); - logger.fine('disconnected $reason'); if (_reconnectAttempts >= _maxReconnectAttempts) { - logger.info('could not connect after $_reconnectAttempts, giving up'); + logger.info('[$objectId] Could not connect after ${_reconnectAttempts} attempts, giving up'); await close(); events.emit(const EngineDisconnectedEvent()); return; @@ -432,12 +430,10 @@ class RTCEngine { }); } - //------------------ SignalClient Delegate methods -------------------------// - void _setUpListeners() => _signalListener ..on((event) async { // create peer connections - isClosed = false; + _connectionState = ConnectionState.connected; _subscriberPrimary = event.response.subscriberPrimary; _providedIceServers = event.response.iceServers; @@ -451,28 +447,30 @@ class RTCEngine { // for subscriberPrimary, we negotiate when necessary (lazy) await negotiate(); } - - // _joinCompleter?.complete(Future.value(event.response)); - // _joinCompleter = null; }) ..on((_) async { await _onDisconnected('signal'); }) ..on((event) async { if (subscriber == null) { + logger.warning('[$objectId] subscriber is null'); return; } - logger.fine('received server offer(type: ${event.sd.type}, ' + logger.fine('[$objectId] Received server offer(type: ${event.sd.type}, ' '${subscriber!.pc.signalingState})'); await subscriber!.setRemoteDescription(event.sd); - final answer = await subscriber!.pc.createAnswer(); - logger.fine('Created answer'); - logger.finer('sdp: ${answer.sdp}'); - await subscriber!.pc.setLocalDescription(answer); - signalClient.sendAnswer(answer); + try { + final answer = await subscriber!.pc.createAnswer(); + logger.fine('Created answer'); + logger.finer('sdp: ${answer.sdp}'); + await subscriber!.pc.setLocalDescription(answer); + signalClient.sendAnswer(answer); + } catch (_) { + logger.severe('[$objectId] Failed to createAnswer()'); + } }) ..on((event) async { if (publisher == null) { @@ -494,16 +492,10 @@ class RTCEngine { await publisher!.addIceCandidate(event.candidate); } }) - ..on((event) async { - events.emit(EngineParticipantUpdateEvent(participants: event.updates)); - }) - // ..on((event) async { - // final completer = _pendingTrackResolvers.remove(event.cid); - // completer?.complete(event.track); - // }) - ..on((event) async { - events.emit(EngineSpeakersUpdateEvent(speakers: event.speakers)); - }) + // relay + ..on((event) => events.emit(event)) + // relay + ..on((event) => events.emit(event)) ..on((event) async { await close(); events.emit(const EngineDisconnectedEvent()); diff --git a/lib/src/signal_client.dart b/lib/src/signal_client.dart index 2740219e6..d7e6fc2c5 100644 --- a/lib/src/signal_client.dart +++ b/lib/src/signal_client.dart @@ -1,23 +1,23 @@ import 'dart:async'; -import 'dart:convert'; import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; import 'package:http/http.dart' as http; -import 'errors.dart'; import 'events.dart'; +import 'exceptions.dart'; import 'extensions.dart'; import 'logger.dart'; import 'managers/event.dart'; import 'options.dart'; import 'proto/livekit_models.pb.dart' as lk_models; import 'proto/livekit_rtc.pb.dart' as lk_rtc; +import 'support/disposable.dart'; +import 'support/websocket.dart'; import 'types.dart'; import 'utils.dart'; -import 'ws/interface.dart'; -class SignalClient { - final events = EventsEmitter(); +class SignalClient extends Disposable with EventsEmittable { + // final ProtocolVersion protocol; bool _connected = false; @@ -29,6 +29,11 @@ class SignalClient { events.listen((event) { logger.fine('[SignalEvent] $event'); }); + + onDispose(() async { + await events.dispose(); + await close(); + }); } bool get connected => _connected; @@ -51,7 +56,7 @@ class SignalClient { try { _ws = await LiveKitWebSocket.connect( rtcUri, - WebSocketOptions( + WebSocketEventHandlers( onData: _onSocketData, onDispose: _onSocketDone, onError: _handleError, @@ -71,8 +76,7 @@ class SignalClient { // Attempt Validation try { final validateResponse = await http.get(validateUri); - if (validateResponse.statusCode != 200) - throw ConnectException(validateResponse.body); + if (validateResponse.statusCode != 200) throw ConnectException(validateResponse.body); throw ConnectException(); } catch (error) { // Pass it up if it's already a `ConnectError` @@ -88,7 +92,7 @@ class SignalClient { String token, ) async { _connected = false; - _ws?.dispose(); + await _ws?.dispose(); _ws = null; final rtcUri = Utils.buildUri( @@ -100,7 +104,7 @@ class SignalClient { _ws = await LiveKitWebSocket.connect( rtcUri, - WebSocketOptions( + WebSocketEventHandlers( onData: _onSocketData, onDispose: _onSocketDone, onError: _handleError, @@ -110,24 +114,27 @@ class SignalClient { _connected = true; } - void close() { + Future close() async { _connected = false; - _ws?.dispose(); + await _ws?.dispose(); } - void sendOffer(rtc.RTCSessionDescription offer) => - _sendRequest(lk_rtc.SignalRequest( + // @override + // Future dispose() async { + // super.dispose(); + // await events.dispose(); + // await close(); + // } + + void sendOffer(rtc.RTCSessionDescription offer) => _sendRequest(lk_rtc.SignalRequest( offer: offer.toSDKType(), )); - void sendAnswer(rtc.RTCSessionDescription answer) => - _sendRequest(lk_rtc.SignalRequest( + void sendAnswer(rtc.RTCSessionDescription answer) => _sendRequest(lk_rtc.SignalRequest( answer: answer.toSDKType(), )); - void sendIceCandidate( - rtc.RTCIceCandidate candidate, lk_rtc.SignalTarget target) => - _sendRequest( + void sendIceCandidate(rtc.RTCIceCandidate candidate, lk_rtc.SignalTarget target) => _sendRequest( lk_rtc.SignalRequest( trickle: lk_rtc.TrickleRequest( candidateInit: candidate.toJson(), @@ -136,8 +143,7 @@ class SignalClient { ), ); - void sendMuteTrack(String trackSid, bool muted) => - _sendRequest(lk_rtc.SignalRequest( + void sendMuteTrack(String trackSid, bool muted) => _sendRequest(lk_rtc.SignalRequest( mute: lk_rtc.MuteTrackRequest( sid: trackSid, muted: muted, @@ -174,8 +180,7 @@ class SignalClient { subscription: subscription, )); - void sendSetSimulcastLayers( - String trackSid, List layers) => + void sendSetSimulcastLayers(String trackSid, List layers) => _sendRequest(lk_rtc.SignalRequest( simulcast: lk_rtc.SetSimulcastLayers( trackSid: trackSid, @@ -188,8 +193,8 @@ class SignalClient { )); void _sendRequest(lk_rtc.SignalRequest req) { - if (_ws == null) { - logger.warning('could not send message, not connected'); + if (_ws == null || isDisposed) { + logger.warning('[$objectId] Could not send message, not connected or already disposed'); return; } @@ -221,8 +226,7 @@ class SignalClient { )); break; case lk_rtc.SignalResponse_Message.update: - events.emit( - SignalParticipantUpdateEvent(updates: msg.update.participants)); + events.emit(SignalParticipantUpdateEvent(participants: msg.update.participants)); break; case lk_rtc.SignalResponse_Message.trackPublished: events.emit(SignalLocalTrackPublishedEvent( @@ -230,9 +234,8 @@ class SignalClient { track: msg.trackPublished.track, )); break; - case lk_rtc.SignalResponse_Message.speaker: - events.emit( - SignalActiveSpeakersChangedEvent(speakers: msg.speaker.speakers)); + case lk_rtc.SignalResponse_Message.speakersChanged: + events.emit(SignalSpeakersChangedEvent(speakers: msg.speakersChanged.speakers)); break; case lk_rtc.SignalResponse_Message.leave: events.emit(SignalLeaveEvent(canReconnect: msg.leave.canReconnect)); diff --git a/lib/src/support/disposable.dart b/lib/src/support/disposable.dart new file mode 100644 index 000000000..2795e3ec5 --- /dev/null +++ b/lib/src/support/disposable.dart @@ -0,0 +1,90 @@ +import 'dart:async'; + +import 'package:flutter/foundation.dart'; +import 'package:meta/meta.dart'; + +import '../extensions.dart'; +import '../logger.dart'; + +typedef OnDisposeFunc = Future Function(); + +mixin _Disposer { + // + final _disposeFuncs = []; + bool _isDisposed = false; + bool get isDisposed => _isDisposed; + + // last added func will be called first when disposing + void onDispose(OnDisposeFunc func) => _disposeFuncs.add(func); + + Future _dispose() async { + if (!_isDisposed) { + logger.fine('[${objectId}] dispose()'); + _isDisposed = true; + if (_disposeFuncs.isNotEmpty) { + logger.fine('[$objectId] running ${_disposeFuncs.length} dispose funcs...'); + // call dispose funcs in reverse order + for (final _func in _disposeFuncs.reversed) { + await _func(); + } + _disposeFuncs.clear(); + logger.fine('[$objectId] dispose complete.'); + } + return true; + } else { + logger.warning('[$objectId] unnecessary dispose() called.'); + return false; + } + } +} + +abstract class Disposable with _Disposer { + @mustCallSuper + Future dispose() async { + return await _dispose(); + } +} + +abstract class DisposableChangeNotifier extends ChangeNotifier with _Disposer { + @override + Future dispose() async { + if (!isDisposed) super.dispose(); + return await super._dispose(); + } + + @override + bool get hasListeners { + if (isDisposed) { + logger.warning('called hasListeners on a disposed ChangeNotifier'); + return false; + } + return super.hasListeners; + } + + @override + void addListener(VoidCallback listener) { + if (isDisposed) { + logger.warning('called addListener() on a disposed ChangeNotifier'); + return; + } + super.addListener(listener); + } + + @override + void notifyListeners() { + if (isDisposed) { + logger.warning('called notifyListeners() on a disposed ChangeNotifier'); + return; + } + super.notifyListeners(); + } + + @override + void removeListener(VoidCallback listener) { + if (isDisposed) { + logger.warning('called removeListener() on a disposed ChangeNotifier'); + return; + } + super.removeListener(listener); + } +} diff --git a/lib/src/support/native_audio.dart b/lib/src/support/native_audio.dart new file mode 100644 index 000000000..0af42246c --- /dev/null +++ b/lib/src/support/native_audio.dart @@ -0,0 +1,122 @@ +// https://developer.apple.com/documentation/avfaudio/avaudiosession/category +import 'package:flutter/services.dart'; + +import '../logger.dart'; + +enum AppleAudioCategory { + soloAmbient, + playback, + record, + playAndRecord, + multiRoute, +} + +// https://developer.apple.com/documentation/avfaudio/avaudiosession/categoryoptions +enum AppleAudioCategoryOption { + mixWithOthers, // Only playAndRecord, playback, or multiRoute. + duckOthers, // Only playAndRecord, playback, or multiRoute. + interruptSpokenAudioAndMixWithOthers, + allowBluetooth, // Only playAndRecord or record. + allowBluetoothA2DP, + allowAirPlay, + defaultToSpeaker, +} + +// https://developer.apple.com/documentation/avfaudio/avaudiosession/mode +enum AppleAudioMode { + default_, + gameChat, + measurement, + moviePlayback, + spokenAudio, + videoChat, + videoRecording, + voiceChat, + voicePrompt, +} + +extension AppleAudioCategoryExt on AppleAudioCategory { + String toStringValue() => { + AppleAudioCategory.soloAmbient: 'soloAmbient', + AppleAudioCategory.playback: 'playback', + AppleAudioCategory.record: 'record', + AppleAudioCategory.playAndRecord: 'playAndRecord', + AppleAudioCategory.multiRoute: 'multiRoute', + }[this]!; +} + +extension AppleAudioCategoryOptionExt on AppleAudioCategoryOption { + String toStringValue() => { + AppleAudioCategoryOption.mixWithOthers: 'mixWithOthers', + AppleAudioCategoryOption.duckOthers: 'duckOthers', + AppleAudioCategoryOption.interruptSpokenAudioAndMixWithOthers: + 'interruptSpokenAudioAndMixWithOthers', + AppleAudioCategoryOption.allowBluetooth: 'allowBluetooth', + AppleAudioCategoryOption.allowBluetoothA2DP: 'allowBluetoothA2DP', + AppleAudioCategoryOption.allowAirPlay: 'allowAirPlay', + AppleAudioCategoryOption.defaultToSpeaker: 'defaultToSpeaker', + }[this]!; +} + +extension AppleAudioModeExt on AppleAudioMode { + String toStringValue() => { + AppleAudioMode.default_: 'default', + AppleAudioMode.gameChat: 'gameChat', + AppleAudioMode.measurement: 'measurement', + AppleAudioMode.moviePlayback: 'moviePlayback', + AppleAudioMode.spokenAudio: 'spokenAudio', + AppleAudioMode.videoChat: 'videoChat', + AppleAudioMode.videoRecording: 'videoRecording', + AppleAudioMode.voiceChat: 'voiceChat', + AppleAudioMode.voicePrompt: 'voicePrompt', + }[this]!; +} + +class NativeAudioConfiguration { + final AppleAudioCategory? appleAudioCategory; + final Set? appleAudioCategoryOptions; + final AppleAudioMode? appleAudioMode; + + NativeAudioConfiguration({ + // for iOS / Mac + this.appleAudioCategory, + this.appleAudioCategoryOptions, + this.appleAudioMode, + // Android options + // ... + }); + + Map toMap() => { + if (appleAudioCategory != null) 'appleAudioCategory': appleAudioCategory!.toStringValue(), + if (appleAudioCategoryOptions != null) + 'appleAudioCategoryOptions': + appleAudioCategoryOptions!.map((e) => e.toStringValue()).toList(), + if (appleAudioMode != null) 'appleAudioMode': appleAudioMode!.toStringValue(), + }; + + NativeAudioConfiguration copyWith({ + AppleAudioCategory? appleAudioCategory, + Set? appleAudioCategoryOptions, + AppleAudioMode? appleAudioMode, + }) => + NativeAudioConfiguration( + appleAudioCategory: appleAudioCategory ?? this.appleAudioCategory, + appleAudioCategoryOptions: appleAudioCategoryOptions ?? this.appleAudioCategoryOptions, + appleAudioMode: appleAudioMode ?? this.appleAudioMode, + ); +} + +const _lkMethodChannel = MethodChannel('livekit_client'); + +Future configureNativeAudio(NativeAudioConfiguration configuration) async { + try { + final result = await _lkMethodChannel.invokeMethod( + 'configureNativeAudio', + configuration.toMap(), + ); + return result == true; + } catch (_) { + logger.warning('configureAudioSession did throw $_'); + return false; + } +} diff --git a/lib/src/ws/platform/io.dart b/lib/src/support/platforms/io.dart similarity index 54% rename from lib/src/ws/platform/io.dart rename to lib/src/support/platforms/io.dart index 417569e4c..7d6e7c946 100644 --- a/lib/src/ws/platform/io.dart +++ b/lib/src/support/platforms/io.dart @@ -2,17 +2,18 @@ import 'dart:async'; import 'dart:io' as io; import '../../logger.dart'; -import '../interface.dart'; +import '../websocket.dart'; +import '../../extensions.dart'; Future lkWebSocketConnect( Uri uri, [ - WebSocketOptions? options, + WebSocketEventHandlers? options, ]) => LiveKitWebSocketIO.connect(uri, options); class LiveKitWebSocketIO implements LiveKitWebSocket { final io.WebSocket _ws; - final WebSocketOptions? options; + final WebSocketEventHandlers? options; late final StreamSubscription _subscription; LiveKitWebSocketIO._( @@ -26,38 +27,38 @@ class LiveKitWebSocketIO implements LiveKitWebSocket { } @override - void dispose() { + Future dispose() async { + await _subscription.cancel(); + await _ws.close(); options?.onDispose?.call(); - _subscription.cancel(); - _ws.close(); } @override void send(List data) { - // 0 CONNECTING - // 1 OPEN - // 2 CLOSING - // 3 CLOSED - if (_ws.readyState == 1) { - try { - _ws.add(data); - } catch (e) { - // - } + // 0 CONNECTING, 1 OPEN, 2 CLOSING, 3 CLOSED + if (_ws.readyState != 1) { + logger.fine('[$objectId] Tried to send data (readyState: ${_ws.readyState})'); + return; + } + + try { + _ws.add(data); + } catch (_) { + logger.fine('[$objectId] send did throw ${_}'); } } static Future connect( Uri uri, [ - WebSocketOptions? options, + WebSocketEventHandlers? options, ]) async { - logger.fine('WebSocketIO connect (uri: ${uri.toString()})'); + logger.fine('[WebSocketIO] Connecting(uri: ${uri.toString()})...'); try { final ws = await io.WebSocket.connect(uri.toString()); - logger.fine('WebSocketIO connected'); + logger.fine('[WebSocketIO] Connected'); return LiveKitWebSocketIO._(ws, options); } catch (_) { - logger.severe('WebSocketIO error ${_}'); + logger.severe('[WebSocketIO] did throw ${_}'); throw WebSocketException.connect(); } } diff --git a/lib/src/ws/platform/web.dart b/lib/src/support/platforms/web.dart similarity index 83% rename from lib/src/ws/platform/web.dart rename to lib/src/support/platforms/web.dart index b1da3d7ef..953c8e764 100644 --- a/lib/src/ws/platform/web.dart +++ b/lib/src/support/platforms/web.dart @@ -4,17 +4,17 @@ import 'dart:async'; import 'dart:html' as html; import 'dart:typed_data'; -import '../interface.dart'; +import '../websocket.dart'; Future lkWebSocketConnect( Uri uri, [ - WebSocketOptions? options, + WebSocketEventHandlers? options, ]) => LiveKitWebSocketWeb.connect(uri, options); class LiveKitWebSocketWeb implements LiveKitWebSocket { final html.WebSocket _ws; - final WebSocketOptions? options; + final WebSocketEventHandlers? options; late final StreamSubscription _messageSubscription; late final StreamSubscription _closeSubscription; @@ -34,16 +34,16 @@ class LiveKitWebSocketWeb implements LiveKitWebSocket { void send(List data) => _ws.send(data); @override - void dispose() { + Future dispose() async { options?.onDispose?.call(); - _messageSubscription.cancel(); - _closeSubscription.cancel(); + await _messageSubscription.cancel(); + await _closeSubscription.cancel(); _ws.close(); } static Future connect( Uri uri, [ - WebSocketOptions? options, + WebSocketEventHandlers? options, ]) async { final completer = Completer(); final ws = html.WebSocket(uri.toString()); diff --git a/lib/src/ws/interface.dart b/lib/src/support/websocket.dart similarity index 82% rename from lib/src/ws/interface.dart rename to lib/src/support/websocket.dart index f82edf8c4..8021abf42 100644 --- a/lib/src/ws/interface.dart +++ b/lib/src/support/websocket.dart @@ -1,4 +1,4 @@ -import 'platform/io.dart' if (dart.library.html) 'platform/web.dart'; +import 'platforms/io.dart' if (dart.library.html) 'platforms/web.dart'; class WebSocketException implements Exception { final int code; @@ -18,11 +18,11 @@ typedef WebSocketOnData = Function(dynamic data); typedef WebSocketOnError = Function(dynamic error); typedef WebSocketOnDispose = Function(); -class WebSocketOptions { +class WebSocketEventHandlers { final WebSocketOnData? onData; final WebSocketOnError? onError; final WebSocketOnDispose? onDispose; - const WebSocketOptions({ + const WebSocketEventHandlers({ this.onData, this.onError, this.onDispose, @@ -31,11 +31,11 @@ class WebSocketOptions { abstract class LiveKitWebSocket { void send(List data); - void dispose(); + Future dispose(); static Future connect( Uri uri, [ - WebSocketOptions? options, + WebSocketEventHandlers? options, ]) => lkWebSocketConnect(uri, options); } diff --git a/lib/src/track/audio_track.dart b/lib/src/track/audio_track.dart index 057b3d6e5..68fc5ff56 100644 --- a/lib/src/track/audio_track.dart +++ b/lib/src/track/audio_track.dart @@ -1,29 +1,174 @@ +// import 'package:audio_session/audio_session.dart' as _as; +import 'dart:io'; + +import 'package:flutter/foundation.dart'; import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; +import 'package:synchronized/synchronized.dart' as sync; +import '../logger.dart'; import '../proto/livekit_models.pb.dart' as lk_models; +import '../support/native_audio.dart'; import '_audio_api.dart' if (dart.library.html) '_audio_html.dart' as audio; import 'local_audio_track.dart'; import 'track.dart'; +enum AudioTrackState { + none, + remoteOnly, + localOnly, + localAndRemote, +} + +typedef ConfigureNativeAudioFunc = Future Function(AudioTrackState state); + class AudioTrack extends Track { + // it's possible to set custom function here to customize audio session configuration + static ConfigureNativeAudioFunc nativeAudioConfigurationForAudioTrackState = + defaultNativeAudioConfigurationFunc; + + static final _trackCounterLock = sync.Lock(); + static AudioTrackState audioTrackState = AudioTrackState.none; + static int _localTrackCount = 0; + static int _remoteTrackCount = 0; + rtc.MediaStream? mediaStream; - AudioTrack(String name, rtc.MediaStreamTrack track, this.mediaStream) - : super(lk_models.TrackType.AUDIO, name, track); + AudioTrack( + String name, + rtc.MediaStreamTrack track, + this.mediaStream, + ) : super( + lk_models.TrackType.AUDIO, + name, + track, + ); /// Start playing audio track. On web platform, create an audio element and /// start playback - void start() { - if (this is! LocalAudioTrack) { - audio.startAudio(getCid(), mediaStreamTrack); + @override + Future start() async { + final didStart = await super.start(); + if (didStart) { + if (this is! LocalAudioTrack) { + audio.startAudio(getCid(), mediaStreamTrack); + } + + // update counter + await _trackCounterLock.synchronized(() async { + if (this is LocalAudioTrack) { + _localTrackCount++; + } else if (this is! LocalAudioTrack) { + _remoteTrackCount++; + } + await _onAudioTrackCountDidChange(); + }); } + + return didStart; } @override - Future stop() async { - await mediaStream?.dispose(); - mediaStream = null; - audio.stopAudio(getCid()); - await super.stop(); + Future stop() async { + final didStop = await super.stop(); + if (didStop) { + await mediaStream?.dispose(); + mediaStream = null; + audio.stopAudio(getCid()); + + // update counter + await _trackCounterLock.synchronized(() async { + if (this is LocalAudioTrack) { + _localTrackCount--; + } else if (this is! LocalAudioTrack) { + _remoteTrackCount--; + } + await _onAudioTrackCountDidChange(); + }); + } + + return didStop; } + + Future _onAudioTrackCountDidChange() async { + logger.fine('[$runtimeType] onAudioTrackCountDidChange: ' + 'local: $_localTrackCount, remote: $_remoteTrackCount'); + + final newState = _computeAudioTrackState(); + + if (audioTrackState != newState) { + audioTrackState = newState; + logger.fine('[$runtimeType] didUpdateSate: $audioTrackState'); + + NativeAudioConfiguration? config; + if (!kIsWeb && Platform.isIOS) { + // Only iOS for now... + config = await nativeAudioConfigurationForAudioTrackState.call(audioTrackState); + } + + if (config != null) { + logger.fine('[$runtimeType] configuring for ${audioTrackState} using ${config}...'); + try { + await configureNativeAudio(config); + } catch (error) { + logger.warning('[$runtimeType] Failed to configure ${error}'); + } + } + } + } + + static AudioTrackState _computeAudioTrackState() { + if (_localTrackCount > 0 && _remoteTrackCount == 0) { + return AudioTrackState.localOnly; + } else if (_localTrackCount == 0 && _remoteTrackCount > 0) { + return AudioTrackState.remoteOnly; + } else if (_localTrackCount > 0 && _remoteTrackCount > 0) { + return AudioTrackState.localAndRemote; + } + // Default + return AudioTrackState.none; + } +} + +Future defaultNativeAudioConfigurationFunc(AudioTrackState state) async { + // + if (state == AudioTrackState.remoteOnly) { + return NativeAudioConfiguration( + appleAudioCategory: AppleAudioCategory.playback, + appleAudioCategoryOptions: { + AppleAudioCategoryOption.mixWithOthers, + // IosAudioCategoryOption.duckOthers, + }, + appleAudioMode: AppleAudioMode.spokenAudio, + ); + } else if ([ + AudioTrackState.localOnly, + AudioTrackState.localAndRemote, + ].contains(state)) { + return NativeAudioConfiguration( + appleAudioCategory: AppleAudioCategory.playAndRecord, + appleAudioCategoryOptions: { + AppleAudioCategoryOption.allowBluetooth, + AppleAudioCategoryOption.mixWithOthers, + // IosAudioCategoryOption.duckOthers, + }, + appleAudioMode: AppleAudioMode.voiceChat, + ); + } + + // TODO: .record category causes exception in WebRTC lib for unknown reason + // if (this == AudioTrackState.localOnly) { + // return NativeAudioConfiguration( + // iosCategory: IosAudioCategory.record, + // iosCategoryOptions: { + // // IosAudioCategoryOption.allowBluetooth, + // }, + // iosMode: IosAudioMode.spokenAudio, + // ); + // } + + return NativeAudioConfiguration( + appleAudioCategory: AppleAudioCategory.soloAmbient, + appleAudioCategoryOptions: {}, + appleAudioMode: AppleAudioMode.default_, + ); } diff --git a/lib/src/track/local_audio_track.dart b/lib/src/track/local_audio_track.dart index 830090c47..75b80b01d 100644 --- a/lib/src/track/local_audio_track.dart +++ b/lib/src/track/local_audio_track.dart @@ -2,7 +2,7 @@ import 'dart:async'; import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; -import '../errors.dart'; +import '../exceptions.dart'; import 'audio_track.dart'; import 'options.dart'; @@ -15,8 +15,12 @@ class LocalAudioTrack extends AudioTrack { /// Creates a new audio track from the default audio input device. static Future create([LocalAudioTrackOptions? options]) async { - // try { + // TODO: have back up incase the options fail final stream = await rtc.navigator.mediaDevices.getUserMedia({ + // 'audio': { + // 'echoCancellation': true, + // 'noiseSuppression': true, + // }, 'audio': true, 'video': false, }); diff --git a/lib/src/track/local_video_track.dart b/lib/src/track/local_video_track.dart index ae8e10609..c7197f69d 100644 --- a/lib/src/track/local_video_track.dart +++ b/lib/src/track/local_video_track.dart @@ -1,6 +1,6 @@ import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; -import '../errors.dart'; +import '../exceptions.dart'; import '../logger.dart'; import 'options.dart'; import 'track.dart'; diff --git a/lib/src/track/remote_track_publication.dart b/lib/src/track/remote_track_publication.dart index bd0792e50..fef7ba69f 100644 --- a/lib/src/track/remote_track_publication.dart +++ b/lib/src/track/remote_track_publication.dart @@ -1,11 +1,9 @@ -import 'package:livekit_client/livekit_client.dart'; - +import '../events.dart'; +import '../extensions.dart'; import '../participant/remote_participant.dart'; import '../proto/livekit_models.pb.dart' as lk_models; import '../proto/livekit_rtc.pb.dart' as lk_rtc; import 'track.dart'; -import '../extensions.dart'; - import 'track_publication.dart'; /// Represents a track publication from a RemoteParticipant. Provides methods to diff --git a/lib/src/track/track.dart b/lib/src/track/track.dart index af7ea5b30..ceddb3ece 100644 --- a/lib/src/track/track.dart +++ b/lib/src/track/track.dart @@ -1,13 +1,17 @@ +import 'package:flutter/material.dart'; import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; -import 'package:livekit_client/src/classes/change_notifier.dart'; +import 'package:meta/meta.dart'; import 'package:uuid/uuid.dart'; +import '../extensions.dart'; +import '../logger.dart'; import '../proto/livekit_models.pb.dart' as lk_models; +import '../support/disposable.dart'; /// Wrapper around a MediaStreamTrack with additional metadata. /// Base for [AudioTrack] and [VideoTrack], /// can not be instantiated directly. -abstract class Track extends LKChangeNotifier { +abstract class Track extends DisposableChangeNotifier { static const cameraName = 'camera'; static const screenShareName = 'screen'; @@ -19,6 +23,10 @@ abstract class Track extends LKChangeNotifier { rtc.RTCRtpTransceiver? transceiver; String? _cid; + // started / stopped + bool _active = false; + bool get isActive => _active; + Track( this.kind, this.name, @@ -50,7 +58,33 @@ abstract class Track extends LKChangeNotifier { return cid; } - Future stop() async { - await mediaStreamTrack.stop(); + // returns true if started, false if already started + @mustCallSuper + Future start() async { + if (_active) { + // already started + return false; + } + + _active = true; + return true; + } + + // returns true if stopped, false if already stopped + @mustCallSuper + Future stop() async { + if (!_active) { + // already stopped + return false; + } + + try { + await mediaStreamTrack.stop(); + } catch (_) { + logger.warning('[$objectId] rtc.mediaStreamTrack.stop() did throw ${_}'); + } + + _active = false; + return true; } } diff --git a/lib/src/track/track_publication.dart b/lib/src/track/track_publication.dart index 4b518381d..a476aad31 100644 --- a/lib/src/track/track_publication.dart +++ b/lib/src/track/track_publication.dart @@ -1,3 +1,4 @@ +import '../support/disposable.dart'; import '../proto/livekit_models.pb.dart' as lk_models; import '../types.dart'; import 'track.dart'; @@ -8,9 +9,9 @@ import 'track.dart'; /// Base for [RemoteTrackPublication] and [LocalTrackPublication], /// can not be instantiated directly. -abstract class TrackPublication { - final String name; +abstract class TrackPublication extends Disposable { final String sid; + final String name; final lk_models.TrackType kind; Track? track; diff --git a/lib/src/track/video_track.dart b/lib/src/track/video_track.dart index 5da706bfe..12fe00350 100644 --- a/lib/src/track/video_track.dart +++ b/lib/src/track/video_track.dart @@ -5,6 +5,7 @@ import 'track.dart'; /// A video track will notify when its mediaTrack has changed. class VideoTrack extends Track { + // rtc.MediaStream _mediaStream; VideoTrack( @@ -27,9 +28,12 @@ class VideoTrack extends Track { } @override - Future stop() async { - await super.stop(); - await _mediaStream.dispose(); + Future stop() async { + final didStop = await super.stop(); + if (didStop) { + await _mediaStream.dispose(); + } // _mediaStream = null; + return didStop; } } diff --git a/lib/src/transport.dart b/lib/src/transport.dart index e31f79d52..f21cad475 100644 --- a/lib/src/transport.dart +++ b/lib/src/transport.dart @@ -5,13 +5,14 @@ import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc; import 'constants.dart'; import 'extensions.dart'; import 'logger.dart'; +import 'support/disposable.dart'; import 'types.dart'; import 'utils.dart'; typedef PCTransportOnOffer = void Function(rtc.RTCSessionDescription offer); /// a wrapper around PeerConnection -class PCTransport { +class PCTransport extends Disposable { final rtc.RTCPeerConnection pc; final List _pendingCandidates = []; bool restartingIce = false; @@ -20,11 +21,42 @@ class PCTransport { Function? _cancelDebounce; // private constructor - PCTransport._(this.pc); + PCTransport._(this.pc) { + // + onDispose(() async { + _cancelDebounce?.call(); + _cancelDebounce = null; + + // Ensure callbacks won't fire any more + pc.onRenegotiationNeeded = null; + pc.onIceCandidate = null; + pc.onIceConnectionState = null; + pc.onTrack = null; + + // Remove all senders + List senders = []; + try { + senders = await pc.getSenders(); + } catch (_) { + logger.warning('getSenders() failed with error: $_'); + } + + for (final e in senders) { + try { + await pc.removeTrack(e); + } catch (_) { + logger.warning('removeTrack() failed with error: $_'); + } + } + + await pc.close(); + await pc.dispose(); + }); + } static Future create([RTCConfiguration? rtcConfig]) async { rtcConfig ??= const RTCConfiguration(); - logger.fine('PCTransport creating ${rtcConfig.toMap()}'); + logger.fine('[PCTransport] creating ${rtcConfig.toMap()}'); final _ = await rtc.createPeerConnection(rtcConfig.toMap()); return PCTransport._(_); } @@ -35,39 +67,19 @@ class PCTransport { wait: Timeouts.debounce, ); - Future dispose() async { - logger.fine('${objectId} dispose()'); - // Ensure debounce won't fire - _cancelDebounce?.call(); - _cancelDebounce = null; + // @override + // Future dispose() async { + // super.dispose(); + // // Ensure debounce won't fire - // Ensure callbacks won't fire any more - pc.onRenegotiationNeeded = null; - pc.onIceCandidate = null; - pc.onIceConnectionState = null; - pc.onTrack = null; + // } - // Remove all senders - List senders = []; - try { - senders = await pc.getSenders(); - } catch (_) { - logger.warning('getSenders() failed with error: $_'); - } - - for (final e in senders) { - try { - await pc.removeTrack(e); - } catch (_) { - logger.warning('removeTrack() failed with error: $_'); - } + Future setRemoteDescription(rtc.RTCSessionDescription sd) async { + if (isDisposed) { + logger.warning('[$objectId] setRemoteDescription() already disposed'); + return; } - await pc.close(); - await pc.dispose(); - } - - Future setRemoteDescription(rtc.RTCSessionDescription sd) async { await pc.setRemoteDescription(sd); for (final candidate in _pendingCandidates) { @@ -84,6 +96,11 @@ class PCTransport { } Future createAndSendOffer([RTCOfferOptions? options]) async { + if (isDisposed) { + logger.warning('[$objectId] createAndSendOffer() already disposed'); + return; + } + if (onOffer == null) { logger.warning('onOffer is null'); return; @@ -116,6 +133,11 @@ class PCTransport { } Future addIceCandidate(rtc.RTCIceCandidate candidate) async { + if (isDisposed) { + logger.warning('[$objectId] addIceCandidate() already disposed'); + return; + } + final desc = await getRemoteDescription(); if (desc != null && !restartingIce) { @@ -127,6 +149,11 @@ class PCTransport { } Future getRemoteDescription() async { + if (isDisposed) { + logger.warning('[$objectId] getRemoteDescription() already disposed'); + return null; + } + // Checking agains null doesn't work as intended // if (pc.iceConnectionState == null) return null; diff --git a/pubspec.lock b/pubspec.lock index 0d7a85544..d1a0e23b6 100644 --- a/pubspec.lock +++ b/pubspec.lock @@ -99,11 +99,11 @@ packages: dependency: "direct main" description: path: "." - ref: use-custom-webrtc-build - resolved-ref: "4942e7faec2e5775d35c42e22c2929ca6ca53769" - url: "https://github.com/livekit/flutter-webrtc" + ref: master + resolved-ref: "32d83d85fa2faebcd37a19534f8a84273b12cbce" + url: "https://github.com/flutter-webrtc/flutter-webrtc" source: git - version: "0.6.7" + version: "0.6.8" http: dependency: "direct main" description: diff --git a/pubspec.yaml b/pubspec.yaml index 5eb9b4a14..0ace21913 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -16,28 +16,21 @@ dependencies: uuid: ^3.0.4 synchronized: ^3.0.0 protobuf: ^2.0.0 - - flutter_webrtc: - git: - url: https://github.com/livekit/flutter-webrtc - ref: use-custom-webrtc-build - - # ^0.6.7 - # path: ../../repos_livekit/flutter-webrtc/ + + flutter_webrtc: # This will use custom webrtc build from # https://github.com/webrtc-sdk/Specs/releases - - # protobuf: - # git: - # url: https://github.com/google/protobuf.dart.git - # ref: master - # path: protobuf/ - - # WebSocketChannel has design flaws - # https://github.com/dart-lang/web_socket_channel/issues/25 - # web_socket_channel: ^2.1.0 + git: + url: https://github.com/flutter-webrtc/flutter-webrtc + ref: master dev_dependencies: flutter_test: sdk: flutter flutter_lints: ^1.0.4 + +flutter: + plugin: + platforms: + ios: + pluginClass: LiveKitClientPlugin