diff --git a/Cargo.lock b/Cargo.lock index 43aa7f550..d20a7e528 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -101,6 +101,22 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "core-foundation" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "194a7a9e6de53fa55116934067c844d9d749312f75c6f6d0980e8c252f8c2146" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" + [[package]] name = "cpufeatures" version = "0.2.4" @@ -214,6 +230,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.0.1" @@ -438,10 +469,13 @@ dependencies = [ "anyhow", "env_logger", "futures-util", + "lazy_static", + "livekit-webrtc", "log", "prost 0.11.0", "prost-build", "prost-types 0.11.1", + "serde_json", "thiserror", "tokio", "tokio-tungstenite", @@ -509,6 +543,24 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "native-tls" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd7e2f3618557f980e0b17e8856252eee3c97fa12c54dff0ca290fb6266ca4a9" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "num_cpus" version = "1.13.1" @@ -525,6 +577,51 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "074864da206b4973b84eb91683020dbefd6a8c3f0f38e054d93954e891935e4e" +[[package]] +name = "openssl" +version = "0.10.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "618febf65336490dfcf20b73f885f5651a0c89c64c2d4a8c3662585a70bf5bd0" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "openssl-probe" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" + +[[package]] +name = "openssl-sys" +version = "0.9.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5f9bd0c2710541a3cda73d6f9ac4f1b240de4ae261065d309dbe73d9dceb42f" +dependencies = [ + "autocfg", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -576,6 +673,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" + [[package]] name = "ppv-lite86" version = "0.2.16" @@ -753,6 +856,22 @@ dependencies = [ "winapi", ] +[[package]] +name = "ryu" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" + +[[package]] +name = "schannel" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88d6731146462ea25d9244b2ed5fd1d716d25c52e4d54aa4fb0f3c4e9854dbe2" +dependencies = [ + "lazy_static", + "windows-sys", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -765,6 +884,46 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898" +[[package]] +name = "security-framework" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bc1bb97804af6631813c55739f771071e0f2ed33ee20b68c86ec505d906356c" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0160a13a177a45bfb43ce71c01580998474f556ad854dcbca936dd2841a5c556" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "serde" +version = "1.0.145" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b" + +[[package]] +name = "serde_json" +version = "1.0.85" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e55a28e3aaef9d5ce0506d0a14dbba8054ddc7e499ef522dd8b26859ec9d4a44" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "sha-1" version = "0.10.0" @@ -911,6 +1070,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-tungstenite" version = "0.17.2" @@ -919,7 +1088,9 @@ checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181" dependencies = [ "futures-util", "log", + "native-tls", "tokio", + "tokio-native-tls", "tungstenite", ] @@ -935,6 +1106,7 @@ dependencies = [ "http", "httparse", "log", + "native-tls", "rand", "sha-1", "thiserror", @@ -993,6 +1165,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" diff --git a/crates/livekit-webrtc/.cargo/config b/crates/.cargo/config similarity index 100% rename from crates/livekit-webrtc/.cargo/config rename to crates/.cargo/config diff --git a/crates/livekit-core/Cargo.toml b/crates/livekit-core/Cargo.toml index 486f2fd7a..8ddd856d0 100644 --- a/crates/livekit-core/Cargo.toml +++ b/crates/livekit-core/Cargo.toml @@ -4,8 +4,9 @@ version = "0.1.0" edition = "2021" [dependencies] +serde_json = "1.0" log = "0.4" -tokio-tungstenite = "0.17.2" +tokio-tungstenite = { version = "0.17.2", features = ["native-tls"] } tokio = { version = "1.20.1", features = ["full"] } url = "2.2.2" futures-util = "0.3.23" @@ -13,6 +14,8 @@ thiserror = "1.0" prost = "0.11.0" prost-types = "0.11.1" anyhow = "1.0.63" +livekit-webrtc = { path = "../livekit-webrtc" } +lazy_static = "1.4.0" [build-dependencies] prost-build = { version = "0.10" } diff --git a/crates/livekit-core/src/lib.rs b/crates/livekit-core/src/lib.rs index 15669d31d..de8d91053 100644 --- a/crates/livekit-core/src/lib.rs +++ b/crates/livekit-core/src/lib.rs @@ -2,6 +2,8 @@ pub mod proto { include!(concat!(env!("OUT_DIR"), "/livekit.rs")); } +mod lk_runtime; +mod pc_transport; mod rtc_engine; mod signal_client; diff --git a/crates/livekit-core/src/lk_runtime.rs b/crates/livekit-core/src/lk_runtime.rs new file mode 100644 index 000000000..041158bfd --- /dev/null +++ b/crates/livekit-core/src/lk_runtime.rs @@ -0,0 +1,25 @@ +use log::trace; + +use livekit_webrtc::peer_connection_factory::PeerConnectionFactory; +use livekit_webrtc::webrtc::RTCRuntime; + +pub struct LKRuntime { + pub rtc_runtime: RTCRuntime, + pub pc_factory: PeerConnectionFactory, +} + +impl LKRuntime { + pub fn new() -> Self { + trace!("LKRuntime::new()"); + Self { + rtc_runtime: RTCRuntime::new(), + pc_factory: PeerConnectionFactory::new(), + } + } +} + +impl Drop for LKRuntime { + fn drop(&mut self) { + trace!("LKRuntime::drop()"); + } +} diff --git a/crates/livekit-core/src/pc_transport.rs b/crates/livekit-core/src/pc_transport.rs new file mode 100644 index 000000000..338cc4d69 --- /dev/null +++ b/crates/livekit-core/src/pc_transport.rs @@ -0,0 +1,122 @@ +use std::sync::Arc; +use std::time::Duration; + +use log::{error, trace}; + +use livekit_webrtc::jsep::{IceCandidate, SessionDescription}; +use livekit_webrtc::peer_connection::{ + PeerConnection, RTCOfferAnswerOptions, SignalingState, +}; +use livekit_webrtc::peer_connection_factory::RTCConfiguration; +use livekit_webrtc::rtc_error::RTCError; + +use crate::lk_runtime::LKRuntime; + +const NEGOTIATION_FREQUENCY: Duration = Duration::from_millis(150); // TODO(theomonnom) + +pub type OnOfferHandler = Box; + +pub struct PCTransport { + peer_connection: PeerConnection, + pending_candidates: Vec, + on_offer_handler: Option, + restarting_ice: bool, + renegotiate: bool, +} + +impl PCTransport { + pub fn new(lk_runtime: Arc, cfg: RTCConfiguration) -> Result { + let peer_connection = lk_runtime.pc_factory.create_peer_connection(cfg)?; + + Ok(Self { + peer_connection, + pending_candidates: Vec::default(), + on_offer_handler: None, + restarting_ice: false, + renegotiate: false, + }) + } + + pub fn peer_connection(&mut self) -> &mut PeerConnection { + &mut self.peer_connection + } + + pub fn on_offer(&mut self, handler: OnOfferHandler) { + self.on_offer_handler = Some(handler); + } + + pub async fn add_ice_candidate(&mut self, ice_candidate: IceCandidate) -> Result<(), RTCError> { + if self.peer_connection.remote_description().is_none() { + self.pending_candidates.push(ice_candidate); + return Ok(()); + } + + self.peer_connection.add_ice_candidate(ice_candidate).await?; + Ok(()) + } + + pub async fn set_remote_description( + &mut self, + remote_description: SessionDescription, + ) -> Result<(), RTCError> { + self.peer_connection + .set_remote_description(remote_description) + .await?; + + for ic in self.pending_candidates.drain(..) { + self.peer_connection.add_ice_candidate(ic).await?; + } + self.restarting_ice = false; + + if self.renegotiate { + self.renegotiate = false; + self.create_and_send_offer(RTCOfferAnswerOptions::default()) + .await?; + } + + Ok(()) + } + + pub async fn negotiate(&mut self) -> Result<(), RTCError> { + // TODO(theomonnom) Debounce here with NEGOTIATION_FREQUENCY + self.create_and_send_offer(RTCOfferAnswerOptions::default()) + .await + } + + async fn create_and_send_offer( + &mut self, + options: RTCOfferAnswerOptions, + ) -> Result<(), RTCError> { + if self.on_offer_handler.is_none() { + return Ok(()); + } + + if options.ice_restart { + trace!("restarting ICE"); + self.restarting_ice = true; + } + + if self.peer_connection.signaling_state() == SignalingState::HaveLocalOffer { + if options.ice_restart { + if let Some(remote_description) = self.peer_connection.remote_description() { + self.peer_connection + .set_remote_description(remote_description) + .await?; + } else { + error!("trying to ice restart when the pc doesn't have remote description"); + } + } else { + self.renegotiate = true; + return Ok(()); + } + } + + let offer = self.peer_connection.create_offer(options).await?; + trace!("created offer {:?}", offer); + self.peer_connection + .set_local_description(offer.clone()) + .await?; + self.on_offer_handler.as_mut().unwrap()(offer); + Ok(()) + } +} diff --git a/crates/livekit-core/src/rtc_engine.rs b/crates/livekit-core/src/rtc_engine.rs index fa61959ef..56b08f1c0 100644 --- a/crates/livekit-core/src/rtc_engine.rs +++ b/crates/livekit-core/src/rtc_engine.rs @@ -1,59 +1,419 @@ -use crate::proto::signal_response; -use crate::signal_client::{SignalClient, SignalClientError}; -use log::error; -use tokio::sync::broadcast; +use std::sync::{Arc, Mutex, Weak}; +use std::time::Duration; -pub struct RTCEngine { - signal_client: SignalClient, +use lazy_static::lazy_static; +use log::{error, trace}; +use prost::Message as ProstMessage; +use thiserror::Error; +use tokio::sync::mpsc; +use tokio::time::sleep; +use tokio_tungstenite::tungstenite::protocol::frame::coding::Data; + +use livekit_webrtc::data_channel::{DataChannel, DataChannelInit}; +use livekit_webrtc::jsep::{IceCandidate, SdpParseError, SessionDescription}; +use livekit_webrtc::peer_connection::{PeerConnectionState, RTCOfferAnswerOptions}; +use livekit_webrtc::peer_connection_factory::{ + ContinualGatheringPolicy, ICEServer, IceTransportsType, RTCConfiguration, +}; +use livekit_webrtc::rtc_error::RTCError; + +use crate::{proto, signal_client}; +use crate::lk_runtime::LKRuntime; +use crate::pc_transport::PCTransport; +use crate::proto::{ + DataPacket, JoinResponse, signal_request, signal_response, SignalTarget, TrickleRequest, +}; +use crate::signal_client::{SignalClient, SignalError}; + +const LOSSY_DC_LABEL: &str = "_lossy"; +const RELIABLE_DC_LABEL: &str = "_reliable"; + +#[derive(Error, Debug)] +pub enum EngineError { + #[error("signal failure")] + Signal(#[from] SignalError), + #[error("internal webrtc failure")] + Rtc(#[from] RTCError), + #[error("failed to parse sdp")] + Parse(#[from] SdpParseError), + #[error("serde error")] + Serde(#[from] serde_json::Error), } -impl RTCEngine { - pub fn new() -> RTCEngine { - Self { - signal_client: SignalClient::new(), +#[derive(PartialEq, Debug, Copy, Clone)] +enum PCState { + New, + Connected, + Disconnected, + Reconnecting, + Closed, +} + +lazy_static! { + // Share one LKRuntime across all RTCEngine instances + static ref LK_RUNTIME: Mutex> = Mutex::new(Weak::new()); +} + +enum EngineMessage {} + +struct PeerInternal { + publisher_pc: PCTransport, + subscriber_pc: PCTransport, + + lossy_dc: DataChannel, + reliable_dc: DataChannel, + + pub_ice_rx: mpsc::Receiver, + sub_ice_rx: mpsc::Receiver, + + pub_offer_rx: mpsc::Receiver, + + primary_connection_state_rx: mpsc::Receiver, + secondary_connection_state_rx: mpsc::Receiver, + + lossy_data_rx: mpsc::Receiver, + reliable_data_rx: mpsc::Receiver, + + sub_dc_rx: mpsc::Receiver, + + pc_state: PCState, +} + +struct RTCInternal { + #[allow(unused)] + lk_runtime: Arc, + signal_client: Arc, + pc_internal: PeerInternal, +} + +impl RTCInternal { + async fn connect(url: &str, token: &str) -> Result { + let mut lk_runtime = None; + { + // Acquire an existing/a new LKRuntime + let mut lk_runtime_ref = LK_RUNTIME.lock().unwrap(); + lk_runtime = lk_runtime_ref.upgrade(); + + if lk_runtime.is_none() { + let new_runtime = Arc::new(LKRuntime::new()); + *lk_runtime_ref = Arc::downgrade(&new_runtime); + lk_runtime = Some(new_runtime); + } + } + let lk_runtime = lk_runtime.unwrap(); + let signal_client = Arc::new(signal_client::connect(url, token).await?); + + trace!("waiting join_response.."); + if let signal_response::Message::Join(join) = signal_client.recv().await? { + trace!("configuring peer_connections: {:?}", join); + let mut pc_internal = Self::configure(lk_runtime.clone(), join.clone())?; + + if !join.subscriber_primary { + pc_internal.publisher_pc.negotiate().await?; + } + + Ok(Self { + lk_runtime, + signal_client, + pc_internal, + }) + } else { + panic!("the first received message isn't a JoinResponse"); } } - pub async fn connect(&mut self, url: &str, token: &str) -> Result<(), SignalClientError> { - self.signal_client.connect(url, token).await?; + fn request_signal(&mut self, msg: signal_request::Message) { + tokio::spawn({ + let sc = self.signal_client.clone(); - tokio::spawn(Self::handle_rtc( - self.signal_client.response_rx.resubscribe(), - )); + async move { + if let Err(err) = sc.send(msg).await { + error!("failed to send signal: {:?}", err); + } + } + }); + } + + async fn handle_signal(&mut self, signal: signal_response::Message) -> Result<(), EngineError> { + match signal { + signal_response::Message::Answer(answer) => { + let sdp = SessionDescription::from(answer.r#type.parse().unwrap(), &answer.sdp)?; + self.pc_internal.publisher_pc.set_remote_description(sdp).await?; + }, + signal_response::Message::Offer(offer) => { + let sdp = SessionDescription::from(offer.r#type.parse().unwrap(), &offer.sdp)?; + self.pc_internal.subscriber_pc.set_remote_description(sdp).await?; + let answer = self.pc_internal.subscriber_pc.peer_connection().create_answer(RTCOfferAnswerOptions::default()).await?; + self.pc_internal.subscriber_pc.peer_connection().set_local_description(answer.clone()).await?; + + self.request_signal(signal_request::Message::Answer(proto::SessionDescription { + r#type: "answer".to_string(), + sdp: answer.to_string(), + })); + }, + signal_response::Message::Trickle(trickle) => { + let json: serde_json::Value = serde_json::from_str(&trickle.candidate_init)?; + let ice = IceCandidate::from( + json["sdpMid"].as_str().unwrap(), + json["sdpMLineIndex"].as_i64().unwrap().try_into().unwrap(), + json["candidate"].as_str().unwrap() + )?; + + if trickle.target == SignalTarget::Publisher as i32 { + self.pc_internal.publisher_pc.add_ice_candidate(ice).await?; + } else { + self.pc_internal.subscriber_pc.add_ice_candidate(ice).await?; + } + } + _ => {}, + } Ok(()) } - pub fn update(&self) {} - - async fn handle_rtc(mut signal_receiver: broadcast::Receiver) { + async fn run(&mut self) { loop { - let msg = match signal_receiver.recv().await { - Ok(msg) => msg, - Err(error) => { - error!("Failed to receive SignalResponse: {:?}", error); - continue; + tokio::select! { + Ok(signal) = self.signal_client.recv() => { + if let Err(err) = self.handle_signal(signal).await { + error!("failed to handle signal: {:?}", err); + } + }, + Some(ice_candidate) = self.pc_internal.pub_ice_rx.recv() => { + self.request_signal(signal_request::Message::Trickle(TrickleRequest { + candidate_init: ice_candidate.to_string(), + target: SignalTarget::Publisher as i32 + })); + }, + Some(ice_candidate) = self.pc_internal.sub_ice_rx.recv() => { + self.request_signal(signal_request::Message::Trickle(TrickleRequest { + candidate_init: ice_candidate.to_string(), + target: SignalTarget::Subscriber as i32 + })); + }, + Some(sdp) = self.pc_internal.pub_offer_rx.recv() => { + trace!("received publisher offer: {:?}", sdp); + self.request_signal(signal_request::Message::Offer(proto::SessionDescription { + r#type: "offer".to_string(), + sdp: sdp.to_string(), + })); + }, + Some(state) = self.pc_internal.primary_connection_state_rx.recv() => { + if state == PeerConnectionState::Connected { + let old_state = self.pc_internal.pc_state; + self.pc_internal.pc_state = PCState::Connected; + + if old_state == PCState::New { + // TODO(theomonnom) OnConnected + } + } else if state == PeerConnectionState::Failed { + self.pc_internal.pc_state = PCState::Disconnected; + // TODO(theomonnom) Handle Disconnect + } + }, + Some(state) = self.pc_internal.secondary_connection_state_rx.recv() => { + if state == PeerConnectionState::Failed { + self.pc_internal.pc_state = PCState::Disconnected; + // TODO(theomonnom) Handle Disconnect + } + }, + Some(data) = self.pc_internal.lossy_data_rx.recv() => { + + }, + Some(data) = self.pc_internal.reliable_data_rx.recv() => { + + }, + Some(mut dc) = self.pc_internal.sub_dc_rx.recv() => { + // Subscriber DataChannels + // Only received when the subscriber_primary is enabled + trace!("using subscriber data channels"); + + let (data_tx, data_rx) = mpsc::channel(8); + Self::configure_dc(&mut dc, data_tx); + + if dc.label() == RELIABLE_DC_LABEL { + self.pc_internal.reliable_dc = dc; + self.pc_internal.reliable_data_rx = data_rx; + } else { + self.pc_internal.lossy_dc = dc; + self.pc_internal.lossy_data_rx = data_rx; + } } - }; - - match msg { - signal_response::Message::Join(join) => {} - signal_response::Message::Trickle(trickle) => {} - signal_response::Message::Answer(answer) => {} - signal_response::Message::Offer(offer) => {} - _ => {} } } } + + fn configure_dc(data_channel: &mut DataChannel, data_tx: mpsc::Sender) { + let label = data_channel.label(); + data_channel.on_message(Box::new(move |data, _| { + if let Ok(data) = DataPacket::decode(data) { + let _ = data_tx.blocking_send(data); + } else { + trace!("{} - failed to decode DataPacket", label); + } + })); + } + + fn configure( + lk_runtime: Arc, + join: JoinResponse, + ) -> Result { + let cfg = RTCConfiguration { + ice_servers: { + let mut servers = vec![]; + for is in join.ice_servers { + servers.push(ICEServer { + urls: is.urls, + username: is.username, + password: is.credential, + }) + } + servers + }, + continual_gathering_policy: ContinualGatheringPolicy::GatherContinually, + ice_transport_type: IceTransportsType::All, + }; + + // Create the PeerConnections + let mut publisher_pc = PCTransport::new(lk_runtime.clone(), cfg.clone())?; + let mut subscriber_pc = PCTransport::new(lk_runtime, cfg)?; + + let (pub_ice_tx, pub_ice_rx) = mpsc::channel(8); + let (sub_ice_tx, sub_ice_rx) = mpsc::channel(8); + let (pub_offer_tx, pub_offer_rx) = mpsc::channel(8); + let (primary_connection_state_tx, primary_connection_state_rx) = mpsc::channel(8); + let (secondary_connection_state_tx, secondary_connection_state_rx) = mpsc::channel(8); + let (lossy_data_tx, lossy_data_rx) = mpsc::channel(8); + let (reliable_data_tx, reliable_data_rx) = mpsc::channel(8); + let (sub_dc_tx, sub_dc_rx) = mpsc::channel(8); + + publisher_pc + .peer_connection() + .on_ice_candidate(Box::new(move |ice_candidate| { + trace!("publisher - on_ice_candidate: {:?}", ice_candidate); + let _ = pub_ice_tx.blocking_send(ice_candidate); + })); + + subscriber_pc + .peer_connection() + .on_ice_candidate(Box::new(move |ice_candidate| { + trace!("subscriber - on_ice_candidate: {:?}", ice_candidate); + let _ = sub_ice_tx.blocking_send(ice_candidate); + })); + + publisher_pc.on_offer(Box::new(move |offer| { + trace!("publisher - on_offer: {:?}", offer); + let _ = pub_offer_tx.blocking_send(offer); // TODO(theomonnom) Don't use blocking_send here + })); + + let mut primary_pc = &mut publisher_pc; + let mut secondary_pc = &mut subscriber_pc; + if join.subscriber_primary { + primary_pc = &mut subscriber_pc; + secondary_pc = &mut publisher_pc; + + primary_pc.peer_connection().on_data_channel(Box::new(move |dc| { + let _ = sub_dc_tx.blocking_send(dc); + })); + } + + primary_pc + .peer_connection() + .on_connection_change(Box::new(move |state| { + let _ = primary_connection_state_tx.blocking_send(state); + })); + + secondary_pc + .peer_connection() + .on_connection_change(Box::new(move |state| { + let _ = secondary_connection_state_tx.blocking_send(state); + })); + + // Note that when subscriber_primary feature is enabled, + // the subscriber uses his own data channels created by the server. + let mut lossy_dc = publisher_pc.peer_connection().create_data_channel( + LOSSY_DC_LABEL, + DataChannelInit { + ordered: true, + max_retransmits: Some(0), + ..DataChannelInit::default() + }, + )?; + + let mut reliable_dc = publisher_pc.peer_connection().create_data_channel( + RELIABLE_DC_LABEL, + DataChannelInit { + ordered: true, + ..DataChannelInit::default() + }, + )?; + + Self::configure_dc(&mut lossy_dc, lossy_data_tx); + Self::configure_dc(&mut reliable_dc, reliable_data_tx); + + Ok(PeerInternal { + publisher_pc, + subscriber_pc, + lossy_dc, + reliable_dc, + pub_ice_rx, + sub_ice_rx, + pub_offer_rx, + primary_connection_state_rx, + secondary_connection_state_rx, + lossy_data_rx, + reliable_data_rx, + sub_dc_rx, + pc_state: PCState::New, + }) + } +} + +pub struct RTCEngine {} + +/// Initialize the SignalClient & the PeerConnections +pub async fn connect(url: &str, token: &str) -> Result { + let mut rtc_internal = RTCInternal::connect(url, token).await?; + tokio::spawn(async move { + rtc_internal.run().await + }); + + Ok(RTCEngine{}) +} + +impl RTCEngine { + async fn rtc_handle() { + loop {} + } } #[tokio::test] async fn test_test() { env_logger::init(); - let mut engine = RTCEngine::new(); - engine.connect("ws://localhost:7880", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2NjQ1OTY4MDYsImlzcyI6IkFQSUNrSG04M01oZ2hQeCIsIm5hbWUiOiJ1c2VyMSIsIm5iZiI6MTY2MDk5NjgwNiwic3ViIjoidXNlcjEiLCJ2aWRlbyI6eyJyb29tIjoibXktZmlyc3Qtcm9vbSIsInJvb21Kb2luIjp0cnVlfX0.SWU_LETMK6ZmFOf38pYjVhpur0o7jJc6u61h8BH7g20").await.unwrap(); + let engine = connect("ws://localhost:7880", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2NzEyMzk4NjAsImlzcyI6IkFQSXpLYkFTaUNWYWtnSiIsIm5hbWUiOiJ0ZXN0IiwibmJmIjoxNjY0MDM5ODYwLCJzdWIiOiJ0ZXN0IiwidmlkZW8iOnsicm9vbUFkbWluIjp0cnVlLCJyb29tQ3JlYXRlIjp0cnVlLCJyb29tSm9pbiI6dHJ1ZX19.0Bee2jI2cSZveAbZ8MLc-ADoMYQ4l8IRxcAxpXAS6a8").await.unwrap(); + + + sleep(Duration::from_secs(60)).await; - // Wait before exiting the program - tokio::time::sleep(core::time::Duration::from_millis(1000 * 25)).await; } + +/*sync fn handle_rtc(mut signal_receiver: broadcast::Receiver) { + loop { + let msg = match signal_receiver.recv().await { + Ok(msg) => msg, + Err(error) => { + error!("Failed to receive SignalResponse: {:?}", error); + continue; + } + }; + + match msg { + Message::Join(join) => {} + Message::Trickle(trickle) => {} + Message::Answer(answer) => {} + Message::Offer(offer) => {} + _ => {} + } + } +}*/ diff --git a/crates/livekit-core/src/signal_client.rs b/crates/livekit-core/src/signal_client.rs index 717f94834..5fa87150e 100644 --- a/crates/livekit-core/src/signal_client.rs +++ b/crates/livekit-core/src/signal_client.rs @@ -1,100 +1,186 @@ -use futures_util::SinkExt; -use futures_util::StreamExt; +use futures_util::{SinkExt, StreamExt}; +use futures_util::stream::{SplitSink, SplitStream}; use log::{error, info}; -use prost::Message as ProtoMessage; -use std::borrow::Borrow; +use prost::Message as ProstMessage; use thiserror::Error; use tokio::net::TcpStream; -use tokio::sync::broadcast; +use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; -use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; +use tokio_tungstenite::tungstenite::{ + Error as WsError, + Message, protocol::frame::{CloseFrame, coding::CloseCode}, +}; -use crate::{proto, proto::signal_response}; +use crate::proto::{signal_request, signal_response, SignalRequest, SignalResponse}; + +pub const PROTOCOL_VERSION: u32 = 8; #[derive(Error, Debug)] -pub enum SignalClientError { +pub enum SignalError { #[error("websocket failure")] - WebSocket(#[from] tokio_tungstenite::tungstenite::Error), + WsError(#[from] WsError), #[error("failed to parse the url")] UrlParse(#[from] url::ParseError), - #[error("failed to parse messages from server")] + #[error("failed to decode messages from server")] ProtoParse(#[from] prost::DecodeError), } +type SignalResult = Result; +type WebSocket = WebSocketStream>; + +#[derive(Debug)] +struct RecvMessage { + response_chn: oneshot::Sender>, +} + +#[derive(Debug)] +struct SendMessage { + signal: signal_request::Message, + response_chn: oneshot::Sender>, +} + pub struct SignalClient { - ws_handle: Option>>, - response_tx: broadcast::Sender, - pub response_rx: broadcast::Receiver, + read_sender: mpsc::Sender, + write_sender: mpsc::Sender, + write_shutdown_sender: oneshot::Sender<()>, + read_shutdown_sender: oneshot::Sender<()>, + read_handle: JoinHandle<()>, + write_handle: JoinHandle<()>, } -impl SignalClient { - pub fn new() -> Self { - let (tx, rx) = broadcast::channel(16); +pub async fn connect(url: &str, token: &str) -> SignalResult { + let mut lk_url = url::Url::parse(url)?; + lk_url.set_path("/rtc"); + lk_url + .query_pairs_mut() + .append_pair("access_token", token) + .append_pair("protocol", PROTOCOL_VERSION.to_string().as_str()); - Self { - response_tx: tx, - response_rx: rx, - ws_handle: None, - } - } + let (ws_stream, _) = connect_async(lk_url).await?; + let (ws_writer, ws_reader) = ws_stream.split(); + + let (read_tx, read_rx) = mpsc::channel::(1); + let (write_tx, write_rx) = mpsc::channel::(1); + let (read_shutdown_tx, read_shutdown_rx) = oneshot::channel(); + let (write_shutdown_tx, write_shutdown_rx) = oneshot::channel(); - pub async fn connect(&mut self, url: &str, token: &str) -> Result<(), SignalClientError> { - let mut lk_url = url::Url::parse(url)?; - lk_url.set_path("/rtc"); - lk_url - .query_pairs_mut() - .append_pair("access_token", token) - .append_pair("protocol", "8"); + let read_handle = tokio::spawn(SignalClient::ws_read(read_rx, ws_reader, read_shutdown_rx)); + let write_handle = tokio::spawn(SignalClient::ws_write( + write_rx, + ws_writer, + write_shutdown_rx, + )); - info!("Connecting to {}", lk_url); - let (ws, _) = connect_async(&lk_url).await?; + Ok(SignalClient { + read_sender: read_tx, + write_sender: write_tx, + write_shutdown_sender: write_shutdown_tx, + read_shutdown_sender: read_shutdown_tx, + read_handle, + write_handle, + }) +} - self.ws_handle = Some(tokio::spawn(Self::handle_ws(ws, self.response_tx.clone()))); - Ok(()) +impl SignalClient { + pub async fn close(self) { + let _ = self.write_shutdown_sender.send(()); + let _ = self.write_handle.await; + let _ = self.read_shutdown_sender.send(()); + let _ = self.read_handle.await; } - pub async fn disconnect() { - unimplemented!() + pub async fn recv(&self) -> SignalResult { + let (send, recv) = oneshot::channel(); + let msg = RecvMessage { response_chn: send }; + let _ = self.read_sender.send(msg).await; + recv.await.expect("channel closed") } - async fn handle_ws( - mut ws: WebSocketStream>, - response_tx: broadcast::Sender, - ) -> Result<(), SignalClientError> { + pub async fn send(&self, signal: signal_request::Message) -> SignalResult<()> { + let (send, recv) = oneshot::channel(); + let msg = SendMessage { + signal, + response_chn: send, + }; + let _ = self.write_sender.send(msg).await; + recv.await.expect("channel closed") + } + + async fn ws_write( + mut write_receiver: mpsc::Receiver, + mut ws_writer: SplitSink, + mut shutdown_receiver: oneshot::Receiver<()>, + ) { loop { tokio::select! { - next_msg = ws.next() => { - let ws_msg = match next_msg { - Some(msg) => msg?, - None => break, + Some(msg) = write_receiver.recv() => { + let req = SignalRequest { + message: Some(msg.signal), }; - let data = match ws_msg { - Message::Binary(data) => data, - Message::Ping(data) => { - ws.send(Message::Pong(data)).await?; - continue - }, - Message::Close(_frame) => break, - _ => continue, - }; + let write_res = ws_writer.send(Message::Binary(req.encode_to_vec())).await; + if let Err(err) = write_res { + error!("failed to send message to ws: {:?}", err); + let _ = msg.response_chn.send(Err(err.into())); + break; + } - let proto_msg = proto::SignalResponse::decode(data.borrow())?; - let signal_response = proto_msg.message.unwrap(); + let _ = msg.response_chn.send(Ok(())); + }, + _ = (&mut shutdown_receiver) => { + let _ = ws_writer.send(Message::Close(Some(CloseFrame { + code: CloseCode::Normal, + reason: "disconnected by client".into() + }))).await; + let _ = ws_writer.flush().await; + break; + } + } + } + } - match signal_response { - signal_response::Message::Pong(ts) => { + async fn ws_read( + mut write_receiver: mpsc::Receiver, + mut ws_reader: SplitStream, + mut shutdown_receiver: oneshot::Receiver<()>, + ) { + loop { + tokio::select! { + Some(msg) = write_receiver.recv() => { + let read = ws_reader.next().await; + if read.is_none() { + let _ = msg.response_chn.send(Err(SignalError::WsError(WsError::ConnectionClosed))); + break; + } + let read = read.unwrap(); + match read { + Ok(Message::Binary(data)) => { + let res = SignalResponse::decode(data.as_slice()).expect("failed to decode incoming SignalResponse"); - }, + // TODO(theomonnon) Handle Message::Pong + let res = res.message.unwrap(); + let _ = msg.response_chn.send(Ok(res)); + } _ => { - response_tx.send(signal_response).unwrap(); + error!("unhandled websocket message: {:?}", read); + let _ = msg.response_chn.send(Err(SignalError::WsError(WsError::ConnectionClosed))); + break; } } - } + }, + _ = (&mut shutdown_receiver) => break } } - - Ok(()) } } + +#[tokio::test] +async fn test_test() { + env_logger::init(); + let client = connect("ws://localhost:7880", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2NzEyMzk4NjAsImlzcyI6IkFQSXpLYkFTaUNWYWtnSiIsIm5hbWUiOiJ0ZXN0IiwibmJmIjoxNjY0MDM5ODYwLCJzdWIiOiJ0ZXN0IiwidmlkZW8iOnsicm9vbUFkbWluIjp0cnVlLCJyb29tQ3JlYXRlIjp0cnVlLCJyb29tSm9pbiI6dHJ1ZX19.0Bee2jI2cSZveAbZ8MLc-ADoMYQ4l8IRxcAxpXAS6a8").await.unwrap(); + let msg = client.recv().await.unwrap(); + + client.close().await; + info!("Received message {:?}", msg); +} diff --git a/crates/livekit-webrtc/libwebrtc-sys/.cargo/config b/crates/livekit-webrtc/libwebrtc-sys/.cargo/config deleted file mode 100644 index 0c17df095..000000000 --- a/crates/livekit-webrtc/libwebrtc-sys/.cargo/config +++ /dev/null @@ -1,2 +0,0 @@ -[target.x86_64-pc-windows-msvc] -rustflags = ["-C", "target-feature=+crt-static"] \ No newline at end of file diff --git a/crates/livekit-webrtc/libwebrtc-sys/include/livekit/jsep.h b/crates/livekit-webrtc/libwebrtc-sys/include/livekit/jsep.h index cf977832c..7c1d04418 100644 --- a/crates/livekit-webrtc/libwebrtc-sys/include/livekit/jsep.h +++ b/crates/livekit-webrtc/libwebrtc-sys/include/livekit/jsep.h @@ -19,12 +19,15 @@ class IceCandidate { explicit IceCandidate( std::unique_ptr ice_candidate); + rust::String stringify() const; std::unique_ptr release(); private: std::unique_ptr ice_candidate_; }; +std::unique_ptr create_ice_candidate(rust::String sdp_mid, int sdp_mline_index, rust::String sdp); + static std::unique_ptr _unique_ice_candidate() { return nullptr; // Ignore } @@ -42,6 +45,8 @@ class SessionDescription { std::unique_ptr session_description_; }; +std::unique_ptr create_session_description(SdpType type, rust::String sdp); + static std::unique_ptr _unique_session_description() { return nullptr; // Ignore } diff --git a/crates/livekit-webrtc/libwebrtc-sys/include/livekit/peer_connection.h b/crates/livekit-webrtc/libwebrtc-sys/include/livekit/peer_connection.h index 7dd3afb18..e0f7e6743 100644 --- a/crates/livekit-webrtc/libwebrtc-sys/include/livekit/peer_connection.h +++ b/crates/livekit-webrtc/libwebrtc-sys/include/livekit/peer_connection.h @@ -34,6 +34,10 @@ class PeerConnection { std::unique_ptr init); void add_ice_candidate(std::unique_ptr candidate, NativeAddIceCandidateObserver& observer); + std::unique_ptr local_description() const; + std::unique_ptr remote_description() const; + SignalingState signaling_state() const; + IceGatheringState ice_gathering_state() const; void close(); private: diff --git a/crates/livekit-webrtc/libwebrtc-sys/include/livekit/rust_types.h b/crates/livekit-webrtc/libwebrtc-sys/include/livekit/rust_types.h index ed61c5495..73a696bf7 100644 --- a/crates/livekit-webrtc/libwebrtc-sys/include/livekit/rust_types.h +++ b/crates/livekit-webrtc/libwebrtc-sys/include/livekit/rust_types.h @@ -17,6 +17,12 @@ struct DataChannelObserverWrapper; struct AddIceCandidateObserverWrapper; // Shared types +enum class PeerConnectionState; +enum class SignalingState; +enum class IceConnectionState; +enum class IceGatheringState; +enum class SdpType; +struct SdpParseError; struct RTCOfferAnswerOptions; struct RTCError; struct DataChannelInit; diff --git a/crates/livekit-webrtc/libwebrtc-sys/src/data_channel.rs b/crates/livekit-webrtc/libwebrtc-sys/src/data_channel.rs index 98b187a3d..26c59916a 100644 --- a/crates/livekit-webrtc/libwebrtc-sys/src/data_channel.rs +++ b/crates/livekit-webrtc/libwebrtc-sys/src/data_channel.rs @@ -3,7 +3,7 @@ use std::slice; #[cxx::bridge(namespace = "livekit")] pub mod ffi { #[derive(Debug)] - #[repr(u32)] + #[repr(i32)] pub enum Priority { VeryLow, Low, @@ -36,7 +36,7 @@ pub mod ffi { } #[derive(Debug)] - #[repr(u32)] + #[repr(i32)] pub enum DataState { Connecting, Open, diff --git a/crates/livekit-webrtc/libwebrtc-sys/src/jsep.cpp b/crates/livekit-webrtc/libwebrtc-sys/src/jsep.cpp index e214b42ec..5ed542a00 100644 --- a/crates/livekit-webrtc/libwebrtc-sys/src/jsep.cpp +++ b/crates/livekit-webrtc/libwebrtc-sys/src/jsep.cpp @@ -4,22 +4,51 @@ #include "livekit/jsep.h" +#include #include #include "libwebrtc-sys/src/jsep.rs.h" #include "livekit/rtc_error.h" -#include "rtc_base/ref_counted_object.h" namespace livekit { +const std::string& serialize_sdp_error(webrtc::SdpParseError error) { + std::stringstream ss; + ss << std::hex << std::setfill('0'); + ss << std::setw(8) << (uint32_t)error.line.length(); + ss << std::dec << std::setw(1) << error.line; + ss << std::dec << std::setw(1) << error.description; + return ss.str(); +} + IceCandidate::IceCandidate( std::unique_ptr ice_candidate) : ice_candidate_(std::move(ice_candidate)) {} +rust::String IceCandidate::stringify() const { + std::string str; + ice_candidate_->ToString(&str); + return rust::String{str}; +} + std::unique_ptr IceCandidate::release() { return std::move(ice_candidate_); } +std::unique_ptr create_ice_candidate(rust::String sdp_mid, + int sdp_mline_index, + rust::String sdp) { + webrtc::SdpParseError error; + auto ice_rtc = webrtc::CreateIceCandidate(sdp_mid.c_str(), sdp_mline_index, + sdp.c_str(), &error); + if (!ice_rtc) { + throw std::runtime_error(serialize_sdp_error(error)); + } + + return std::make_unique( + std::unique_ptr(ice_rtc)); +} + SessionDescription::SessionDescription( std::unique_ptr session_description) : session_description_(std::move(session_description)) {} @@ -39,6 +68,19 @@ SessionDescription::release() { return std::move(session_description_); } +std::unique_ptr create_session_description( + SdpType type, + rust::String sdp) { + webrtc::SdpParseError error; + auto rtc_sdp = webrtc::CreateSessionDescription( + static_cast(type), sdp.c_str(), &error); + if (!rtc_sdp) { + throw std::runtime_error(serialize_sdp_error(error)); + } + + return std::make_unique(std::move(rtc_sdp)); +} + // CreateSdpObserver NativeCreateSdpObserver::NativeCreateSdpObserver( diff --git a/crates/livekit-webrtc/libwebrtc-sys/src/jsep.rs b/crates/livekit-webrtc/libwebrtc-sys/src/jsep.rs index f752f990b..0a1cc4ff7 100644 --- a/crates/livekit-webrtc/libwebrtc-sys/src/jsep.rs +++ b/crates/livekit-webrtc/libwebrtc-sys/src/jsep.rs @@ -1,4 +1,6 @@ -use std::fmt::{Debug, Formatter}; +use std::error::Error; +use std::fmt::{Debug, Display, Formatter}; +use std::str::FromStr; use cxx::UniquePtr; @@ -6,6 +8,21 @@ use crate::rtc_error::ffi::RTCError; #[cxx::bridge(namespace = "livekit")] pub mod ffi { + #[derive(Debug)] + #[repr(i32)] + pub enum SdpType { + Offer, + PrAnswer, + Answer, + Rollback, + } + + #[derive(Debug)] + pub struct SdpParseError { + pub line: String, + pub description: String, + } + extern "Rust" { type CreateSdpObserverWrapper; fn on_success( @@ -32,6 +49,8 @@ pub mod ffi { type NativeSetLocalSdpObserverHandle; type NativeSetRemoteSdpObserverHandle; + fn stringify(self: &IceCandidate) -> String; + fn stringify(self: &SessionDescription) -> String; fn clone(self: &SessionDescription) -> UniquePtr; @@ -45,8 +64,19 @@ pub mod ffi { observer: Box, ) -> UniquePtr; + fn create_ice_candidate(sdp_mid: String, sdp_mline_index: i32, sdp: String) -> Result>; + fn create_session_description(sdp_type: SdpType, sdp: String) -> Result>; + fn _unique_ice_candidate() -> UniquePtr; // Ignore - fn _unique_session_description() -> UniquePtr; // Ignore + fn _unique_session_description() -> UniquePtr; // Ignore + } +} + +impl Error for ffi::SdpParseError {} + +impl Display for ffi::SdpParseError { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "SdpParseError occurred {}: {}", self.line, self.description) } } @@ -57,14 +87,46 @@ impl Debug for ffi::SessionDescription { } unsafe impl Send for ffi::SessionDescription {} +unsafe impl Sync for ffi::SessionDescription {} impl Debug for ffi::IceCandidate { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - write!(f, "TODO") // TODO(theomonnom) + write!(f, "{}", self.stringify()) } } unsafe impl Send for ffi::IceCandidate {} +unsafe impl Sync for ffi::IceCandidate {} + +impl ffi::SdpParseError { + /// # Safety + /// The value must be correctly encoded + pub unsafe fn from(value: &str) -> Self { + // Parse the hex encoded error from c++ + let line_length = u32::from_str_radix(&value[0..8], 16).unwrap() as usize + 8; + let line = String::from(&value[8..line_length]); + let description = String::from(&value[line_length..]); + + Self { + line, + description, + } + } +} + +impl FromStr for ffi::SdpType { + type Err = (); + + fn from_str(s: &str) -> Result { + match s { + "offer" => Ok(ffi::SdpType::Offer), + "pranswer" => Ok(ffi::SdpType::PrAnswer), + "answer" => Ok(ffi::SdpType::Answer), + "rollback" => Ok(ffi::SdpType::Rollback), + _ => Err(()), + } + } +} // CreateSdpObserver @@ -130,3 +192,37 @@ impl SetRemoteSdpObserverWrapper { self.observer.on_set_remote_description_complete(error); } } + +#[cfg(test)] +mod tests { + use log::info; + + use crate::jsep::ffi; + + #[test] + fn throw_error() { + let sdp_string = "v=0 +o=- 6549709950142776241 2 IN IP4 127.0.0.1 +s=- +t=0 0 +======================== ERROR HERE +a=group:BUNDLE 0 +a=extmap-allow-mixed +a=msid-semantic: WMS +m=application 9 UDP/DTLS/SCTP webrtc-datachannel +c=IN IP4 0.0.0.0 +a=ice-ufrag:Tw7h +a=ice-pwd:6XOVUD6HpcB4c1M8EB8jXJE9 +a=ice-options:trickle +a=fingerprint:sha-256 4F:EC:23:59:5D:A5:E6:3E:3E:5D:8A:09:B6:FA:04:AA:19:99:49:67:BD:65:93:06:BB:EE:AC:D5:21:0F:57:D6 +a=setup:actpass +a=mid:0 +a=sctp-port:5000 +a=max-message-size:262144 +"; + + let sdp = ffi::create_session_description(ffi::SdpType::Offer, sdp_string.to_string()); + let err = unsafe { ffi::SdpParseError::from(sdp.err().unwrap().what()) }; + info!("parse err: {:?}", err) + } +} diff --git a/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection.cpp b/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection.cpp index 3b189766f..1d020c8e1 100644 --- a/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection.cpp +++ b/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection.cpp @@ -76,10 +76,35 @@ void PeerConnection::add_ice_candidate( [&](const webrtc::RTCError& err) { observer.OnComplete(to_error(err)); }); } +std::unique_ptr PeerConnection::local_description() const { + auto local_description = peer_connection_->local_description(); + if (local_description) + return std::make_unique(local_description->Clone()); + + return std::unique_ptr(); +} + +std::unique_ptr PeerConnection::remote_description() const { + auto remote_description = peer_connection_->remote_description(); + if (remote_description) + return std::make_unique(remote_description->Clone()); + + return std::unique_ptr(); +} + +SignalingState PeerConnection::signaling_state() const { + return static_cast(peer_connection_->signaling_state()); +} + +IceGatheringState PeerConnection::ice_gathering_state() const { + return static_cast(peer_connection_->ice_gathering_state()); +} + void PeerConnection::close() { peer_connection_->Close(); } + // AddIceCandidateObserver NativeAddIceCandidateObserver::NativeAddIceCandidateObserver( diff --git a/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection.rs b/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection.rs index 2bd844202..4e5f10cef 100644 --- a/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection.rs +++ b/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection.rs @@ -23,7 +23,7 @@ pub mod ffi { } #[derive(Debug)] - #[repr(u32)] + #[repr(i32)] pub enum PeerConnectionState { New, Connecting, @@ -34,7 +34,7 @@ pub mod ffi { } #[derive(Debug)] - #[repr(u32)] + #[repr(i32)] pub enum SignalingState { Stable, HaveLocalOffer, @@ -45,7 +45,7 @@ pub mod ffi { } #[derive(Debug)] - #[repr(u32)] + #[repr(i32)] pub enum IceConnectionState { IceConnectionNew, IceConnectionChecking, @@ -58,7 +58,7 @@ pub mod ffi { } #[derive(Debug)] - #[repr(u32)] + #[repr(i32)] pub enum IceGatheringState { IceGatheringNew, IceGatheringGathering, @@ -158,6 +158,14 @@ pub mod ffi { observer: Pin<&mut NativeAddIceCandidateObserver>, ); + fn local_description(self: &PeerConnection) -> UniquePtr; + + fn remote_description(self: &PeerConnection) -> UniquePtr; + + fn signaling_state(self: &PeerConnection) -> SignalingState; + + fn ice_gathering_state(self: &PeerConnection) -> IceGatheringState; + fn close(self: Pin<&mut PeerConnection>); fn create_native_peer_connection_observer( @@ -245,11 +253,23 @@ pub mod ffi { } // https://webrtc.github.io/webrtc-org/native-code/native-apis/ -unsafe impl Sync for ffi::PeerConnection {} - unsafe impl Send for ffi::PeerConnection {} +unsafe impl Sync for ffi::PeerConnection {} unsafe impl Send for ffi::NativePeerConnectionObserver {} +unsafe impl Sync for ffi::NativePeerConnectionObserver {} + +unsafe impl Sync for ffi::NativeAddIceCandidateObserver {} +unsafe impl Send for ffi::NativeAddIceCandidateObserver {} + +unsafe impl Sync for ffi::NativeSetRemoteSdpObserverHandle {} +unsafe impl Send for ffi::NativeSetRemoteSdpObserverHandle {} + +unsafe impl Sync for ffi::NativeSetLocalSdpObserverHandle {} +unsafe impl Send for ffi::NativeSetLocalSdpObserverHandle {} + +unsafe impl Sync for ffi::NativeCreateSdpObserverHandle {} +unsafe impl Send for ffi::NativeCreateSdpObserverHandle {} impl Default for ffi::RTCOfferAnswerOptions { /* @@ -272,17 +292,21 @@ impl Default for ffi::RTCOfferAnswerOptions { } } +pub trait AddIceCandidateObserver: Send { + fn on_complete(&self, error: RTCError); +} + pub struct AddIceCandidateObserverWrapper { - observer: Box, + observer: Box, } impl AddIceCandidateObserverWrapper { - pub fn new(observer: Box) -> Self { + pub fn new(observer: Box) -> Self { Self { observer } } fn on_complete(&self, error: RTCError) { - (self.observer)(error); + self.observer.on_complete(error); } } diff --git a/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection_factory.cpp b/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection_factory.cpp index 1de4ff4d0..07cb05ca6 100644 --- a/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection_factory.cpp +++ b/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection_factory.cpp @@ -86,8 +86,13 @@ std::unique_ptr create_rtc_configuration( for (auto& url : item.urls) { ice_server.urls.emplace_back(url.c_str()); } - rtc->servers.push_back(ice_server); + rtc->continual_gathering_policy = + static_cast( + conf.continual_gathering_policy); + + rtc->type = static_cast( + conf.ice_transport_type); } return rtc; diff --git a/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection_factory.rs b/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection_factory.rs index 4116eca79..5c45e3112 100644 --- a/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection_factory.rs +++ b/crates/livekit-webrtc/libwebrtc-sys/src/peer_connection_factory.rs @@ -12,9 +12,27 @@ pub mod ffi { pub password: String, } + #[derive(Debug)] + #[repr(i32)] + pub enum ContinualGatheringPolicy { + GatherOnce, + GatherContinually, + } + + #[derive(Debug)] + #[repr(i32)] + pub enum IceTransportsType { + None, + Relay, + NoHost, + All, + } + #[derive(Debug, Clone)] pub struct RTCConfiguration { pub ice_servers: Vec, + pub continual_gathering_policy: ContinualGatheringPolicy, + pub ice_transport_type: IceTransportsType, } unsafe extern "C++" { @@ -22,7 +40,7 @@ pub mod ffi { type PeerConnection = crate::peer_connection::ffi::PeerConnection; type NativePeerConnectionObserver = - crate::peer_connection::ffi::NativePeerConnectionObserver; + crate::peer_connection::ffi::NativePeerConnectionObserver; type PeerConnectionFactory; type NativeRTCConfiguration; @@ -38,3 +56,6 @@ pub mod ffi { ) -> Result>; } } + +unsafe impl Send for ffi::PeerConnectionFactory {} +unsafe impl Sync for ffi::PeerConnectionFactory {} diff --git a/crates/livekit-webrtc/libwebrtc-sys/src/rtc_error.rs b/crates/livekit-webrtc/libwebrtc-sys/src/rtc_error.rs index 0c2e1bd73..c3839e354 100644 --- a/crates/livekit-webrtc/libwebrtc-sys/src/rtc_error.rs +++ b/crates/livekit-webrtc/libwebrtc-sys/src/rtc_error.rs @@ -7,7 +7,7 @@ use std::fmt::{Display, Formatter}; #[cxx::bridge(namespace = "livekit")] pub mod ffi { #[derive(Debug)] - #[repr(u32)] + #[repr(i32)] pub enum RTCErrorType { None, UnsupportedOperation, @@ -24,7 +24,7 @@ pub mod ffi { } #[derive(Debug)] - #[repr(u32)] + #[repr(i32)] pub enum RTCErrorDetailType { None, DataChannelFailure, diff --git a/crates/livekit-webrtc/libwebrtc-sys/src/webrtc.rs b/crates/livekit-webrtc/libwebrtc-sys/src/webrtc.rs index d890fc9b0..74df99260 100644 --- a/crates/livekit-webrtc/libwebrtc-sys/src/webrtc.rs +++ b/crates/livekit-webrtc/libwebrtc-sys/src/webrtc.rs @@ -10,3 +10,6 @@ pub mod ffi { fn create_rtc_runtime() -> UniquePtr; } } + +unsafe impl Send for ffi::RTCRuntime {} +unsafe impl Sync for ffi::RTCRuntime {} diff --git a/crates/livekit-webrtc/src/data_channel.rs b/crates/livekit-webrtc/src/data_channel.rs index ad5145d86..5a0d0129a 100644 --- a/crates/livekit-webrtc/src/data_channel.rs +++ b/crates/livekit-webrtc/src/data_channel.rs @@ -128,14 +128,14 @@ impl Default for InternalDataChannelObserver { #[derive(Debug)] pub struct DataChannelInit { #[deprecated] - reliable: bool, - ordered: bool, - max_retransmit_time: Option, - max_retransmits: Option, - protocol: String, - negotiated: bool, - id: i32, - priority: Option, + pub reliable: bool, + pub ordered: bool, + pub max_retransmit_time: Option, + pub max_retransmits: Option, + pub protocol: String, + pub negotiated: bool, + pub id: i32, + pub priority: Option, } impl Default for DataChannelInit { diff --git a/crates/livekit-webrtc/src/jsep.rs b/crates/livekit-webrtc/src/jsep.rs index d979b9ae6..75e80edee 100644 --- a/crates/livekit-webrtc/src/jsep.rs +++ b/crates/livekit-webrtc/src/jsep.rs @@ -1,13 +1,24 @@ use cxx::UniquePtr; - use libwebrtc_sys::jsep as sys_jsep; +pub use sys_jsep::ffi::{SdpType, SdpParseError}; + +// TODO Maybe we can replace that by a serialized IceCandidateInit #[derive(Debug)] pub struct IceCandidate { cxx_handle: UniquePtr, } impl IceCandidate { + pub fn from(sdp_mid: &str, sdp_mline_index: i32, sdp: &str) -> Result { + let res = sys_jsep::ffi::create_ice_candidate(sdp_mid.to_string(), sdp_mline_index, sdp.to_string()); + + match res { + Ok(cxx_handle) => Ok(IceCandidate::new(cxx_handle)), + Err(e) => Err(unsafe { SdpParseError::from(e.what()) }), + } + } + pub(crate) fn new(cxx_handle: UniquePtr) -> Self { Self { cxx_handle } } @@ -17,12 +28,27 @@ impl IceCandidate { } } +impl ToString for IceCandidate { + fn to_string(&self) -> String { + self.cxx_handle.stringify() + } +} + #[derive(Debug)] pub struct SessionDescription { cxx_handle: UniquePtr, } impl SessionDescription { + pub fn from(sdp_type: SdpType, description: &str) -> Result { + let res = sys_jsep::ffi::create_session_description(sdp_type, description.to_string()); + + match res { + Ok(cxx_handle) => Ok(SessionDescription::new(cxx_handle)), + Err(e) => Err(unsafe { SdpParseError::from(e.what()) }), + } + } + pub(crate) fn new(cxx_handle: UniquePtr) -> Self { Self { cxx_handle } } @@ -32,6 +58,12 @@ impl SessionDescription { } } +impl ToString for SessionDescription { + fn to_string(&self) -> String { + self.cxx_handle.stringify() + } +} + impl Clone for SessionDescription { fn clone(&self) -> Self { SessionDescription::new(self.cxx_handle.clone()) diff --git a/crates/livekit-webrtc/src/peer_connection.rs b/crates/livekit-webrtc/src/peer_connection.rs index d310435c0..a4860aadd 100644 --- a/crates/livekit-webrtc/src/peer_connection.rs +++ b/crates/livekit-webrtc/src/peer_connection.rs @@ -1,9 +1,7 @@ -use std::fmt::Debug; use std::sync::{Arc, Mutex}; use cxx::UniquePtr; use log::trace; -use thiserror::Error; use tokio::sync::mpsc; use libwebrtc_sys::data_channel as sys_dc; @@ -22,14 +20,6 @@ use crate::rtc_error::RTCError; use crate::rtp_receiver::RtpReceiver; use crate::rtp_transceiver::RtpTransceiver; -#[derive(Error, Debug)] -pub enum SdpError { - #[error("recv failure: {0}")] - RecvError(String), - #[error("internal libwebrtc error")] - RTCError(#[from] RTCError), -} - pub struct PeerConnection { cxx_handle: UniquePtr, observer: Box, @@ -51,7 +41,7 @@ impl PeerConnection { } } - pub async fn create_offer(&mut self) -> Result { + pub async fn create_offer(&mut self, options: RTCOfferAnswerOptions) -> Result { let (tx, mut rx) = mpsc::channel(1); let wrapper = @@ -62,16 +52,13 @@ impl PeerConnection { unsafe { self.cxx_handle .pin_mut() - .create_offer(native_wrapper.pin_mut(), RTCOfferAnswerOptions::default()); + .create_offer(native_wrapper.pin_mut(), options); } - match rx.recv().await { - Some(value) => value.map_err(Into::into), - None => Err(SdpError::RecvError("channel closed".to_string())), - } + rx.recv().await.unwrap() } - pub async fn create_answer(&mut self) -> Result { + pub async fn create_answer(&mut self, options: RTCOfferAnswerOptions) -> Result { let (tx, mut rx) = mpsc::channel(1); let wrapper = @@ -82,19 +69,16 @@ impl PeerConnection { unsafe { self.cxx_handle .pin_mut() - .create_answer(native_wrapper.pin_mut(), RTCOfferAnswerOptions::default()); + .create_answer(native_wrapper.pin_mut(), options); } - match rx.recv().await { - Some(value) => value.map_err(Into::into), - None => Err(SdpError::RecvError("channel closed".to_string())), - } + rx.recv().await.unwrap() } pub async fn set_local_description( &mut self, desc: SessionDescription, - ) -> Result<(), SdpError> { + ) -> Result<(), RTCError> { let (tx, mut rx) = mpsc::channel(1); let wrapper = sys_jsep::SetLocalSdpObserverWrapper::new(Box::new(InternalSetLocalSdpObserver { tx })); @@ -107,16 +91,13 @@ impl PeerConnection { .set_local_description(desc.release(), native_wrapper.pin_mut()); } - match rx.recv().await { - Some(value) => value.map_err(Into::into), - None => Err(SdpError::RecvError("channel closed".to_string())), - } + rx.recv().await.unwrap() } pub async fn set_remote_description( &mut self, desc: SessionDescription, - ) -> Result<(), SdpError> { + ) -> Result<(), RTCError> { let (tx, mut rx) = mpsc::channel(1); let wrapper = sys_jsep::SetRemoteSdpObserverWrapper::new(Box::new(InternalSetRemoteSdpObserver { @@ -131,10 +112,7 @@ impl PeerConnection { .set_remote_description(desc.release(), native_wrapper.pin_mut()); } - match rx.recv().await { - Some(value) => value.map_err(Into::into), - None => Err(SdpError::RecvError("channel closed".to_string())), - } + rx.recv().await.unwrap() } pub fn create_data_channel( @@ -154,10 +132,11 @@ impl PeerConnection { } } - pub async fn add_ice_candidate(&mut self, candidate: IceCandidate) -> Result<(), SdpError> { + // TODO(theomonnom) Use IceCandidateInit instead of IceCandidate + pub async fn add_ice_candidate(&mut self, candidate: IceCandidate) -> Result<(), RTCError> { let (tx, mut rx) = mpsc::channel(1); - let observer = sys_pc::AddIceCandidateObserverWrapper::new(Box::new(move |error| { - tx.blocking_send(error).unwrap(); + let observer = sys_pc::AddIceCandidateObserverWrapper::new(Box::new(InternalAddIceCandidateObserver { + tx, })); let mut native_observer = @@ -166,12 +145,35 @@ impl PeerConnection { .pin_mut() .add_ice_candidate(candidate.release(), native_observer.pin_mut()); - match rx.recv().await { - Some(value) => Ok(()), - None => Err(SdpError::RecvError("channel closed".to_string())), + rx.recv().await.unwrap() + } + + pub fn local_description(&self) -> Option { + let local_description = self.cxx_handle.local_description(); + if local_description.is_null() { + None + } else { + Some(SessionDescription::new(local_description)) + } + } + + pub fn remote_description(&self) -> Option { + let remote_description = self.cxx_handle.remote_description(); + if remote_description.is_null() { + None + } else { + Some(SessionDescription::new(remote_description)) } } + pub fn signaling_state(&self) -> SignalingState { + self.cxx_handle.signaling_state() + } + + pub fn ice_gathering_state(&self) -> IceGatheringState { + self.cxx_handle.ice_gathering_state() + } + pub fn close(&mut self) { self.cxx_handle.pin_mut().close(); } @@ -286,6 +288,20 @@ impl PeerConnection { } } + +// SetLocalSdpObserver + +struct InternalAddIceCandidateObserver { + tx: mpsc::Sender>, +} + +impl sys_pc::AddIceCandidateObserver for InternalAddIceCandidateObserver { + fn on_complete(&self, error: RTCError) { + let res = if error.ok() { Ok(()) } else { Err(error) }; + let _ = self.tx.blocking_send(res); + } +} + // CreateSdpObserver struct InternalCreateSdpObserver { @@ -297,13 +313,11 @@ impl sys_jsep::CreateSdpObserver for InternalCreateSdpObserver { &self, session_description: UniquePtr, ) { - self.tx - .blocking_send(Ok(SessionDescription::new(session_description))) - .unwrap(); + let _ = self.tx.blocking_send(Ok(SessionDescription::new(session_description))); } fn on_failure(&self, error: RTCError) { - self.tx.blocking_send(Err(error)).unwrap(); + let _ = self.tx.blocking_send(Err(error)); } } @@ -316,7 +330,7 @@ struct InternalSetLocalSdpObserver { impl sys_jsep::SetLocalSdpObserver for InternalSetLocalSdpObserver { fn on_set_local_description_complete(&self, error: RTCError) { let res = if error.ok() { Ok(()) } else { Err(error) }; - self.tx.blocking_send(res).unwrap(); + let _ = self.tx.blocking_send(res); } } @@ -329,7 +343,7 @@ struct InternalSetRemoteSdpObserver { impl sys_jsep::SetRemoteSdpObserver for InternalSetRemoteSdpObserver { fn on_set_remote_description_complete(&self, error: RTCError) { let res = if error.ok() { Ok(()) } else { Err(error) }; - self.tx.blocking_send(res).unwrap(); + let _ = self.tx.blocking_send(res); } } @@ -344,16 +358,16 @@ pub type OnRenegotiationNeededHandler = Box; pub type OnNegotiationNeededEventHandler = Box; pub type OnIceConnectionChangeHandler = Box; pub type OnStandardizedIceConnectionChangeHandler = -Box; + Box; pub type OnConnectionChangeHandler = Box; pub type OnIceGatheringChangeHandler = Box; pub type OnIceCandidateHandler = Box; pub type OnIceCandidateErrorHandler = -Box; + Box; pub type OnIceCandidatesRemovedHandler = Box) + Send + Sync>; pub type OnIceConnectionReceivingChangeHandler = Box; pub type OnIceSelectedCandidatePairChangedHandler = -Box; + Box; pub type OnAddTrackHandler = Box) + Send + Sync>; pub type OnTrackHandler = Box; pub type OnRemoveTrackHandler = Box; @@ -368,16 +382,16 @@ pub(crate) struct InternalObserver { on_negotiation_needed_event_handler: Arc>>, on_ice_connection_change_handler: Arc>>, on_standardized_ice_connection_change_handler: - Arc>>, + Arc>>, on_connection_change_handler: Arc>>, on_ice_gathering_change_handler: Arc>>, on_ice_candidate_handler: Arc>>, on_ice_candidate_error_handler: Arc>>, on_ice_candidates_removed_handler: Arc>>, on_ice_connection_receiving_change_handler: - Arc>>, + Arc>>, on_ice_selected_candidate_pair_changed_handler: - Arc>>, + Arc>>, on_add_track_handler: Arc>>, on_track_handler: Arc>>, on_remove_track_handler: Arc>>, @@ -607,6 +621,8 @@ impl sys_pc::PeerConnectionObserver for InternalObserver { mod tests { use log::trace; use tokio::sync::mpsc; + use libwebrtc_sys::peer_connection::ffi::RTCOfferAnswerOptions; + use libwebrtc_sys::peer_connection_factory::ffi::{ContinualGatheringPolicy, IceTransportsType}; use crate::data_channel::{DataChannel, DataChannelInit}; use crate::jsep::IceCandidate; @@ -630,6 +646,8 @@ mod tests { username: "".into(), password: "".into(), }], + continual_gathering_policy: ContinualGatheringPolicy::GatherOnce, + ice_transport_type: IceTransportsType::All }; let mut bob = factory.create_peer_connection(config.clone()).unwrap(); @@ -655,12 +673,12 @@ mod tests { .create_data_channel("test_dc", DataChannelInit::default()) .unwrap(); - let offer = bob.create_offer().await.unwrap(); + let offer = bob.create_offer(RTCOfferAnswerOptions::default()).await.unwrap(); trace!("Bob offer: {:?}", offer); bob.set_local_description(offer.clone()).await.unwrap(); alice.set_remote_description(offer).await.unwrap(); - let answer = alice.create_answer().await.unwrap(); + let answer = alice.create_answer(RTCOfferAnswerOptions::default()).await.unwrap(); trace!("Alice answer: {:?}", answer); alice.set_local_description(answer.clone()).await.unwrap(); bob.set_remote_description(answer).await.unwrap(); diff --git a/crates/livekit-webrtc/src/peer_connection_factory.rs b/crates/livekit-webrtc/src/peer_connection_factory.rs index 6ff7e75f3..5f376129b 100644 --- a/crates/livekit-webrtc/src/peer_connection_factory.rs +++ b/crates/livekit-webrtc/src/peer_connection_factory.rs @@ -2,7 +2,9 @@ use cxx::UniquePtr; use libwebrtc_sys::peer_connection as sys_pc; use libwebrtc_sys::peer_connection_factory as sys_factory; -pub use sys_factory::ffi::{ICEServer, RTCConfiguration}; +pub use sys_factory::ffi::{ + ContinualGatheringPolicy, ICEServer, IceTransportsType, RTCConfiguration, +}; use crate::peer_connection::{InternalObserver, PeerConnection}; use crate::rtc_error::RTCError; diff --git a/crates/livekit-webrtc/src/rtc_error.rs b/crates/livekit-webrtc/src/rtc_error.rs index e054cf4cc..86c81e1e6 100644 --- a/crates/livekit-webrtc/src/rtc_error.rs +++ b/crates/livekit-webrtc/src/rtc_error.rs @@ -1,3 +1,2 @@ // TODO(theomonnom) Wrap the RTCError ffi so we can use Option(u16) pub use libwebrtc_sys::rtc_error::ffi::RTCError; -