From 34dcdcd157f8d6966d293e5b61272d1845688711 Mon Sep 17 00:00:00 2001 From: SteveMcFarlin Date: Thu, 29 Jul 2021 10:04:41 -0700 Subject: [PATCH] Dominant Speaker Event (#603) Implementation of Dominant Speaker Identification for Multipoint Videoconferencing by Ilana Volfin and Israel Cohen. This implementation uses the RTP Audio Level extension from RFC-6464 for the input signal --- lib/ActiveSpeakerObserver.d.ts | 28 + lib/ActiveSpeakerObserver.d.ts.map | 1 + lib/ActiveSpeakerObserver.js | 40 ++ lib/Router.d.ts | 5 + lib/Router.d.ts.map | 2 +- lib/Router.js | 26 + lib/types.d.ts | 1 + lib/types.d.ts.map | 2 +- lib/types.js | 1 + rust/src/lib.rs | 6 + rust/src/messages.rs | 25 + rust/src/router.rs | 79 ++- rust/src/router/active_speaker_observer.rs | 417 +++++++++++++ .../router/active_speaker_observer/tests.rs | 60 ++ rust/src/router/rtp_observer.rs | 1 + .../integration/active_speaker_observer.rs | 198 ++++++ rust/tests/integration/main.rs | 1 + rust/tests/integration/smoke.rs | 19 + src/ActiveSpeakerObserver.ts | 68 +++ src/Router.ts | 44 ++ src/types.ts | 1 + test/test-ActiveSpeakerObserver.js | 123 ++++ worker/include/Channel/ChannelRequest.hpp | 1 + worker/include/RTC/ActiveSpeakerObserver.hpp | 96 +++ worker/mediasoup-worker.gyp | 2 + worker/src/Channel/ChannelRequest.cpp | 1 + worker/src/RTC/ActiveSpeakerObserver.cpp | 564 ++++++++++++++++++ worker/src/RTC/Router.cpp | 20 + 28 files changed, 1824 insertions(+), 8 deletions(-) create mode 100644 lib/ActiveSpeakerObserver.d.ts create mode 100644 lib/ActiveSpeakerObserver.d.ts.map create mode 100644 lib/ActiveSpeakerObserver.js create mode 100644 rust/src/router/active_speaker_observer.rs create mode 100644 rust/src/router/active_speaker_observer/tests.rs create mode 100644 rust/tests/integration/active_speaker_observer.rs create mode 100644 src/ActiveSpeakerObserver.ts create mode 100644 test/test-ActiveSpeakerObserver.js create mode 100644 worker/include/RTC/ActiveSpeakerObserver.hpp create mode 100644 worker/src/RTC/ActiveSpeakerObserver.cpp diff --git a/lib/ActiveSpeakerObserver.d.ts b/lib/ActiveSpeakerObserver.d.ts new file mode 100644 index 0000000000..d60f8effa7 --- /dev/null +++ b/lib/ActiveSpeakerObserver.d.ts @@ -0,0 +1,28 @@ +import { EnhancedEventEmitter } from './EnhancedEventEmitter'; +import { RtpObserver } from './RtpObserver'; +import { Producer } from './Producer'; +export interface ActiveSpeakerObserverOptions { + interval?: number; + /** + * Custom application data. + */ + appData?: any; +} +export interface ActiveSpeakerObserverActivity { + /** + * The producer instance. + */ + producer: Producer; +} +export declare class ActiveSpeakerObserver extends RtpObserver { + /** + * @private + */ + constructor(params: any); + /** + * Observer. + */ + get observer(): EnhancedEventEmitter; + private _handleWorkerNotifications; +} +//# sourceMappingURL=ActiveSpeakerObserver.d.ts.map \ No newline at end of file diff --git a/lib/ActiveSpeakerObserver.d.ts.map b/lib/ActiveSpeakerObserver.d.ts.map new file mode 100644 index 0000000000..205673cc98 --- /dev/null +++ b/lib/ActiveSpeakerObserver.d.ts.map @@ -0,0 +1 @@ +{"version":3,"file":"ActiveSpeakerObserver.d.ts","sourceRoot":"","sources":["../src/ActiveSpeakerObserver.ts"],"names":[],"mappings":"AACA,OAAO,EAAE,oBAAoB,EAAE,MAAM,wBAAwB,CAAC;AAC9D,OAAO,EAAE,WAAW,EAAE,MAAM,eAAe,CAAC;AAC5C,OAAO,EAAE,QAAQ,EAAE,MAAM,YAAY,CAAC;AAEtC,MAAM,WAAW,4BAA4B;IAC5C,QAAQ,CAAC,EAAE,MAAM,CAAC;IAClB;;OAEG;IACH,OAAO,CAAC,EAAE,GAAG,CAAC;CACd;AAED,MAAM,WAAW,6BAA6B;IAC7C;;OAEG;IACH,QAAQ,EAAE,QAAQ,CAAC;CACnB;AAID,qBAAa,qBAAsB,SAAQ,WAAW;IAErD;;OAEG;gBACS,MAAM,EAAE,GAAG;IAOvB;;OAEG;IACH,IAAI,QAAQ,IAAI,oBAAoB,CAGnC;IAED,OAAO,CAAC,0BAA0B;CAyBlC"} \ No newline at end of file diff --git a/lib/ActiveSpeakerObserver.js b/lib/ActiveSpeakerObserver.js new file mode 100644 index 0000000000..f60feb846b --- /dev/null +++ b/lib/ActiveSpeakerObserver.js @@ -0,0 +1,40 @@ +"use strict"; +Object.defineProperty(exports, "__esModule", { value: true }); +const Logger_1 = require("./Logger"); +const RtpObserver_1 = require("./RtpObserver"); +const logger = new Logger_1.Logger('ActiveSpeakerObserver'); +class ActiveSpeakerObserver extends RtpObserver_1.RtpObserver { + /** + * @private + */ + constructor(params) { + super(params); + this._handleWorkerNotifications(); + } + /** + * Observer. + */ + get observer() { + return this._observer; + } + _handleWorkerNotifications() { + this._channel.on(this._internal.rtpObserverId, (event, data) => { + switch (event) { + case 'dominantspeaker': + { + const dominantSpeaker = { + producer: this._getProducerById(data.producerId) + }; + this.safeEmit('dominantspeaker', dominantSpeaker); + this._observer.safeEmit('dominantspeaker', dominantSpeaker); + break; + } + default: + { + logger.error('ignoring unknown event "%s"', event); + } + } + }); + } +} +exports.ActiveSpeakerObserver = ActiveSpeakerObserver; diff --git a/lib/Router.d.ts b/lib/Router.d.ts index e4df55b57b..91946faf33 100644 --- a/lib/Router.d.ts +++ b/lib/Router.d.ts @@ -10,6 +10,7 @@ import { Producer } from './Producer'; import { Consumer } from './Consumer'; import { DataProducer } from './DataProducer'; import { DataConsumer } from './DataConsumer'; +import { ActiveSpeakerObserver, ActiveSpeakerObserverOptions } from './ActiveSpeakerObserver'; import { AudioLevelObserver, AudioLevelObserverOptions } from './AudioLevelObserver'; import { RtpCapabilities, RtpCodecCapability } from './RtpParameters'; import { NumSctpStreams } from './SctpParameters'; @@ -167,6 +168,10 @@ export declare class Router extends EnhancedEventEmitter { * Pipes the given Producer or DataProducer into another Router in same host. */ pipeToRouter({ producerId, dataProducerId, router, listenIp, enableSctp, numSctpStreams, enableRtx, enableSrtp }: PipeToRouterOptions): Promise; + /** + * Create an ActiveSpeakerObserver + */ + createActiveSpeakerObserver({ interval, appData }?: ActiveSpeakerObserverOptions): Promise; /** * Create an AudioLevelObserver. */ diff --git a/lib/Router.d.ts.map b/lib/Router.d.ts.map index 1998a2c51d..76fb497940 100644 --- a/lib/Router.d.ts.map +++ b/lib/Router.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"Router.d.ts","sourceRoot":"","sources":["../src/Router.ts"],"names":[],"mappings":"AAGA,OAAO,EAAE,oBAAoB,EAAE,MAAM,wBAAwB,CAAC;AAG9D,OAAO,EAAE,OAAO,EAAE,MAAM,WAAW,CAAC;AACpC,OAAO,EAAE,cAAc,EAAE,MAAM,kBAAkB,CAAC;AAClD,OAAO,EAAa,iBAAiB,EAAE,MAAM,aAAa,CAAC;AAC3D,OAAO,EAAE,eAAe,EAAE,sBAAsB,EAAE,MAAM,mBAAmB,CAAC;AAC5E,OAAO,EAAE,cAAc,EAAE,qBAAqB,EAAE,MAAM,kBAAkB,CAAC;AACzE,OAAO,EAAE,aAAa,EAAE,oBAAoB,EAAE,MAAM,iBAAiB,CAAC;AACtE,OAAO,EAAE,eAAe,EAAE,sBAAsB,EAAE,MAAM,mBAAmB,CAAC;AAC5E,OAAO,EAAE,QAAQ,EAAE,MAAM,YAAY,CAAC;AACtC,OAAO,EAAE,QAAQ,EAAE,MAAM,YAAY,CAAC;AACtC,OAAO,EAAE,YAAY,EAAE,MAAM,gBAAgB,CAAC;AAC9C,OAAO,EAAE,YAAY,EAAE,MAAM,gBAAgB,CAAC;AAE9C,OAAO,EAAE,kBAAkB,EAAE,yBAAyB,EAAE,MAAM,sBAAsB,CAAC;AACrF,OAAO,EAAE,eAAe,EAAE,kBAAkB,EAAE,MAAM,iBAAiB,CAAC;AACtE,OAAO,EAAE,cAAc,EAAE,MAAM,kBAAkB,CAAC;AAElD,oBAAY,aAAa,GACzB;IACC;;OAEG;IACH,WAAW,CAAC,EAAE,kBAAkB,EAAE,CAAC;IAEnC;;OAEG;IACH,OAAO,CAAC,EAAE,GAAG,CAAC;CACd,CAAA;AAED,oBAAY,mBAAmB,GAC/B;IACC;;OAEG;IACH,UAAU,CAAC,EAAE,MAAM,CAAC;IAEpB;;OAEG;IACH,cAAc,CAAC,EAAE,MAAM,CAAC;IAExB;;OAEG;IACH,MAAM,EAAE,MAAM,CAAC;IAEf;;OAEG;IACH,QAAQ,CAAC,EAAE,iBAAiB,GAAG,MAAM,CAAC;IAEtC;;OAEG;IACH,UAAU,CAAC,EAAE,OAAO,CAAC;IAErB;;OAEG;IACH,cAAc,CAAC,EAAE,cAAc,CAAC;IAEhC;;OAEG;IACH,SAAS,CAAC,EAAE,OAAO,CAAC;IAEpB;;OAEG;IACH,UAAU,CAAC,EAAE,OAAO,CAAC;CACrB,CAAA;AAED,oBAAY,kBAAkB,GAC9B;IACC;;OAEG;IACH,YAAY,CAAC,EAAE,QAAQ,CAAC;IAExB;;OAEG;IACH,YAAY,CAAC,EAAE,QAAQ,CAAC;IAExB;;OAEG;IACH,gBAAgB,CAAC,EAAE,YAAY,CAAC;IAEhC;;OAEG;IACH,gBAAgB,CAAC,EAAE,YAAY,CAAC;CAChC,CAAA;AAID,qBAAa,MAAO,SAAQ,oBAAoB;IAG/C,OAAO,CAAC,QAAQ,CAAC,SAAS,CAGxB;IAGF,OAAO,CAAC,QAAQ,CAAC,KAAK,CAGrB;IAGD,OAAO,CAAC,QAAQ,CAAC,QAAQ,CAAU;IAGnC,OAAO,CAAC,QAAQ,CAAC,eAAe,CAAiB;IAGjD,OAAO,CAAC,OAAO,CAAS;IAGxB,OAAO,CAAC,QAAQ,CAAC,QAAQ,CAAC,CAAM;IAGhC,OAAO,CAAC,QAAQ,CAAC,WAAW,CAAqC;IAGjE,OAAO,CAAC,QAAQ,CAAC,UAAU,CAAoC;IAG/D,OAAO,CAAC,QAAQ,CAAC,aAAa,CAAuC;IAGrE,OAAO,CAAC,QAAQ,CAAC,cAAc,CAAwC;IAGvE,OAAO,CAAC,QAAQ,CAAC,wBAAwB,CAA2C;IAGpF,OAAO,CAAC,QAAQ,CAAC,kBAAkB,CACsB;IAGzD,OAAO,CAAC,QAAQ,CAAC,SAAS,CAA8B;IAExD;;;;OAIG;gBAEF,EACC,QAAQ,EACR,IAAI,EACJ,OAAO,EACP,cAAc,EACd,OAAO,EACP,EACD;QACC,QAAQ,EAAE,GAAG,CAAC;QACd,IAAI,EAAE,GAAG,CAAC;QACV,OAAO,EAAE,OAAO,CAAC;QACjB,cAAc,EAAE,cAAc,CAAC;QAC/B,OAAO,CAAC,EAAE,GAAG,CAAC;KACd;IAcF;;OAEG;IACH,IAAI,EAAE,IAAI,MAAM,CAGf;IAED;;OAEG;IACH,IAAI,MAAM,IAAI,OAAO,CAGpB;IAED;;OAEG;IACH,IAAI,eAAe,IAAI,eAAe,CAGrC;IAED;;OAEG;IACH,IAAI,OAAO,IAAI,GAAG,CAGjB;IAED;;OAEG;IACH,IAAI,OAAO,CAAC,OAAO,EAAE,GAAG,EAGvB;IAED;;;;;;OAMG;IACH,IAAI,QAAQ,IAAI,oBAAoB,CAGnC;IAED;;OAEG;IACH,KAAK,IAAI,IAAI;IA4Cb;;;;OAIG;IACH,YAAY,IAAI,IAAI;IAsCpB;;OAEG;IACG,IAAI,IAAI,OAAO,CAAC,GAAG,CAAC;IAO1B;;OAEG;IACG,qBAAqB,CAC1B,EACC,SAAS,EACT,IAAI,EACJ,SAAgB,EAChB,SAAiB,EACjB,SAAiB,EACjB,SAAiB,EACjB,+BAAwC,EACxC,UAAkB,EAClB,cAAwC,EACxC,kBAA2B,EAC3B,kBAA2B,EAC3B,OAAY,EACZ,EAAE,sBAAsB,GACvB,OAAO,CAAC,eAAe,CAAC;IAgF3B;;OAEG;IACG,oBAAoB,CACzB,EACC,QAAQ,EACR,IAAI,EACJ,OAAc,EACd,OAAe,EACf,UAAkB,EAClB,cAAwC,EACxC,kBAA2B,EAC3B,kBAA2B,EAC3B,UAAkB,EAClB,eAA2C,EAC3C,OAAY,EACZ,EAAE,qBAAqB,GACtB,OAAO,CAAC,cAAc,CAAC;IA6E1B;;OAEG;IACG,uBAAuB,CAC5B,OAAO,EAAE,qBAAqB,GAC5B,OAAO,CAAC,cAAc,CAAC;IAQ1B;;OAEG;IACG,mBAAmB,CACxB,EACC,QAAQ,EACR,IAAI,EACJ,UAAkB,EAClB,cAAwC,EACxC,kBAA8B,EAC9B,kBAA8B,EAC9B,SAAiB,EACjB,UAAkB,EAClB,OAAY,EACZ,EAAE,oBAAoB,GACrB,OAAO,CAAC,aAAa,CAAC;IA2EzB;;OAEG;IACG,qBAAqB,CAC1B,EACC,cAAuB,EACvB,OAAY,EACZ,GAAE,sBAGF,GACC,OAAO,CAAC,eAAe,CAAC;IA2C3B;;OAEG;IACG,YAAY,CACjB,EACC,UAAU,EACV,cAAc,EACd,MAAM,EACN,QAAsB,EACtB,UAAiB,EACjB,cAAwC,EACxC,SAAiB,EACjB,UAAkB,EAClB,EAAE,mBAAmB,GACpB,OAAO,CAAC,kBAAkB,CAAC;IAmO9B;;OAEG;IACG,wBAAwB,CAC7B,EACC,UAAc,EACd,SAAe,EACf,QAAe,EACf,OAAY,EACZ,GAAE,yBAA8B,GAC/B,OAAO,CAAC,kBAAkB,CAAC;IAmC9B;;OAEG;IACH,UAAU,CACT,EACC,UAAU,EACV,eAAe,EACf,EACD;QACC,UAAU,EAAE,MAAM,CAAC;QACnB,eAAe,EAAE,eAAe,CAAC;KACjC,GACC,OAAO;CAuBV"} \ No newline at end of file +{"version":3,"file":"Router.d.ts","sourceRoot":"","sources":["../src/Router.ts"],"names":[],"mappings":"AAGA,OAAO,EAAE,oBAAoB,EAAE,MAAM,wBAAwB,CAAC;AAG9D,OAAO,EAAE,OAAO,EAAE,MAAM,WAAW,CAAC;AACpC,OAAO,EAAE,cAAc,EAAE,MAAM,kBAAkB,CAAC;AAClD,OAAO,EAAa,iBAAiB,EAAE,MAAM,aAAa,CAAC;AAC3D,OAAO,EAAE,eAAe,EAAE,sBAAsB,EAAE,MAAM,mBAAmB,CAAC;AAC5E,OAAO,EAAE,cAAc,EAAE,qBAAqB,EAAE,MAAM,kBAAkB,CAAC;AACzE,OAAO,EAAE,aAAa,EAAE,oBAAoB,EAAE,MAAM,iBAAiB,CAAC;AACtE,OAAO,EAAE,eAAe,EAAE,sBAAsB,EAAE,MAAM,mBAAmB,CAAC;AAC5E,OAAO,EAAE,QAAQ,EAAE,MAAM,YAAY,CAAC;AACtC,OAAO,EAAE,QAAQ,EAAE,MAAM,YAAY,CAAC;AACtC,OAAO,EAAE,YAAY,EAAE,MAAM,gBAAgB,CAAC;AAC9C,OAAO,EAAE,YAAY,EAAE,MAAM,gBAAgB,CAAC;AAE9C,OAAO,EAAE,qBAAqB,EAAE,4BAA4B,EAAE,MAAM,yBAAyB,CAAC;AAC9F,OAAO,EAAE,kBAAkB,EAAE,yBAAyB,EAAE,MAAM,sBAAsB,CAAC;AACrF,OAAO,EAAE,eAAe,EAAE,kBAAkB,EAAE,MAAM,iBAAiB,CAAC;AACtE,OAAO,EAAE,cAAc,EAAE,MAAM,kBAAkB,CAAC;AAElD,oBAAY,aAAa,GACzB;IACC;;OAEG;IACH,WAAW,CAAC,EAAE,kBAAkB,EAAE,CAAC;IAEnC;;OAEG;IACH,OAAO,CAAC,EAAE,GAAG,CAAC;CACd,CAAA;AAED,oBAAY,mBAAmB,GAC/B;IACC;;OAEG;IACH,UAAU,CAAC,EAAE,MAAM,CAAC;IAEpB;;OAEG;IACH,cAAc,CAAC,EAAE,MAAM,CAAC;IAExB;;OAEG;IACH,MAAM,EAAE,MAAM,CAAC;IAEf;;OAEG;IACH,QAAQ,CAAC,EAAE,iBAAiB,GAAG,MAAM,CAAC;IAEtC;;OAEG;IACH,UAAU,CAAC,EAAE,OAAO,CAAC;IAErB;;OAEG;IACH,cAAc,CAAC,EAAE,cAAc,CAAC;IAEhC;;OAEG;IACH,SAAS,CAAC,EAAE,OAAO,CAAC;IAEpB;;OAEG;IACH,UAAU,CAAC,EAAE,OAAO,CAAC;CACrB,CAAA;AAED,oBAAY,kBAAkB,GAC9B;IACC;;OAEG;IACH,YAAY,CAAC,EAAE,QAAQ,CAAC;IAExB;;OAEG;IACH,YAAY,CAAC,EAAE,QAAQ,CAAC;IAExB;;OAEG;IACH,gBAAgB,CAAC,EAAE,YAAY,CAAC;IAEhC;;OAEG;IACH,gBAAgB,CAAC,EAAE,YAAY,CAAC;CAChC,CAAA;AAID,qBAAa,MAAO,SAAQ,oBAAoB;IAG/C,OAAO,CAAC,QAAQ,CAAC,SAAS,CAGxB;IAGF,OAAO,CAAC,QAAQ,CAAC,KAAK,CAGrB;IAGD,OAAO,CAAC,QAAQ,CAAC,QAAQ,CAAU;IAGnC,OAAO,CAAC,QAAQ,CAAC,eAAe,CAAiB;IAGjD,OAAO,CAAC,OAAO,CAAS;IAGxB,OAAO,CAAC,QAAQ,CAAC,QAAQ,CAAC,CAAM;IAGhC,OAAO,CAAC,QAAQ,CAAC,WAAW,CAAqC;IAGjE,OAAO,CAAC,QAAQ,CAAC,UAAU,CAAoC;IAG/D,OAAO,CAAC,QAAQ,CAAC,aAAa,CAAuC;IAGrE,OAAO,CAAC,QAAQ,CAAC,cAAc,CAAwC;IAGvE,OAAO,CAAC,QAAQ,CAAC,wBAAwB,CAA2C;IAGpF,OAAO,CAAC,QAAQ,CAAC,kBAAkB,CACsB;IAGzD,OAAO,CAAC,QAAQ,CAAC,SAAS,CAA8B;IAExD;;;;OAIG;gBAEF,EACC,QAAQ,EACR,IAAI,EACJ,OAAO,EACP,cAAc,EACd,OAAO,EACP,EACD;QACC,QAAQ,EAAE,GAAG,CAAC;QACd,IAAI,EAAE,GAAG,CAAC;QACV,OAAO,EAAE,OAAO,CAAC;QACjB,cAAc,EAAE,cAAc,CAAC;QAC/B,OAAO,CAAC,EAAE,GAAG,CAAC;KACd;IAcF;;OAEG;IACH,IAAI,EAAE,IAAI,MAAM,CAGf;IAED;;OAEG;IACH,IAAI,MAAM,IAAI,OAAO,CAGpB;IAED;;OAEG;IACH,IAAI,eAAe,IAAI,eAAe,CAGrC;IAED;;OAEG;IACH,IAAI,OAAO,IAAI,GAAG,CAGjB;IAED;;OAEG;IACH,IAAI,OAAO,CAAC,OAAO,EAAE,GAAG,EAGvB;IAED;;;;;;OAMG;IACH,IAAI,QAAQ,IAAI,oBAAoB,CAGnC;IAED;;OAEG;IACH,KAAK,IAAI,IAAI;IA4Cb;;;;OAIG;IACH,YAAY,IAAI,IAAI;IAsCpB;;OAEG;IACG,IAAI,IAAI,OAAO,CAAC,GAAG,CAAC;IAO1B;;OAEG;IACG,qBAAqB,CAC1B,EACC,SAAS,EACT,IAAI,EACJ,SAAgB,EAChB,SAAiB,EACjB,SAAiB,EACjB,SAAiB,EACjB,+BAAwC,EACxC,UAAkB,EAClB,cAAwC,EACxC,kBAA2B,EAC3B,kBAA2B,EAC3B,OAAY,EACZ,EAAE,sBAAsB,GACvB,OAAO,CAAC,eAAe,CAAC;IAgF3B;;OAEG;IACG,oBAAoB,CACzB,EACC,QAAQ,EACR,IAAI,EACJ,OAAc,EACd,OAAe,EACf,UAAkB,EAClB,cAAwC,EACxC,kBAA2B,EAC3B,kBAA2B,EAC3B,UAAkB,EAClB,eAA2C,EAC3C,OAAY,EACZ,EAAE,qBAAqB,GACtB,OAAO,CAAC,cAAc,CAAC;IA6E1B;;OAEG;IACG,uBAAuB,CAC5B,OAAO,EAAE,qBAAqB,GAC5B,OAAO,CAAC,cAAc,CAAC;IAQ1B;;OAEG;IACG,mBAAmB,CACxB,EACC,QAAQ,EACR,IAAI,EACJ,UAAkB,EAClB,cAAwC,EACxC,kBAA8B,EAC9B,kBAA8B,EAC9B,SAAiB,EACjB,UAAkB,EAClB,OAAY,EACZ,EAAE,oBAAoB,GACrB,OAAO,CAAC,aAAa,CAAC;IA2EzB;;OAEG;IACG,qBAAqB,CAC1B,EACC,cAAuB,EACvB,OAAY,EACZ,GAAE,sBAGF,GACC,OAAO,CAAC,eAAe,CAAC;IA2C3B;;OAEG;IACG,YAAY,CACjB,EACC,UAAU,EACV,cAAc,EACd,MAAM,EACN,QAAsB,EACtB,UAAiB,EACjB,cAAwC,EACxC,SAAiB,EACjB,UAAkB,EAClB,EAAE,mBAAmB,GACpB,OAAO,CAAC,kBAAkB,CAAC;IAmO9B;;OAEG;IACG,2BAA2B,CAChC,EACC,QAAc,EACd,OAAY,EACZ,GAAE,4BAAiC,GAClC,OAAO,CAAC,qBAAqB,CAAC;IAmCjC;;OAEG;IACG,wBAAwB,CAC7B,EACC,UAAc,EACd,SAAe,EACf,QAAe,EACf,OAAY,EACZ,GAAE,yBAA8B,GAC/B,OAAO,CAAC,kBAAkB,CAAC;IAmC9B;;OAEG;IACH,UAAU,CACT,EACC,UAAU,EACV,eAAe,EACf,EACD;QACC,UAAU,EAAE,MAAM,CAAC;QACnB,eAAe,EAAE,eAAe,CAAC;KACjC,GACC,OAAO;CAuBV"} \ No newline at end of file diff --git a/lib/Router.js b/lib/Router.js index 23229ee41e..489bab3739 100644 --- a/lib/Router.js +++ b/lib/Router.js @@ -10,6 +10,7 @@ const WebRtcTransport_1 = require("./WebRtcTransport"); const PlainTransport_1 = require("./PlainTransport"); const PipeTransport_1 = require("./PipeTransport"); const DirectTransport_1 = require("./DirectTransport"); +const ActiveSpeakerObserver_1 = require("./ActiveSpeakerObserver"); const AudioLevelObserver_1 = require("./AudioLevelObserver"); const logger = new Logger_1.Logger('Router'); class Router extends EnhancedEventEmitter_1.EnhancedEventEmitter { @@ -516,6 +517,31 @@ class Router extends EnhancedEventEmitter_1.EnhancedEventEmitter { throw new Error('internal error'); } } + /** + * Create an ActiveSpeakerObserver + */ + async createActiveSpeakerObserver({ interval = 300, appData = {} } = {}) { + logger.debug('createActiveSpeakerObserver()'); + if (appData && typeof appData !== 'object') + throw new TypeError('if given, appData must be an object'); + const internal = { ...this._internal, rtpObserverId: uuid_1.v4() }; + const reqData = { interval }; + await this._channel.request('router.createActiveSpeakerObserver', internal, reqData); + const activeSpeakerObserver = new ActiveSpeakerObserver_1.ActiveSpeakerObserver({ + internal, + channel: this._channel, + payloadChannel: this._payloadChannel, + appData, + getProducerById: (producerId) => (this._producers.get(producerId)) + }); + this._rtpObservers.set(activeSpeakerObserver.id, activeSpeakerObserver); + activeSpeakerObserver.on('@close', () => { + this._rtpObservers.delete(activeSpeakerObserver.id); + }); + // Emit observer event. + this._observer.safeEmit('newrtpobserver', activeSpeakerObserver); + return activeSpeakerObserver; + } /** * Create an AudioLevelObserver. */ diff --git a/lib/types.d.ts b/lib/types.d.ts index 384c281bb9..e6357b6c3d 100644 --- a/lib/types.d.ts +++ b/lib/types.d.ts @@ -10,6 +10,7 @@ export * from './Consumer'; export * from './DataProducer'; export * from './DataConsumer'; export * from './RtpObserver'; +export * from './ActiveSpeakerObserver'; export * from './AudioLevelObserver'; export * from './RtpParameters'; export * from './SctpParameters'; diff --git a/lib/types.d.ts.map b/lib/types.d.ts.map index 75b647b6df..88f029fe0f 100644 --- a/lib/types.d.ts.map +++ b/lib/types.d.ts.map @@ -1 +1 @@ -{"version":3,"file":"types.d.ts","sourceRoot":"","sources":["../src/types.ts"],"names":[],"mappings":"AAAA,cAAc,UAAU,CAAC;AACzB,cAAc,UAAU,CAAC;AACzB,cAAc,aAAa,CAAC;AAC5B,cAAc,mBAAmB,CAAC;AAClC,cAAc,kBAAkB,CAAC;AACjC,cAAc,iBAAiB,CAAC;AAChC,cAAc,mBAAmB,CAAC;AAClC,cAAc,YAAY,CAAC;AAC3B,cAAc,YAAY,CAAC;AAC3B,cAAc,gBAAgB,CAAC;AAC/B,cAAc,gBAAgB,CAAC;AAC/B,cAAc,eAAe,CAAC;AAC9B,cAAc,sBAAsB,CAAC;AACrC,cAAc,iBAAiB,CAAC;AAChC,cAAc,kBAAkB,CAAC;AACjC,cAAc,kBAAkB,CAAC;AACjC,cAAc,UAAU,CAAC;AACzB,OAAO,EAAE,eAAe,EAAE,MAAM,oBAAoB,CAAC"} \ No newline at end of file +{"version":3,"file":"types.d.ts","sourceRoot":"","sources":["../src/types.ts"],"names":[],"mappings":"AAAA,cAAc,UAAU,CAAC;AACzB,cAAc,UAAU,CAAC;AACzB,cAAc,aAAa,CAAC;AAC5B,cAAc,mBAAmB,CAAC;AAClC,cAAc,kBAAkB,CAAC;AACjC,cAAc,iBAAiB,CAAC;AAChC,cAAc,mBAAmB,CAAC;AAClC,cAAc,YAAY,CAAC;AAC3B,cAAc,YAAY,CAAC;AAC3B,cAAc,gBAAgB,CAAC;AAC/B,cAAc,gBAAgB,CAAC;AAC/B,cAAc,eAAe,CAAC;AAC9B,cAAc,yBAAyB,CAAC;AACxC,cAAc,sBAAsB,CAAC;AACrC,cAAc,iBAAiB,CAAC;AAChC,cAAc,kBAAkB,CAAC;AACjC,cAAc,kBAAkB,CAAC;AACjC,cAAc,UAAU,CAAC;AACzB,OAAO,EAAE,eAAe,EAAE,MAAM,oBAAoB,CAAC"} \ No newline at end of file diff --git a/lib/types.js b/lib/types.js index 42e52aa418..11d8ae16d6 100644 --- a/lib/types.js +++ b/lib/types.js @@ -15,5 +15,6 @@ __export(require("./Consumer")); __export(require("./DataProducer")); __export(require("./DataConsumer")); __export(require("./RtpObserver")); +__export(require("./ActiveSpeakerObserver")); __export(require("./AudioLevelObserver")); __export(require("./errors")); diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 60da574ab6..08febb4766 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -68,6 +68,12 @@ pub mod audio_level_observer { pub use crate::router::audio_level_observer::*; } +pub mod active_speaker_observer { + //! An active speaker observer monitors the speaking activity of the selected audio producers. + + pub use crate::router::active_speaker_observer::*; +} + pub mod consumer { //! A consumer represents an audio or video source being forwarded from a mediasoup router to an //! endpoint. It's created on top of a transport that defines how the media packets are carried. diff --git a/rust/src/messages.rs b/rust/src/messages.rs index 211d5569a6..fb67bf00a5 100644 --- a/rust/src/messages.rs +++ b/rust/src/messages.rs @@ -1,3 +1,4 @@ +use crate::active_speaker_observer::ActiveSpeakerObserverOptions; use crate::audio_level_observer::AudioLevelObserverOptions; use crate::consumer::{ ConsumerDump, ConsumerId, ConsumerLayers, ConsumerScore, ConsumerStats, ConsumerTraceEventType, @@ -414,6 +415,30 @@ request_response!( }, ); +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub(crate) struct RouterCreateActiveSpeakerObserverData { + pub(crate) interval: u16, +} + +impl RouterCreateActiveSpeakerObserverData { + pub(crate) fn from_options( + active_speaker_observer_options: &ActiveSpeakerObserverOptions, + ) -> Self { + Self { + interval: active_speaker_observer_options.interval, + } + } +} + +request_response!( + "router.createActiveSpeakerObserver", + RouterCreateActiveSpeakerObserverRequest { + internal: RtpObserverInternal, + data: RouterCreateActiveSpeakerObserverData, + }, +); + request_response!( "transport.close", TransportCloseRequest { diff --git a/rust/src/router.rs b/rust/src/router.rs index f9643e3fd0..9ae2fa616c 100644 --- a/rust/src/router.rs +++ b/rust/src/router.rs @@ -6,6 +6,7 @@ //! high level use cases (for instance, a "multi-party conference room" could involve various //! mediasoup routers, even in different physicals hosts). +pub(super) mod active_speaker_observer; pub(super) mod audio_level_observer; pub(super) mod consumer; pub(super) mod data_consumer; @@ -20,6 +21,7 @@ mod tests; pub(super) mod transport; pub(super) mod webrtc_transport; +use crate::active_speaker_observer::{ActiveSpeakerObserver, ActiveSpeakerObserverOptions}; use crate::audio_level_observer::{AudioLevelObserver, AudioLevelObserverOptions}; use crate::consumer::{Consumer, ConsumerId, ConsumerOptions}; use crate::data_consumer::{DataConsumer, DataConsumerId, DataConsumerOptions}; @@ -29,12 +31,14 @@ use crate::data_producer::{ use crate::data_structures::{AppData, TransportListenIp}; use crate::direct_transport::{DirectTransport, DirectTransportOptions}; use crate::messages::{ - RouterCloseRequest, RouterCreateAudioLevelObserverData, RouterCreateAudioLevelObserverRequest, - RouterCreateDirectTransportData, RouterCreateDirectTransportRequest, - RouterCreatePipeTransportData, RouterCreatePipeTransportRequest, - RouterCreatePlainTransportData, RouterCreatePlainTransportRequest, - RouterCreateWebrtcTransportData, RouterCreateWebrtcTransportRequest, RouterDumpRequest, - RouterInternal, RtpObserverInternal, TransportInternal, + RouterCloseRequest, RouterCreateActiveSpeakerObserverData, + RouterCreateActiveSpeakerObserverRequest, RouterCreateAudioLevelObserverData, + RouterCreateAudioLevelObserverRequest, RouterCreateDirectTransportData, + RouterCreateDirectTransportRequest, RouterCreatePipeTransportData, + RouterCreatePipeTransportRequest, RouterCreatePlainTransportData, + RouterCreatePlainTransportRequest, RouterCreateWebrtcTransportData, + RouterCreateWebrtcTransportRequest, RouterDumpRequest, RouterInternal, RtpObserverInternal, + TransportInternal, }; use crate::pipe_transport::{ PipeTransport, PipeTransportOptions, PipeTransportRemoteParameters, WeakPipeTransport, @@ -306,6 +310,8 @@ impl<'a> Deref for NewTransport<'a> { pub enum NewRtpObserver<'a> { /// Audio level observer AudioLevel(&'a AudioLevelObserver), + /// Active speaker observer + ActiveSpeaker(&'a ActiveSpeakerObserver), } impl<'a> Deref for NewRtpObserver<'a> { @@ -314,6 +320,7 @@ impl<'a> Deref for NewRtpObserver<'a> { fn deref(&self) -> &Self::Target { match self { Self::AudioLevel(observer) => *observer as &Self::Target, + Self::ActiveSpeaker(observer) => *observer as &Self::Target, } } } @@ -828,6 +835,66 @@ impl Router { Ok(audio_level_observer) } + /// Create an [`ActiveSpeakerObserver`]. + /// + /// Router will be kept alive as long as at least one observer instance is alive. + /// + /// # Example + /// ```rust + /// use mediasoup::active_speaker_observer::ActiveSpeakerObserverOptions; + /// + /// # async fn f(router: mediasoup::router::Router) -> Result<(), Box> { + /// let observer = router + /// .create_active_speaker_observer({ + /// let mut options = ActiveSpeakerObserverOptions::default(); + /// options.interval = 300; + /// options + /// }) + /// .await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn create_active_speaker_observer( + &self, + active_speaker_observer_options: ActiveSpeakerObserverOptions, + ) -> Result { + debug!("create_active_speaker_observer()"); + + let rtp_observer_id = RtpObserverId::new(); + + let _buffer_guard = self + .inner + .channel + .buffer_messages_for(rtp_observer_id.into()); + + self.inner + .channel + .request(RouterCreateActiveSpeakerObserverRequest { + internal: RtpObserverInternal { + router_id: self.inner.id, + rtp_observer_id, + }, + data: RouterCreateActiveSpeakerObserverData::from_options( + &active_speaker_observer_options, + ), + }) + .await?; + + let active_speaker_observer = ActiveSpeakerObserver::new( + rtp_observer_id, + Arc::clone(&self.inner.executor), + self.inner.channel.clone(), + active_speaker_observer_options.app_data, + self.clone(), + ); + + self.inner.handlers.new_rtp_observer.call(|callback| { + callback(NewRtpObserver::ActiveSpeaker(&active_speaker_observer)); + }); + + Ok(active_speaker_observer) + } + /// Pipes [`Producer`] with the given `producer_id` into another [`Router`] on same host. /// /// # Example diff --git a/rust/src/router/active_speaker_observer.rs b/rust/src/router/active_speaker_observer.rs new file mode 100644 index 0000000000..7df64fc197 --- /dev/null +++ b/rust/src/router/active_speaker_observer.rs @@ -0,0 +1,417 @@ +#[cfg(test)] +mod tests; + +use crate::data_structures::AppData; +use crate::messages::{ + RtpObserverAddProducerRequest, RtpObserverAddRemoveProducerRequestData, + RtpObserverCloseRequest, RtpObserverInternal, RtpObserverPauseRequest, + RtpObserverRemoveProducerRequest, RtpObserverResumeRequest, +}; +use crate::producer::{Producer, ProducerId}; +use crate::router::Router; +use crate::rtp_observer::{RtpObserver, RtpObserverAddProducerOptions, RtpObserverId}; +use crate::worker::{Channel, RequestError, SubscriptionHandler}; +use async_executor::Executor; +use async_trait::async_trait; +use event_listener_primitives::{Bag, BagOnce, HandlerId}; +use log::{debug, error}; +use parking_lot::Mutex; +use serde::Deserialize; +use std::fmt; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Weak}; + +/// [`ActiveSpeakerObserver`] options +#[derive(Debug, Clone)] +#[non_exhaustive] +pub struct ActiveSpeakerObserverOptions { + /// Interval in ms for checking audio volumes. + /// Default 300. + pub interval: u16, + /// Custom application data. + pub app_data: AppData, +} + +impl Default for ActiveSpeakerObserverOptions { + fn default() -> Self { + Self { + interval: 300, + app_data: AppData::default(), + } + } +} + +/// Represents volume of one audio producer. +#[derive(Debug, Clone)] +pub struct ActiveSpeakerObserverDominantSpeaker { + /// The audio producer instance. + pub producer: Producer, +} + +#[derive(Default)] +struct Handlers { + dominant_speaker: Bag>, + pause: Bag>, + resume: Bag>, + add_producer: Bag>, + remove_producer: Bag>, + router_close: BagOnce>, + close: BagOnce>, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct DominantSpeakerNotification { + producer_id: ProducerId, +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "event", rename_all = "lowercase", content = "data")] +enum Notification { + DominantSpeaker(DominantSpeakerNotification), +} + +struct Inner { + id: RtpObserverId, + executor: Arc>, + channel: Channel, + handlers: Arc, + paused: AtomicBool, + app_data: AppData, + // Make sure router is not dropped until this audio level observer is not dropped + router: Router, + closed: AtomicBool, + // Drop subscription to audio level observer-specific notifications when observer itself is + // dropped + _subscription_handler: Option, + _on_router_close_handler: Mutex, +} + +impl Drop for Inner { + fn drop(&mut self) { + debug!("drop()"); + + self.close(true); + } +} + +impl Inner { + fn close(&self, close_request: bool) { + if !self.closed.swap(true, Ordering::SeqCst) { + debug!("close()"); + + self.handlers.close.call_simple(); + + if close_request { + let channel = self.channel.clone(); + let request = RtpObserverCloseRequest { + internal: RtpObserverInternal { + router_id: self.router.id(), + rtp_observer_id: self.id, + }, + }; + let router = self.router.clone(); + self.executor + .spawn(async move { + if let Err(error) = channel.request(request).await { + error!("active speaker observer closing failed on drop: {}", error); + } + + drop(router); + }) + .detach(); + } + } + } +} + +/// An active speaker observer monitors the volume of the selected audio producers. +/// +/// It just handles audio producers (if [`AudioLevelObserver::add_producer()`] is called with a +/// video producer it will fail). +/// +/// Audio levels are read from an RTP header extension. No decoding of audio data is done. See +/// [RFC6464](https://tools.ietf.org/html/rfc6464) for more information. +#[derive(Clone)] +#[must_use = "Audio level observer will be closed on drop, make sure to keep it around for as long as needed"] +pub struct ActiveSpeakerObserver { + inner: Arc, +} + +impl fmt::Debug for ActiveSpeakerObserver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ActiveSpeakerObserver") + .field("id", &self.inner.id) + .field("paused", &self.inner.paused) + .field("router", &self.inner.router) + .field("closed", &self.inner.closed) + .finish() + } +} + +#[async_trait(?Send)] +impl RtpObserver for ActiveSpeakerObserver { + fn id(&self) -> RtpObserverId { + self.inner.id + } + + fn paused(&self) -> bool { + self.inner.paused.load(Ordering::SeqCst) + } + + fn app_data(&self) -> &AppData { + &self.inner.app_data + } + + fn closed(&self) -> bool { + self.inner.closed.load(Ordering::SeqCst) + } + + async fn pause(&self) -> Result<(), RequestError> { + debug!("pause()"); + + self.inner + .channel + .request(RtpObserverPauseRequest { + internal: self.get_internal(), + }) + .await?; + + let was_paused = self.inner.paused.swap(true, Ordering::SeqCst); + + if !was_paused { + self.inner.handlers.pause.call_simple(); + } + + Ok(()) + } + + async fn resume(&self) -> Result<(), RequestError> { + debug!("resume()"); + + self.inner + .channel + .request(RtpObserverResumeRequest { + internal: self.get_internal(), + }) + .await?; + + let was_paused = self.inner.paused.swap(false, Ordering::SeqCst); + + if !was_paused { + self.inner.handlers.resume.call_simple(); + } + + Ok(()) + } + + async fn add_producer( + &self, + RtpObserverAddProducerOptions { producer_id }: RtpObserverAddProducerOptions, + ) -> Result<(), RequestError> { + let producer = match self.inner.router.get_producer(&producer_id) { + Some(producer) => producer, + None => { + return Ok(()); + } + }; + self.inner + .channel + .request(RtpObserverAddProducerRequest { + internal: self.get_internal(), + data: RtpObserverAddRemoveProducerRequestData { producer_id }, + }) + .await?; + + self.inner.handlers.add_producer.call(|callback| { + callback(&producer); + }); + + Ok(()) + } + + async fn remove_producer(&self, producer_id: ProducerId) -> Result<(), RequestError> { + let producer = match self.inner.router.get_producer(&producer_id) { + Some(producer) => producer, + None => { + return Ok(()); + } + }; + self.inner + .channel + .request(RtpObserverRemoveProducerRequest { + internal: self.get_internal(), + data: RtpObserverAddRemoveProducerRequestData { producer_id }, + }) + .await?; + + self.inner.handlers.remove_producer.call(|callback| { + callback(&producer); + }); + + Ok(()) + } + + fn on_pause(&self, callback: Box) -> HandlerId { + self.inner.handlers.pause.add(Box::new(callback)) + } + + fn on_resume(&self, callback: Box) -> HandlerId { + self.inner.handlers.resume.add(Box::new(callback)) + } + + fn on_add_producer( + &self, + callback: Box, + ) -> HandlerId { + self.inner.handlers.add_producer.add(Box::new(callback)) + } + + fn on_remove_producer( + &self, + callback: Box, + ) -> HandlerId { + self.inner.handlers.remove_producer.add(Box::new(callback)) + } + + fn on_router_close(&self, callback: Box) -> HandlerId { + self.inner.handlers.router_close.add(Box::new(callback)) + } + + fn on_close(&self, callback: Box) -> HandlerId { + let handler_id = self.inner.handlers.close.add(Box::new(callback)); + if self.inner.closed.load(Ordering::Relaxed) { + self.inner.handlers.close.call_simple(); + } + handler_id + } +} + +impl ActiveSpeakerObserver { + pub(super) fn new( + id: RtpObserverId, + executor: Arc>, + channel: Channel, + app_data: AppData, + router: Router, + ) -> Self { + debug!("new()"); + + let handlers = Arc::::default(); + let paused = AtomicBool::new(false); + + let subscription_handler = { + let router = router.clone(); + let handlers = Arc::clone(&handlers); + + channel.subscribe_to_notifications(id.into(), move |notification| { + match serde_json::from_value::(notification) { + Ok(notification) => match notification { + Notification::DominantSpeaker(dominant_speaker) => { + let DominantSpeakerNotification { producer_id } = dominant_speaker; + match router.get_producer(&producer_id) { + Some(producer) => { + let dominant_speaker = + ActiveSpeakerObserverDominantSpeaker { producer }; + + handlers.dominant_speaker.call(|callback| { + callback(&dominant_speaker); + }); + } + None => { + error!( + "Producer for dominant speaker event not found: {}", + producer_id + ); + } + }; + } + }, + Err(error) => { + error!("Failed to parse notification: {}", error); + } + } + }) + }; + + let inner_weak = Arc::>>>::default(); + let on_router_close_handler = router.on_close({ + let inner_weak = Arc::clone(&inner_weak); + + move || { + if let Some(inner) = inner_weak.lock().as_ref().and_then(Weak::upgrade) { + inner.handlers.router_close.call_simple(); + inner.close(false); + } + } + }); + let inner = Arc::new(Inner { + id, + executor, + channel, + handlers, + paused, + app_data, + router, + closed: AtomicBool::new(false), + _subscription_handler: subscription_handler, + _on_router_close_handler: Mutex::new(on_router_close_handler), + }); + + inner_weak.lock().replace(Arc::downgrade(&inner)); + + Self { inner } + } + + /// Callback is called at most every interval (see [`ActiveSpeakerObserverOptions`]). + pub fn on_dominant_speaker< + F: Fn(&ActiveSpeakerObserverDominantSpeaker) + Send + Sync + 'static, + >( + &self, + callback: F, + ) -> HandlerId { + self.inner.handlers.dominant_speaker.add(Box::new(callback)) + } + + /// Downgrade `ActiveSpeakerObserver` to [`WeakActiveSpeakerObserver`] instance. + #[must_use] + pub fn downgrade(&self) -> WeakActiveSpeakerObserver { + WeakActiveSpeakerObserver { + inner: Arc::downgrade(&self.inner), + } + } + + fn get_internal(&self) -> RtpObserverInternal { + RtpObserverInternal { + router_id: self.inner.router.id(), + rtp_observer_id: self.inner.id, + } + } +} + +/// [`WeakActiveSpeakerObserver`] doesn't own audio level observer instance on mediasoup-worker and +/// will not prevent one from being destroyed once last instance of regular [`ActiveSpeakerObserver`] +/// is dropped. +/// +/// [`WeakActiveSpeakerObserver`] vs [`ActiveSpeakerObserver`] is similar to [`Weak`] vs [`Arc`]. +#[derive(Clone)] +pub struct WeakActiveSpeakerObserver { + inner: Weak, +} + +impl fmt::Debug for WeakActiveSpeakerObserver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WeakActiveSpeakerObserver").finish() + } +} + +impl WeakActiveSpeakerObserver { + /// Attempts to upgrade `WeakActiveSpeakerObserver` to [`ActiveSpeakerObserver`] if last instance of one wasn't + /// dropped yet. + #[must_use] + pub fn upgrade(&self) -> Option { + let inner = self.inner.upgrade()?; + + Some(ActiveSpeakerObserver { inner }) + } +} diff --git a/rust/src/router/active_speaker_observer/tests.rs b/rust/src/router/active_speaker_observer/tests.rs new file mode 100644 index 0000000000..cf3ab23d3e --- /dev/null +++ b/rust/src/router/active_speaker_observer/tests.rs @@ -0,0 +1,60 @@ +use crate::active_speaker_observer::ActiveSpeakerObserverOptions; +use crate::router::RouterOptions; +use crate::rtp_observer::RtpObserver; +use crate::worker::{Worker, WorkerSettings}; +use crate::worker_manager::WorkerManager; +use futures_lite::future; +use std::env; + +async fn init() -> Worker { + { + let mut builder = env_logger::builder(); + if env::var(env_logger::DEFAULT_FILTER_ENV).is_err() { + builder.filter_level(log::LevelFilter::Off); + } + let _ = builder.is_test(true).try_init(); + } + + let worker_manager = WorkerManager::new(); + + worker_manager + .create_worker(WorkerSettings::default()) + .await + .expect("Failed to create worker") +} + +#[test] +fn router_close_event() { + future::block_on(async move { + let worker = init().await; + + let router = worker + .create_router(RouterOptions::default()) + .await + .expect("Failed to create router"); + + let active_speaker_observer = router + .create_active_speaker_observer(ActiveSpeakerObserverOptions::default()) + .await + .expect("Failed to create ActiveSpeakerObserver"); + + let (mut close_tx, close_rx) = async_oneshot::oneshot::<()>(); + let _handler = active_speaker_observer.on_close(Box::new(move || { + let _ = close_tx.send(()); + })); + + let (mut router_close_tx, router_close_rx) = async_oneshot::oneshot::<()>(); + let _handler = active_speaker_observer.on_router_close(Box::new(move || { + let _ = router_close_tx.send(()); + })); + + router.close(); + + router_close_rx + .await + .expect("Failed to receive router_close event"); + close_rx.await.expect("Failed to receive close event"); + + assert_eq!(active_speaker_observer.closed(), true); + }); +} diff --git a/rust/src/router/rtp_observer.rs b/rust/src/router/rtp_observer.rs index c3b2b202e4..26599b00d7 100644 --- a/rust/src/router/rtp_observer.rs +++ b/rust/src/router/rtp_observer.rs @@ -30,6 +30,7 @@ impl RtpObserverAddProducerOptions { /// /// mediasoup implements the following RTP observers: /// * [`AudioLevelObserver`](crate::audio_level_observer::AudioLevelObserver) +/// * [`ActiveSpeakerObserver`](crate::active_speaker_observer::ActiveSpeakerObserver) #[async_trait(?Send)] pub trait RtpObserver { /// RtpObserver id. diff --git a/rust/tests/integration/active_speaker_observer.rs b/rust/tests/integration/active_speaker_observer.rs new file mode 100644 index 0000000000..d63be258f3 --- /dev/null +++ b/rust/tests/integration/active_speaker_observer.rs @@ -0,0 +1,198 @@ +use async_io::Timer; +use futures_lite::future; +use mediasoup::active_speaker_observer::ActiveSpeakerObserverOptions; +use mediasoup::router::RouterOptions; +use mediasoup::rtp_observer::RtpObserver; +use mediasoup::rtp_parameters::{MimeTypeAudio, RtpCodecCapability, RtpCodecParametersParameters}; +use mediasoup::worker::{Worker, WorkerSettings}; +use mediasoup::worker_manager::WorkerManager; +use std::env; +use std::num::{NonZeroU32, NonZeroU8}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +fn media_codecs() -> Vec { + vec![RtpCodecCapability::Audio { + mime_type: MimeTypeAudio::Opus, + preferred_payload_type: None, + clock_rate: NonZeroU32::new(48000).unwrap(), + channels: NonZeroU8::new(2).unwrap(), + parameters: RtpCodecParametersParameters::from([ + ("useinbandfec", 1_u32.into()), + ("foo", "bar".into()), + ]), + rtcp_feedback: vec![], + }] +} + +async fn init() -> Worker { + { + let mut builder = env_logger::builder(); + if env::var(env_logger::DEFAULT_FILTER_ENV).is_err() { + builder.filter_level(log::LevelFilter::Off); + } + let _ = builder.is_test(true).try_init(); + } + + let worker_manager = WorkerManager::new(); + + worker_manager + .create_worker(WorkerSettings::default()) + .await + .expect("Failed to create worker") +} + +#[test] +fn create() { + future::block_on(async move { + let worker = init().await; + + let router = worker + .create_router(RouterOptions::new(media_codecs())) + .await + .expect("Failed to create router"); + + let new_observer_count = Arc::new(AtomicUsize::new(0)); + + router + .on_new_rtp_observer({ + let new_observer_count = Arc::clone(&new_observer_count); + + move |_new_rtp_observer| { + new_observer_count.fetch_add(1, Ordering::SeqCst); + } + }) + .detach(); + + let active_speaker_observer = router + .create_active_speaker_observer(ActiveSpeakerObserverOptions::default()) + .await + .expect("Failed to create ActiveSpeakerObserver"); + + assert_eq!(new_observer_count.load(Ordering::SeqCst), 1); + assert_eq!(active_speaker_observer.closed(), false); + assert_eq!(active_speaker_observer.paused(), false); + + let dump = router.dump().await.expect("Failed to get router dump"); + + assert_eq!( + dump.rtp_observer_ids.into_iter().collect::>(), + vec![active_speaker_observer.id()] + ); + }); +} + +#[test] +fn weak() { + future::block_on(async move { + let worker = init().await; + + let router = worker + .create_router(RouterOptions::new(media_codecs())) + .await + .expect("Failed to create router"); + + let active_speaker_observer = router + .create_active_speaker_observer(ActiveSpeakerObserverOptions::default()) + .await + .expect("Failed to create ActiveSpeakerObserver"); + + let weak_active_speaker_observer = active_speaker_observer.downgrade(); + + assert!(weak_active_speaker_observer.upgrade().is_some()); + + drop(active_speaker_observer); + + assert!(weak_active_speaker_observer.upgrade().is_none()); + }); +} + +#[test] +fn pause_resume() { + future::block_on(async move { + let worker = init().await; + + let router = worker + .create_router(RouterOptions::new(media_codecs())) + .await + .expect("Failed to create router"); + + let active_speaker_observer = router + .create_active_speaker_observer(ActiveSpeakerObserverOptions::default()) + .await + .expect("Failed to create ActiveSpeakerObserver"); + + active_speaker_observer + .pause() + .await + .expect("Failed to pause"); + assert_eq!(active_speaker_observer.paused(), true); + + active_speaker_observer + .resume() + .await + .expect("Failed to resume"); + assert_eq!(active_speaker_observer.paused(), false); + }); +} + +#[test] +fn close_event() { + future::block_on(async move { + let worker = init().await; + + let router = worker + .create_router(RouterOptions::new(media_codecs())) + .await + .expect("Failed to create router"); + + let active_speaker_observer = router + .create_active_speaker_observer(ActiveSpeakerObserverOptions::default()) + .await + .expect("Failed to create ActiveSpeakerObserver"); + + let (mut tx, rx) = async_oneshot::oneshot::<()>(); + let _handler = active_speaker_observer.on_close(Box::new(move || { + let _ = tx.send(()); + })); + drop(active_speaker_observer); + + rx.await.expect("Failed to receive close event"); + }); +} + +#[test] +fn drop_test() { + future::block_on(async move { + let worker = init().await; + + let router = worker + .create_router(RouterOptions::new(media_codecs())) + .await + .expect("Failed to create router"); + + let _active_speaker_observer = router + .create_active_speaker_observer(ActiveSpeakerObserverOptions::default()) + .await + .expect("Failed to create ActiveSpeakerObserver"); + + let active_speaker_observer_2 = router + .create_active_speaker_observer(ActiveSpeakerObserverOptions::default()) + .await + .expect("Failed to create ActiveSpeakerObserver"); + + let dump = router.dump().await.expect("Failed to get router dump"); + + assert_eq!(dump.rtp_observer_ids.len(), 2); + + drop(active_speaker_observer_2); + + // Drop is async, give it a bit of time to finish + Timer::after(Duration::from_millis(200)).await; + + let dump = router.dump().await.expect("Failed to get router dump"); + + assert_eq!(dump.rtp_observer_ids.len(), 1); + }); +} diff --git a/rust/tests/integration/main.rs b/rust/tests/integration/main.rs index 69974644f6..59b151c8b7 100644 --- a/rust/tests/integration/main.rs +++ b/rust/tests/integration/main.rs @@ -1,3 +1,4 @@ +mod active_speaker_observer; mod audio_level_observer; mod consumer; mod data_consumer; diff --git a/rust/tests/integration/smoke.rs b/rust/tests/integration/smoke.rs index b0df824e56..4b1597468d 100644 --- a/rust/tests/integration/smoke.rs +++ b/rust/tests/integration/smoke.rs @@ -1,4 +1,5 @@ use futures_lite::future; +use mediasoup::active_speaker_observer::ActiveSpeakerObserverOptions; use mediasoup::audio_level_observer::AudioLevelObserverOptions; use mediasoup::consumer::{ConsumerLayers, ConsumerOptions, ConsumerTraceEventType}; use mediasoup::data_consumer::DataConsumerOptions; @@ -326,6 +327,24 @@ fn smoke() { .await .unwrap() ); + + let active_speaker_observer = router + .create_active_speaker_observer(ActiveSpeakerObserverOptions::default()) + .await + .unwrap(); + + println!( + "Active speaker observer: {:#?}", + active_speaker_observer.id() + ); + println!( + "Add producer to active speaker observer: {:#?}", + active_speaker_observer + .add_producer(RtpObserverAddProducerOptions::new(producer.id())) + .await + .unwrap() + ); + println!("Router dump: {:#?}", router.dump().await.unwrap()); println!( "Remove producer from audio level observer: {:#?}", diff --git a/src/ActiveSpeakerObserver.ts b/src/ActiveSpeakerObserver.ts new file mode 100644 index 0000000000..ad05bfe544 --- /dev/null +++ b/src/ActiveSpeakerObserver.ts @@ -0,0 +1,68 @@ +import { Logger } from './Logger'; +import { EnhancedEventEmitter } from './EnhancedEventEmitter'; +import { RtpObserver } from './RtpObserver'; +import { Producer } from './Producer'; + +export interface ActiveSpeakerObserverOptions { + interval?: number; + /** + * Custom application data. + */ + appData?: any; +} + +export interface ActiveSpeakerObserverActivity { + /** + * The producer instance. + */ + producer: Producer; +} + +const logger = new Logger('ActiveSpeakerObserver'); + +export class ActiveSpeakerObserver extends RtpObserver +{ + /** + * @private + */ + constructor(params: any) + { + super(params); + + this._handleWorkerNotifications(); + } + + /** + * Observer. + */ + get observer(): EnhancedEventEmitter + { + return this._observer; + } + + private _handleWorkerNotifications(): void + { + this._channel.on(this._internal.rtpObserverId, (event: string, data?: any) => + { + switch (event) + { + case 'dominantspeaker': + { + const dominantSpeaker = { + producer : this._getProducerById(data.producerId) + }; + + this.safeEmit('dominantspeaker', dominantSpeaker); + this._observer.safeEmit('dominantspeaker', dominantSpeaker); + + break; + } + + default: + { + logger.error('ignoring unknown event "%s"', event); + } + } + }); + } +} diff --git a/src/Router.ts b/src/Router.ts index 8f72bf364c..04973c1a18 100644 --- a/src/Router.ts +++ b/src/Router.ts @@ -16,6 +16,7 @@ import { Consumer } from './Consumer'; import { DataProducer } from './DataProducer'; import { DataConsumer } from './DataConsumer'; import { RtpObserver } from './RtpObserver'; +import { ActiveSpeakerObserver, ActiveSpeakerObserverOptions } from './ActiveSpeakerObserver'; import { AudioLevelObserver, AudioLevelObserverOptions } from './AudioLevelObserver'; import { RtpCapabilities, RtpCodecCapability } from './RtpParameters'; import { NumSctpStreams } from './SctpParameters'; @@ -924,6 +925,49 @@ export class Router extends EnhancedEventEmitter } } + /** + * Create an ActiveSpeakerObserver + */ + async createActiveSpeakerObserver( + { + interval = 300, + appData = {} + }: ActiveSpeakerObserverOptions = {} + ): Promise + { + logger.debug('createActiveSpeakerObserver()'); + + if (appData && typeof appData !== 'object') + throw new TypeError('if given, appData must be an object'); + + const internal = { ...this._internal, rtpObserverId: uuidv4() }; + const reqData = { interval }; + + await this._channel.request('router.createActiveSpeakerObserver', internal, reqData); + + const activeSpeakerObserver = new ActiveSpeakerObserver( + { + internal, + channel : this._channel, + payloadChannel : this._payloadChannel, + appData, + getProducerById : (producerId: string): Producer | undefined => ( + this._producers.get(producerId) + ) + }); + + this._rtpObservers.set(activeSpeakerObserver.id, activeSpeakerObserver); + activeSpeakerObserver.on('@close', () => + { + this._rtpObservers.delete(activeSpeakerObserver.id); + }); + + // Emit observer event. + this._observer.safeEmit('newrtpobserver', activeSpeakerObserver); + + return activeSpeakerObserver; + } + /** * Create an AudioLevelObserver. */ diff --git a/src/types.ts b/src/types.ts index d5676e65e3..f6187b7675 100644 --- a/src/types.ts +++ b/src/types.ts @@ -10,6 +10,7 @@ export * from './Consumer'; export * from './DataProducer'; export * from './DataConsumer'; export * from './RtpObserver'; +export * from './ActiveSpeakerObserver'; export * from './AudioLevelObserver'; export * from './RtpParameters'; export * from './SctpParameters'; diff --git a/test/test-ActiveSpeakerObserver.js b/test/test-ActiveSpeakerObserver.js new file mode 100644 index 0000000000..841a603a2d --- /dev/null +++ b/test/test-ActiveSpeakerObserver.js @@ -0,0 +1,123 @@ +const { toBeType } = require('jest-tobetype'); +const mediasoup = require('../'); +const { createWorker } = mediasoup; + +expect.extend({ toBeType }); + +let worker; +let router; +let activeSpeakerObserver; + +const mediaCodecs = +[ + { + kind : 'audio', + mimeType : 'audio/opus', + clockRate : 48000, + channels : 2, + parameters : + { + useinbandfec : 1, + foo : 'bar' + } + } +]; + +beforeAll(async () => +{ + worker = await createWorker(); + router = await worker.createRouter({ mediaCodecs }); +}); + +afterAll(() => worker.close()); + +test('router.createActiveSpeakerObserver() succeeds', async () => +{ + const onObserverNewRtpObserver = jest.fn(); + + router.observer.once('newrtpobserver', onObserverNewRtpObserver); + + activeSpeakerObserver = await router.createActiveSpeakerObserver(); + + expect(onObserverNewRtpObserver).toHaveBeenCalledTimes(1); + expect(onObserverNewRtpObserver).toHaveBeenCalledWith(activeSpeakerObserver); + expect(activeSpeakerObserver.id).toBeType('string'); + expect(activeSpeakerObserver.closed).toBe(false); + expect(activeSpeakerObserver.paused).toBe(false); + expect(activeSpeakerObserver.appData).toEqual({}); + + await expect(router.dump()) + .resolves + .toMatchObject( + { + rtpObserverIds : [ activeSpeakerObserver.id ] + }); +}, 2000); + +test('router.createActiveSpeakerObserver() with wrong arguments rejects with TypeError', async () => +{ + await expect(router.createActiveSpeakerObserver({ interval: false })) + .rejects + .toThrow(TypeError); + + await expect(router.createActiveSpeakerObserver({ appData: 'NOT-AN-OBJECT' })) + .rejects + .toThrow(TypeError); +}, 2000); + +test('activeSpeakerObserver.pause() and resume() succeed', async () => +{ + await activeSpeakerObserver.pause(); + + expect(activeSpeakerObserver.paused).toBe(true); + + await activeSpeakerObserver.resume(); + + expect(activeSpeakerObserver.paused).toBe(false); +}, 2000); + +test('activeSpeakerObserver.close() succeeds', async () => +{ + // We need different a AudioLevelObserver instance here. + const activeSpeakerObserver2 = + await router.createAudioLevelObserver({ maxEntries: 8 }); + + let dump = await router.dump(); + + expect(dump.rtpObserverIds.length).toBe(2); + + activeSpeakerObserver2.close(); + + expect(activeSpeakerObserver2.closed).toBe(true); + + dump = await router.dump(); + + expect(dump.rtpObserverIds.length).toBe(1); + +}, 2000); + +test('ActiveSpeakerObserver emits "routerclose" if Router is closed', async () => +{ + // We need different Router and AudioLevelObserver instances here. + const router2 = await worker.createRouter({ mediaCodecs }); + const activeSpeakerObserver2 = await router2.createAudioLevelObserver(); + + await new Promise((resolve) => + { + activeSpeakerObserver2.on('routerclose', resolve); + router2.close(); + }); + + expect(activeSpeakerObserver2.closed).toBe(true); +}, 2000); + +test('ActiveSpeakerObserver emits "routerclose" if Worker is closed', async () => +{ + await new Promise((resolve) => + { + activeSpeakerObserver.on('routerclose', resolve); + worker.close(); + }); + + expect(activeSpeakerObserver.closed).toBe(true); +}, 2000); diff --git a/worker/include/Channel/ChannelRequest.hpp b/worker/include/Channel/ChannelRequest.hpp index 5176751c69..6e421418e6 100644 --- a/worker/include/Channel/ChannelRequest.hpp +++ b/worker/include/Channel/ChannelRequest.hpp @@ -30,6 +30,7 @@ namespace Channel ROUTER_CREATE_PLAIN_TRANSPORT, ROUTER_CREATE_PIPE_TRANSPORT, ROUTER_CREATE_DIRECT_TRANSPORT, + ROUTER_CREATE_ACTIVE_SPEAKER_OBSERVER, ROUTER_CREATE_AUDIO_LEVEL_OBSERVER, TRANSPORT_CLOSE, TRANSPORT_DUMP, diff --git a/worker/include/RTC/ActiveSpeakerObserver.hpp b/worker/include/RTC/ActiveSpeakerObserver.hpp new file mode 100644 index 0000000000..72fc83bccd --- /dev/null +++ b/worker/include/RTC/ActiveSpeakerObserver.hpp @@ -0,0 +1,96 @@ +#ifndef MS_RTC_ACTIVE_SPEAKER_OBSERVER_HPP +#define MS_RTC_ACTIVE_SPEAKER_OBSERVER_HPP + +#include "RTC/RtpObserver.hpp" +#include "handles/Timer.hpp" +#include +#include +#include +#include + +// Implementation of Dominant Speaker Identification for Multipoint +// Videoconferencing by Ilana Volfin and Israel Cohen. This +// implementation uses the RTP Audio Level extension from RFC-6464 +// for the input signal. +// This has been ported from DominantSpeakerIdentification.java in Jitsi. +// https://github.com/jitsi/jitsi-utils/blob/master/src/main/java/org/jitsi/utils/dsi/DominantSpeakerIdentification.java +namespace RTC +{ + class ActiveSpeakerObserver : public RTC::RtpObserver, public Timer::Listener + { + private: + class Speaker + { + public: + Speaker(); + void EvalActivityScores(); + double GetActivityScore(int32_t interval); + void LevelChanged(uint32_t level, uint64_t time); + void LevelTimedOut(); + + private: + bool ComputeImmediates(); + bool ComputeLongs(); + bool ComputeMediums(); + void EvalImmediateActivityScore(); + void EvalMediumActivityScore(); + void EvalLongActivityScore(); + void UpdateMinLevel(int8_t level); + + public: + bool paused{ false }; + double immediateActivityScore; + double mediumActivityScore; + double longActivityScore; + uint64_t lastLevelChangeTime{ 0 }; + + private: + uint8_t minLevel; + uint8_t nextMinLevel; + uint32_t nextMinLevelWindowLen{ 0 }; + std::vector immediates; + std::vector mediums; + std::vector longs; + std::vector levels; + }; + + struct ProducerSpeaker + { + RTC::Producer* producer; + Speaker* speaker; + }; + + public: + ActiveSpeakerObserver(const std::string& id, json& data); + ~ActiveSpeakerObserver() override; + + public: + void AddProducer(RTC::Producer* producer) override; + void RemoveProducer(RTC::Producer* producer) override; + void ReceiveRtpPacket(RTC::Producer* producer, RTC::RtpPacket* packet) override; + void ProducerPaused(RTC::Producer* producer) override; + void ProducerResumed(RTC::Producer* producer) override; + + private: + void Paused() override; + void Resumed() override; + void Update(); + bool CalculateActiveSpeaker(); + void TimeoutIdleLevels(uint64_t now); + + /* Pure virtual methods inherited from Timer. */ + protected: + void OnTimer(Timer* timer) override; + + private: + static constexpr int relativeSpeachActivitiesLen{ 3 }; + double relativeSpeachActivities[relativeSpeachActivitiesLen]; + std::string dominantId{ "" }; + Timer* periodicTimer{ nullptr }; + uint16_t interval{ 300u }; + std::unordered_map mapProducerSpeaker; + uint64_t lastLevelIdleTime{ 0 }; + }; +} // namespace RTC + +#endif diff --git a/worker/mediasoup-worker.gyp b/worker/mediasoup-worker.gyp index 7ed6f69672..f58f5a09a1 100644 --- a/worker/mediasoup-worker.gyp +++ b/worker/mediasoup-worker.gyp @@ -43,6 +43,7 @@ 'src/PayloadChannel/PayloadChannelNotifier.cpp', 'src/PayloadChannel/PayloadChannelRequest.cpp', 'src/PayloadChannel/PayloadChannelSocket.cpp', + 'src/RTC/ActiveSpeakerObserver.cpp', 'src/RTC/AudioLevelObserver.cpp', 'src/RTC/Consumer.cpp', 'src/RTC/DataConsumer.cpp', @@ -156,6 +157,7 @@ 'include/PayloadChannel/PayloadChannelRequest.hpp', 'include/PayloadChannel/PayloadChannelSocket.hpp', 'include/RTC/BweType.hpp', + 'include/RTC/ActiveSpeakerObserver.hpp', 'include/RTC/AudioLevelObserver.hpp', 'include/RTC/Consumer.hpp', 'include/RTC/DataConsumer.hpp', diff --git a/worker/src/Channel/ChannelRequest.cpp b/worker/src/Channel/ChannelRequest.cpp index ec46d2bb70..1e40aab5d2 100644 --- a/worker/src/Channel/ChannelRequest.cpp +++ b/worker/src/Channel/ChannelRequest.cpp @@ -24,6 +24,7 @@ namespace Channel { "router.createPlainTransport", ChannelRequest::MethodId::ROUTER_CREATE_PLAIN_TRANSPORT }, { "router.createPipeTransport", ChannelRequest::MethodId::ROUTER_CREATE_PIPE_TRANSPORT }, { "router.createDirectTransport", ChannelRequest::MethodId::ROUTER_CREATE_DIRECT_TRANSPORT }, + { "router.createActiveSpeakerObserver", ChannelRequest::MethodId::ROUTER_CREATE_ACTIVE_SPEAKER_OBSERVER }, { "router.createAudioLevelObserver", ChannelRequest::MethodId::ROUTER_CREATE_AUDIO_LEVEL_OBSERVER }, { "transport.close", ChannelRequest::MethodId::TRANSPORT_CLOSE }, { "transport.dump", ChannelRequest::MethodId::TRANSPORT_DUMP }, diff --git a/worker/src/RTC/ActiveSpeakerObserver.cpp b/worker/src/RTC/ActiveSpeakerObserver.cpp new file mode 100644 index 0000000000..e0e4541275 --- /dev/null +++ b/worker/src/RTC/ActiveSpeakerObserver.cpp @@ -0,0 +1,564 @@ +#define MS_CLASS "RTC::ActiveSpeakerObserver" + +#include "RTC/ActiveSpeakerObserver.hpp" +#include "Logger.hpp" +#include "MediaSoupErrors.hpp" +#include "Utils.hpp" +#include "Channel/ChannelNotifier.hpp" +#include "RTC/RtpDictionaries.hpp" + +namespace RTC +{ + constexpr uint32_t C1{ 3 }; + constexpr uint32_t C2{ 2 }; + constexpr uint32_t C3{ 0 }; + constexpr uint32_t N1{ 13 }; + constexpr uint32_t N2{ 5 }; + constexpr uint32_t N3{ 10 }; + constexpr uint32_t LongCount{ 1 }; + constexpr uint32_t LevelIdleTimeout{ 40 }; + constexpr uint64_t SpeakerIdleTimeout{ 60 * 60 * 1000 }; + constexpr uint32_t LongThreashold{ 4 }; + constexpr uint32_t MaxLevel{ 127 }; + constexpr uint32_t MinLevel{ 0 }; + constexpr uint32_t MinLevelWindowLen{ 15 * 1000 / 20 }; + constexpr uint32_t MediumThreshold{ 7 }; + constexpr uint32_t SubunitLengthN1{ (MaxLevel - MinLevel + N1 - 1) / N1 }; + constexpr uint32_t ImmediateBuffLen{ LongCount * N3 * N2 }; + constexpr uint32_t MediumsBuffLen{ LongCount * N3 }; + constexpr uint32_t LongsBuffLen{ LongCount }; + constexpr uint32_t LevelsBuffLen{ LongCount * N3 * N2 }; + constexpr double MinActivityScore{ 0.0000000001 }; + + inline int64_t BinomialCoefficient(int32_t n, int32_t r) + { + int32_t m = n - r; + + if (r < m) + { + r = m; + } + + int64_t t = 1; + for (int64_t i = n, j = 1; i > r; i--, ++j) + { + t = t * i / j; + } + + return t; + } + + inline double ComputeActivityScore( + const uint8_t vL, const uint32_t nR, const double p, const double lambda) + { + double activityScore = std::log(BinomialCoefficient(nR, vL)) + vL * std::log(p) + + (nR - vL) * std::log(1 - p) - std::log(lambda) + lambda * vL; + + if (activityScore < MinActivityScore) + { + activityScore = MinActivityScore; + } + + return activityScore; + } + + inline bool ComputeBigs( + const std::vector& littles, std::vector& bigs, uint8_t threashold) + { + uint32_t littleLen = littles.size(); + uint32_t bigLen = bigs.size(); + uint32_t littleLenPerBig = littleLen / bigLen; + bool changed = false; + + for (uint32_t b = 0, l = 0; b < bigLen; b++) + { + uint8_t sum = 0; + + for (uint32_t lEnd = l + littleLenPerBig; l < lEnd; ++l) + { + if (littles[l] > threashold) + { + ++sum; + } + } + + if (bigs[b] != sum) + { + bigs[b] = sum; + changed = true; + } + } + + return changed; + } + + ActiveSpeakerObserver::ActiveSpeakerObserver(const std::string& id, json& data) + : RTC::RtpObserver(id) + { + MS_TRACE(); + + auto jsonIntervalIt = data.find("interval"); + + if (jsonIntervalIt == data.end() || !jsonIntervalIt->is_number()) + MS_THROW_TYPE_ERROR("missing interval"); + + this->interval = jsonIntervalIt->get(); + + if (this->interval < 100) + this->interval = 100; + else if (this->interval > 5000) + this->interval = 5000; + + this->periodicTimer = new Timer(this); + + this->periodicTimer->Start(interval, interval); + } + + ActiveSpeakerObserver::~ActiveSpeakerObserver() + { + MS_TRACE(); + + delete this->periodicTimer; + } + + void ActiveSpeakerObserver::AddProducer(RTC::Producer* producer) + { + MS_TRACE(); + + if (producer->GetKind() != RTC::Media::Kind::AUDIO) + MS_THROW_TYPE_ERROR("not an audio Producer"); + + if (this->mapProducerSpeaker.find(producer->id) != this->mapProducerSpeaker.end()) + MS_THROW_ERROR("Producer already in map"); + + this->mapProducerSpeaker[producer->id].producer = producer; + this->mapProducerSpeaker[producer->id].speaker = new Speaker(); + } + + void ActiveSpeakerObserver::RemoveProducer(RTC::Producer* producer) + { + MS_TRACE(); + + auto it = this->mapProducerSpeaker.find(producer->id); + + if (it == this->mapProducerSpeaker.end()) + { + return; + } + + if (it->second.speaker != nullptr) + { + delete it->second.speaker; + } + + this->mapProducerSpeaker.erase(producer->id); + + if (producer->id == this->dominantId) + { + Update(); + } + } + + void ActiveSpeakerObserver::ProducerResumed(RTC::Producer* producer) + { + MS_TRACE(); + + auto it = this->mapProducerSpeaker.find(producer->id); + + if (it != this->mapProducerSpeaker.end()) + { + auto& rtpObserver = it->second; + + rtpObserver.speaker->paused = false; + } + } + + void ActiveSpeakerObserver::ProducerPaused(RTC::Producer* producer) + { + MS_TRACE(); + + auto it = this->mapProducerSpeaker.find(producer->id); + + if (it != this->mapProducerSpeaker.end()) + { + auto& rtpObserver = it->second; + + rtpObserver.speaker->paused = true; + } + } + + void ActiveSpeakerObserver::ReceiveRtpPacket(RTC::Producer* producer, RTC::RtpPacket* packet) + { + MS_TRACE(); + + if (IsPaused()) + return; + + uint8_t volume; + bool voice; + + if (!packet->ReadSsrcAudioLevel(volume, voice)) + return; + + auto it = this->mapProducerSpeaker.find(producer->id); + + if (it != this->mapProducerSpeaker.end()) + { + auto& rtpObserver = it->second; + uint64_t now = DepLibUV::GetTimeMs(); + + rtpObserver.speaker->LevelChanged(volume, now); + } + } + + void ActiveSpeakerObserver::Paused() + { + MS_TRACE(); + + this->periodicTimer->Stop(); + } + + void ActiveSpeakerObserver::Resumed() + { + MS_TRACE(); + + this->periodicTimer->Restart(); + } + + void ActiveSpeakerObserver::OnTimer(Timer* timer) + { + MS_TRACE(); + + Update(); + } + + void ActiveSpeakerObserver::Update() + { + MS_TRACE(); + + uint64_t now = DepLibUV::GetTimeMs(); + int64_t levelIdleTimeout = LevelIdleTimeout - (now - this->lastLevelIdleTime); + + if (levelIdleTimeout <= 0) + { + if (this->lastLevelIdleTime != 0) + { + TimeoutIdleLevels(now); + } + this->lastLevelIdleTime = now; + } + + if (!this->mapProducerSpeaker.empty() && CalculateActiveSpeaker()) + { + json data = json::object(); + data["producerId"] = this->dominantId; + + Channel::ChannelNotifier::Emit(this->id, "dominantspeaker", data); + } + } + + bool ActiveSpeakerObserver::CalculateActiveSpeaker() + { + MS_TRACE(); + + std::string newDominantId; + int32_t speakerCount = this->mapProducerSpeaker.size(); + + if (speakerCount == 0) + { + newDominantId = ""; + } + else if (speakerCount == 1) + { + auto it = this->mapProducerSpeaker.begin(); + + newDominantId = it->second.producer->id; + } + else + { + Speaker* dominantSpeaker = + (this->dominantId.empty()) ? nullptr : this->mapProducerSpeaker[this->dominantId].speaker; + + if (dominantSpeaker == nullptr) + { + auto item = this->mapProducerSpeaker.begin(); + newDominantId = item->first; + dominantSpeaker = item->second.speaker; + } + else + { + newDominantId = ""; + } + + dominantSpeaker->EvalActivityScores(); + double newDominantC2 = C2; + + for (auto it = this->mapProducerSpeaker.begin(); it != this->mapProducerSpeaker.end(); ++it) + { + Speaker* speaker = it->second.speaker; + const std::string& id = it->second.producer->id; + + if (id == this->dominantId || speaker->paused) + { + continue; + } + + speaker->EvalActivityScores(); + + for (int interval = 0; interval < this->relativeSpeachActivitiesLen; ++interval) + { + this->relativeSpeachActivities[interval] = std::log( + dominantSpeaker->GetActivityScore(interval) / speaker->GetActivityScore(interval)); + } + + double c1 = this->relativeSpeachActivities[0]; + double c2 = this->relativeSpeachActivities[1]; + double c3 = this->relativeSpeachActivities[2]; + + if ((c1 > C1) && (c2 > C2) && (c3 > C3) && (c2 > newDominantC2)) + { + newDominantC2 = c2; + newDominantId = id; + } + } + } + + if (!newDominantId.empty() && newDominantId != this->dominantId) + { + this->dominantId = newDominantId; + + return true; + } + + return false; + } + + void ActiveSpeakerObserver::TimeoutIdleLevels(uint64_t now) + { + MS_TRACE(); + + for (auto it = this->mapProducerSpeaker.begin(); it != this->mapProducerSpeaker.end(); ++it) + { + Speaker* speaker = it->second.speaker; + const std::string& id = it->second.producer->id; + uint64_t idle = now - speaker->lastLevelChangeTime; + + if (SpeakerIdleTimeout < idle && (this->dominantId.empty() || id != this->dominantId)) + { + speaker->paused = true; + } + else if (LevelIdleTimeout < idle) + { + speaker->LevelTimedOut(); + } + } + } + + ActiveSpeakerObserver::Speaker::Speaker() + { + MS_TRACE(); + + this->minLevel = MinLevel; + this->nextMinLevel = MinLevel; + this->immediateActivityScore = MinActivityScore; + this->mediumActivityScore = MinActivityScore; + this->longActivityScore = MinActivityScore; + + this->immediates.resize(ImmediateBuffLen); + this->mediums.resize(MediumsBuffLen); + this->longs.resize(LongsBuffLen); + this->levels.resize(LevelsBuffLen); + + this->lastLevelChangeTime = DepLibUV::GetTimeMs(); + } + + void ActiveSpeakerObserver::Speaker::EvalActivityScores() + { + MS_TRACE(); + + if (ComputeImmediates()) + { + EvalImmediateActivityScore(); + + if (ComputeMediums()) + { + EvalMediumActivityScore(); + + if (ComputeLongs()) + { + EvalLongActivityScore(); + } + } + } + } + + double ActiveSpeakerObserver::Speaker::GetActivityScore(int32_t interval) + { + MS_TRACE(); + + switch (interval) + { + case 0: + return this->immediateActivityScore; + case 1: + return this->mediumActivityScore; + case 2: + return this->longActivityScore; + default: + MS_ABORT("interval is invalid"); + } + + return 0; + } + + void ActiveSpeakerObserver::Speaker::LevelChanged(uint32_t level, uint64_t time) + { + if (this->lastLevelChangeTime <= time) + { + this->lastLevelChangeTime = time; + + int8_t b = 0; + + if (level < MinLevel) + { + b = MinLevel; + } + else if (level > MaxLevel) + { + b = MaxLevel; + } + else + { + b = level; + } + + std::copy(this->levels.begin(), this->levels.end() - 1, this->levels.begin() + 1); + + this->levels[0] = b; + UpdateMinLevel(b); + this->paused = false; + } + } + + void ActiveSpeakerObserver::Speaker::LevelTimedOut() + { + MS_TRACE(); + + LevelChanged(MinLevel, this->lastLevelChangeTime); + } + + bool ActiveSpeakerObserver::Speaker::ComputeImmediates() + { + MS_TRACE(); + + int8_t minLevel = this->minLevel + SubunitLengthN1; + bool changed = false; + + for (uint32_t i = 0; i < ImmediateBuffLen; ++i) + { + uint8_t level = this->levels[i]; + + if (level < minLevel) + { + level = MinLevel; + } + + uint8_t immediate = (level / SubunitLengthN1); + + if (this->immediates[i] != immediate) + { + this->immediates[i] = immediate; + changed = true; + } + } + + return changed; + } + + bool ActiveSpeakerObserver::Speaker::ComputeMediums() + { + MS_TRACE(); + + return ComputeBigs(this->immediates, this->mediums, MediumThreshold); + } + + bool ActiveSpeakerObserver::Speaker::ComputeLongs() + { + MS_TRACE(); + + return ComputeBigs(this->mediums, this->longs, LongThreashold); + } + + void ActiveSpeakerObserver::Speaker::EvalImmediateActivityScore() + { + MS_TRACE(); + + this->immediateActivityScore = ComputeActivityScore(this->immediates[0], N1, 0.5, 0.78); + } + + void ActiveSpeakerObserver::Speaker::EvalMediumActivityScore() + { + MS_TRACE(); + + this->mediumActivityScore = ComputeActivityScore(this->mediums[0], N2, 0.5, 24); + } + + void ActiveSpeakerObserver::Speaker::EvalLongActivityScore() + { + MS_TRACE(); + + this->longActivityScore = ComputeActivityScore(this->longs[0], N3, 0.5, 47); + } + + void ActiveSpeakerObserver::Speaker::UpdateMinLevel(int8_t level) + { + MS_TRACE(); + + if (level == MinLevel) + { + return; + } + + if ((this->minLevel == MinLevel) || (this->minLevel > level)) + { + this->minLevel = level; + this->nextMinLevel = MinLevel; + this->nextMinLevelWindowLen = 0; + } + else + { + if (this->nextMinLevel == MinLevel) + { + this->nextMinLevel = level; + this->nextMinLevelWindowLen = 1; + } + else + { + if (this->nextMinLevel > level) + { + this->nextMinLevel = level; + } + this->nextMinLevelWindowLen++; + + if (this->nextMinLevelWindowLen >= MinLevelWindowLen) + { + double newMinLevel = std::sqrt(static_cast(this->minLevel * this->nextMinLevel)); + + if (newMinLevel < MinLevel) + { + newMinLevel = MinLevel; + } + else if (newMinLevel > MaxLevel) + { + newMinLevel = MaxLevel; + } + + this->minLevel = static_cast(newMinLevel); + + this->nextMinLevel = MinLevel; + this->nextMinLevelWindowLen = 0; + } + } + } + } +} // namespace RTC diff --git a/worker/src/RTC/Router.cpp b/worker/src/RTC/Router.cpp index 383973ea97..17eac9e9e0 100644 --- a/worker/src/RTC/Router.cpp +++ b/worker/src/RTC/Router.cpp @@ -5,6 +5,7 @@ #include "Logger.hpp" #include "MediaSoupErrors.hpp" #include "Utils.hpp" +#include "RTC/ActiveSpeakerObserver.hpp" #include "RTC/AudioLevelObserver.hpp" #include "RTC/DirectTransport.hpp" #include "RTC/PipeTransport.hpp" @@ -270,6 +271,25 @@ namespace RTC break; } + case Channel::ChannelRequest::MethodId::ROUTER_CREATE_ACTIVE_SPEAKER_OBSERVER: + { + std::string rtpObserverId; + + // This may throw. + SetNewRtpObserverIdFromInternal(request->internal, rtpObserverId); + + auto* activeSpeakerObserver = new RTC::ActiveSpeakerObserver(rtpObserverId, request->data); + + // Insert into the map. + this->mapRtpObservers[rtpObserverId] = activeSpeakerObserver; + + MS_DEBUG_DEV("ActiveSpeakerObserver created [rtpObserverId:%s]", rtpObserverId.c_str()); + + request->Accept(); + + break; + } + case Channel::ChannelRequest::MethodId::ROUTER_CREATE_AUDIO_LEVEL_OBSERVER: { std::string rtpObserverId;