/
locusMediaRequest.ts
313 lines (269 loc) · 9.08 KB
/
locusMediaRequest.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
/* eslint-disable valid-jsdoc */
import {defer} from 'lodash';
import {Defer} from '@webex/common';
import {WebexPlugin} from '@webex/webex-core';
import {MEDIA, HTTP_VERBS, ROAP, IP_VERSION} from '../constants';
import LoggerProxy from '../common/logs/logger-proxy';
export type MediaRequestType = 'RoapMessage' | 'LocalMute';
export type RequestResult = any;
export type RoapRequest = {
type: 'RoapMessage';
selfUrl: string;
mediaId: string;
roapMessage: any;
reachability: any;
sequence?: any;
joinCookie: any; // any, because this is opaque to the client, we pass whatever object we got from one backend component (Orpheus) to the other (Locus)
ipVersion?: IP_VERSION;
};
export type LocalMuteRequest = {
type: 'LocalMute';
selfUrl: string;
mediaId: string;
sequence?: any;
muteOptions: {
audioMuted?: boolean;
videoMuted?: boolean;
};
};
export type Request = RoapRequest | LocalMuteRequest;
/** Class representing a single /media request being sent to Locus */
class InternalRequestInfo {
public readonly request: Request;
private pendingPromises: Defer[];
private sendRequestFn: (request: Request) => Promise<RequestResult>;
/** Constructor */
constructor(
request: Request,
pendingPromise: Defer,
sendRequestFn: (request: Request) => Promise<RequestResult>
) {
this.request = request;
this.pendingPromises = [pendingPromise];
this.sendRequestFn = sendRequestFn;
}
/**
* Returns the list of pending promises associated with this request
*/
public getPendingPromises() {
return this.pendingPromises;
}
/**
* Adds promises to the list of pending promises associated with this request
*/
public addPendingPromises(pendingPromises: Defer[]) {
this.pendingPromises.push(...pendingPromises);
}
/**
* Executes the request. Returned promise is resolved once the request
* is completed (no matter if it succeeded or failed).
*/
public execute(): Promise<void> {
return this.sendRequestFn(this.request)
.then((result) => {
// resolve all the pending promises associated with this request
this.pendingPromises.forEach((d) => d.resolve(result));
})
.catch((e) => {
// reject all the pending promises associated with this request
this.pendingPromises.forEach((d) => d.reject(e));
});
}
}
export type Config = {
device: {
url: string;
deviceType: string;
countryCode?: string;
regionCode?: string;
};
correlationId: string;
preferTranscoding: boolean;
};
/**
* Returns true if the request is triggering confluence creation in the server
*/
function isRequestAffectingConfluenceState(request: Request): boolean {
return (
request.type === 'RoapMessage' && request.roapMessage.messageType === ROAP.ROAP_TYPES.OFFER
);
}
/**
* This class manages all /media API requests to Locus. Every call to that
* Locus API has to go through this class.
*/
export class LocusMediaRequest extends WebexPlugin {
private config: Config;
private latestAudioMuted?: boolean;
private latestVideoMuted?: boolean;
private isRequestInProgress: boolean;
private queuedRequests: InternalRequestInfo[];
private confluenceState: 'not created' | 'creation in progress' | 'created';
/**
* Constructor
*/
constructor(config: Config, options: any) {
super({}, options);
this.isRequestInProgress = false;
this.queuedRequests = [];
this.config = config;
this.confluenceState = 'not created';
}
/**
* Add a request to the internal queue.
*/
private addToQueue(info: InternalRequestInfo) {
if (info.request.type === 'LocalMute' && this.queuedRequests.length > 0) {
// We don't need additional local mute requests in the queue.
// We only need at most 1 local mute or 1 roap request, because
// roap requests also include mute state, so whatever request
// is sent out, it will send the latest local mute state.
// We only need to store the pendingPromises so that they get resolved
// when the roap request is sent out.
this.queuedRequests[0].addPendingPromises(info.getPendingPromises());
return;
}
if (info.request.type === 'RoapMessage' && this.queuedRequests.length > 0) {
// remove any LocalMute requests from the queue, because this Roap message
// will also update the mute status in Locus, so they are redundant
this.queuedRequests = this.queuedRequests.filter((r) => {
if (r.request.type === 'LocalMute') {
// we need to keep the pending promises from the local mute request
// that we're removing from the queue
info.addPendingPromises(r.getPendingPromises());
return false;
}
return true;
});
}
this.queuedRequests.push(info);
}
/**
* Takes the next request from the queue and executes it. Once that
* request is completed, the next one will be taken from the queue
* and executed and this is repeated until the queue is empty.
*/
private executeNextQueuedRequest(): void {
if (this.isRequestInProgress) {
return;
}
const nextRequest = this.queuedRequests.shift();
if (nextRequest) {
this.isRequestInProgress = true;
nextRequest.execute().then(() => {
this.isRequestInProgress = false;
this.executeNextQueuedRequest();
});
}
}
/**
* Returns latest requested audio and video mute values. If they have never been
* requested, we assume audio/video to be muted.
*/
private getLatestMuteState() {
const audioMuted = this.latestAudioMuted !== undefined ? this.latestAudioMuted : true;
const videoMuted = this.latestVideoMuted !== undefined ? this.latestVideoMuted : true;
return {audioMuted, videoMuted};
}
/**
* Prepares the uri and body for the media request to be sent to Locus
*/
private sendHttpRequest(request: Request) {
const uri = `${request.selfUrl}/${MEDIA}`;
const {audioMuted, videoMuted} = this.getLatestMuteState();
// first setup things common to all requests
const body: any = {
device: this.config.device,
correlationId: this.config.correlationId,
clientMediaPreferences: {
preferTranscoding: this.config.preferTranscoding,
ipver: request.type === 'RoapMessage' ? request.ipVersion : undefined,
},
};
const localMedias: any = {
audioMuted,
videoMuted,
};
// now add things specific to request type
switch (request.type) {
case 'LocalMute':
body.respOnlySdp = true;
body.usingResource = null;
break;
case 'RoapMessage':
localMedias.roapMessage = request.roapMessage;
localMedias.reachability = request.reachability;
body.clientMediaPreferences.joinCookie = request.joinCookie;
break;
}
if (request.sequence) {
body.sequence = request.sequence;
}
body.localMedias = [
{
localSdp: JSON.stringify(localMedias), // this part must be JSON stringified, Locus requires this
mediaId: request.mediaId,
},
];
LoggerProxy.logger.info(
`Meeting:LocusMediaRequest#sendHttpRequest --> ${request.type} audioMuted=${audioMuted} videoMuted=${videoMuted}`
);
if (isRequestAffectingConfluenceState(request) && this.confluenceState === 'not created') {
this.confluenceState = 'creation in progress';
}
// @ts-ignore
return this.request({
method: HTTP_VERBS.PUT,
uri,
body,
})
.then((result) => {
if (isRequestAffectingConfluenceState(request)) {
this.confluenceState = 'created';
}
return result;
})
.catch((e) => {
if (
isRequestAffectingConfluenceState(request) &&
this.confluenceState === 'creation in progress'
) {
this.confluenceState = 'not created';
}
throw e;
});
}
/**
* Sends a media request to Locus
*/
public send(request: Request): Promise<RequestResult> {
if (request.type === 'LocalMute') {
const {audioMuted, videoMuted} = request.muteOptions;
if (audioMuted !== undefined) {
this.latestAudioMuted = audioMuted;
}
if (videoMuted !== undefined) {
this.latestVideoMuted = videoMuted;
}
if (this.confluenceState === 'not created') {
// if there is no confluence, there is no point sending out local mute request
// as it will fail so we just store the latest audio/video muted values
// and resolve immediately, so that higher layer (MuteState class) doesn't get blocked
// and can call us again if user mutes/unmutes again before confluence is created
LoggerProxy.logger.info(
'Meeting:LocusMediaRequest#send --> called with LocalMute request before confluence creation'
);
return Promise.resolve({});
}
}
const pendingPromise = new Defer();
const newRequest = new InternalRequestInfo(
request,
pendingPromise,
this.sendHttpRequest.bind(this)
);
this.addToQueue(newRequest);
defer(() => this.executeNextQueuedRequest());
return pendingPromise.promise;
}
}