Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Rust DataConsumer #1262

Merged
merged 9 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

* liburing: avoid extra memcpy on RTP ([PR #1258](https://github.com/versatica/mediasoup/pull/1258)).
* libsrtp: use our own fork ([PR #1260](https://github.com/versatica/mediasoup/pull/1260)).
* Fix Rust `DataConsumer` in tests ([PR #1262](https://github.com/versatica/mediasoup/pull/1262)).


### 3.13.10
Expand Down
4 changes: 2 additions & 2 deletions node/src/tests/test-DataConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ test('transport.consumeData() succeeds', async () =>
expect(dataConsumer1.label).toBe('foo');
expect(dataConsumer1.protocol).toBe('bar');
expect(dataConsumer1.paused).toBe(false);
expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 1, 2, 100, 65535 ]);
expect(dataConsumer1.subchannels).toEqual([ 0, 1, 2, 100, 65535 ]);
expect(dataConsumer1.appData).toEqual({ baz: 'LOL' });

const dump = await router.dump();
Expand Down Expand Up @@ -134,7 +134,7 @@ test('dataConsumer.setSubchannels() succeeds', async () =>
{
await dataConsumer1.setSubchannels([ 999, 999, 998, 65536 ]);

expect(dataConsumer1.subchannels.sort((a, b) => a - b)).toEqual([ 0, 998, 999 ]);
expect(dataConsumer1.subchannels).toEqual([ 0, 998, 999 ]);
}, 2000);

test('transport.consumeData() on a DirectTransport succeeds', async () =>
Expand Down
31 changes: 15 additions & 16 deletions rust/src/router/data_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ impl fmt::Debug for RegularDataConsumer {
.field("data_producer_id", &self.inner.data_producer_id)
.field("paused", &self.inner.paused)
.field("data_producer_paused", &self.inner.data_producer_paused)
.field("subchannels", &self.inner.subchannels)
.field("transport", &self.inner.transport)
.field("closed", &self.inner.closed)
.finish()
Expand Down Expand Up @@ -411,6 +412,7 @@ impl fmt::Debug for DirectDataConsumer {
.field("data_producer_id", &self.inner.data_producer_id)
.field("paused", &self.inner.paused)
.field("data_producer_paused", &self.inner.data_producer_paused)
.field("subchannels", &self.inner.subchannels)
.field("transport", &self.inner.transport)
.field("closed", &self.inner.closed)
.finish()
Expand Down Expand Up @@ -788,6 +790,19 @@ impl DataConsumer {
.await
}

/// Sets subchannels to the worker DataConsumer.
pub async fn set_subchannels(&self, subchannels: Vec<u16>) -> Result<(), RequestError> {
let response = self
.inner()
.channel
.request(self.id(), DataConsumerSetSubchannelsRequest { subchannels })
.await?;

*self.inner().subchannels.lock() = response.subchannels;

Ok(())
}

/// Callback is called when a message has been received from the corresponding data producer.
///
/// # Notes on usage
Expand Down Expand Up @@ -918,22 +933,6 @@ impl DirectDataConsumer {
)
.await
}

/// Sets subchannels to the worker DataConsumer.
pub async fn set_subchannels(&self, subchannels: Vec<u16>) -> Result<(), RequestError> {
let response = self
.inner
.channel
.request(
self.inner.id,
DataConsumerSetSubchannelsRequest { subchannels },
)
.await?;

*self.inner.subchannels.lock() = response.subchannels;

Ok(())
}
}

/// [`WeakDataConsumer`] doesn't own data consumer instance on mediasoup-worker and will not prevent
Expand Down
32 changes: 27 additions & 5 deletions rust/tests/integration/data_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,7 @@ fn consume_data_succeeds() {
}
assert_eq!(data_consumer.label().as_str(), "foo");
assert_eq!(data_consumer.protocol().as_str(), "bar");

let mut sorted_subchannels = data_consumer.subchannels();
sorted_subchannels.sort();

assert_eq!(sorted_subchannels, [0, 1, 2, 100, 65535]);
assert_eq!(data_consumer.subchannels(), [0, 1, 2, 100, 65535]);
assert_eq!(
data_consumer
.app_data()
Expand Down Expand Up @@ -321,6 +317,32 @@ fn get_stats_succeeds() {
});
}

#[test]
fn set_subchannels() {
future::block_on(async move {
let (_worker, _router, transport1, data_producer) = init().await;

let data_consumer = transport1
.consume_data({
let options = DataConsumerOptions::new_sctp_unordered_with_life_time(
data_producer.id(),
4000,
);

options
})
.await
.expect("Failed to consume data");

data_consumer
.set_subchannels([999, 999, 998, 0].to_vec())
.await
.expect("Failed to set data consumer subchannels");

assert_eq!(data_consumer.subchannels(), [0, 998, 999]);
});
}

#[test]
fn consume_data_on_direct_transport_succeeds() {
future::block_on(async move {
Expand Down
4 changes: 2 additions & 2 deletions rust/tests/integration/direct_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ fn send_with_subchannels_succeeds() {
}
};

let direct_data_consumer_2 = match &data_consumer_2 {
let _ = match &data_consumer_2 {
DataConsumer::Direct(direct_data_consumer) => direct_data_consumer,
_ => {
panic!("Expected direct data consumer")
Expand Down Expand Up @@ -514,7 +514,7 @@ fn send_with_subchannels_succeeds() {
let mut subchannels = data_consumer_2.subchannels();
subchannels.push(1);

direct_data_consumer_2
data_consumer_2
.set_subchannels(subchannels)
.await
.expect("Failed to set subchannels");
Expand Down
4 changes: 2 additions & 2 deletions worker/include/RTC/DataConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "Channel/ChannelSocket.hpp"
#include "RTC/SctpDictionaries.hpp"
#include "RTC/Shared.hpp"
#include <absl/container/flat_hash_set.h>
#include <absl/container/btree_set.h>
#include <string>

namespace RTC
Expand Down Expand Up @@ -126,7 +126,7 @@ namespace RTC
RTC::SctpStreamParameters sctpStreamParameters;
std::string label;
std::string protocol;
absl::flat_hash_set<uint16_t> subchannels;
absl::btree_set<uint16_t> subchannels;
bool transportConnected{ false };
bool sctpAssociationConnected{ false };
bool paused{ false };
Expand Down