Skip to content

Commit

Permalink
Each room gets its own router
Browse files Browse the repository at this point in the history
  • Loading branch information
jbaudanza committed Oct 7, 2020
1 parent ef8fa61 commit c8d0282
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 85 deletions.
59 changes: 23 additions & 36 deletions single-page/dist/server.js
Expand Up @@ -36,9 +36,8 @@ expressApp.disable("x-powered-by");
// The Sentry handler must be the first middleware in the stack
expressApp.use(Sentry.Handlers.requestHandler());
const log = debugModule('demo-app');
// one mediasoup worker and router
//
let worker, router;
// One worker. If we scale this to multiple CPUs, we should create more workers
let worker;
const emptyRoom = Object.freeze({
// external
peers: {},
Expand All @@ -49,21 +48,13 @@ const emptyRoom = Object.freeze({
});
const roomState = {};
//
// we also send information about the active speaker, as tracked by
// our audioLevelObserver.
//
// internally, we keep lists of transports, producers, and
// consumers. whenever we create a transport, producer, or consumer,
// we save the remote peerId in the object's `appData`. for producers
// and consumers we also keep track of the client-side "media tag", to
// correlate tracks.
//
//
// our http server needs to send 'index.html' and 'client-bundle.js'.
// might as well just send everything in this directory ...
//
const dir = __dirname.replace(/dist$/, "");
expressApp.use(express_1.default.static(dir));
function createHttpsServer() {
const https = require('https');
const tls = {
Expand Down Expand Up @@ -96,7 +87,7 @@ function withAsyncHandler(handler) {
async function main() {
// start mediasoup
console.log('starting mediasoup');
({ worker, router } = await startMediasoup());
worker = await startMediasoup();
// start https server, falling back to http if https fails
console.log('starting express');
let server;
Expand Down Expand Up @@ -135,12 +126,6 @@ function setSocketHandlers(socket) {
socket.on('disconnect', () => {
logSocket(`socketio disconnect`);
});
// --> /signaling/router-capabilities
//
//
socket.on('router-capabilities', withAsyncSocketHandler(async function () {
return { routerRtpCapabilities: router.rtpCapabilities };
}));
// --> /signaling/join-as-new-peer
//
// adds the peer to the roomState data structure and creates a
Expand All @@ -152,14 +137,18 @@ function setSocketHandlers(socket) {
const { peerId, roomId } = request;
console.log('join-as-new-peer', peerId, roomId);
if (!(roomId in roomState)) {
roomState[roomId] = Object.assign({}, emptyRoom);
// Create a new room with a new router
const mediaCodecs = config.mediasoup.router.mediaCodecs;
const router = await worker.createRouter({ mediaCodecs });
roomState[roomId] = Object.assign({ router }, emptyRoom);
}
roomState[roomId].peers[peerId] = {
const room = roomState[roomId];
room.peers[peerId] = {
joinTs: Date.now(),
media: {}, consumerLayers: {}, stats: { producers: {}, consumers: {} }
};
const response = {
routerRtpCapabilities: router.rtpCapabilities
routerRtpCapabilities: room.router.rtpCapabilities
};
updatePeers(roomId);
setSocketHandlersForPeer(socket, peerId, roomId);
Expand Down Expand Up @@ -195,13 +184,12 @@ function setSocketHandlersForPeer(socket, peerId, roomId) {
socket.on('create-transport', withAsyncSocketHandler(async (data) => {
const request = protocol.createTransportRequest(data);
log('create-transport', peerId, request.direction);
let transport = await createWebRtcTransport({ peerId, direction: request.direction });
if (roomId in roomState) {
roomState[roomId].transports[transport.id] = transport;
}
else {
console.warn(`create-transport unable to find room ${roomId}`);
if (!(roomId in roomState)) {
throw new Error(`No room ${roomId}`);
}
const room = roomState[roomId];
let transport = await createWebRtcTransport({ router: room.router, peerId, direction: request.direction });
room.transports[transport.id] = transport;
let { id, iceParameters, iceCandidates, dtlsParameters } = transport;
const response = {
transportOptions: { id, iceParameters, iceCandidates, dtlsParameters }
Expand Down Expand Up @@ -312,17 +300,18 @@ function setSocketHandlersForPeer(socket, peerId, roomId) {
if (!(roomId in roomState)) {
throw new Error(`No room ${roomId}`);
}
const producer = roomState[roomId].producers.find((p) => p.appData.mediaTag === mediaTag &&
const room = roomState[roomId];
const producer = room.producers.find((p) => p.appData.mediaTag === mediaTag &&
p.appData.peerId === mediaPeerId);
if (!producer) {
throw new Error('server-side producer for ' + `${mediaPeerId}:${mediaTag} not found`);
}
if (!router.canConsume({ producerId: producer.id,
if (!room.router.canConsume({ producerId: producer.id,
// @ts-ignore
rtpCapabilities })) {
throw new Error(`client cannot consume ${mediaPeerId}:${mediaTag}`);
}
const transport = Object.values(roomState[roomId].transports).find((t) => t.appData.peerId === peerId && t.appData.clientDirection === 'recv');
const transport = Object.values(room.transports).find((t) => t.appData.peerId === peerId && t.appData.clientDirection === 'recv');
if (!transport) {
throw new Error(`server-side recv transport for ${peerId} not found`);
}
Expand All @@ -347,8 +336,8 @@ function setSocketHandlersForPeer(socket, peerId, roomId) {
// stick this consumer in our list of consumers to keep track of,
// and create a data structure to track the client-relevant state
// of this consumer
roomState[roomId].consumers.push(consumer);
roomState[roomId].peers[peerId].consumerLayers[consumer.id] = {
room.consumers.push(consumer);
room.peers[peerId].consumerLayers[consumer.id] = {
currentLayer: null,
clientSelectedLayer: null
};
Expand Down Expand Up @@ -515,9 +504,7 @@ async function startMediasoup() {
console.error('mediasoup worker died (this should never happen)');
process.exit(1);
});
const mediaCodecs = config.mediasoup.router.mediaCodecs;
const router = await worker.createRouter({ mediaCodecs });
return { worker, router };
return worker;
}
//
// -- our minimal signaling is just http polling --
Expand Down Expand Up @@ -591,7 +578,7 @@ async function closeConsumer(roomId, consumer) {
}
}
async function createWebRtcTransport(params) {
const { peerId, direction } = params;
const { peerId, direction, router } = params;
const { listenIps, initialAvailableOutgoingBitrate } = config.mediasoup.webRtcTransport;
const transport = await router.createWebRtcTransport({
listenIps: listenIps,
Expand Down
79 changes: 30 additions & 49 deletions single-page/src/server.ts
Expand Up @@ -30,15 +30,16 @@ import type { Worker } from "mediasoup/lib/Worker";

import type { Socket } from "socket.io";

// one mediasoup worker and router
//
let worker: Worker, router: Router;

// One worker. If we scale this to multiple CPUs, we should create more workers
let worker: Worker;

type Room = {
peers: { [key: string]: Peer },
transports: {[key: string]: Transport },
producers: Array<Producer>,
consumers: Array<Consumer>
consumers: Array<Consumer>,
router: Router
};

type Media = {
Expand Down Expand Up @@ -73,14 +74,8 @@ const roomState: { [roomId: string]: Room } = {};

//
// for each peer that connects, we keep a table of peers and what
// tracks are being sent and received. we also need to know the last
// time we saw the peer, so that we can disconnect clients that have
// network issues.
// tracks are being sent and received.
//
// for this simple demo, each client polls the server at 1hz, and we
// just send this room.peers data structure as our answer to each
// poll request.

type Peer = {
joinTs: number,
// TODO: I think this can be completed derived from the producers lists
Expand All @@ -93,8 +88,6 @@ type Peer = {
};

//
// we also send information about the active speaker, as tracked by
// our audioLevelObserver.
//
// internally, we keep lists of transports, producers, and
// consumers. whenever we create a transport, producer, or consumer,
Expand All @@ -103,14 +96,6 @@ type Peer = {
// correlate tracks.
//

//
// our http server needs to send 'index.html' and 'client-bundle.js'.
// might as well just send everything in this directory ...
//

const dir = __dirname.replace(/dist$/, "");
expressApp.use(express.static(dir));

function createHttpsServer() {
const https = require('https');

Expand Down Expand Up @@ -150,7 +135,7 @@ function withAsyncHandler(handler: express.Handler): express.Handler {
async function main() {
// start mediasoup
console.log('starting mediasoup');
({ worker, router } = await startMediasoup());
worker = await startMediasoup();

// start https server, falling back to http if https fails
console.log('starting express');
Expand Down Expand Up @@ -198,14 +183,6 @@ function setSocketHandlers(socket: SocketIO.Socket) {
logSocket(`socketio disconnect`);
});

// --> /signaling/router-capabilities
//
//
socket.on('router-capabilities', withAsyncSocketHandler(async function() {
return { routerRtpCapabilities: router.rtpCapabilities };
}));


// --> /signaling/join-as-new-peer
//
// adds the peer to the roomState data structure and creates a
Expand All @@ -218,16 +195,21 @@ function setSocketHandlers(socket: SocketIO.Socket) {
console.log('join-as-new-peer', peerId, roomId);

if (!(roomId in roomState)) {
roomState[roomId] = Object.assign({}, emptyRoom);
// Create a new room with a new router
const mediaCodecs = config.mediasoup.router.mediaCodecs;
const router = await worker.createRouter({ mediaCodecs });

roomState[roomId] = Object.assign({ router }, emptyRoom);
}
const room = roomState[roomId];

roomState[roomId].peers[peerId] = {
room.peers[peerId] = {
joinTs: Date.now(),
media: {}, consumerLayers: {}, stats: { producers: {}, consumers: {}}
};

const response: GuardType<typeof protocol.joinAsNewPeerResponse> = {
routerRtpCapabilities: router.rtpCapabilities
routerRtpCapabilities: room.router.rtpCapabilities
};

updatePeers(roomId);
Expand Down Expand Up @@ -274,12 +256,13 @@ function setSocketHandlersForPeer(socket: SocketIO.Socket, peerId: string, roomI
const request = protocol.createTransportRequest(data);
log('create-transport', peerId, request.direction);

let transport = await createWebRtcTransport({ peerId, direction: request.direction });
if (roomId in roomState) {
roomState[roomId].transports[transport.id] = transport;
} else {
console.warn(`create-transport unable to find room ${roomId}`);
if (!(roomId in roomState)) {
throw new Error(`No room ${roomId}`);
}
const room = roomState[roomId];

let transport = await createWebRtcTransport({ router: room.router, peerId, direction: request.direction });
room.transports[transport.id] = transport;

let { id, iceParameters, iceCandidates, dtlsParameters } = transport;

Expand Down Expand Up @@ -427,8 +410,9 @@ function setSocketHandlersForPeer(socket: SocketIO.Socket, peerId: string, roomI
if (!(roomId in roomState)) {
throw new Error(`No room ${roomId}`);
}
const room = roomState[roomId];

const producer = roomState[roomId].producers.find(
const producer = room.producers.find(
(p) => p.appData.mediaTag === mediaTag &&
p.appData.peerId === mediaPeerId
);
Expand All @@ -437,13 +421,13 @@ function setSocketHandlersForPeer(socket: SocketIO.Socket, peerId: string, roomI
throw new Error('server-side producer for ' + `${mediaPeerId}:${mediaTag} not found`);
}

if (!router.canConsume({ producerId: producer.id,
if (!room.router.canConsume({ producerId: producer.id,
// @ts-ignore
rtpCapabilities })) {
throw new Error(`client cannot consume ${mediaPeerId}:${mediaTag}`);
}

const transport = Object.values(roomState[roomId].transports).find((t) =>
const transport = Object.values(room.transports).find((t) =>
t.appData.peerId === peerId && t.appData.clientDirection === 'recv'
);

Expand Down Expand Up @@ -474,8 +458,8 @@ function setSocketHandlersForPeer(socket: SocketIO.Socket, peerId: string, roomI
// stick this consumer in our list of consumers to keep track of,
// and create a data structure to track the client-relevant state
// of this consumer
roomState[roomId].consumers.push(consumer);
roomState[roomId].peers[peerId].consumerLayers[consumer.id] = {
room.consumers.push(consumer);
room.peers[peerId].consumerLayers[consumer.id] = {
currentLayer: null,
clientSelectedLayer: null
};
Expand Down Expand Up @@ -696,10 +680,7 @@ async function startMediasoup() {
process.exit(1);
});

const mediaCodecs = config.mediasoup.router.mediaCodecs;
const router = await worker.createRouter({ mediaCodecs });

return { worker, router };
return worker;
}

//
Expand Down Expand Up @@ -781,8 +762,8 @@ async function closeConsumer(roomId: string, consumer: Consumer) {
}
}

async function createWebRtcTransport(params: { peerId: string, direction: string }) {
const { peerId, direction } = params;
async function createWebRtcTransport(params: { router: Router, peerId: string, direction: string }) {
const { peerId, direction, router } = params;
const {
listenIps,
initialAvailableOutgoingBitrate
Expand Down

0 comments on commit c8d0282

Please sign in to comment.