Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix usrsctp usage in Rust #1353

Draft
wants to merge 13 commits into
base: v3
Choose a base branch
from
Draft

Fix usrsctp usage in Rust #1353

wants to merge 13 commits into from

Conversation

ibc
Copy link
Member

@ibc ibc commented Mar 5, 2024

WIP

Fixes #1352

Details

  • Basically as described in the ticket. But not everything is done at all.
  • Also, I'm testing this in Node by using UV async stuff (which doesn't make sense in mediasoup for Node but anyway).

TODO

  • None of these changes should take effect when in Node, so we need to pass (or to NOT pass) some define only from Rust to enable this in the C++ code. We don't want to deal with UV async stuff when in Node because it's not needed at all, so let's see how to do it.

  • Missing thread X to initialize usrsctp and run the Checker singleton. And many other things.

ibc added 7 commits March 5, 2024 20:21
**WIP**

Fixes #1352

### Details

- Basically as described in the ticket. But not everything is done at all.
- Also, I'm testing this in Node by using UV async stuff (which doesn't make sense in mediasoup for Node but anyway).

### TODO

- None of these changes should take effect when in Node, so we need to pass (or to NOT pass) some `define` only from Rust to enable this in the C++ code. We don't want to deal with UV async stuff when in Node because it's not needed at all, so let's see how to do it.

- Missing thread X to initialize usrsctp and run the `Checker` singleton. And many other things.

- Crash when a `SctpAssociation` is closed. I think it's because somehow the `onAsync` callback is invoked asynchronously (of course) so when it calls `sctpAssociation->OnUsrSctpSendSctpData()` it happens that such a `SctpAssociation` has already been freed. Not sure how to resolve it. Here the logs:
  ```
  mediasoup:Transport close() +18s
  mediasoup:Channel request() [method:ROUTER_CLOSE_TRANSPORT] +8s
  mediasoup:Producer transportClosed() +19s
  mediasoup:DataProducer transportClosed() +18s
  mediasoup:DataProducer transportClosed() +0ms
  mediasoup:Transport close() +1ms
  mediasoup:Channel request() [method:ROUTER_CLOSE_TRANSPORT] +1ms
  mediasoup:Consumer transportClosed() +19s
  mediasoup:DataConsumer transportClosed() +18s
  mediasoup:DataConsumer transportClosed() +1ms
  mediasoup:Channel [pid:98040] RTC::SctpAssociation::ResetSctpStream() | SCTP_RESET_STREAMS sent [streamId:1] +1ms
  mediasoup:Channel request succeeded [method:ROUTER_CLOSE_TRANSPORT, id:39] +0ms
  DepUsrSCTP::onAsync() | ---------- onAsync!!
  DepUsrSCTP::onAsync() | ---------- onAsync, sending SCTP data!!
  mediasoup:Channel Producer Channel ended by the worker process +1ms
  mediasoup:ERROR:Worker worker process died unexpectedly [pid:98040, code:null, signal:SIGSEGV] +0ms
  ```
@ibc
Copy link
Member Author

ibc commented Mar 6, 2024

Issue 1: test-node-sctp.ts fails

I've added some console logs: 069f78e

The test fails because first sent message is not later received by the data consumer:

npx jest --testPathPattern node/src/test/test-node-sctp.ts
  console.log
    TODO: Revert numMessages to 200

      at node/src/test/test-node-sctp.ts:116:10

  console.log
    ---- sending id 1

      at node/src/test/test-node-sctp.ts:143:12

  console.log
    ---- sending id 2

      at node/src/test/test-node-sctp.ts:143:12

  console.log
    ---- sending id 3

      at node/src/test/test-node-sctp.ts:143:12

  console.log
    ---- received id 2

      at SCTPStreamReadable.<anonymous> (node/src/test/test-node-sctp.ts:175:13)

 FAIL  test/test-node-sctp.ts (7.075 s)
  ✕ ordered DataProducer delivers all SCTP messages to the DataConsumer (4267 ms)

  ● ordered DataProducer delivers all SCTP messages to the DataConsumer

    id 2 in message should match numReceivedMessages 1

      177 | 				if (id !== numReceivedMessages) {
      178 | 					reject(
    > 179 | 						new Error(
          | 						^
      180 | 							`id ${id} in message should match numReceivedMessages ${numReceivedMessages}`
      181 | 						)
      182 | 					);

      at SCTPStreamReadable.<anonymous> (node/src/test/test-node-sctp.ts:179:7)

UPDATE: Issue found. Problem is that when usrsctp send callback is called, there we store sending data into a map and then invoke uv async (which will happen time later) and such a uv async will read from that storage to send the data. Problem is that, if two sequential messages must be sent to a peer, the second one will override the first one in the storage so when uv async callback is executed it will only read the second one. Here the issue (see how onSendSctpData() is called twice sequentially):

---- test | sending id 1
---- test | sending id 2
RTC::Transport::OnSctpAssociationMessageReceived() | ------ [streamId:123, ppid:53, msg:'1']
RTC::DataProducer::ReceiveMessage() | ------ [msg:'1']
RTC::Router::OnTransportDataProducerMessageReceived() | ------ [ppid:53, msg:'1']
RTC::DataConsumer::SendMessage() | ------ [ppid:53, msg:'1']
RTC::Transport::OnDataConsumerSendMessage() | ------ [ppid:53, msg:'1']
RTC::PlainTransport::SendMessage() | ------ [ppid:53, msg:'1']
RTC::SctpAssociation::SendSctpMessage() | ------ [ppid:53, msg:'1']
DepUsrSCTP::onSendSctpData() | ****
DepUsrSCTP::SendSctpData() | **** previous store.data: 0x0 [len:0]
DepUsrSCTP::onSendSctpData() | ****
DepUsrSCTP::SendSctpData() | **** previous store.data: 0x600001a88040 [len:32]
DepUsrSCTP::onAsync() | ****
RTC::Transport::OnSctpAssociationMessageReceived() | ------ [streamId:123, ppid:53, msg:'2']
RTC::DataProducer::ReceiveMessage() | ------ [msg:'2']
RTC::Router::OnTransportDataProducerMessageReceived() | ------ [ppid:53, msg:'2']
RTC::DataConsumer::SendMessage() | ------ [ppid:53, msg:'2']
RTC::Transport::OnDataConsumerSendMessage() | ------ [ppid:53, msg:'2']
RTC::PlainTransport::SendMessage() | ------ [ppid:53, msg:'2']
RTC::SctpAssociation::SendSctpMessage() | ------ [ppid:53, msg:'2']
DepUsrSCTP::onSendSctpData() | ****
DepUsrSCTP::SendSctpData() | **** previous store.data: 0x0 [len:0]
DepUsrSCTP::onAsync() | ****
---- test | received id 2

 RUNS  test/test-node-sctp.ts
 FAIL  test/test-node-sctp.ts
  ✕ ordered DataProducer delivers all SCTP messages to the DataConsumer (1645 ms)

  ● ordered DataProducer delivers all SCTP messages to the DataConsumer

    id 2 in message should match numReceivedMessages 1

Solution is: Store all pending messages to be sent in a container.

@jmillan
Copy link
Member

jmillan commented Mar 6, 2024

Maybe related to this?:

https://docs.libuv.org/en/v1.x/async.html

libuv will coalesce calls to uv_async_send(), that is, not every call to it will yield an execution of the callback. For example: if uv_async_send() is called 5 times in a row before the callback is called, the callback will only be called once. If uv_async_send() is called again after the callback was called, it will be called again.

@ibc
Copy link
Member Author

ibc commented Mar 6, 2024

Ok, so I've solved the problem as follows: 1a25bed

Test now works (for simplicity is temporary reduced to just 2 messages) and all logs are good:

DepUsrSCTP::SendSctpDataStore() | ---------- store constructor [fooId:1]
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:0]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:1, items.size:1]
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:1]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:1, data:0x6000014fc620]
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:2, items.size:2]
DepUsrSCTP::onAsync() | **** onAsync() called
DepUsrSCTP::onAsync() | ------------- sending pening messages [items.size:2]
DepUsrSCTP::ClearItems() | ---------- store ClearItems() [fooId:1, items.size():2]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:2, data:0x7f78ff705690]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:1, data:0x6000014fc620]
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:0]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:3, items.size:1]
DepUsrSCTP::onAsync() | **** onAsync() called
DepUsrSCTP::onAsync() | ------------- sending pening messages [items.size:1]
DepUsrSCTP::ClearItems() | ---------- store ClearItems() [fooId:1, items.size():1]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:3, data:0x600003df8090]
TODO: Revert numMessages to 200
---- test | sending id 1
---- test | sending id 2
RTC::Transport::OnSctpAssociationMessageReceived() | ------ [streamId:123, ppid:53, msg:'1']
RTC::DataProducer::ReceiveMessage() | ------ [msg:'1']
RTC::DataConsumer::SendMessage() | ------ [ppid:53, msg:'1']
RTC::SctpAssociation::SendSctpMessage() | ------ [ppid:53, msg:'1']
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:0]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:4, items.size:1]
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:1]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:5, items.size:2]
DepUsrSCTP::onAsync() | **** onAsync() called
DepUsrSCTP::onAsync() | ------------- sending pening messages [items.size:2]
DepUsrSCTP::ClearItems() | ---------- store ClearItems() [fooId:1, items.size():2]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:5, data:0x600003fe0c00]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:4, data:0x600003fe08e0]
---- test | received id 1
RTC::Transport::OnSctpAssociationMessageReceived() | ------ [streamId:123, ppid:53, msg:'2']
RTC::DataProducer::ReceiveMessage() | ------ [msg:'2']
RTC::DataConsumer::SendMessage() | ------ [ppid:53, msg:'2']
RTC::SctpAssociation::SendSctpMessage() | ------ [ppid:53, msg:'2']
DepUsrSCTP::onSendSctpData() | **** onSendSctpData() called
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back()... [items.size:0]
DepUsrSCTP::SendSctpDataItem() | ---------- item constructor
DepUsrSCTP::SendSctpData() | -------- store.items.emplace_back() DONE [item.fooId:6, items.size:1]
DepUsrSCTP::onAsync() | **** onAsync() called
DepUsrSCTP::onAsync() | ------------- sending pening messages [items.size:1]
DepUsrSCTP::ClearItems() | ---------- store ClearItems() [fooId:1, items.size():1]
DepUsrSCTP::~SendSctpDataItem() | ---------- item destructor [fooId:6, data:0x600003ffeec0]
---- test | received id 2

 PASS  test/test-node-sctp.ts (5.395 s)
  ✓ ordered DataProducer delivers all SCTP messages to the DataConsumer (1608 ms)

@ibc
Copy link
Member Author

ibc commented Mar 6, 2024

And OF COURSE a double free bug that just happens in Rust in CI in Ubuntu and not in MacOS, to make me spend yet another 2 extra days on this: https://github.com/versatica/mediasoup/actions/runs/8172985292/job/22344414084?pr=1353

@jmillan
Copy link
Member

jmillan commented Mar 6, 2024

Are you making sure that threadA is not writing to a SendSctpDataStore at the when at the same time the uv main thread is reading it?

@ibc
Copy link
Member Author

ibc commented Mar 6, 2024

Are you making sure that threadA is not writing to a SendSctpDataStore at the when at the same time the uv main thread is reading it?

ClassDestroy() method should also use the mutex since it clears the storage map so every store is deallocated so it deletes all items in the items array. It maybe that.

@ibc
Copy link
Member Author

ibc commented Mar 6, 2024

Also take into account that this PR doesn't yet implement the thing about running the Checker in a separate thread.

@ibc
Copy link
Member Author

ibc commented Mar 6, 2024

Are you making sure that threadA is not writing to a SendSctpDataStore at the when at the same time the uv main thread is reading it?

ClassDestroy() method should also use the mutex since it clears the storage map so every store is deallocated so it deletes all items in the items array. It maybe that.

No, false alarm, it already calls const std::lock_guard<std::mutex> lock(GlobalSyncMutex);.

…the Worker is closed) and more things

Also make checker static rather than thread_local static.

Remove many debugging logs and stuff.

TODO: Rust tests do not finish.
@ibc
Copy link
Member Author

ibc commented Mar 6, 2024

I've done many changes in ca5f222:

  1. Move creation of DepUsrSCTP::Checker from Worker constructor/destructor to DepUsrSCTP::ClassInit() and DepUsrSCTP::ClassDestroy(). This comes with an issue (see Issue 2 in next comment).
  2. Make the DepUsrSCTP::Checker singleton be static rather than thread_local static. Why? Because there must be only one and not one per thread (in Rust).

@ibc
Copy link
Member Author

ibc commented Mar 6, 2024

Issue 2

When running any test in Node:

npx jest --testPathPattern node/src/test/test-node-sctp.ts
      mediasoup:ERROR:Worker (stderr) DepLibUV::onWalk() | alive UV handle found (this shouldn't happen) [type:timer, active:0, closing:1, has_ref:1] +0ms

      492 | 			for (const line of buffer.toString('utf8').split('\n')) {
      493 | 				if (line) {
    > 494 | 					workerLogger.error(`(stderr) ${line}`);
          | 					             ^
      495 | 				}
      496 | 			}
      497 | 		});

This is because DepUsrSCTP::CloseChecker() is now called from DepUsrSCTP::ClassDestroy() rather than from Worker::Close() method. Not sure why it happens honestly, not sure what the difference is. I've added logs to file in /tmp/ms_log.txt and the problem is clear:

---- Timer created
---- Timer created
---- onCloseTimer
---- WORKER CLOSED
---- DepUsrSCTP::ClassDestroy()---- DepUsrSCTP::ClassDestroy() | calling mapAsyncHandlerSendSctpData.clear()
---- onWalk
---- onCloseTimer

As you can see, when onWalk() runs (which is defined in DepLibUV.cpp and it's called by lib.cpp when the Worker instance ends) the timer of the Checker singleton hasn't been yet freed. It's done later.

@ibc
Copy link
Member Author

ibc commented Mar 7, 2024

After a meeting with Jose we have decided that, as originally planned, the timer of the usrsctp Checker singleton must run in a separate thread X and when the first SctpAssociation is created or when the last one is destroyed (in any Worker thread) then DepUsrSctp must use uv_async_send to tell the Checker singleton to start, stop or restart the timer. And same when DepUsrSctp::HandleUsrSctpTimers() is called if PR #1351 is merged.

@satoren
Copy link
Contributor

satoren commented Mar 8, 2024

Are you making sure that threadA is not writing to a SendSctpDataStore at the when at the same time the uv main thread is reading it?

Running a test with ThreadSanitizer enabled may help confirm that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

Multi-thread bug when using usrsctp in N Worker threads in Rust
3 participants