Skip to content

Commit

Permalink
Update this crate to use the new polling breaking changes (#142)
Browse files Browse the repository at this point in the history
Signed-off-by: John Nunley <dev@notgull.net>
  • Loading branch information
notgull committed Sep 10, 2023
1 parent 63f3e14 commit 1b1466a
Show file tree
Hide file tree
Showing 10 changed files with 365 additions and 152 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Expand Up @@ -29,16 +29,13 @@ concurrent-queue = "2.2.0"
futures-io = { version = "0.3.28", default-features = false, features = ["std"] }
futures-lite = { version = "1.11.0", default-features = false }
parking = "2.0.0"
polling = "2.6.0"
polling = "3.0.0"
rustix = { version = "0.38.2", default-features = false, features = ["std", "fs"] }
slab = "0.4.2"
socket2 = { version = "0.5.3", features = ["all"] }
tracing = { version = "0.1.37", default-features = false }
waker-fn = "1.1.0"

[build-dependencies]
autocfg = "1"

[dev-dependencies]
async-channel = "1"
async-net = "1"
Expand All @@ -54,3 +51,6 @@ timerfd = "1"

[target.'cfg(windows)'.dev-dependencies]
uds_windows = "1"

[patch.crates-io]
async-io = { path = "." }
16 changes: 0 additions & 16 deletions build.rs

This file was deleted.

14 changes: 9 additions & 5 deletions examples/linux-inotify.rs
Expand Up @@ -37,17 +37,21 @@ fn main() -> std::io::Result<()> {
future::block_on(async {
// Watch events in the current directory.
let mut inotify = Async::new(Inotify::init()?)?;
inotify
.get_mut()
.watches()
.add(".", WatchMask::ALL_EVENTS)?;

// SAFETY: We do not move the inner file descriptor out.
unsafe {
inotify
.get_mut()
.watches()
.add(".", WatchMask::ALL_EVENTS)?;
}
println!("Watching for filesystem events in the current directory...");
println!("Try opening a file to trigger some events.");
println!();

// Wait for events in a loop and print them on the screen.
loop {
for event in inotify.read_with_mut(read_op).await? {
for event in unsafe { inotify.read_with_mut(read_op).await? } {
println!("{:?}", event);
}
}
Expand Down
80 changes: 74 additions & 6 deletions examples/windows-uds.rs
Expand Up @@ -8,22 +8,88 @@

#[cfg(windows)]
fn main() -> std::io::Result<()> {
use std::ops::Deref;
use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket};
use std::path::PathBuf;

use async_io::Async;
use blocking::Unblock;
use futures_lite::{future, io, prelude::*};
use futures_lite::{future, prelude::*};
use std::io;
use tempfile::tempdir;
use uds_windows::{UnixListener, UnixStream};

// n.b.: notgull: uds_windows does not support I/O safety uet, hence the wrapper types

struct UnixListener(uds_windows::UnixListener);

impl From<uds_windows::UnixListener> for UnixListener {
fn from(ul: uds_windows::UnixListener) -> Self {
Self(ul)
}
}

impl Deref for UnixListener {
type Target = uds_windows::UnixListener;

fn deref(&self) -> &uds_windows::UnixListener {
&self.0
}
}

impl AsSocket for UnixListener {
fn as_socket(&self) -> BorrowedSocket<'_> {
unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
}
}

struct UnixStream(uds_windows::UnixStream);

impl From<uds_windows::UnixStream> for UnixStream {
fn from(ul: uds_windows::UnixStream) -> Self {
Self(ul)
}
}

impl Deref for UnixStream {
type Target = uds_windows::UnixStream;

fn deref(&self) -> &uds_windows::UnixStream {
&self.0
}
}

impl AsSocket for UnixStream {
fn as_socket(&self) -> BorrowedSocket<'_> {
unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
}
}

impl io::Read for UnixStream {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
io::Read::read(&mut self.0, buf)
}
}

impl io::Write for UnixStream {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
io::Write::write(&mut self.0, buf)
}

fn flush(&mut self) -> io::Result<()> {
io::Write::flush(&mut self.0)
}
}

unsafe impl async_io::IoSafe for UnixStream {}

async fn client(addr: PathBuf) -> io::Result<()> {
// Connect to the address.
let stream = Async::new(UnixStream::connect(addr)?)?;
let stream = Async::new(UnixStream::from(uds_windows::UnixStream::connect(addr)?))?;
println!("Connected to {:?}", stream.get_ref().peer_addr()?);

// Pipe the stream to stdout.
let mut stdout = Unblock::new(std::io::stdout());
io::copy(&stream, &mut stdout).await?;
futures_lite::io::copy(stream, &mut stdout).await?;
Ok(())
}

Expand All @@ -32,7 +98,7 @@ fn main() -> std::io::Result<()> {

future::block_on(async {
// Create a listener.
let listener = Async::new(UnixListener::bind(&path)?)?;
let listener = Async::new(UnixListener::from(uds_windows::UnixListener::bind(&path)?))?;
println!("Listening on {:?}", listener.get_ref().local_addr()?);

future::try_zip(
Expand All @@ -42,7 +108,9 @@ fn main() -> std::io::Result<()> {
println!("Accepted a client");

// Send a message, drop the stream, and wait for the client.
Async::new(stream)?.write_all(b"Hello!\n").await?;
Async::new(UnixStream::from(stream))?
.write_all(b"Hello!\n")
.await?;
Ok(())
},
client(path),
Expand Down

0 comments on commit 1b1466a

Please sign in to comment.