diff --git a/src/apis/master.ts b/src/apis/master.ts index f005e60..1e302dd 100644 --- a/src/apis/master.ts +++ b/src/apis/master.ts @@ -3,6 +3,7 @@ import { RoomService, RouterService, UserService, + ProducerService, } from '../services/index.js'; import { RouteConfig, getDataSource } from '../utils/index.js'; @@ -95,7 +96,21 @@ export default [ method: 'POST', url: '/producer_transports/:transportId/produce', handler: (data) => { - return new TransportService(getDataSource()).produce(data); + return new ProducerService(getDataSource()).create(data); + }, + }, + { + method: 'POST', + url: '/producers/:producerId/resume', + handler: (data) => { + return new ProducerService(getDataSource()).resume(data); + }, + }, + { + method: 'POST', + url: '/producers/:producerId/pause', + handler: (data) => { + return new ProducerService(getDataSource()).pause(data); }, }, { diff --git a/src/apis/slave.ts b/src/apis/slave.ts index b1f2386..624d434 100644 --- a/src/apis/slave.ts +++ b/src/apis/slave.ts @@ -107,6 +107,20 @@ export default [ return mediasoupConsumerManager.create(data); }, }, + { + method: 'POST', + url: '/producers/:producerId/resume', + handler: (data) => { + return mediasoupProducerManager.resume(data); + }, + }, + { + method: 'POST', + url: '/producers/:producerId/pause', + handler: (data) => { + return mediasoupProducerManager.pause(data); + }, + }, { method: 'POST', url: '/consumers/:consumerId/resume', diff --git a/src/services/index.ts b/src/services/index.ts index 79f96a1..2190dd3 100644 --- a/src/services/index.ts +++ b/src/services/index.ts @@ -3,4 +3,5 @@ export * from './room.js'; export * from './transport.js'; export * from './router.js'; export * from './user.js'; +export * from './producer.js'; export * from './mediasoup/index.js'; diff --git a/src/services/mediasoup/producer.ts b/src/services/mediasoup/producer.ts index 284ded8..49a9074 100644 --- a/src/services/mediasoup/producer.ts +++ b/src/services/mediasoup/producer.ts @@ -1,5 +1,6 @@ import { types } from 'mediasoup'; import { mediasoupProducerWebRTCTransportManager } from './webrtc.transport.js'; +import { ServiceError } from '../base.js'; class MediasoupProducerManager { static producers = new Map(); @@ -18,6 +19,26 @@ class MediasoupProducerManager { return { id: producer.id }; } + + async pause(data: { producerId: string }) { + const producer = this.get(data); + await producer.pause(); + return {}; + } + + async resume(data: { producerId: string }) { + const producer = this.get(data); + await producer.resume(); + return {}; + } + + get(data: { producerId: string }) { + const producer = MediasoupProducerManager.producers.get(data.producerId); + if (producer) { + return producer; + } + throw new ServiceError(404, 'Producer not found'); + } } export const mediasoupProducerManager = new MediasoupProducerManager(); diff --git a/src/services/producer.ts b/src/services/producer.ts new file mode 100644 index 0000000..95ed79a --- /dev/null +++ b/src/services/producer.ts @@ -0,0 +1,85 @@ +import { constants } from '../constants.js'; +import { MediaProducer } from '../entities/index.js'; +import { fetchApi } from '../utils/api.js'; +import { BaseService, ServiceError } from './base.js'; +import { TransportService } from './transport.js'; + +export class ProducerService extends BaseService { + async create(data: { + transportId: string; + kind: any; + rtpParameters: any; + }): Promise<{ + /** + * Producer id + */ + id: string; + }> { + const transport = await this.createService(TransportService).get({ + transportId: data.transportId, + }); + if (transport.type === constants.PRODUCER) { + const result = await fetchApi({ + host: transport.worker.apiHost, + port: transport.worker.apiPort, + path: '/transports/:transportId/producer', + method: 'POST', + data: { + transportId: transport.id, + kind: data.kind, + rtpParameters: data.rtpParameters, + }, + }); + + const producer = new MediaProducer(); + producer.id = result.id; + producer.kind = data.kind; + producer.transportId = transport.id; + await this.dataSource.getRepository(MediaProducer).save(producer); + return result; + } + throw new ServiceError(400, 'Invalid transport'); + } + + async pause(data: { producerId: string }) { + const producer = await this.get(data); + const transport = await this.createService(TransportService).get({ + transportId: producer.transportId, + }); + await fetchApi({ + host: transport.worker.apiHost, + port: transport.worker.apiPort, + path: '/producers/:producerId/pause', + method: 'POST', + data: { producerId: data.producerId }, + }); + return {}; + } + + async resume(data: { producerId: string }) { + const producer = await this.get(data); + const transport = await this.createService(TransportService).get({ + transportId: producer.transportId, + }); + await fetchApi({ + host: transport.worker.apiHost, + port: transport.worker.apiPort, + path: '/producers/:producerId/resume', + method: 'POST', + data: { producerId: data.producerId }, + }); + return {}; + } + + async get(data: { producerId: string }) { + const producer = await this.dataSource + .getRepository(MediaProducer) + .findOne({ + where: { id: data.producerId }, + }); + if (producer) { + return producer; + } + throw new ServiceError(404, 'Producer not found'); + } +} diff --git a/src/services/transport.ts b/src/services/transport.ts index a422dfd..bb84594 100644 --- a/src/services/transport.ts +++ b/src/services/transport.ts @@ -10,7 +10,6 @@ import { fetchApi } from '../utils/api.js'; import { BaseService, ServiceError } from './base.js'; import { RoomService } from './room.js'; import { RouterService } from './router.js'; -import { MediaProducer } from '../entities/media.producer.js'; export class TransportService extends BaseService { async createProducer(data: { @@ -48,40 +47,6 @@ export class TransportService extends BaseService { return result; } - async produce(data: { - transportId: string; - kind: any; - rtpParameters: any; - }): Promise<{ - /** - * Producer id - */ - id: string; - }> { - const transport = await this.get({ transportId: data.transportId }); - if (transport.type === constants.PRODUCER) { - const result = await fetchApi({ - host: transport.worker.apiHost, - port: transport.worker.apiPort, - path: '/transports/:transportId/producer', - method: 'POST', - data: { - transportId: transport.id, - kind: data.kind, - rtpParameters: data.rtpParameters, - }, - }); - - const producer = new MediaProducer(); - producer.id = result.id; - producer.kind = data.kind; - producer.transportId = transport.id; - await this.dataSource.getRepository(MediaProducer).save(producer); - return result; - } - throw new ServiceError(400, 'Invalid transport'); - } - async createConsumer(data: { routerId: string; userId?: string }): Promise<{ id: string; iceParameters: types.IceParameters;