Skip to content

Commit

Permalink
feat: create MediaConsumer and MediaProducer
Browse files Browse the repository at this point in the history
  • Loading branch information
woody146 committed Jun 20, 2023
1 parent 61a8055 commit ff1e8da
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 52 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ Then, navigate to https://localhost:4430
<td>
<ul>
<li>id: peer id</li>
<li>producerId: producer id</li>
<li>producers: Array<{ id: string; kind: string }></li>
</ul>
</td>
</tr>
Expand Down
75 changes: 44 additions & 31 deletions examples/rooms/src/components/Consumer.tsx
Original file line number Diff line number Diff line change
@@ -1,51 +1,64 @@
import { types } from 'mediasoup-client';
import { useEffect, useMemo, useRef, useState } from 'react';
import { useEffect, useRef, useState } from 'react';

import { fetchApi } from '../services/api';

export function Consumer({
device,
producerId,
producers,
transport,
}: {
device: types.Device;
producerId: string;
producers: Record<string, any>;
transport: types.Transport;
}) {
const [stream, setStream] = useState<MediaStream>();
const ref = useRef<HTMLVideoElement>(null);

const subscribe = useMemo(() => {
return async () => {
const { rtpCapabilities } = device;
const { id, kind, rtpParameters } = await fetchApi({
path: `/api/consumer_peers/${transport.id}/consume`,
method: 'POST',
data: { rtpCapabilities, producerId },
});
const subscribe = async (producerId: string) => {
const { rtpCapabilities } = device;
const { id, kind, rtpParameters } = await fetchApi({
path: `/api/consumer_peers/${transport.id}/consume`,
method: 'POST',
data: { rtpCapabilities, producerId },
});

const consumer = await transport.consume({
id,
producerId,
kind,
rtpParameters,
});
const stream = new MediaStream();
const consumer = await transport.consume({
id,
producerId,
kind,
rtpParameters,
});

let mStream;
if (!stream) {
mStream = new MediaStream();
mStream.addTrack(consumer.track);
setStream(mStream);
} else {
stream.addTrack(consumer.track);
}
if (ref.current) {
ref.current.srcObject = (stream || mStream) as any;
await fetchApi({
path: `/api/consumer_peers/${transport.id}/resume`,
method: 'POST',
data: { consumerId: id },
});
}
};

if (ref.current) {
ref.current.srcObject = stream;
fetchApi({
path: `/api/consumer_peers/${transport.id}/resume`,
method: 'POST',
data: { consumerId: id },
});
}
};
}, [device, producerId, transport]);
useEffect(() => {
const mediaStream = new MediaStream();
setStream(mediaStream);
if (ref.current) {
ref.current.srcObject = mediaStream;
}
}, []);

useEffect(() => {
subscribe();
}, [subscribe]);
producers.map((item: any) => subscribe(item.id));
}, []);

return (
<div className="flex flex-col">
Expand Down Expand Up @@ -137,7 +150,7 @@ export function Consumers({
<Consumer
device={device}
transport={transport}
producerId={item.producerId}
producers={item.producers}
/>
</div>
))}
Expand Down
4 changes: 2 additions & 2 deletions examples/rooms/src/components/Producer.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ export function Producer({
video: useVideo,
audio: useAudio,
});
const track = stream.getVideoTracks()[0];
await transport.produce({ track });
await transport.produce({ track: stream.getVideoTracks()[0] });
await transport.produce({ track: stream.getAudioTracks()[0] });
} catch (err: any) {
setLog(err.toString());
throw err;
Expand Down
2 changes: 2 additions & 0 deletions src/entities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ export * from './media.worker.js';
export * from './media.room.js';
export * from './media.peer.js';
export * from './media.router.js';
export * from './media.consumer.js';
export * from './media.producer.js';
29 changes: 29 additions & 0 deletions src/entities/media.consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import {
Entity,
BaseEntity,
PrimaryColumn,
Column,
CreateDateColumn,
ManyToOne,
} from 'typeorm';
import { MediaPeer } from './media.peer.js';

@Entity()
export class MediaConsumer extends BaseEntity {
@PrimaryColumn('uuid')
id!: string;

@Column('uuid')
producerId!: string;

@Column('uuid')
peerId!: string;

@ManyToOne(() => MediaPeer, (peer) => peer.consumers, {
onDelete: 'CASCADE',
})
peer!: MediaPeer;

@CreateDateColumn()
createDate!: Date;
}
11 changes: 7 additions & 4 deletions src/entities/media.peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import {
PrimaryColumn,
ManyToOne,
Index,
OneToMany,
} from 'typeorm';
import { MediaWorker } from './media.worker.js';
import { MediaRoom } from './media.room.js';
import { MediaConsumer } from './media.consumer.js';
import { MediaProducer } from './media.producer.js';

@Entity()
export class MediaPeer extends BaseEntity {
Expand Down Expand Up @@ -36,11 +39,11 @@ export class MediaPeer extends BaseEntity {
@Column('text', { nullable: true })
userId?: string;

@Column('uuid', { nullable: true })
producerId?: string;
@OneToMany(() => MediaConsumer, (consumer) => consumer.peer)
consumers!: MediaConsumer[];

@Column({ type: 'jsonb', default: {} })
consumers!: any;
@OneToMany(() => MediaProducer, (producer) => producer.peer)
producers!: MediaProducer[];

@Column('text')
type!: string; // consumer | producer
Expand Down
29 changes: 29 additions & 0 deletions src/entities/media.producer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import {
Entity,
BaseEntity,
PrimaryColumn,
Column,
CreateDateColumn,
ManyToOne,
} from 'typeorm';
import { MediaPeer } from './media.peer.js';

@Entity()
export class MediaProducer extends BaseEntity {
@PrimaryColumn('uuid')
id!: string;

@Column('text')
kind!: string;

@Column('uuid')
peerId!: string;

@ManyToOne(() => MediaPeer, (peer) => peer.producers, {
onDelete: 'CASCADE',
})
peer!: MediaPeer;

@CreateDateColumn()
createDate!: Date;
}
31 changes: 17 additions & 14 deletions src/services/peer.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { types } from 'mediasoup';
import { IsNull, Not } from 'typeorm';

import { constants } from '../constants.js';
import { MediaPeer, MediaWorker } from '../entities/index.js';
import { MediaConsumer, MediaPeer, MediaWorker } from '../entities/index.js';
import { fetchApi } from '../utils/api.js';
import { BaseService, ServiceError } from './base.js';
import { RoomService } from './room.js';
import { RouterService } from './router.js';
import { MediaProducer } from '../entities/media.producer.js';

export class PeerService extends BaseService {
async createProducer(data: {
Expand Down Expand Up @@ -68,8 +68,11 @@ export class PeerService extends BaseService {
},
});

peer.producerId = result.id;
await this.dataSource.getRepository(MediaPeer).save(peer);
const producer = new MediaProducer();
producer.id = result.id;
producer.kind = data.kind;
producer.peerId = peer.id;
await this.dataSource.getRepository(MediaProducer).save(producer);
return result;
}
throw new ServiceError(400, 'Invalid peer');
Expand Down Expand Up @@ -153,7 +156,7 @@ export class PeerService extends BaseService {
id: string;
}> {
const peer = await this.get({ peerId: data.peerId });
if (peer.type === constants.CONSUMER && !peer.consumers[data.producerId]) {
if (peer.type === constants.CONSUMER) {
await this.createService(RouterService).checkToPipe({
routerId: peer.routerId,
producerId: data.producerId,
Expand All @@ -172,8 +175,11 @@ export class PeerService extends BaseService {
},
});

peer.consumers[data.producerId] = result.id;
await this.dataSource.getRepository(MediaPeer).save(peer);
const consumer = new MediaConsumer();
consumer.id = result.id;
consumer.producerId = data.producerId;
consumer.peerId = peer.id;
await this.dataSource.getRepository(MediaConsumer).save(consumer);
return result;
}
throw new ServiceError(400, 'Invalid type peer');
Expand Down Expand Up @@ -217,10 +223,7 @@ export class PeerService extends BaseService {

async resume(data: { peerId: string; consumerId: string }) {
const peer = await this.get({ peerId: data.peerId });
if (
peer.type === constants.CONSUMER &&
Object.values(peer.consumers).includes(data.consumerId)
) {
if (peer.type === constants.CONSUMER) {
await fetchApi({
host: peer.worker.internalHost,
port: peer.worker.apiPort,
Expand All @@ -247,14 +250,14 @@ export class PeerService extends BaseService {
async getProducers(data: { roomId: string }): Promise<
Array<{
id: string;
producerId: string;
producers: Array<{ id: string; kind: string }>;
}>
> {
return this.dataSource.getRepository(MediaPeer).find({
select: ['id', 'producerId'],
relations: { producers: true },
select: ['id', 'producers'],
where: {
roomId: data.roomId,
producerId: Not(IsNull()),
type: constants.PRODUCER,
},
}) as any;
Expand Down

0 comments on commit ff1e8da

Please sign in to comment.