Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataChannel subchannels #1152

Merged
merged 11 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 54 additions & 3 deletions node/src/DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Event, Notification } from './fbs/notification';
import * as FbsTransport from './fbs/transport';
import * as FbsRequest from './fbs/request';
import * as FbsDataConsumer from './fbs/data-consumer';
import * as utils from './utils';

export type DataConsumerOptions<DataConsumerAppData extends AppData = AppData> =
{
Expand Down Expand Up @@ -45,6 +46,13 @@ export type DataConsumerOptions<DataConsumerAppData extends AppData = AppData> =
*/
paused?: boolean;

/**
* Subchannels this data consumer initially subscribes to.
* Only used in case this data consumer receives messages from a local data
* producer that specifies subchannel(s) when calling send().
*/
subchannels?: number[];

/**
* Custom application data.
*/
Expand Down Expand Up @@ -93,6 +101,7 @@ type DataConsumerDump = DataConsumerData &
id: string;
paused: boolean;
dataProducerPaused: boolean;
subchannels: number[];
};

type DataConsumerInternal = TransportInternal &
Expand Down Expand Up @@ -132,6 +141,9 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
// Associated DataProducer paused flag.
#dataProducerPaused = false;

// Subchannels subscribed to.
#subchannels: number[];

// Custom app data.
#appData: DataConsumerAppData;

Expand All @@ -148,6 +160,7 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
channel,
paused,
dataProducerPaused,
subchannels,
appData
}:
{
Expand All @@ -156,6 +169,7 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
channel: Channel;
paused: boolean;
dataProducerPaused: boolean;
subchannels: number[];
appData?: DataConsumerAppData;
}
)
Expand All @@ -169,6 +183,7 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
this.#channel = channel;
this.#paused = paused;
this.#dataProducerPaused = dataProducerPaused;
this.#subchannels = subchannels;
this.#appData = appData || {} as DataConsumerAppData;

this.handleWorkerNotifications();
Expand Down Expand Up @@ -246,6 +261,14 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
return this.#dataProducerPaused;
}

/**
* Get current subchannels this data consumer is subscribed to.
*/
get subchannels(): number[]
{
return Array.from(this.#subchannels);
}

/**
* App custom data.
*/
Expand Down Expand Up @@ -541,6 +564,34 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
return data.bufferedAmount();
}

/**
* Set subchannels.
*/
async setSubchannels(subchannels: number[]): Promise<void>
{
logger.debug('setSubchannels()');

/* Build Request. */
const requestOffset = new FbsDataConsumer.SetSubchannelsRequestT(
subchannels
).pack(this.#channel.bufferBuilder);

const response = await this.#channel.request(
FbsRequest.Method.DATACONSUMER_SET_SUBCHANNELS,
FbsRequest.Body.FBS_DataConsumer_SetSubchannelsRequest,
requestOffset,
this.#internal.dataConsumerId
);

/* Decode Response. */
const data = new FbsDataConsumer.SetSubchannelsResponse();

response.body(data);

// Update subchannels.
this.#subchannels = utils.parseVector(data, 'subchannels');
}

private handleWorkerNotifications(): void
{
this.#channel.on(this.#internal.dataConsumerId, (event: Event, data?: Notification) =>
Expand Down Expand Up @@ -675,14 +726,14 @@ export function parseDataConsumerDumpResponse(
label : data.label()!,
protocol : data.protocol()!,
paused : data.paused(),
dataProducerPaused : data.dataProducerPaused()

dataProducerPaused : data.dataProducerPaused(),
subchannels : utils.parseVector(data, 'subchannels')
};
}

function parseDataConsumerStats(
binary: FbsDataConsumer.GetStatsResponse
):DataConsumerStat
): DataConsumerStat
{
return {
type : 'data-consumer',
Expand Down
15 changes: 13 additions & 2 deletions node/src/DataProducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,12 @@ export class DataProducer<DataProducerAppData extends AppData = AppData>
/**
* Send data (just valid for DataProducers created on a DirectTransport).
*/
send(message: string | Buffer, ppid?: number): void
send(
message: string | Buffer,
ppid?: number,
subchannels?: number[],
requiredSubchannel?: number
): void
{
if (typeof message !== 'string' && !Buffer.isBuffer(message))
{
Expand Down Expand Up @@ -431,6 +436,10 @@ export class DataProducer<DataProducerAppData extends AppData = AppData>

let dataOffset = 0;

const subchannelsOffset = FbsDataProducer.SendNotification.createSubchannelsVector(
builder, subchannels ?? []
);

if (typeof message === 'string')
{
const messageOffset = builder.createString(message);
Expand All @@ -450,7 +459,9 @@ export class DataProducer<DataProducerAppData extends AppData = AppData>
typeof message === 'string' ?
FbsDataProducer.Data.String :
FbsDataProducer.Data.Binary,
dataOffset
dataOffset,
subchannelsOffset,
requiredSubchannel ?? null
);

this.#channel.notify(
Expand Down
24 changes: 10 additions & 14 deletions node/src/RtpParameters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import * as flatbuffers from 'flatbuffers';
import {
Boolean as FbsBoolean,
Double as FbsDouble,
Integer as FbsInteger,
IntegerArray as FbsIntegerArray,
Integer32 as FbsInteger32,
Integer32Array as FbsInteger32Array,
String as FbsString,
Parameter as FbsParameter,
RtcpFeedback as FbsRtcpFeedback,
Expand Down Expand Up @@ -564,16 +564,15 @@ export function serializeParameters(
builder, keyOffset, FbsValue.Boolean, value === true ? 1:0
);
}

else if (typeof value === 'number')
{
// Integer.
if (value % 1 === 0)
{
const valueOffset = FbsInteger.createInteger(builder, value);
const valueOffset = FbsInteger32.createInteger32(builder, value);

parameterOffset = FbsParameter.createParameter(
builder, keyOffset, FbsValue.Integer, valueOffset
builder, keyOffset, FbsValue.Integer32, valueOffset
);
}
// Float.
Expand All @@ -586,7 +585,6 @@ export function serializeParameters(
);
}
}

else if (typeof value === 'string')
{
const valueOffset = FbsString.createString(builder, builder.createString(value));
Expand All @@ -595,16 +593,14 @@ export function serializeParameters(
builder, keyOffset, FbsValue.String, valueOffset
);
}

else if (Array.isArray(value))
{
const valueOffset = FbsIntegerArray.createValueVector(builder, value);
const valueOffset = FbsInteger32Array.createValueVector(builder, value);

parameterOffset = FbsParameter.createParameter(
builder, keyOffset, FbsValue.IntegerArray, valueOffset
builder, keyOffset, FbsValue.Integer32Array, valueOffset
);
}

else
{
throw new Error(`invalid parameter type [key:'${key}', value:${value}]`);
Expand Down Expand Up @@ -645,9 +641,9 @@ export function parseParameters(data: any): any
break;
}

case FbsValue.Integer:
case FbsValue.Integer32:
{
const value = new FbsInteger();
const value = new FbsInteger32();

fbsParameter.value(value);

Expand Down Expand Up @@ -678,9 +674,9 @@ export function parseParameters(data: any): any
break;
}

case FbsValue.IntegerArray:
case FbsValue.Integer32Array:
{
const value = new FbsIntegerArray();
const value = new FbsInteger32Array();

fbsParameter.value(value);

Expand Down
16 changes: 13 additions & 3 deletions node/src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,7 @@ export class Transport
maxPacketLifeTime,
maxRetransmits,
paused = false,
subchannels,
appData
}: DataConsumerOptions<ConsumerAppData>
): Promise<DataConsumer<ConsumerAppData>>
Expand Down Expand Up @@ -1163,7 +1164,8 @@ export class Transport
sctpStreamParameters,
label,
protocol,
paused
paused,
subchannels
});

const response = await this.channel.request(
Expand Down Expand Up @@ -1197,6 +1199,7 @@ export class Transport
},
channel : this.channel,
paused : dump.paused,
subchannels : dump.subchannels,
dataProducerPaused : dump.dataProducerPaused,
appData
});
Expand Down Expand Up @@ -1680,16 +1683,18 @@ function createConsumeDataRequest({
sctpStreamParameters,
label,
protocol,
paused
paused,
subchannels = []
} : {
builder : flatbuffers.Builder;
builder: flatbuffers.Builder;
dataConsumerId: string;
dataProducerId: string;
type: DataConsumerType;
sctpStreamParameters?: SctpStreamParameters;
label: string;
protocol: string;
paused: boolean;
subchannels?: number[];
}): number
{
const dataConsumerIdOffset = builder.createString(dataConsumerId);
Expand All @@ -1707,6 +1712,10 @@ function createConsumeDataRequest({
);
}

const subchannelsOffset = FbsTransport.ConsumeDataRequest.createSubchannelsVector(
builder, subchannels
);

FbsTransport.ConsumeDataRequest.startConsumeDataRequest(builder);
FbsTransport.ConsumeDataRequest.addDataConsumerId(builder, dataConsumerIdOffset);
FbsTransport.ConsumeDataRequest.addDataProducerId(builder, dataProducerIdOffset);
Expand All @@ -1722,6 +1731,7 @@ function createConsumeDataRequest({
FbsTransport.ConsumeDataRequest.addLabel(builder, labelOffset);
FbsTransport.ConsumeDataRequest.addProtocol(builder, protocolOffset);
FbsTransport.ConsumeDataRequest.addPaused(builder, paused);
FbsTransport.ConsumeDataRequest.addSubchannels(builder, subchannelsOffset);

return FbsTransport.ConsumeDataRequest.endConsumeDataRequest(builder);
}
Expand Down
11 changes: 11 additions & 0 deletions node/src/tests/test-DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ test('transport.consumeData() succeeds', async () =>
{
dataProducerId : dataProducer.id,
maxPacketLifeTime : 4000,
// Valid values are 0...65535 so others and duplicated ones will be
// discarded.
subchannels : [ 0, 1, 1, 1, 2, 65535, 65536, 65537, 100 ],
appData : { baz: 'LOL' }
});

Expand All @@ -70,6 +73,7 @@ test('transport.consumeData() succeeds', async () =>
expect(dataConsumer1.label).toBe('foo');
expect(dataConsumer1.protocol).toBe('bar');
expect(dataConsumer1.paused).toBe(false);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 1, 2, 100, 65535 ]);
expect(dataConsumer1.appData).toEqual({ baz: 'LOL' });

const dump = await router.dump();
Expand Down Expand Up @@ -128,6 +132,13 @@ test('dataConsumer.getStats() succeeds', async () =>
]);
}, 2000);

test('dataConsumer.setSubchannels() succeeds', async () =>
{
await dataConsumer1.setSubchannels([ 999, 999, 998, 65536 ]);

expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 998, 999 ]);
}, 2000);

test('transport.consumeData() on a DirectTransport succeeds', async () =>
{
const onObserverNewDataConsumer = jest.fn();
Expand Down