Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional sctpStreamId to SCTP-based data consumers #1378

Draft
wants to merge 2 commits into
base: v3
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions node/src/DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ export type DataConsumerOptions<DataConsumerAppData extends AppData = AppData> =
*/
maxRetransmits?: number;

/**
* Just if consuming over SCTP.
* SCTP stream id. If not provided defaults to an arbitrary available stream ID.
*/
sctpStreamId?: number;

/**
* Whether the data consumer must start in paused mode. Default false.
*/
Expand Down
44 changes: 25 additions & 19 deletions node/src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,7 @@ export class Transport<
ordered,
maxPacketLifeTime,
maxRetransmits,
sctpStreamId,
paused = false,
subchannels,
appData,
Expand All @@ -1113,7 +1114,6 @@ export class Transport<

let type: DataConsumerType;
let sctpStreamParameters: SctpStreamParameters | undefined;
let sctpStreamId: number;

// If this is not a DirectTransport, use sctpStreamParameters from the
// DataProducer (if type 'sctp') unless they are given in method parameters.
Expand All @@ -1139,10 +1139,9 @@ export class Transport<
}

// This may throw.
sctpStreamId = this.getNextSctpStreamId();

this.#sctpStreamIds![sctpStreamId] = 1;
sctpStreamParameters.streamId = sctpStreamId;
sctpStreamParameters.streamId = this.getNextSctpStreamId(sctpStreamId);
ortc.validateSctpStreamParameters(sctpStreamParameters);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this call to ortc.validateSctpStreamParameters()?

Copy link
Author

@threema-lenny threema-lenny Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seemed like an oversight as this call is also present for validating the SCTP stream parameters for the producer (https://github.com/versatica/mediasoup/pull/1378/files#diff-3253c176609c972f1279e2a6c6ca4c52cbfac8423efcc8b2891b9d8dbd92c358R1016).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove it. There is no need for this. These sctpStreamParameters were already validated in transport.produceData() and they are literally a copy of them.

Copy link
Author

@threema-lenny threema-lenny Apr 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, only streamId is now user input but its value (besides type) is not validated in validateSctpStreamParameters anyways. I'll remove it.

this.#sctpStreamIds![sctpStreamParameters.streamId] = 1;
}
// If this is a DirectTransport, sctpStreamParameters must not be used.
else {
Expand All @@ -1151,10 +1150,11 @@ export class Transport<
if (
ordered !== undefined ||
maxPacketLifeTime !== undefined ||
maxRetransmits !== undefined
maxRetransmits !== undefined ||
sctpStreamId !== undefined
) {
logger.warn(
'consumeData() | ordered, maxPacketLifeTime and maxRetransmits are ignored when consuming data on a DirectTransport'
'consumeData() | ordered, maxPacketLifeTime, maxRetransmits and sctpStreamId are ignored when consuming data on a DirectTransport'
);
}
}
Expand Down Expand Up @@ -1213,14 +1213,14 @@ export class Transport<
this.dataConsumers.delete(dataConsumer.id);

if (this.#sctpStreamIds) {
this.#sctpStreamIds[sctpStreamId] = 0;
this.#sctpStreamIds[sctpStreamParameters!.streamId] = 0;
}
});
dataConsumer.on('@dataproducerclose', () => {
this.dataConsumers.delete(dataConsumer.id);

if (this.#sctpStreamIds) {
this.#sctpStreamIds[sctpStreamId] = 0;
this.#sctpStreamIds[sctpStreamParameters!.streamId] = 0;
}
});

Expand Down Expand Up @@ -1266,7 +1266,7 @@ export class Transport<
);
}

private getNextSctpStreamId(): number {
private getNextSctpStreamId(sctpStreamId?: number): number {
if (
!this.#data.sctpParameters ||
typeof this.#data.sctpParameters.MIS !== 'number'
Expand All @@ -1280,20 +1280,26 @@ export class Transport<
this.#sctpStreamIds = Buffer.alloc(numStreams, 0);
}

let sctpStreamId;

for (let idx = 0; idx < this.#sctpStreamIds.length; ++idx) {
sctpStreamId =
(this.#nextSctpStreamId + idx) % this.#sctpStreamIds.length;
if (sctpStreamId === undefined) {
for (let idx = 0; idx < this.#sctpStreamIds.length; ++idx) {
sctpStreamId =
(this.#nextSctpStreamId + idx) % this.#sctpStreamIds.length;

if (!this.#sctpStreamIds[sctpStreamId]) {
this.#nextSctpStreamId = sctpStreamId + 1;
if (!this.#sctpStreamIds[sctpStreamId]) {
this.#nextSctpStreamId = sctpStreamId + 1;

return sctpStreamId;
return sctpStreamId;
}
}

throw new Error('no sctpStreamId available');
} else if (sctpStreamId >= this.#sctpStreamIds.length) {
throw new Error('invalid sctpStreamId');
} else if (this.#sctpStreamIds[sctpStreamId]) {
throw new Error('sctpStreamId already assigned');
}

throw new Error('no sctpStreamId available');
return sctpStreamId;
}
}

Expand Down
14 changes: 14 additions & 0 deletions node/src/test/test-DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ test('transport.consumeData() succeeds', async () => {
});
}, 2000);

test('transport.consumeData() with already used sctpStreamId rejects with Error', async () => {
await ctx.webRtcTransport2!.consumeData({
dataProducerId: ctx.dataProducer!.id,
sctpStreamId: 123,
});
Comment on lines +111 to +114
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, create const dataConsumer here and assert that its sctpStreamParameters.streamId matches the given sctpStreamId.


await expect(
ctx.webRtcTransport2!.consumeData({
dataProducerId: ctx.dataProducer!.id,
sctpStreamId: 123,
})
).rejects.toThrow(Error);
}, 2000);

test('dataConsumer.dump() succeeds', async () => {
const dataConsumer = await ctx.webRtcTransport2!.consumeData({
dataProducerId: ctx.dataProducer!.id,
Expand Down
Loading