Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor UdpMuxNewAddr to be lock-less #1

Merged
merged 24 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
9b298e2
Dirty PoC for lock-less UdpMuxNewAddr
thomaseizinger Aug 2, 2022
f5483e7
Remove locks from handle
thomaseizinger Aug 2, 2022
a137a07
Merge branch 'anton/webrtc-transport' into rewrite-udp-mux
thomaseizinger Aug 3, 2022
8f522bb
Replace `flume` with `futures::mpsc` channels
thomaseizinger Aug 3, 2022
9d9c9a4
Introduce `req_res_chan` module
thomaseizinger Aug 3, 2022
54444b7
Move locking into `req_res_chan::Sender`
thomaseizinger Aug 3, 2022
ce44836
Fill in more implementations
thomaseizinger Aug 3, 2022
c111c46
Add TODO
thomaseizinger Aug 3, 2022
ac14b9f
Implement send command handling, buffering items if we can't send them
thomaseizinger Aug 18, 2022
3c214cb
Remove tokio::spawn for handling waiting for closed sockets
thomaseizinger Aug 18, 2022
eb35b18
Remove futures::block_on in favor of local task set
thomaseizinger Aug 18, 2022
744ad62
Fix clippy warnings
thomaseizinger Aug 18, 2022
eb976f2
Don't loop inside a loop
thomaseizinger Aug 18, 2022
8a77b6e
Use match instead of `ready!`
thomaseizinger Aug 18, 2022
6d49c5a
Fix webrtc version to rev prior to monorepo merge
thomaseizinger Aug 29, 2022
521da48
Never have more than one write future
thomaseizinger Aug 29, 2022
676d50d
Resolve TODO's for error handling
thomaseizinger Aug 29, 2022
9a32e4f
Make `recv_buf` as short-lived as possible
thomaseizinger Aug 29, 2022
64f8021
Replace AtomicBool with regular bool
thomaseizinger Aug 29, 2022
1d8094b
Remove unused import
thomaseizinger Aug 29, 2022
cc66761
Merge remote-tracking branch 'melekes/anton/webrtc-transport' into re…
thomaseizinger Aug 29, 2022
6bce388
Merge remote-tracking branch 'melekes/anton/webrtc-transport' into re…
thomaseizinger Aug 29, 2022
16fb8e4
Simplify send_buffer handling
thomaseizinger Aug 30, 2022
211baae
Cargo fmt
thomaseizinger Aug 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions transports/webrtc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub mod transport;

mod fingerprint;
mod in_addr;
mod req_res_chan;
mod sdp;
mod udp_mux;
mod webrtc_connection;
49 changes: 49 additions & 0 deletions transports/webrtc/src/req_res_chan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use futures::channel::mpsc;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing license header. Feel free to ignore.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I? I'd love to get rid of those 😅

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, I meant "Feel free to ignore [for now]".

I think we should be consistent in this pull request.

I am fine getting rid of them in general, though I would need to check back with some legal experts first.

use futures::channel::oneshot;
use futures::SinkExt;
use futures_lite::StreamExt;
use std::io;
use std::task::{Context, Poll};

pub fn new<Req, Res>(capacity: usize) -> (Sender<Req, Res>, Receiver<Req, Res>) {
let (sender, receiver) = mpsc::channel(capacity);

(
Sender {
inner: futures::lock::Mutex::new(sender),
},
Receiver { inner: receiver },
)
}

pub struct Sender<Req, Res> {
inner: futures::lock::Mutex<mpsc::Sender<(Req, oneshot::Sender<Res>)>>,
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
}

impl<Req, Res> Sender<Req, Res> {
pub async fn send(&self, req: Req) -> io::Result<Res> {
let (sender, receiver) = oneshot::channel();

self.inner
.lock()
.await
.send((req, sender))
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let res = receiver
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;

Ok(res)
}
}

pub struct Receiver<Req, Res> {
inner: mpsc::Receiver<(Req, oneshot::Sender<Res>)>,
}

impl<Req, Res> Receiver<Req, Res> {
pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<(Req, oneshot::Sender<Res>)>> {
self.inner.poll_next(cx)
}
}
10 changes: 5 additions & 5 deletions transports/webrtc/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl Transport for WebRTCTransport {
.iter()
.next()
.ok_or(TransportError::Other(Error::NoListeners))?;
let udp_mux = first_listener.udp_mux.clone();
let udp_mux = first_listener.udp_mux.udp_mux_handle();

// [`Transport::dial`] should do no work unless the returned [`Future`] is polled. Thus
// do the `set_remote_description` call within the [`Future`].
Expand Down Expand Up @@ -259,7 +259,7 @@ pub struct WebRTCListenStream {
config: WebRTCConfiguration,

/// The UDP muxer that manages all ICE connections.
udp_mux: Arc<UDPMuxNewAddr>,
udp_mux: UDPMuxNewAddr,

/// `Keypair` identifying this peer
id_keys: identity::Keypair,
Expand All @@ -277,7 +277,7 @@ impl WebRTCListenStream {
listener_id: ListenerId,
listen_addr: SocketAddr,
config: WebRTCConfiguration,
udp_mux: Arc<UDPMuxNewAddr>,
udp_mux: UDPMuxNewAddr,
id_keys: identity::Keypair,
) -> Self {
let in_addr = InAddr::new(listen_addr.ip());
Expand Down Expand Up @@ -382,13 +382,13 @@ impl Stream for WebRTCListenStream {
}

// Poll UDP muxer for new addresses or incoming data for streams.
match ready!(self.udp_mux.as_ref().poll(cx)) {
match ready!(self.udp_mux.poll(cx)) {
UDPMuxEvent::NewAddr(new_addr) => {
let local_addr = socketaddr_to_multiaddr(&self.listen_addr);
let send_back_addr = socketaddr_to_multiaddr(&new_addr.addr);
let event = TransportEvent::Incoming {
upgrade: Box::pin(upgrade(
self.udp_mux.clone(),
self.udp_mux.udp_mux_handle(),
self.config.clone(),
new_addr.addr,
new_addr.ufrag,
Expand Down