From c8d02824c7827ed43b022c4074bd6a66ce7c6d02 Mon Sep 17 00:00:00 2001 From: Jonathan Baudanza Date: Tue, 6 Oct 2020 17:43:39 -0700 Subject: [PATCH] Each room gets its own router --- single-page/dist/server.js | 59 +++++++++++----------------- single-page/src/server.ts | 79 +++++++++++++++----------------------- 2 files changed, 53 insertions(+), 85 deletions(-) diff --git a/single-page/dist/server.js b/single-page/dist/server.js index 3ac677a..57dd34a 100644 --- a/single-page/dist/server.js +++ b/single-page/dist/server.js @@ -36,9 +36,8 @@ expressApp.disable("x-powered-by"); // The Sentry handler must be the first middleware in the stack expressApp.use(Sentry.Handlers.requestHandler()); const log = debugModule('demo-app'); -// one mediasoup worker and router -// -let worker, router; +// One worker. If we scale this to multiple CPUs, we should create more workers +let worker; const emptyRoom = Object.freeze({ // external peers: {}, @@ -49,8 +48,6 @@ const emptyRoom = Object.freeze({ }); const roomState = {}; // -// we also send information about the active speaker, as tracked by -// our audioLevelObserver. // // internally, we keep lists of transports, producers, and // consumers. whenever we create a transport, producer, or consumer, @@ -58,12 +55,6 @@ const roomState = {}; // and consumers we also keep track of the client-side "media tag", to // correlate tracks. // -// -// our http server needs to send 'index.html' and 'client-bundle.js'. -// might as well just send everything in this directory ... -// -const dir = __dirname.replace(/dist$/, ""); -expressApp.use(express_1.default.static(dir)); function createHttpsServer() { const https = require('https'); const tls = { @@ -96,7 +87,7 @@ function withAsyncHandler(handler) { async function main() { // start mediasoup console.log('starting mediasoup'); - ({ worker, router } = await startMediasoup()); + worker = await startMediasoup(); // start https server, falling back to http if https fails console.log('starting express'); let server; @@ -135,12 +126,6 @@ function setSocketHandlers(socket) { socket.on('disconnect', () => { logSocket(`socketio disconnect`); }); - // --> /signaling/router-capabilities - // - // - socket.on('router-capabilities', withAsyncSocketHandler(async function () { - return { routerRtpCapabilities: router.rtpCapabilities }; - })); // --> /signaling/join-as-new-peer // // adds the peer to the roomState data structure and creates a @@ -152,14 +137,18 @@ function setSocketHandlers(socket) { const { peerId, roomId } = request; console.log('join-as-new-peer', peerId, roomId); if (!(roomId in roomState)) { - roomState[roomId] = Object.assign({}, emptyRoom); + // Create a new room with a new router + const mediaCodecs = config.mediasoup.router.mediaCodecs; + const router = await worker.createRouter({ mediaCodecs }); + roomState[roomId] = Object.assign({ router }, emptyRoom); } - roomState[roomId].peers[peerId] = { + const room = roomState[roomId]; + room.peers[peerId] = { joinTs: Date.now(), media: {}, consumerLayers: {}, stats: { producers: {}, consumers: {} } }; const response = { - routerRtpCapabilities: router.rtpCapabilities + routerRtpCapabilities: room.router.rtpCapabilities }; updatePeers(roomId); setSocketHandlersForPeer(socket, peerId, roomId); @@ -195,13 +184,12 @@ function setSocketHandlersForPeer(socket, peerId, roomId) { socket.on('create-transport', withAsyncSocketHandler(async (data) => { const request = protocol.createTransportRequest(data); log('create-transport', peerId, request.direction); - let transport = await createWebRtcTransport({ peerId, direction: request.direction }); - if (roomId in roomState) { - roomState[roomId].transports[transport.id] = transport; - } - else { - console.warn(`create-transport unable to find room ${roomId}`); + if (!(roomId in roomState)) { + throw new Error(`No room ${roomId}`); } + const room = roomState[roomId]; + let transport = await createWebRtcTransport({ router: room.router, peerId, direction: request.direction }); + room.transports[transport.id] = transport; let { id, iceParameters, iceCandidates, dtlsParameters } = transport; const response = { transportOptions: { id, iceParameters, iceCandidates, dtlsParameters } @@ -312,17 +300,18 @@ function setSocketHandlersForPeer(socket, peerId, roomId) { if (!(roomId in roomState)) { throw new Error(`No room ${roomId}`); } - const producer = roomState[roomId].producers.find((p) => p.appData.mediaTag === mediaTag && + const room = roomState[roomId]; + const producer = room.producers.find((p) => p.appData.mediaTag === mediaTag && p.appData.peerId === mediaPeerId); if (!producer) { throw new Error('server-side producer for ' + `${mediaPeerId}:${mediaTag} not found`); } - if (!router.canConsume({ producerId: producer.id, + if (!room.router.canConsume({ producerId: producer.id, // @ts-ignore rtpCapabilities })) { throw new Error(`client cannot consume ${mediaPeerId}:${mediaTag}`); } - const transport = Object.values(roomState[roomId].transports).find((t) => t.appData.peerId === peerId && t.appData.clientDirection === 'recv'); + const transport = Object.values(room.transports).find((t) => t.appData.peerId === peerId && t.appData.clientDirection === 'recv'); if (!transport) { throw new Error(`server-side recv transport for ${peerId} not found`); } @@ -347,8 +336,8 @@ function setSocketHandlersForPeer(socket, peerId, roomId) { // stick this consumer in our list of consumers to keep track of, // and create a data structure to track the client-relevant state // of this consumer - roomState[roomId].consumers.push(consumer); - roomState[roomId].peers[peerId].consumerLayers[consumer.id] = { + room.consumers.push(consumer); + room.peers[peerId].consumerLayers[consumer.id] = { currentLayer: null, clientSelectedLayer: null }; @@ -515,9 +504,7 @@ async function startMediasoup() { console.error('mediasoup worker died (this should never happen)'); process.exit(1); }); - const mediaCodecs = config.mediasoup.router.mediaCodecs; - const router = await worker.createRouter({ mediaCodecs }); - return { worker, router }; + return worker; } // // -- our minimal signaling is just http polling -- @@ -591,7 +578,7 @@ async function closeConsumer(roomId, consumer) { } } async function createWebRtcTransport(params) { - const { peerId, direction } = params; + const { peerId, direction, router } = params; const { listenIps, initialAvailableOutgoingBitrate } = config.mediasoup.webRtcTransport; const transport = await router.createWebRtcTransport({ listenIps: listenIps, diff --git a/single-page/src/server.ts b/single-page/src/server.ts index 7c5212c..2bc0e00 100644 --- a/single-page/src/server.ts +++ b/single-page/src/server.ts @@ -30,15 +30,16 @@ import type { Worker } from "mediasoup/lib/Worker"; import type { Socket } from "socket.io"; -// one mediasoup worker and router -// -let worker: Worker, router: Router; + +// One worker. If we scale this to multiple CPUs, we should create more workers +let worker: Worker; type Room = { peers: { [key: string]: Peer }, transports: {[key: string]: Transport }, producers: Array, - consumers: Array + consumers: Array, + router: Router }; type Media = { @@ -73,14 +74,8 @@ const roomState: { [roomId: string]: Room } = {}; // // for each peer that connects, we keep a table of peers and what -// tracks are being sent and received. we also need to know the last -// time we saw the peer, so that we can disconnect clients that have -// network issues. +// tracks are being sent and received. // -// for this simple demo, each client polls the server at 1hz, and we -// just send this room.peers data structure as our answer to each -// poll request. - type Peer = { joinTs: number, // TODO: I think this can be completed derived from the producers lists @@ -93,8 +88,6 @@ type Peer = { }; // -// we also send information about the active speaker, as tracked by -// our audioLevelObserver. // // internally, we keep lists of transports, producers, and // consumers. whenever we create a transport, producer, or consumer, @@ -103,14 +96,6 @@ type Peer = { // correlate tracks. // -// -// our http server needs to send 'index.html' and 'client-bundle.js'. -// might as well just send everything in this directory ... -// - -const dir = __dirname.replace(/dist$/, ""); -expressApp.use(express.static(dir)); - function createHttpsServer() { const https = require('https'); @@ -150,7 +135,7 @@ function withAsyncHandler(handler: express.Handler): express.Handler { async function main() { // start mediasoup console.log('starting mediasoup'); - ({ worker, router } = await startMediasoup()); + worker = await startMediasoup(); // start https server, falling back to http if https fails console.log('starting express'); @@ -198,14 +183,6 @@ function setSocketHandlers(socket: SocketIO.Socket) { logSocket(`socketio disconnect`); }); - // --> /signaling/router-capabilities - // - // - socket.on('router-capabilities', withAsyncSocketHandler(async function() { - return { routerRtpCapabilities: router.rtpCapabilities }; - })); - - // --> /signaling/join-as-new-peer // // adds the peer to the roomState data structure and creates a @@ -218,16 +195,21 @@ function setSocketHandlers(socket: SocketIO.Socket) { console.log('join-as-new-peer', peerId, roomId); if (!(roomId in roomState)) { - roomState[roomId] = Object.assign({}, emptyRoom); + // Create a new room with a new router + const mediaCodecs = config.mediasoup.router.mediaCodecs; + const router = await worker.createRouter({ mediaCodecs }); + + roomState[roomId] = Object.assign({ router }, emptyRoom); } + const room = roomState[roomId]; - roomState[roomId].peers[peerId] = { + room.peers[peerId] = { joinTs: Date.now(), media: {}, consumerLayers: {}, stats: { producers: {}, consumers: {}} }; const response: GuardType = { - routerRtpCapabilities: router.rtpCapabilities + routerRtpCapabilities: room.router.rtpCapabilities }; updatePeers(roomId); @@ -274,12 +256,13 @@ function setSocketHandlersForPeer(socket: SocketIO.Socket, peerId: string, roomI const request = protocol.createTransportRequest(data); log('create-transport', peerId, request.direction); - let transport = await createWebRtcTransport({ peerId, direction: request.direction }); - if (roomId in roomState) { - roomState[roomId].transports[transport.id] = transport; - } else { - console.warn(`create-transport unable to find room ${roomId}`); + if (!(roomId in roomState)) { + throw new Error(`No room ${roomId}`); } + const room = roomState[roomId]; + + let transport = await createWebRtcTransport({ router: room.router, peerId, direction: request.direction }); + room.transports[transport.id] = transport; let { id, iceParameters, iceCandidates, dtlsParameters } = transport; @@ -427,8 +410,9 @@ function setSocketHandlersForPeer(socket: SocketIO.Socket, peerId: string, roomI if (!(roomId in roomState)) { throw new Error(`No room ${roomId}`); } + const room = roomState[roomId]; - const producer = roomState[roomId].producers.find( + const producer = room.producers.find( (p) => p.appData.mediaTag === mediaTag && p.appData.peerId === mediaPeerId ); @@ -437,13 +421,13 @@ function setSocketHandlersForPeer(socket: SocketIO.Socket, peerId: string, roomI throw new Error('server-side producer for ' + `${mediaPeerId}:${mediaTag} not found`); } - if (!router.canConsume({ producerId: producer.id, + if (!room.router.canConsume({ producerId: producer.id, // @ts-ignore rtpCapabilities })) { throw new Error(`client cannot consume ${mediaPeerId}:${mediaTag}`); } - const transport = Object.values(roomState[roomId].transports).find((t) => + const transport = Object.values(room.transports).find((t) => t.appData.peerId === peerId && t.appData.clientDirection === 'recv' ); @@ -474,8 +458,8 @@ function setSocketHandlersForPeer(socket: SocketIO.Socket, peerId: string, roomI // stick this consumer in our list of consumers to keep track of, // and create a data structure to track the client-relevant state // of this consumer - roomState[roomId].consumers.push(consumer); - roomState[roomId].peers[peerId].consumerLayers[consumer.id] = { + room.consumers.push(consumer); + room.peers[peerId].consumerLayers[consumer.id] = { currentLayer: null, clientSelectedLayer: null }; @@ -696,10 +680,7 @@ async function startMediasoup() { process.exit(1); }); - const mediaCodecs = config.mediasoup.router.mediaCodecs; - const router = await worker.createRouter({ mediaCodecs }); - - return { worker, router }; + return worker; } // @@ -781,8 +762,8 @@ async function closeConsumer(roomId: string, consumer: Consumer) { } } -async function createWebRtcTransport(params: { peerId: string, direction: string }) { - const { peerId, direction } = params; +async function createWebRtcTransport(params: { router: Router, peerId: string, direction: string }) { + const { peerId, direction, router } = params; const { listenIps, initialAvailableOutgoingBitrate