Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions moq-native/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use futures::future::BoxFuture;
use futures::stream::{FuturesUnordered, StreamExt};
use futures::FutureExt;

use web_transport::quinn as web_transport_quinn;

#[derive(Parser, Clone)]
pub struct Args {
/// Listen for UDP packets on the given address.
Expand Down Expand Up @@ -96,11 +98,11 @@ impl Endpoint {

pub struct Server {
quic: quinn::Endpoint,
accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<web_transport::Session>>>,
accept: FuturesUnordered<BoxFuture<'static, anyhow::Result<web_transport_quinn::Session>>>,
}

impl Server {
pub async fn accept(&mut self) -> Option<web_transport::Session> {
pub async fn accept(&mut self) -> Option<web_transport_quinn::Session> {
loop {
tokio::select! {
res = self.quic.accept() => {
Expand All @@ -116,7 +118,7 @@ impl Server {
}
}

async fn accept_session(conn: quinn::Incoming) -> anyhow::Result<web_transport::Session> {
async fn accept_session(conn: quinn::Incoming) -> anyhow::Result<web_transport_quinn::Session> {
let mut conn = conn.accept()?;

let handshake = conn
Expand Down Expand Up @@ -155,7 +157,7 @@ impl Server {
_ => anyhow::bail!("unsupported ALPN: {}", alpn),
};

Ok(session.into())
Ok(session)
}

pub fn local_addr(&self) -> anyhow::Result<net::SocketAddr> {
Expand All @@ -171,7 +173,7 @@ pub struct Client {
}

impl Client {
pub async fn connect(&self, mut url: Url) -> anyhow::Result<web_transport::Session> {
pub async fn connect(&self, mut url: Url) -> anyhow::Result<web_transport_quinn::Session> {
let mut config = self.config.clone();

let host = url.host().context("invalid DNS name")?.to_string();
Expand Down Expand Up @@ -231,6 +233,6 @@ impl Client {
_ => unreachable!(),
};

Ok(session.into())
Ok(session)
}
}
2 changes: 1 addition & 1 deletion moq-relay/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn main() -> anyhow::Result<()> {
let mut conn_id = 0;

while let Some(conn) = server.accept().await {
let session = Connection::new(conn_id, conn, cluster.clone());
let session = Connection::new(conn_id, conn.into(), cluster.clone());
conn_id += 1;

tokio::spawn(async move {
Expand Down
6 changes: 4 additions & 2 deletions moq-transfork/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ impl Session {
}

/// Perform the MoQ handshake as a client.
pub async fn connect(mut session: web_transport::Session) -> Result<Self, Error> {
pub async fn connect<T: Into<web_transport::Session>>(session: T) -> Result<Self, Error> {
let mut session = session.into();
let mut stream = Stream::open(&mut session, message::ControlType::Session).await?;
Self::connect_setup(&mut stream).await.or_close(&mut stream)?;
Ok(Self::new(session, stream))
Expand All @@ -74,7 +75,8 @@ impl Session {
}

/// Perform the MoQ handshake as a server
pub async fn accept(mut session: web_transport::Session) -> Result<Self, Error> {
pub async fn accept<T: Into<web_transport::Session>>(session: T) -> Result<Self, Error> {
let mut session = session.into();
let mut stream = Stream::accept(&mut session).await?;
let kind = stream.reader.decode().await?;

Expand Down