Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leakage and high cpu usage for each peer #529

Closed
LIMPIX31 opened this issue Jan 22, 2024 · 6 comments
Closed

Memory leakage and high cpu usage for each peer #529

LIMPIX31 opened this issue Jan 22, 2024 · 6 comments

Comments

@LIMPIX31
Copy link

LIMPIX31 commented Jan 22, 2024

I wrote a simple application to transmit H264 over webrtc. I only need to connect to the server, so the signaling server also handles the peers in the same rust app.

I have 4 tracks here Arc<RwLock<HashMap<String, Arc<TrackLocalStaticSample>>>>, although I'm really only using one track

I am using axum for the signaling server and making a single POST request (offer) and returning an answer to the client. That's all I need to establish a connection.

The problem is that each negotiation process takes 5-10 Mb of memory (25Mb at startup), sometimes there can be a sudden jump to 200Mb. Further connections are not possible because it also increases the load on the CPU. After disconnecting the peer, the load does not decrease.

For only one connection memory consumption increases by 2-3 Mb (for 10 minutes), then stays the same.

I think the problem is how I make the connection and hopefully I'm doing something wrong and it's not a webrtc-rs error.

pub async fn handle_peer(
  config: Arc<Config>,
  offer: RTCSessionDescription,
  track: Arc<BroadcastTrack>,
  candidates: Vec<RTCIceCandidateInit>,
) -> Result<(RTCSessionDescription, Vec<RTCIceCandidateInit>)> {
  let mut m = MediaEngine::default();

  m.register_default_codecs()?;

  let api = APIBuilder::new().with_media_engine(m).build();

  let mut ice_servers = vec![RTCIceServer {
    urls: vec!["stun:stun.l.google.com:19302".to_owned()],
    ..Default::default()
  }];

  ice_servers.extend(config.turns.iter().map(|it| RTCIceServer {
    urls: vec![it.url.clone()],
    username: it.username.clone(),
    credential: it.credential.clone(),
    credential_type: RTCIceCredentialType::Password,
  }));

  let config = RTCConfiguration {
    ice_servers,
    ..Default::default()
  };

  let peer_connection = api.new_peer_connection(config).await?;

  let _ = peer_connection.add_track(track.clone()).await?;

  peer_connection.on_peer_connection_state_change(Box::new(move |s| {
    info!("Peer Connection State has changed: {s}");
    Box::pin(async {})
  }));

  peer_connection.set_remote_description(offer).await?;

  for candidate in candidates {
    peer_connection.add_ice_candidate(candidate).await?;
  }

  let (candidates_tx, mut candidates_rx) = mpsc::unbounded_channel();
  let candidates_tx = Arc::new(candidates_tx);

  peer_connection.on_ice_candidate(Box::new(move |candidate| {
    let candidates_tx = candidates_tx.clone();

    Box::pin(async move {
      candidates_tx
        .send(candidate.map(|it| it.to_json().unwrap()))
        .unwrap();
    })
  }));

  let answer = peer_connection.create_answer(None).await?;
  peer_connection.set_local_description(answer).await?;

  let mut candidates = Vec::with_capacity(16);

  while let Some(Some(candidate)) = candidates_rx.recv().await {
    candidates.push(candidate);
  }

  Ok((
    peer_connection
      .local_description()
      .await
      .context("No answer found")?,
    candidates,
  ))
}

Axum handler

pub async fn offer_handler(
  _claims: Claims,
  State(state): State<Arc<AppState>>,
  Json(offer): Json<Offer>,
) -> impl IntoResponse {
  if let Some(track) = state.tracks.read().await.get(&offer.sid) {
    let (sdp, candidates) = match handle_peer(
      state.config.clone(),
      offer.spd,
      track.clone(),
      offer.candidates,
    )
    .await
    {
      Ok(it) => it,
      Err(err) => {
        error!(%err, "{}", err.backtrace());
        return StatusCode::INTERNAL_SERVER_ERROR.into_response();
      }
    };

    (StatusCode::OK, Json(Answer { sdp, candidates })).into_response()
  } else {
    StatusCode::BAD_REQUEST.into_response()
  }
}

Minimal reproduction

async fn create_peer() -> Result<RTCPeerConnection> {
  let track = Arc::new(TrackLocalStaticSample::new(
    RTCRtpCodecCapability {
      mime_type: MIME_TYPE_H264.to_owned(),
      ..Default::default()
    },
    "video".to_owned(),
    "restream".to_owned(),
  ));

  let mut m = MediaEngine::default();

  m.register_default_codecs()?;

  let api = APIBuilder::new().with_media_engine(m).build();

  let ice_servers = vec![RTCIceServer {
    urls: vec!["stun:stun.l.google.com:19302".to_owned()],
    ..Default::default()
  }];

  let config = RTCConfiguration {
    ice_servers,
    ..Default::default()
  };

  let peer_connection = api.new_peer_connection(config).await?;
  let _ = peer_connection.add_track(track.clone()).await?;

  Ok(peer_connection)
}

#[tokio::main]
async fn main() -> Result<()> {
  let _ = create_peer().await?;

  Ok(())
}

Report

       Error leaked 75.8 kiB in 1 block
        Info at malloc (vg_replace_malloc.c:442)
             at alloc (alloc.rs:98)
             at alloc::alloc::Global::alloc_impl (alloc.rs:181)
             at allocate (alloc.rs:241)
             at alloc::alloc::exchange_malloc (alloc.rs:330)
             at new<alloc::sync::ArcInner<tokio::sync::mutex::Mutex<alloc::vec::Vec<alloc::sync::Arc<webrtc::rtp_transceiver::RTCRtpTransceiver, alloc::alloc::Global>, alloc::alloc::Global>>>> (boxed.rs:217)
             at alloc::sync::Arc<T>::new (sync.rs:389)
             at webrtc::peer_connection::peer_connection_internal::PeerConnectionInternal::new::{{closure}} (peer_connection_internal.rs:95)
             at webrtc::peer_connection::RTCPeerConnection::new::{{closure}} (mod.rs:236)
             at webrtc::api::API::new_peer_connection::{{closure}} (mod.rs:48)
             at restream_leak::create_peer::{{closure}} (leak.rs:48)
             at restream_leak::main::{{closure}} (leak.rs:56)
             at tokio::runtime::park::CachedParkThread::block_on::{{closure}} (park.rs:282)
             at with_budget<core::task::poll::Poll<core::result::Result<(), anyhow::Error>>, tokio::runtime::park::{impl#4}::block_on::{closure_env#0}<restream_leak::main::{async_block_env#0}>> (coop.rs:107)
             at budget<core::task::poll::Poll<core::result::Result<(), anyhow::Error>>, tokio::runtime::park::{impl#4}::block_on::{closure_env#0}<restream_leak::main::{async_block_env#0}>> (coop.rs:73)
             at tokio::runtime::park::CachedParkThread::block_on (park.rs:282)
             at tokio::runtime::context::blocking::BlockingRegionGuard::block_on (blocking.rs:66)
     Summary Leaked 75.8 kiB total

Arc<Mutex<Vec<Arc<RTCRtpTransceiver>>>> inside PeerConnectionInternal is not deallocated

@LIMPIX31
Copy link
Author

LIMPIX31 commented Jan 25, 2024

Okay. In the second case I forgot to close the connections, but in the first case there's still something wrong.

peer_connection.close().await hangs

  peer_connection.on_peer_connection_state_change({
    let peer_connection = peer_connection.clone();

    Box::new(move |s| {
      info!("Peer Connection State has changed: {s}");
      let peer_connection = peer_connection.clone();
      Box::pin(async move {
        use RTCPeerConnectionState::*;

        if let Disconnected | Failed = s {
          info!("Attempting to close ended connection");
          if let Err(err) = peer_connection.close().await {
            error!(%err);
          } else {
            info!("Connection closed");
          }
        }
      })
    })
  });

@samguns
Copy link

samguns commented Feb 17, 2024

Interestingly, we have an app seemingly the same as you've done. It employs HTTP served by Axum to do the signaling. Everything works fine but a similar scenario especially the high CPU usage was observed in our app as well, even if we made sure the peer connection had been closed explicitly. Did you get any progress further concerning this issue? @LIMPIX31

@LIMPIX31
Copy link
Author

Interestingly, we have an app seemingly the same as you've done. It employs HTTP served by Axum to do the signaling. Everything works fine but a similar scenario especially the high CPU usage was observed in our app as well, even if we made sure the peer connection had been closed explicitly. Did you get any progress further concerning this issue? @LIMPIX31

Thanks for your reply. Unfortunately I was not able to find out more. I was in a hurry, so I had to write a broadcast server in golang as a temporary solution.

@samguns
Copy link

samguns commented Feb 22, 2024

Some follow-ups to share with you @LIMPIX31 and to all users especially devs of this project.
After browsering through PRs and we found this recently merged #514 seemed highly related to what we experienced. So we changed crate source to pull latest code from master branch instead of v0.8.0 in crate.io. The issue disappeared. Maybe you could try it as well and let's wait for a formal new release in crate.io.

@rainliu
Copy link
Member

rainliu commented Feb 22, 2024

Thanks for the sharing.

I will make a new release soon.

@rainliu
Copy link
Member

rainliu commented Feb 23, 2024

webrtc v0.10.0 is released.

@rainliu rainliu closed this as completed Feb 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants