From 23f90997b0525631da45d480c5691720ad4740e3 Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Sat, 1 Feb 2025 01:02:40 -0300 Subject: [PATCH 1/2] Have moq-native return web_transport_quinn. A few folks have been asking. --- moq-native/src/quic.rs | 14 ++++++++------ moq-relay/src/cluster.rs | 4 ++-- moq-relay/src/main.rs | 2 +- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/moq-native/src/quic.rs b/moq-native/src/quic.rs index 9b915e6bd..19e1936e3 100644 --- a/moq-native/src/quic.rs +++ b/moq-native/src/quic.rs @@ -10,6 +10,8 @@ use futures::future::BoxFuture; use futures::stream::{FuturesUnordered, StreamExt}; use futures::FutureExt; +use web_transport::quinn as web_transport_quinn; + #[derive(Parser, Clone)] pub struct Args { /// Listen for UDP packets on the given address. @@ -96,11 +98,11 @@ impl Endpoint { pub struct Server { quic: quinn::Endpoint, - accept: FuturesUnordered>>, + accept: FuturesUnordered>>, } impl Server { - pub async fn accept(&mut self) -> Option { + pub async fn accept(&mut self) -> Option { loop { tokio::select! { res = self.quic.accept() => { @@ -116,7 +118,7 @@ impl Server { } } - async fn accept_session(conn: quinn::Incoming) -> anyhow::Result { + async fn accept_session(conn: quinn::Incoming) -> anyhow::Result { let mut conn = conn.accept()?; let handshake = conn @@ -155,7 +157,7 @@ impl Server { _ => anyhow::bail!("unsupported ALPN: {}", alpn), }; - Ok(session.into()) + Ok(session) } pub fn local_addr(&self) -> anyhow::Result { @@ -171,7 +173,7 @@ pub struct Client { } impl Client { - pub async fn connect(&self, mut url: Url) -> anyhow::Result { + pub async fn connect(&self, mut url: Url) -> anyhow::Result { let mut config = self.config.clone(); let host = url.host().context("invalid DNS name")?.to_string(); @@ -231,6 +233,6 @@ impl Client { _ => unreachable!(), }; - Ok(session.into()) + Ok(session) } } diff --git a/moq-relay/src/cluster.rs b/moq-relay/src/cluster.rs index ea62201bd..13b16b69b 100644 --- a/moq-relay/src/cluster.rs +++ b/moq-relay/src/cluster.rs @@ -101,7 +101,7 @@ impl Cluster { let root = Url::parse(&format!("https://{}", root)).context("invalid root URL")?; let root = self.client.connect(root).await.context("failed to connect to root")?; - let mut root = moq_transfork::Session::connect(root) + let mut root = moq_transfork::Session::connect(root.into()) .await .context("failed to establish root session")?; @@ -191,7 +191,7 @@ impl Cluster { // Connect to the remote node. let conn = self.client.connect(url).await.context("failed to connect to remote")?; - let mut session = moq_transfork::Session::connect(conn) + let mut session = moq_transfork::Session::connect(conn.into()) .await .context("failed to establish session")?; diff --git a/moq-relay/src/main.rs b/moq-relay/src/main.rs index b0905ff3f..9d7746fd3 100644 --- a/moq-relay/src/main.rs +++ b/moq-relay/src/main.rs @@ -73,7 +73,7 @@ async fn main() -> anyhow::Result<()> { let mut conn_id = 0; while let Some(conn) = server.accept().await { - let session = Connection::new(conn_id, conn, cluster.clone()); + let session = Connection::new(conn_id, conn.into(), cluster.clone()); conn_id += 1; tokio::spawn(async move { From c1b3c8d5b314511f16c09cc6ff62e3c9a7b9a7ad Mon Sep 17 00:00:00 2001 From: Luke Curley Date: Mon, 3 Feb 2025 15:46:42 -0800 Subject: [PATCH 2/2] Fix da errors. --- moq-relay/src/cluster.rs | 4 ++-- moq-transfork/src/session/mod.rs | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/moq-relay/src/cluster.rs b/moq-relay/src/cluster.rs index 13b16b69b..ea62201bd 100644 --- a/moq-relay/src/cluster.rs +++ b/moq-relay/src/cluster.rs @@ -101,7 +101,7 @@ impl Cluster { let root = Url::parse(&format!("https://{}", root)).context("invalid root URL")?; let root = self.client.connect(root).await.context("failed to connect to root")?; - let mut root = moq_transfork::Session::connect(root.into()) + let mut root = moq_transfork::Session::connect(root) .await .context("failed to establish root session")?; @@ -191,7 +191,7 @@ impl Cluster { // Connect to the remote node. let conn = self.client.connect(url).await.context("failed to connect to remote")?; - let mut session = moq_transfork::Session::connect(conn.into()) + let mut session = moq_transfork::Session::connect(conn) .await .context("failed to establish session")?; diff --git a/moq-transfork/src/session/mod.rs b/moq-transfork/src/session/mod.rs index e00dbb18e..2000f61af 100644 --- a/moq-transfork/src/session/mod.rs +++ b/moq-transfork/src/session/mod.rs @@ -53,7 +53,8 @@ impl Session { } /// Perform the MoQ handshake as a client. - pub async fn connect(mut session: web_transport::Session) -> Result { + pub async fn connect>(session: T) -> Result { + let mut session = session.into(); let mut stream = Stream::open(&mut session, message::ControlType::Session).await?; Self::connect_setup(&mut stream).await.or_close(&mut stream)?; Ok(Self::new(session, stream)) @@ -74,7 +75,8 @@ impl Session { } /// Perform the MoQ handshake as a server - pub async fn accept(mut session: web_transport::Session) -> Result { + pub async fn accept>(session: T) -> Result { + let mut session = session.into(); let mut stream = Stream::accept(&mut session).await?; let kind = stream.reader.decode().await?;