Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improving async and sync API #54

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ impl<'a> IntoAddress for &'a str {

impl IntoAddress for String {
fn into_address(&self) -> Result<Ipv4Addr> {
(&**self).into_address()
self.as_str().into_address()
}
}

impl<'a> IntoAddress for &'a String {
fn into_address(&self) -> Result<Ipv4Addr> {
(&**self).into_address()
self.as_str().into_address()
}
}

Expand All @@ -79,13 +79,13 @@ impl IntoAddress for Ipv4Addr {

impl<'a> IntoAddress for &'a Ipv4Addr {
fn into_address(&self) -> Result<Ipv4Addr> {
(&**self).into_address()
(*self).into_address()
}
}

impl IntoAddress for IpAddr {
fn into_address(&self) -> Result<Ipv4Addr> {
match *self {
match self {
IpAddr::V4(ref value) => Ok(*value),

IpAddr::V6(_) => Err(Error::InvalidAddress),
Expand All @@ -95,7 +95,7 @@ impl IntoAddress for IpAddr {

impl<'a> IntoAddress for &'a IpAddr {
fn into_address(&self) -> Result<Ipv4Addr> {
(&**self).into_address()
(*self).into_address()
}
}

Expand All @@ -107,13 +107,13 @@ impl IntoAddress for SocketAddrV4 {

impl<'a> IntoAddress for &'a SocketAddrV4 {
fn into_address(&self) -> Result<Ipv4Addr> {
(&**self).into_address()
(*self).into_address()
}
}

impl IntoAddress for SocketAddr {
fn into_address(&self) -> Result<Ipv4Addr> {
match *self {
match self {
SocketAddr::V4(ref value) => Ok(*value.ip()),

SocketAddr::V6(_) => Err(Error::InvalidAddress),
Expand All @@ -123,6 +123,6 @@ impl IntoAddress for SocketAddr {

impl<'a> IntoAddress for &'a SocketAddr {
fn into_address(&self) -> Result<Ipv4Addr> {
(&**self).into_address()
(*self).into_address()
}
}
63 changes: 53 additions & 10 deletions src/async/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,27 @@ impl AsyncDevice {
let codec = TunPacketCodec::new(pi, self.inner.get_ref().mtu().unwrap_or(1504));
Framed::new(self, codec)
}

/// Transforms this device into async queues
pub fn queues(self) -> io::Result<Vec<AsyncQueue>> {
self.inner
.into_inner()
.queues()
.into_iter()
.map(AsyncQueue::new)
.collect()
}
}

impl AsyncRead for AsyncDevice {
fn poll_read(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
let this = self.get_mut();
loop {
let mut guard = ready!(self.inner.poll_read_ready_mut(cx))?;
let mut guard = ready!(this.inner.poll_read_ready_mut(cx))?;
let rbuf = buf.initialize_unfilled();
match guard.try_io(|inner| inner.get_mut().read(rbuf)) {
Ok(res) => return Poll::Ready(res.map(|n| buf.advance(n))),
Expand All @@ -76,12 +87,13 @@ impl AsyncRead for AsyncDevice {

impl AsyncWrite for AsyncDevice {
fn poll_write(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();
loop {
let mut guard = ready!(self.inner.poll_write_ready_mut(cx))?;
let mut guard = ready!(this.inner.poll_write_ready_mut(cx))?;
match guard.try_io(|inner| inner.get_mut().write(buf)) {
Ok(res) => return Poll::Ready(res),
Err(_wb) => continue,
Expand All @@ -104,12 +116,13 @@ impl AsyncWrite for AsyncDevice {
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize, io::Error>> {
let this = self.get_mut();
loop {
let mut guard = ready!(self.inner.poll_write_ready_mut(cx))?;
let mut guard = ready!(this.inner.poll_write_ready_mut(cx))?;
match guard.try_io(|inner| inner.get_mut().write_vectored(bufs)) {
Ok(res) => return Poll::Ready(res),
Err(_wb) => continue,
Expand Down Expand Up @@ -145,6 +158,34 @@ impl AsyncQueue {
self.inner.get_mut()
}

/// Sends the given buffer through this queue
///
/// It can be accessed concurrently
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
loop {
let mut guard = self.inner.writable().await?;

match guard.try_io(|inner| unsafe { inner.get_ref().tun.send(buf) }) {
Ok(res) => return res,
Err(_) => continue,
}
}
}

/// Receives data from this queue to the given buffer
///
/// It can be accessed concurrently
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
loop {
let mut guard = self.inner.readable().await?;

match guard.try_io(|inner| unsafe { inner.get_ref().tun.recv(buf) }) {
Ok(res) => return res,
Err(_) => continue,
}
}
}

/// Consumes this AsyncQueue and return a Framed object (unified Stream and Sink interface)
pub fn into_framed(mut self) -> Framed<Self, TunPacketCodec> {
let pi = self.get_mut().has_packet_information();
Expand All @@ -155,12 +196,13 @@ impl AsyncQueue {

impl AsyncRead for AsyncQueue {
fn poll_read(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf,
) -> Poll<io::Result<()>> {
let this = self.get_mut();
loop {
let mut guard = ready!(self.inner.poll_read_ready_mut(cx))?;
let mut guard = ready!(this.inner.poll_read_ready_mut(cx))?;
let rbuf = buf.initialize_unfilled();
match guard.try_io(|inner| inner.get_mut().read(rbuf)) {
Ok(res) => return Poll::Ready(res.map(|n| buf.advance(n))),
Expand All @@ -172,12 +214,13 @@ impl AsyncRead for AsyncQueue {

impl AsyncWrite for AsyncQueue {
fn poll_write(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = self.get_mut();
loop {
let mut guard = ready!(self.inner.poll_write_ready_mut(cx))?;
let mut guard = ready!(this.inner.poll_write_ready_mut(cx))?;
match guard.try_io(|inner| inner.get_mut().write(buf)) {
Ok(res) => return Poll::Ready(res),
Err(_wb) => continue,
Expand Down
9 changes: 2 additions & 7 deletions src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,13 @@ use crate::address::IntoAddress;
use crate::platform;

/// TUN interface OSI layer of operation.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)]
pub enum Layer {
L2,
#[default]
L3,
}

impl Default for Layer {
fn default() -> Self {
Layer::L3
}
}

/// Configuration builder for a TUN interface.
#[derive(Clone, Default, Debug)]
pub struct Configuration {
Expand Down
10 changes: 8 additions & 2 deletions src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ pub trait Device: Read + Write {
/// Set the MTU.
fn set_mtu(&mut self, value: i32) -> Result<()>;

/// Get a device queue.
fn queue(&mut self, index: usize) -> Option<&mut Self::Queue>;
/// Get a mutable reference to the device queue.
fn queue_mut(&mut self, index: usize) -> Option<&mut Self::Queue>;

/// Get a reference to the device queue.
fn queue(&self, index: usize) -> Option<&Self::Queue>;

/// Transforms this device into queues.
fn queues(self) -> Vec<Self::Queue>;
}
18 changes: 15 additions & 3 deletions src/platform/android/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Device {
let tun = Fd::new(fd).map_err(|_| io::Error::last_os_error())?;

Device {
queue: Queue { tun: tun },
queue: Queue { tun },
}
};
Ok(device)
Expand Down Expand Up @@ -141,13 +141,25 @@ impl D for Device {
Ok(())
}

fn queue(&mut self, index: usize) -> Option<&mut Self::Queue> {
fn queue_mut(&mut self, index: usize) -> Option<&mut Self::Queue> {
if index > 0 {
return None;
}

Some(&mut self.queue)
}

fn queue(&self, index: usize) -> Option<&Self::Queue> {
if index > 0 {
return None;
}

Some(&self.queue)
}

fn queues(self) -> Vec<Self::Queue> {
vec![self.queue]
}
}

impl AsRawFd for Device {
Expand All @@ -163,7 +175,7 @@ impl IntoRawFd for Device {
}

pub struct Queue {
tun: Fd,
pub tun: Fd,
}

impl Queue {
Expand Down
18 changes: 15 additions & 3 deletions src/platform/ios/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Device {
let tun = Fd::new(fd).map_err(|_| io::Error::last_os_error())?;

Device {
queue: Queue { tun: tun },
queue: Queue { tun },
}
};
Ok(device)
Expand Down Expand Up @@ -141,13 +141,25 @@ impl D for Device {
Ok(())
}

fn queue(&mut self, index: usize) -> Option<&mut Self::Queue> {
fn queue_mut(&mut self, index: usize) -> Option<&mut Self::Queue> {
if index > 0 {
return None;
}

Some(&mut self.queue)
}

fn queue(&self, index: usize) -> Option<&Self::Queue> {
if index > 0 {
return None;
}

Some(&self.queue)
}

fn queues(self) -> Vec<Self::Queue> {
vec![self.queue]
}
}

impl AsRawFd for Device {
Expand All @@ -163,7 +175,7 @@ impl IntoRawFd for Device {
}

pub struct Queue {
tun: Fd,
pub tun: Fd,
}

impl Queue {
Expand Down
12 changes: 10 additions & 2 deletions src/platform/linux/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,17 @@ impl D for Device {
}
}

fn queue(&mut self, index: usize) -> Option<&mut Self::Queue> {
fn queue_mut(&mut self, index: usize) -> Option<&mut Self::Queue> {
self.queues.get_mut(index)
}

fn queue(&self, index: usize) -> Option<&Self::Queue> {
self.queues.get(index)
}

fn queues(self) -> Vec<Self::Queue> {
self.queues
}
}

impl AsRawFd for Device {
Expand All @@ -392,7 +400,7 @@ impl IntoRawFd for Device {
}

pub struct Queue {
tun: Fd,
pub tun: Fd,
pi_enabled: bool,
}

Expand Down
Loading