From ff1e8da5ceed9efb98aad2945a78e9318a624731 Mon Sep 17 00:00:00 2001 From: woody Date: Tue, 20 Jun 2023 15:49:22 +0700 Subject: [PATCH] feat: create MediaConsumer and MediaProducer --- README.md | 2 +- examples/rooms/src/components/Consumer.tsx | 75 +++++++++++++--------- examples/rooms/src/components/Producer.tsx | 4 +- src/entities/index.ts | 2 + src/entities/media.consumer.ts | 29 +++++++++ src/entities/media.peer.ts | 11 ++-- src/entities/media.producer.ts | 29 +++++++++ src/services/peer.ts | 31 +++++---- 8 files changed, 131 insertions(+), 52 deletions(-) create mode 100644 src/entities/media.consumer.ts create mode 100644 src/entities/media.producer.ts diff --git a/README.md b/README.md index 017bce8..6003f65 100644 --- a/README.md +++ b/README.md @@ -163,7 +163,7 @@ Then, navigate to https://localhost:4430 diff --git a/examples/rooms/src/components/Consumer.tsx b/examples/rooms/src/components/Consumer.tsx index a8b7867..b82d980 100644 --- a/examples/rooms/src/components/Consumer.tsx +++ b/examples/rooms/src/components/Consumer.tsx @@ -1,51 +1,64 @@ import { types } from 'mediasoup-client'; -import { useEffect, useMemo, useRef, useState } from 'react'; +import { useEffect, useRef, useState } from 'react'; import { fetchApi } from '../services/api'; export function Consumer({ device, - producerId, + producers, transport, }: { device: types.Device; - producerId: string; + producers: Record; transport: types.Transport; }) { + const [stream, setStream] = useState(); const ref = useRef(null); - const subscribe = useMemo(() => { - return async () => { - const { rtpCapabilities } = device; - const { id, kind, rtpParameters } = await fetchApi({ - path: `/api/consumer_peers/${transport.id}/consume`, - method: 'POST', - data: { rtpCapabilities, producerId }, - }); + const subscribe = async (producerId: string) => { + const { rtpCapabilities } = device; + const { id, kind, rtpParameters } = await fetchApi({ + path: `/api/consumer_peers/${transport.id}/consume`, + method: 'POST', + data: { rtpCapabilities, producerId }, + }); - const consumer = await transport.consume({ - id, - producerId, - kind, - rtpParameters, - }); - const stream = new MediaStream(); + const consumer = await transport.consume({ + id, + producerId, + kind, + rtpParameters, + }); + + let mStream; + if (!stream) { + mStream = new MediaStream(); + mStream.addTrack(consumer.track); + setStream(mStream); + } else { stream.addTrack(consumer.track); + } + if (ref.current) { + ref.current.srcObject = (stream || mStream) as any; + await fetchApi({ + path: `/api/consumer_peers/${transport.id}/resume`, + method: 'POST', + data: { consumerId: id }, + }); + } + }; - if (ref.current) { - ref.current.srcObject = stream; - fetchApi({ - path: `/api/consumer_peers/${transport.id}/resume`, - method: 'POST', - data: { consumerId: id }, - }); - } - }; - }, [device, producerId, transport]); + useEffect(() => { + const mediaStream = new MediaStream(); + setStream(mediaStream); + if (ref.current) { + ref.current.srcObject = mediaStream; + } + }, []); useEffect(() => { - subscribe(); - }, [subscribe]); + producers.map((item: any) => subscribe(item.id)); + }, []); return (
@@ -137,7 +150,7 @@ export function Consumers({
))} diff --git a/examples/rooms/src/components/Producer.tsx b/examples/rooms/src/components/Producer.tsx index 8307be0..a6845ce 100644 --- a/examples/rooms/src/components/Producer.tsx +++ b/examples/rooms/src/components/Producer.tsx @@ -81,8 +81,8 @@ export function Producer({ video: useVideo, audio: useAudio, }); - const track = stream.getVideoTracks()[0]; - await transport.produce({ track }); + await transport.produce({ track: stream.getVideoTracks()[0] }); + await transport.produce({ track: stream.getAudioTracks()[0] }); } catch (err: any) { setLog(err.toString()); throw err; diff --git a/src/entities/index.ts b/src/entities/index.ts index 552938c..d65b97a 100644 --- a/src/entities/index.ts +++ b/src/entities/index.ts @@ -2,3 +2,5 @@ export * from './media.worker.js'; export * from './media.room.js'; export * from './media.peer.js'; export * from './media.router.js'; +export * from './media.consumer.js'; +export * from './media.producer.js'; diff --git a/src/entities/media.consumer.ts b/src/entities/media.consumer.ts new file mode 100644 index 0000000..b767977 --- /dev/null +++ b/src/entities/media.consumer.ts @@ -0,0 +1,29 @@ +import { + Entity, + BaseEntity, + PrimaryColumn, + Column, + CreateDateColumn, + ManyToOne, +} from 'typeorm'; +import { MediaPeer } from './media.peer.js'; + +@Entity() +export class MediaConsumer extends BaseEntity { + @PrimaryColumn('uuid') + id!: string; + + @Column('uuid') + producerId!: string; + + @Column('uuid') + peerId!: string; + + @ManyToOne(() => MediaPeer, (peer) => peer.consumers, { + onDelete: 'CASCADE', + }) + peer!: MediaPeer; + + @CreateDateColumn() + createDate!: Date; +} diff --git a/src/entities/media.peer.ts b/src/entities/media.peer.ts index e65e17f..cbdf371 100644 --- a/src/entities/media.peer.ts +++ b/src/entities/media.peer.ts @@ -6,9 +6,12 @@ import { PrimaryColumn, ManyToOne, Index, + OneToMany, } from 'typeorm'; import { MediaWorker } from './media.worker.js'; import { MediaRoom } from './media.room.js'; +import { MediaConsumer } from './media.consumer.js'; +import { MediaProducer } from './media.producer.js'; @Entity() export class MediaPeer extends BaseEntity { @@ -36,11 +39,11 @@ export class MediaPeer extends BaseEntity { @Column('text', { nullable: true }) userId?: string; - @Column('uuid', { nullable: true }) - producerId?: string; + @OneToMany(() => MediaConsumer, (consumer) => consumer.peer) + consumers!: MediaConsumer[]; - @Column({ type: 'jsonb', default: {} }) - consumers!: any; + @OneToMany(() => MediaProducer, (producer) => producer.peer) + producers!: MediaProducer[]; @Column('text') type!: string; // consumer | producer diff --git a/src/entities/media.producer.ts b/src/entities/media.producer.ts new file mode 100644 index 0000000..815bf62 --- /dev/null +++ b/src/entities/media.producer.ts @@ -0,0 +1,29 @@ +import { + Entity, + BaseEntity, + PrimaryColumn, + Column, + CreateDateColumn, + ManyToOne, +} from 'typeorm'; +import { MediaPeer } from './media.peer.js'; + +@Entity() +export class MediaProducer extends BaseEntity { + @PrimaryColumn('uuid') + id!: string; + + @Column('text') + kind!: string; + + @Column('uuid') + peerId!: string; + + @ManyToOne(() => MediaPeer, (peer) => peer.producers, { + onDelete: 'CASCADE', + }) + peer!: MediaPeer; + + @CreateDateColumn() + createDate!: Date; +} diff --git a/src/services/peer.ts b/src/services/peer.ts index 2b4e2c1..ed7bf04 100644 --- a/src/services/peer.ts +++ b/src/services/peer.ts @@ -1,12 +1,12 @@ import { types } from 'mediasoup'; -import { IsNull, Not } from 'typeorm'; import { constants } from '../constants.js'; -import { MediaPeer, MediaWorker } from '../entities/index.js'; +import { MediaConsumer, MediaPeer, MediaWorker } from '../entities/index.js'; import { fetchApi } from '../utils/api.js'; import { BaseService, ServiceError } from './base.js'; import { RoomService } from './room.js'; import { RouterService } from './router.js'; +import { MediaProducer } from '../entities/media.producer.js'; export class PeerService extends BaseService { async createProducer(data: { @@ -68,8 +68,11 @@ export class PeerService extends BaseService { }, }); - peer.producerId = result.id; - await this.dataSource.getRepository(MediaPeer).save(peer); + const producer = new MediaProducer(); + producer.id = result.id; + producer.kind = data.kind; + producer.peerId = peer.id; + await this.dataSource.getRepository(MediaProducer).save(producer); return result; } throw new ServiceError(400, 'Invalid peer'); @@ -153,7 +156,7 @@ export class PeerService extends BaseService { id: string; }> { const peer = await this.get({ peerId: data.peerId }); - if (peer.type === constants.CONSUMER && !peer.consumers[data.producerId]) { + if (peer.type === constants.CONSUMER) { await this.createService(RouterService).checkToPipe({ routerId: peer.routerId, producerId: data.producerId, @@ -172,8 +175,11 @@ export class PeerService extends BaseService { }, }); - peer.consumers[data.producerId] = result.id; - await this.dataSource.getRepository(MediaPeer).save(peer); + const consumer = new MediaConsumer(); + consumer.id = result.id; + consumer.producerId = data.producerId; + consumer.peerId = peer.id; + await this.dataSource.getRepository(MediaConsumer).save(consumer); return result; } throw new ServiceError(400, 'Invalid type peer'); @@ -217,10 +223,7 @@ export class PeerService extends BaseService { async resume(data: { peerId: string; consumerId: string }) { const peer = await this.get({ peerId: data.peerId }); - if ( - peer.type === constants.CONSUMER && - Object.values(peer.consumers).includes(data.consumerId) - ) { + if (peer.type === constants.CONSUMER) { await fetchApi({ host: peer.worker.internalHost, port: peer.worker.apiPort, @@ -247,14 +250,14 @@ export class PeerService extends BaseService { async getProducers(data: { roomId: string }): Promise< Array<{ id: string; - producerId: string; + producers: Array<{ id: string; kind: string }>; }> > { return this.dataSource.getRepository(MediaPeer).find({ - select: ['id', 'producerId'], + relations: { producers: true }, + select: ['id', 'producers'], where: { roomId: data.roomId, - producerId: Not(IsNull()), type: constants.PRODUCER, }, }) as any;