Skip to content

Commit

Permalink
Upgrade quinn to 0.9
Browse files Browse the repository at this point in the history
  • Loading branch information
rasmusgo committed Mar 31, 2023
1 parent 65168fc commit 29310b8
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 29 deletions.
2 changes: 1 addition & 1 deletion hotham-asset-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ version = "0.1.0"
[dependencies]
anyhow = "1.0"
futures-util = {version = "0.3.11", default-features = false}
quinn = {version = "0.8.5", features = ["tls-rustls"]}
quinn = {version = "0.9", features = ["tls-rustls"]}
rustls = {version = "0.20.3", features = ["dangerous_configuration", "quic"]}
tokio = {version = "1.0.1", default-features = false, features = ["sync"]}

Expand Down
17 changes: 5 additions & 12 deletions hotham-asset-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::message::Message;
use crate::AssetUpdatedMessage;
use anyhow::{anyhow, bail, Context, Result};
use futures_util::{StreamExt, TryStreamExt};
use futures_util::TryStreamExt;
use quinn::{ClientConfig, Endpoint};
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
Expand All @@ -23,11 +23,7 @@ async fn run_client(
endpoint.set_default_client_config(client_cfg);

// connect to server
let quinn::NewConnection {
connection,
bi_streams,
..
} = endpoint
let connection = endpoint
.connect(server_addr, "hotham_asset_server")?
.await?;
println!("[CLIENT] connected: addr={}", connection.remote_address());
Expand All @@ -42,7 +38,7 @@ async fn run_client(
.try_collect::<Vec<_>>()
.await?;

wait_for_updates(bi_streams, connection.clone(), sender)
wait_for_updates(connection.clone(), sender)
.await
.context("Watching file")?;

Expand All @@ -55,12 +51,11 @@ async fn run_client(
}

async fn wait_for_updates(
mut bi_streams: quinn::IncomingBiStreams,
connection: quinn::Connection,
sender: Sender<AssetUpdatedMessage>,
) -> Result<()> {
while let Some(stream) = bi_streams.next().await {
let stream = match stream {
loop {
let stream = match connection.accept_bi().await {
Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
println!("[CLIENT] Connection closed");
return Ok(());
Expand All @@ -72,8 +67,6 @@ async fn wait_for_updates(
};
tokio::spawn(handle_incoming(stream, connection.clone(), sender.clone()));
}

Ok(())
}

async fn ask_for_watch(connection: quinn::Connection, asset_name: String) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion hotham-asset-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ anyhow = "1.0"
futures-util = {version = "0.3.11", default-features = false, features = ["io"]}
hotham-asset-client = {path = "../hotham-asset-client"}
notify-debouncer-mini = {version = "*", default-features = false}
quinn = {version = "0.8.5", features = ["tls-rustls"]}
quinn = {version = "0.9", features = ["tls-rustls"]}
rcgen = "0.10.0"
rustls = {version = "0.20.3", features = ["dangerous_configuration", "quic"]}
tokio = {version = "1.21.2", features = ["full"]}
8 changes: 3 additions & 5 deletions hotham-asset-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::{collections::HashMap, sync::Arc, time::SystemTime};
use tokio::sync::Mutex;

use anyhow::Result;
use futures_util::StreamExt;
/// A simple server that serves assets to localhost or remote targets. It's great and has no flaws.
// TODO:
// 1. Accept connections
Expand All @@ -19,16 +18,15 @@ pub type WatchList = Arc<Mutex<HashMap<String, anyhow::Result<SystemTime>>>>;
#[tokio::main]
async fn main() -> Result<()> {
let addr = "0.0.0.0:5000".parse().unwrap();
let (mut incoming, _server_cert) = make_server_endpoint(addr).unwrap();
let (endpoint, _server_cert) = make_server_endpoint(addr).unwrap();

loop {
let incoming_conn = incoming.next().await.unwrap();
let incoming_conn = endpoint.accept().await.unwrap();
let new_conn = incoming_conn.await.unwrap();
let watch_list = WatchList::default();

let watcher_watch_list = watch_list.clone();
let watcher_connection = new_conn.connection.clone();
tokio::spawn(watch_files(watcher_connection, watcher_watch_list));
tokio::spawn(watch_files(new_conn.clone(), watcher_watch_list));
tokio::spawn(handle_connection(new_conn, watch_list.clone()));
}
}
18 changes: 8 additions & 10 deletions hotham-asset-server/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{bail, Result};
use futures_util::{StreamExt, TryFutureExt};
use futures_util::TryFutureExt;
use hotham_asset_client::message::Message;
/// A simple server that serves assets to localhost or remote targets. It's great and has no flaws.
// TODO:
Expand All @@ -8,7 +8,7 @@ use hotham_asset_client::message::Message;
// 3. Watch for file updates
// 4. Send a "file updated" message back to the client on update
// 5. GOTO 2
use quinn::{Endpoint, Incoming, ServerConfig};
use quinn::{Endpoint, ServerConfig};
use std::{
error::Error,
net::SocketAddr,
Expand All @@ -18,11 +18,10 @@ use std::{

use crate::WatchList;

pub async fn handle_connection(conn: quinn::NewConnection, watch_list: WatchList) -> Result<()> {
pub async fn handle_connection(conn: quinn::Connection, watch_list: WatchList) -> Result<()> {
println!("[SERVER] Connection established!");
let mut bi_streams = conn.bi_streams;
while let Some(stream) = bi_streams.next().await {
let (send, recv) = match stream {
loop {
let (send, recv) = match conn.accept_bi().await {
Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
println!("[SERVER] Connection closed");
return Ok(());
Expand All @@ -37,7 +36,6 @@ pub async fn handle_connection(conn: quinn::NewConnection, watch_list: WatchList
.map_err(|e| eprintln!("[SERVER] Error in incoming: {e:?}")),
);
}
Ok(())
}

pub async fn watch_files(connection: quinn::Connection, watch_list: WatchList) {
Expand Down Expand Up @@ -152,10 +150,10 @@ async fn get_last_updated(path: &str) -> anyhow::Result<SystemTime> {
Ok(tokio::fs::metadata(path).await?.modified()?)
}

pub fn make_server_endpoint(bind_addr: SocketAddr) -> Result<(Incoming, Vec<u8>), Box<dyn Error>> {
pub fn make_server_endpoint(bind_addr: SocketAddr) -> Result<(Endpoint, Vec<u8>), Box<dyn Error>> {
let (server_config, server_cert) = configure_server()?;
let (_endpoint, incoming) = Endpoint::server(server_config, bind_addr)?;
Ok((incoming, server_cert))
let endpoint = Endpoint::server(server_config, bind_addr)?;
Ok((endpoint, server_cert))
}

/// Returns default server configuration along with its certificate.
Expand Down

0 comments on commit 29310b8

Please sign in to comment.