Skip to content

Commit

Permalink
DataConsumer: Add addSubchannel() and `removeSubchannel()
Browse files Browse the repository at this point in the history
**TODO:** Rust, but I won't do it until #1262 is done and merged.
  • Loading branch information
ibc committed Dec 12, 2023
1 parent 042b7f5 commit d5d04a6
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

* liburing: avoid extra memcpy on RTP ([PR #1258](https://github.com/versatica/mediasoup/pull/1258)).
* libsrtp: use our own fork ([PR #1260](https://github.com/versatica/mediasoup/pull/1260)).
* `DataConsumer`: Add `addSubchannel()` and `removeSubchannel()` methods ([PR #XXXX](https://github.com/versatica/mediasoup/pull/XXXX)).


### 3.13.10
Expand Down
55 changes: 55 additions & 0 deletions node/src/DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,61 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
this.#subchannels = utils.parseVector(data, 'subchannels');
}

/**
* Add a subchannel.
*/
async addSubchannel(subchannel: number): Promise<void>
{
logger.debug('addSubchannel()');

/* Build Request. */
const requestOffset =
FbsDataConsumer.AddSubchannelRequest.createAddSubchannelRequest(
this.#channel.bufferBuilder, subchannel);

const response = await this.#channel.request(
FbsRequest.Method.DATACONSUMER_ADD_SUBCHANNEL,
FbsRequest.Body.DataConsumer_AddSubchannelRequest,
requestOffset,
this.#internal.dataConsumerId
);

/* Decode Response. */
const data = new FbsDataConsumer.AddSubchannelResponse();

response.body(data);

// Update subchannels.
this.#subchannels = utils.parseVector(data, 'subchannels');
}

/**
* Remove a subchannel.
*/
async removeSubchannel(subchannel: number): Promise<void>
{
logger.debug('removeSubchannel()');

/* Build Request. */
const requestOffset = FbsDataConsumer.RemoveSubchannelRequest.
createRemoveSubchannelRequest(this.#channel.bufferBuilder, subchannel);

const response = await this.#channel.request(
FbsRequest.Method.DATACONSUMER_REMOVE_SUBCHANNEL,
FbsRequest.Body.DataConsumer_RemoveSubchannelRequest,
requestOffset,
this.#internal.dataConsumerId
);

/* Decode Response. */
const data = new FbsDataConsumer.RemoveSubchannelResponse();

response.body(data);

// Update subchannels.
this.#subchannels = utils.parseVector(data, 'subchannels');
}

private handleWorkerNotifications(): void
{
this.#channel.on(this.#internal.dataConsumerId, (event: Event, data?: Notification) =>
Expand Down
24 changes: 24 additions & 0 deletions node/src/tests/test-DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,30 @@ test('dataConsumer.setSubchannels() succeeds', async () =>
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 998, 999 ]);
}, 2000);

test('dataConsumer.addSubchannel() and .removeSubchannel() succeed', async () =>
{
await dataConsumer1.setSubchannels([ ]);
expect(dataConsumer1.subchannels).toEqual([ ]);

await dataConsumer1.addSubchannel(5);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5 ]);

await dataConsumer1.addSubchannel(10);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5, 10 ]);

await dataConsumer1.addSubchannel(5);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5, 10 ]);

await dataConsumer1.removeSubchannel(666);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 5, 10 ]);

await dataConsumer1.removeSubchannel(5);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 10 ]);

await dataConsumer1.setSubchannels([ ]);
expect(dataConsumer1.subchannels).toEqual([ ]);
}, 2000);

test('transport.consumeData() on a DirectTransport succeeds', async () =>
{
const onObserverNewDataConsumer = jest.fn();
Expand Down
16 changes: 16 additions & 0 deletions worker/fbs/dataConsumer.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ table SetSubchannelsResponse {
subchannels: [uint16] (required);
}

table AddSubchannelRequest {
subchannel: uint16;
}

table AddSubchannelResponse {
subchannels: [uint16] (required);
}

table RemoveSubchannelRequest {
subchannel: uint16;
}

table RemoveSubchannelResponse {
subchannels: [uint16] (required);
}

// Notifications from Worker.

table BufferedAmountLowNotification {
Expand Down
4 changes: 4 additions & 0 deletions worker/fbs/request.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ enum Method: uint8 {
DATACONSUMER_SET_BUFFERED_AMOUNT_LOW_THRESHOLD,
DATACONSUMER_SEND,
DATACONSUMER_SET_SUBCHANNELS,
DATACONSUMER_ADD_SUBCHANNEL,
DATACONSUMER_REMOVE_SUBCHANNEL,
RTPOBSERVER_PAUSE,
RTPOBSERVER_RESUME,
RTPOBSERVER_ADD_PRODUCER,
Expand Down Expand Up @@ -114,6 +116,8 @@ union Body {
DataConsumer_SetBufferedAmountLowThresholdRequest: FBS.DataConsumer.SetBufferedAmountLowThresholdRequest,
DataConsumer_SendRequest: FBS.DataConsumer.SendRequest,
DataConsumer_SetSubchannelsRequest: FBS.DataConsumer.SetSubchannelsRequest,
DataConsumer_AddSubchannelRequest: FBS.DataConsumer.AddSubchannelRequest,
DataConsumer_RemoveSubchannelRequest: FBS.DataConsumer.RemoveSubchannelRequest,
RtpObserver_AddProducerRequest: FBS.RtpObserver.AddProducerRequest,
RtpObserver_RemoveProducerRequest: FBS.RtpObserver.RemoveProducerRequest,
}
Expand Down
2 changes: 2 additions & 0 deletions worker/fbs/response.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ union Body {
DataConsumer_DumpResponse: FBS.DataConsumer.DumpResponse,
DataConsumer_GetStatsResponse: FBS.DataConsumer.GetStatsResponse,
DataConsumer_SetSubchannelsResponse: FBS.DataConsumer.SetSubchannelsResponse,
DataConsumer_AddSubchannelResponse: FBS.DataConsumer.AddSubchannelResponse,
DataConsumer_RemoveSubchannelResponse: FBS.DataConsumer.RemoveSubchannelResponse
}

table Response {
Expand Down
2 changes: 2 additions & 0 deletions worker/src/Channel/ChannelRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ namespace Channel
{ FBS::Request::Method::DATACONSUMER_SET_BUFFERED_AMOUNT_LOW_THRESHOLD, "dataConsumer.setBufferedAmountLowThreshold" },
{ FBS::Request::Method::DATACONSUMER_SEND, "dataConsumer.send" },
{ FBS::Request::Method::DATACONSUMER_SET_SUBCHANNELS, "dataConsumer.setSubchannels" },
{ FBS::Request::Method::DATACONSUMER_ADD_SUBCHANNEL, "dataConsumer.addSubchannel" },
{ FBS::Request::Method::DATACONSUMER_REMOVE_SUBCHANNEL, "dataConsumer.removeSubchannel" },
{ FBS::Request::Method::RTPOBSERVER_PAUSE, "rtpObserver.pause" },
{ FBS::Request::Method::RTPOBSERVER_RESUME, "rtpObserver.resume" },
{ FBS::Request::Method::RTPOBSERVER_ADD_PRODUCER, "rtpObserver.addProducer" },
Expand Down
48 changes: 48 additions & 0 deletions worker/src/RTC/DataConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,54 @@ namespace RTC
break;
}

case Channel::ChannelRequest::Method::DATACONSUMER_ADD_SUBCHANNEL:
{
const auto* body = request->data->body_as<FBS::DataConsumer::AddSubchannelRequest>();

this->subchannels.insert(body->subchannel());

std::vector<uint16_t> subchannels;

subchannels.reserve(this->subchannels.size());

for (auto subchannel : this->subchannels)
{
subchannels.emplace_back(subchannel);
}

// Create response.
auto responseOffset = FBS::DataConsumer::CreateAddSubchannelResponseDirect(
request->GetBufferBuilder(), std::addressof(subchannels));

request->Accept(FBS::Response::Body::DataConsumer_AddSubchannelResponse, responseOffset);

break;
}

case Channel::ChannelRequest::Method::DATACONSUMER_REMOVE_SUBCHANNEL:
{
const auto* body = request->data->body_as<FBS::DataConsumer::RemoveSubchannelRequest>();

this->subchannels.erase(body->subchannel());

std::vector<uint16_t> subchannels;

subchannels.reserve(this->subchannels.size());

for (auto subchannel : this->subchannels)
{
subchannels.emplace_back(subchannel);
}

// Create response.
auto responseOffset = FBS::DataConsumer::CreateRemoveSubchannelResponseDirect(
request->GetBufferBuilder(), std::addressof(subchannels));

request->Accept(FBS::Response::Body::DataConsumer_RemoveSubchannelResponse, responseOffset);

break;
}

default:
{
MS_THROW_ERROR("unknown method '%s'", request->methodCStr);
Expand Down

0 comments on commit d5d04a6

Please sign in to comment.