Skip to content

Commit

Permalink
More
Browse files Browse the repository at this point in the history
  • Loading branch information
ibc committed Sep 7, 2023
1 parent 6fca2b3 commit 2f5399d
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 11 deletions.
44 changes: 42 additions & 2 deletions node/src/DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { Event, Notification } from './fbs/notification';
import * as FbsTransport from './fbs/transport';
import * as FbsRequest from './fbs/request';
import * as FbsDataConsumer from './fbs/data-consumer';
import * as utils from './utils';

export type DataConsumerOptions<DataConsumerAppData extends AppData = AppData> =
{
Expand Down Expand Up @@ -260,6 +261,14 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
return this.#dataProducerPaused;
}

/**
* Get current subchannels this data consumer is subscribed to.
*/
get subchannels(): number[]
{
return Array.from(this.#subchannels);
}

/**
* App custom data.
*/
Expand Down Expand Up @@ -555,6 +564,38 @@ export class DataConsumer<DataConsumerAppData extends AppData = AppData>
return data.bufferedAmount();
}

/**
* Set subchannels.
*/
async setSubchannels(subchannels: number[]): Promise<void>
{
logger.debug('setSubchannels()');

console.log('TODO');

subchannels = [...new Set(subchannels)];

// /* Build Request. */
// const requestOffset = new FbsTransport.SetSubchannelsRequestT(
// this.#internal.dataConsumerId,
// subchannels
// ).pack(this.#channel.bufferBuilder);

// const response = await this.#channel.request(
// FbsRequest.Method.DATACONSUMER_SET_SUBCHANNELS,
// FbsRequest.Body.FBS_DATA_CONSUMER_SetSubchannelsRequest,,
// requestOffset,
// this.#internal.dataConsumerId
// );

// /* Decode Response. */
// const data = new FbsDataConsumer.SetSubchannelsResponse();

// response.body(data);

// return parseSubchannels(data);
}

private handleWorkerNotifications(): void
{
this.#channel.on(this.#internal.dataConsumerId, (event: Event, data?: Notification) =>
Expand Down Expand Up @@ -690,8 +731,7 @@ export function parseDataConsumerDumpResponse(
protocol : data.protocol()!,
paused : data.paused(),
dataProducerPaused : data.dataProducerPaused(),
subchannels : data.subchannels()

subchannels : utils.parseVector(data, 'subchannels')
};
}

Expand Down
2 changes: 2 additions & 0 deletions worker/include/RTC/DataConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "Channel/ChannelSocket.hpp"
#include "RTC/SctpDictionaries.hpp"
#include "RTC/Shared.hpp"
#include <absl/container/flat_hash_set.h>
#include <string>

namespace RTC
Expand Down Expand Up @@ -120,6 +121,7 @@ namespace RTC
RTC::SctpStreamParameters sctpStreamParameters;
std::string label;
std::string protocol;
absl::flat_hash_set<uint16_t> subchannels;
bool transportConnected{ false };
bool sctpAssociationConnected{ false };
bool paused{ false };
Expand Down
2 changes: 1 addition & 1 deletion worker/include/RTC/Router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
#include "RTC/Transport.hpp"
#include "RTC/WebRtcServer.hpp"
#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include <string>
#include <unordered_set>

namespace RTC
{
Expand Down
8 changes: 8 additions & 0 deletions worker/src/RTC/DataConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ namespace RTC
// paused is set to false by default.
this->paused = data->paused();

if (flatbuffers::IsFieldPresent(data, FBS::Transport::ConsumeDataRequest::VT_SUBCHANNELS))
{
for (const auto subchannel : *data->subchannels())
{
this->subchannels.insert(subchannel);
}
}

// NOTE: This may throw.
this->shared->channelMessageRegistrator->RegisterHandler(
this->id,
Expand Down
16 changes: 8 additions & 8 deletions worker/src/RTC/RtpDictionaries/Parameters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ namespace RTC

case Value::Type::INTEGER:
{
auto valueOffset = FBS::RtpParameters::CreateInteger(builder, value.integerValue);
auto valueOffset = FBS::RtpParameters::CreateInteger32(builder, value.integerValue);

parameters.emplace_back(FBS::RtpParameters::CreateParameterDirect(
builder, key.c_str(), FBS::RtpParameters::Value::Integer, valueOffset.Union()));
builder, key.c_str(), FBS::RtpParameters::Value::Integer32, valueOffset.Union()));

break;
}
Expand Down Expand Up @@ -68,10 +68,10 @@ namespace RTC
case Value::Type::ARRAY_OF_INTEGERS:
{
auto valueOffset =
FBS::RtpParameters::CreateIntegerArrayDirect(builder, &value.arrayOfIntegers);
FBS::RtpParameters::CreateInteger32ArrayDirect(builder, &value.arrayOfIntegers);

parameters.emplace_back(FBS::RtpParameters::CreateParameterDirect(
builder, key.c_str(), FBS::RtpParameters::Value::IntegerArray, valueOffset.Union()));
builder, key.c_str(), FBS::RtpParameters::Value::Integer32Array, valueOffset.Union()));

break;
}
Expand Down Expand Up @@ -99,9 +99,9 @@ namespace RTC
break;
}

case FBS::RtpParameters::Value::Integer:
case FBS::RtpParameters::Value::Integer32:
{
this->mapKeyValues.emplace(key, Value(parameter->value_as_Integer()->value()));
this->mapKeyValues.emplace(key, Value(parameter->value_as_Integer32()->value()));

break;
}
Expand All @@ -120,9 +120,9 @@ namespace RTC
break;
}

case FBS::RtpParameters::Value::IntegerArray:
case FBS::RtpParameters::Value::Integer32Array:
{
this->mapKeyValues.emplace(key, Value(parameter->value_as_IntegerArray()->value()));
this->mapKeyValues.emplace(key, Value(parameter->value_as_Integer32Array()->value()));

break;
}
Expand Down

0 comments on commit 2f5399d

Please sign in to comment.