Skip to content

Commit

Permalink
feat: add MediasoupPipeTransportManager
Browse files Browse the repository at this point in the history
  • Loading branch information
woody146 committed Jun 14, 2023
1 parent 993d192 commit 1f92849
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ DATABASE_LOGGING = 1
# PORT = 3000
SLAVE_FOR = producer
# host in lan network for master call api
SLAVE_INTERNAL_HOST = 'localhost'
SLAVE_INTERNAL_HOST = '127.0.0.1'
SLAVE_MAX_PEER = 1000

MEDIASOUP_LOG_LEVEL = warn
Expand Down
22 changes: 22 additions & 0 deletions src/apis/slave.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
mediasoupConsumerManager,
mediasoupConsumerWebRTCTransportManager,
mediasoupPipeTransportManager,
mediasoupProducerManager,
mediasoupProducerWebRTCTransportManager,
mediasoupRouterManager,
Expand All @@ -22,6 +23,20 @@ export default [
return mediasoupRouterManager.getRtpCapabilities(data);
},
},
{
method: 'POST',
url: '/routers/:routerId/destination_pipe_transports',
handler: (data) => {
return mediasoupPipeTransportManager.createDestination(data);
},
},
{
method: 'POST',
url: '/routers/:routerId/source_pipe_transports',
handler: (data) => {
return mediasoupPipeTransportManager.createSource(data);
},
},
{
method: 'POST',
url: '/routers/:routerId/producer_transports',
Expand Down Expand Up @@ -50,6 +65,13 @@ export default [
return mediasoupConsumerWebRTCTransportManager.connect(data);
},
},
{
method: 'POST',
url: '/pipe_transports/:transportId/consume',
handler: (data) => {
return mediasoupPipeTransportManager.consume(data);
},
},
{
method: 'POST',
url: '/transports/:transportId/producer',
Expand Down
1 change: 1 addition & 0 deletions src/services/mediasoup/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from './worker.js';
export * from './router.js';
export * from './webrtc.transport.js';
export * from './pipe.transport.js';
export * from './producer.js';
export * from './consumer.js';
106 changes: 106 additions & 0 deletions src/services/mediasoup/pipe.transport.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import { types } from 'mediasoup';
import { ServiceError } from '../base.js';
import { mediasoupRouterManager } from './router.js';
import { fetchApi } from '../../utils/api.js';

class MediasoupPipeTransportManager {
static transports = new Map<string, types.PipeTransport>();

async create(data: { routerId: string }) {
const router = mediasoupRouterManager.get(data.routerId);
if (router) {
const transport = await router.createPipeTransport({
listenIp: process.env.SLAVE_INTERNAL_HOST || '127.0.0.1',
enableSctp: true,
numSctpStreams: { OS: 1024, MIS: 1024 },
});
MediasoupPipeTransportManager.transports.set(transport.id, transport);
return transport;
}
throw new ServiceError(404, 'Router not found');
}

async createDestination(data: {
routerId: string;
sourceHost: string;
sourcePort: string;
sourceRouterId: string;
sourceProducerId: string;
}) {
const transport = await this.create(data);
const sourceResult = await fetchApi({
host: data.sourceHost,
port: data.sourcePort,
path: '/routers/:routerId/source_pipe_transports',
method: 'POST',
data: {
routerId: data.sourceRouterId,
destinationIp: transport.tuple.localIp,
destinationPort: transport.tuple.localPort,
destinationSrtpParameters: transport.srtpParameters,
},
});
await transport.connect({
ip: sourceResult.sourceIp,
port: sourceResult.sourcePort,
srtpParameters: sourceResult.sourceSrtpParameters,
});
const consumerResult = await fetchApi({
host: data.sourceHost,
port: data.sourcePort,
path: '/pipe_transports/:transportId/consume',
method: 'POST',
data: {
transportId: sourceResult.id,
producerId: data.sourceProducerId,
},
});
const pipeDataProducer = await transport.produce({
id: data.sourceProducerId,
kind: consumerResult.kind,
rtpParameters: consumerResult.rtpParameters,
paused: consumerResult.producerPaused,
});
return { id: pipeDataProducer.id };
}

async createSource(data: {
routerId: string;
destinationIp: string;
destinationPort: number;
destinationSrtpParameters: types.SrtpParameters;
}) {
const transport = await this.create(data);
await transport.connect({
ip: data.destinationIp,
port: data.destinationPort,
srtpParameters: data.destinationSrtpParameters,
});
return {
id: transport.id,
sourceIp: transport.tuple.localIp,
sourcePort: transport.tuple.localPort,
sourceSrtpParameters: transport.srtpParameters,
};
}

async consume(data: { transportId: string; producerId: string }) {
const transport = MediasoupPipeTransportManager.transports.get(
data.transportId
);
if (transport) {
const pipeConsumer = await transport.consume({
producerId: data.producerId,
});
return {
kind: pipeConsumer.kind,
rtpParameters: pipeConsumer.rtpParameters,
paused: pipeConsumer.producerPaused,
};
}
throw new ServiceError(404, 'Transport not found');
}
}

export const mediasoupPipeTransportManager =
new MediasoupPipeTransportManager();

0 comments on commit 1f92849

Please sign in to comment.