Skip to content

Commit

Permalink
Cleaning up warnings, adding build instructions
Browse files Browse the repository at this point in the history
  • Loading branch information
murphysean committed Jun 29, 2019
1 parent a309441 commit 2cc057d
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 94 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ Example Output
<
* Closing connection 0

Building
---

Requires linux and nightly so I'm going to show how to run it via docker

docker run -p 7878:7878 --rm -it -v $PWD:/usr/src/myapp -w /usr/src/myapp rust:latest /bin/bash
rustup override set nightly
cargo run

Improvements
---

Expand Down
186 changes: 92 additions & 94 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ use core::{
use std::{
collections::HashMap,
convert::TryInto,
ffi::CStr,
io,
io::prelude::*,
io::Error,
net::{Shutdown, SocketAddr, TcpListener, TcpStream},
os::unix::io::{AsRawFd, IntoRawFd, RawFd},
net::{Shutdown, TcpListener, TcpStream},
os::unix::io::{AsRawFd, RawFd},
rc::Rc,
sync::Mutex,
};

use futures::{
executor::{LocalPool, LocalSpawner},
future::{empty, ready, BoxFuture, FutureObj, LocalFutureObj},
stream::{self, Stream, StreamExt, TryStream, TryStreamExt},
task::{LocalSpawn, LocalSpawnExt, Spawn, SpawnExt, Waker},
io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt},
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
stream::{TryStream, TryStreamExt},
task::{LocalSpawnExt, Waker},
};

/// Look how easy it is to be async :)
Expand Down Expand Up @@ -207,8 +207,8 @@ impl AsyncTcpListener {
}

impl Drop for AsyncTcpListener {
fn drop(&mut self){
self.epoll.ctl_del_rawfd(self.listener.as_raw_fd());
fn drop(&mut self) {
self.epoll.ctl_del_rawfd(self.listener.as_raw_fd()).unwrap();
}
}

Expand All @@ -221,7 +221,10 @@ struct Incoming<'a> {
impl<'a> TryStream for Incoming<'a> {
type Ok = AsyncTcpStream;
type Error = io::Error;
fn try_poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Self::Ok,Self::Error>>> {
fn try_poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Ok, Self::Error>>> {
let a = self.listener.listener.accept();
match a {
Ok((stream, _addr)) => Poll::Ready(Some(Ok(AsyncTcpStream::new(
Expand All @@ -231,18 +234,24 @@ impl<'a> TryStream for Incoming<'a> {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
let events = libc::EPOLLET | libc::EPOLLIN | libc::EPOLLONESHOT;
//If I've already added it to epoll, then I just need to modify now
if self.listener.eph{
self.listener.epoll.ctl_mod_rawfd(
self.listener.listener.as_raw_fd(),
cx.waker().clone(),
events as u32,
);
}else{
self.listener.epoll.ctl_add_rawfd(
self.listener.listener.as_raw_fd(),
cx.waker().clone(),
events as u32,
);
if self.listener.eph {
self.listener
.epoll
.ctl_mod_rawfd(
self.listener.listener.as_raw_fd(),
cx.waker().clone(),
events as u32,
)
.unwrap();
} else {
self.listener
.epoll
.ctl_add_rawfd(
self.listener.listener.as_raw_fd(),
cx.waker().clone(),
events as u32,
)
.unwrap();
self.listener.eph = true;
}
Poll::Pending
Expand All @@ -259,7 +268,7 @@ impl<'a> TryStream for Incoming<'a> {
/// Consumes and wraps a tcp stream. It will set it in a nonblocking mode,
/// and provides async versions of read, write, flush, and close from the
/// futures-rs library. At the moment it also takes a reference to an epoll
/// instance that is used under the hood to provide events and wake up the
/// instance that is used under the hood to provide events and wake up the
/// poll methods. Since my impl is all on a single thread, I might be able to
/// use thread local storage to register wakers with an epoll instance.
struct AsyncTcpStream {
Expand All @@ -280,8 +289,8 @@ impl AsyncTcpStream {
}

impl Drop for AsyncTcpStream {
fn drop(&mut self){
self.epoll.ctl_del_rawfd(self.stream.as_raw_fd());
fn drop(&mut self) {
self.epoll.ctl_del_rawfd(self.stream.as_raw_fd()).unwrap();
}
}

Expand All @@ -296,25 +305,19 @@ impl AsyncRead for AsyncTcpStream {
Ok(i) => Poll::Ready(Ok(i)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
let events = libc::EPOLLET | libc::EPOLLIN | libc::EPOLLONESHOT;
if self.eph{
self.epoll.ctl_mod_rawfd(
self.stream.as_raw_fd(),
cx.waker().clone(),
events as u32,
);
}else{
self.epoll.ctl_add_rawfd(
self.stream.as_raw_fd(),
cx.waker().clone(),
events as u32,
);
if self.eph {
self.epoll
.ctl_mod_rawfd(self.stream.as_raw_fd(), cx.waker().clone(), events as u32)
.unwrap();
} else {
self.epoll
.ctl_add_rawfd(self.stream.as_raw_fd(), cx.waker().clone(), events as u32)
.unwrap();
self.eph = true
}
Poll::Pending
},
Err(e) => {
Poll::Ready(Err(e))
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
Expand All @@ -330,23 +333,19 @@ impl AsyncWrite for AsyncTcpStream {
Ok(i) => Poll::Ready(Ok(i)),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
let events = libc::EPOLLET | libc::EPOLLOUT | libc::EPOLLONESHOT;
if self.eph{
self.epoll.ctl_mod_rawfd(
self.stream.as_raw_fd(),
cx.waker().clone(),
events as u32);
}else{
self.epoll.ctl_add_rawfd(
self.stream.as_raw_fd(),
cx.waker().clone(),
events as u32);
if self.eph {
self.epoll
.ctl_mod_rawfd(self.stream.as_raw_fd(), cx.waker().clone(), events as u32)
.unwrap();
} else {
self.epoll
.ctl_add_rawfd(self.stream.as_raw_fd(), cx.waker().clone(), events as u32)
.unwrap();
self.eph = true;
}
Poll::Pending
},
Err(e) =>{
Poll::Ready(Err(e))
}
Err(e) => Poll::Ready(Err(e)),
}
}

Expand All @@ -356,23 +355,19 @@ impl AsyncWrite for AsyncTcpStream {
Ok(()) => Poll::Ready(Ok(())),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
let events = libc::EPOLLET | libc::EPOLLOUT | libc::EPOLLONESHOT;
if self.eph{
self.epoll.ctl_mod_rawfd(
self.stream.as_raw_fd(),
cx.waker().clone(),
events as u32);
}else{
self.epoll.ctl_add_rawfd(
self.stream.as_raw_fd(),
cx.waker().clone(),
events as u32);
if self.eph {
self.epoll
.ctl_mod_rawfd(self.stream.as_raw_fd(), cx.waker().clone(), events as u32)
.unwrap();
} else {
self.epoll
.ctl_add_rawfd(self.stream.as_raw_fd(), cx.waker().clone(), events as u32)
.unwrap();
self.eph = true;
}
Poll::Pending
},
Err(e) =>{
Poll::Ready(Err(e))
}
Err(e) => Poll::Ready(Err(e)),
}
}

Expand All @@ -382,23 +377,19 @@ impl AsyncWrite for AsyncTcpStream {
Ok(()) => Poll::Ready(Ok(())),
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
let events = libc::EPOLLET | libc::EPOLLOUT | libc::EPOLLONESHOT;
if self.eph{
self.epoll.ctl_mod_rawfd(
self.stream.as_raw_fd(),
cx.waker().clone(),
events as u32);
}else{
self.epoll.ctl_add_rawfd(
self.stream.as_raw_fd(),
cx.waker().clone(),
events as u32);
if self.eph {
self.epoll
.ctl_mod_rawfd(self.stream.as_raw_fd(), cx.waker().clone(), events as u32)
.unwrap();
} else {
self.epoll
.ctl_add_rawfd(self.stream.as_raw_fd(), cx.waker().clone(), events as u32)
.unwrap();
self.eph = true;
}
Poll::Pending
},
Err(e) =>{
Poll::Ready(Err(e))
}
Err(e) => Poll::Ready(Err(e)),
}
}
}
Expand All @@ -408,19 +399,19 @@ impl AsyncWrite for AsyncTcpStream {
/// local pool until it is stalled.
struct Server {
listener: Option<AsyncTcpListener>,
localPool: LocalPool,
local_pool: LocalPool,
epoll: Rc<Epoll>,
}

impl Server {
/// Creating a new server will consume a tcp listener, turn it async and wrap it with a simple server
/// that will handle everything asynchronously
pub fn new(listener: TcpListener) -> Server {
let localPool = LocalPool::new();
let local_pool = LocalPool::new();
let epoll = Rc::new(Epoll::new());
Server {
listener: Some(AsyncTcpListener::new(listener, epoll.clone())),
localPool: localPool,
local_pool: local_pool,
epoll: epoll.clone(),
}
}
Expand All @@ -430,18 +421,23 @@ impl Server {
/// the current thread.
pub fn run(self: &mut Self) {
//How do I get my stream to run on this executer, spawning for each new iteration?
self.localPool.spawner()
.spawn_local(accept_async(self.listener.take().unwrap(), self.localPool.spawner()));
self.local_pool
.spawner()
.spawn_local(accept_async(
self.listener.take().unwrap(),
self.local_pool.spawner(),
))
.unwrap();
loop {
//Run the localPool as far as you can without blocking...
self.localPool.run_until_stalled();
self.local_pool.run_until_stalled();
//I'm now noticing that anything that was spawned didn't get grabbed ^ that round
//Let's run through it again to get all the newly spawned futures to call poll
self.localPool.run_until_stalled();
self.local_pool.run_until_stalled();
//Ok now everything is just waiting on io...
//Let epoll take over, epoll will block on all the fd it's gathered
//when finished it will notify all the relevant wakers that had events
self.epoll.wait(-1);
self.epoll.wait(-1).unwrap();
//After epoll is done waiting, rerun the loop
println!("loop: Another round");
}
Expand All @@ -451,17 +447,19 @@ impl Server {
/// This future won't resolve... well maybe it should if someone calls close on the listener
/// It's job is to loop over an async iterator (stream) of incoming connections.
/// It will accept the connection, and spawn a new future to handle that connection.
async fn accept_async(mut listener: AsyncTcpListener, mut spawner : LocalSpawner) {
async fn accept_async(mut listener: AsyncTcpListener, mut spawner: LocalSpawner) {
let mut incoming = listener.incoming();
loop{
loop {
let next = incoming.try_next().await;
if let Ok(r) = next{
if let Some(stream) = r{
spawner.spawn_local(handle_connection_async(stream));
}else{
if let Ok(r) = next {
if let Some(stream) = r {
spawner
.spawn_local(handle_connection_async(stream))
.unwrap();
} else {
println!("accept_async:error");
}
}else{
} else {
println!("accept_async:none");
}
}
Expand All @@ -472,7 +470,7 @@ async fn accept_async(mut listener: AsyncTcpListener, mut spawner : LocalSpawner
/// Eventually this function might handle the http protocol
/// Read the incoming request, validate headers, create a request obj with a async reader attached
/// to read the body. It could then find an associated handler registered to the server by path
/// match and call it to get a response back.
/// match and call it to get a response back.
async fn handle_connection_async(mut stream: AsyncTcpStream) {
println!("handle_connection_async: Handling:started...");
//stream.shutdown(Shutdown::Both);
Expand Down

0 comments on commit 2cc057d

Please sign in to comment.