Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: Add a /status handler on the ws port
Browse files Browse the repository at this point in the history
This adds some logic to parse an incoming HTTP request, peek at the headers, and
then route the response appropriately.
  • Loading branch information
alexcrichton committed Sep 8, 2017
1 parent 843dc68 commit 2430d35
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 19 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@ pypy/
src/
.tox/
.eggs/
autopush_rs/target
autopush_rs/_native*
3 changes: 3 additions & 0 deletions autopush_rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions autopush_rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ authors = ["Alex Crichton <alex@alexcrichton.com>"]
crate-type = ["cdylib"]

[dependencies]
bytes = "0.4"
env_logger = { version = "0.4", default-features = false }
error-chain = "0.10"
futures = "0.1"
httparse = "1.0"
hyper = "0.11"
libc = "0.2"
log = "0.3"
Expand All @@ -18,6 +20,7 @@ serde_derive = "1.0"
serde_json = "1.0"
time = "0.1"
tokio-core = "0.1"
tokio-io = "0.1"
tokio-service = "0.1"
tokio-tungstenite = "0.3"
tungstenite = "0.4"
Expand Down
1 change: 0 additions & 1 deletion autopush_rs/src/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use libc::c_char;
use serde::de;
use serde::ser;
use serde_json;
use time::Tm;
use uuid::Uuid;

use errors::*;
Expand Down
6 changes: 4 additions & 2 deletions autopush_rs/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@ use std::any::Any;
use std::error;
use std::io;

use tungstenite;
use serde_json;
use futures::Future;
use httparse;
use serde_json;
use tungstenite;

error_chain! {
foreign_links {
Ws(tungstenite::Error);
Io(io::Error);
Json(serde_json::Error);
Httparse(httparse::Error);
}

errors {
Expand Down
4 changes: 4 additions & 0 deletions autopush_rs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,23 @@
//!
//! Otherwise be sure to check out each module for more documentation!

extern crate bytes;
extern crate env_logger;
#[macro_use]
extern crate futures;
extern crate httparse;
extern crate hyper;
extern crate libc;
#[macro_use]
extern crate log;
extern crate serde;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate serde_json;
extern crate time;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_service;
extern crate tokio_tungstenite;
extern crate tungstenite;
Expand Down
86 changes: 86 additions & 0 deletions autopush_rs/src/server/dispatch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//! A future to figure out where we're going to dispatch a TCP socket.
//!
//! When the websocket server receives a TCP connection it may be a websocket
//! request or a general HTTP request. Right now the websocket library we're
//! using, Tungstenite, doesn't have built-in support for handling this
//! situation, so we roll our own.
//!
//! The general idea here is that we're going to read just enough data off the
//! socket to parse an initial HTTP request. This request will be parsed by the
//! `httparse` crate. Once we've got a request we take a look at the headers and
//! if we find a websocket upgrade we classify it as a websocket request. If
//! it's otherwise a `/status` request, we return that we're supposed to get the
//! status, and finally after all that if it doesn't match we return an error.
//!
//! This is basically a "poor man's" HTTP router and while it should be good
//! enough for now it should probably be extended/refactored in the future!
//!
//! Note that also to implement this we buffer the request that we read in
//! memory and then attach that to a socket once we've classified what kind of
//! socket this is. That's done to replay the bytes we read again for the
//! tungstenite library, which'll duplicate header parsing but we don't have
//! many other options for now!

use bytes::BytesMut;
use futures::{Future, Poll};
use httparse;
use tokio_core::net::TcpStream;
use tokio_io::AsyncRead;

use errors::*;
use server::webpush_io::WebpushIo;

pub struct Dispatch {
socket: Option<TcpStream>,
data: BytesMut,
}

pub enum RequestType {
Websocket,
Status,
}

impl Dispatch {
pub fn new(socket: TcpStream) -> Dispatch {
Dispatch {
socket: Some(socket),
data: BytesMut::new(),
}
}
}

impl Future for Dispatch {
type Item = (WebpushIo, RequestType);
type Error = Error;

fn poll(&mut self) -> Poll<(WebpushIo, RequestType), Error> {
loop {
if self.data.len() == self.data.capacity() {
self.data.reserve(16); // get some extra space
}
if try_ready!(self.socket.as_mut().unwrap().read_buf(&mut self.data)) == 0 {
return Err("early eof".into())
}
let ty = {
let mut headers = [httparse::EMPTY_HEADER; 16];
let mut req = httparse::Request::new(&mut headers);
match req.parse(&self.data)? {
httparse::Status::Complete(_) => {}
httparse::Status::Partial => continue,
}

if req.headers.iter().any(|h| h.name == "Upgrade") {
RequestType::Websocket
} else if req.path == Some("/status") {
RequestType::Status
} else {
debug!("unknown http request {:?}", req);
return Err("unknown http request".into())
}
};

let tcp = self.socket.take().unwrap();
return Ok((WebpushIo::new(tcp, self.data.take()), ty).into())
}
}
}
80 changes: 64 additions & 16 deletions autopush_rs/src/server.rs → autopush_rs/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ use futures::task::{self, Task};
use futures::{Stream, Future, Sink, Async, Poll, AsyncSink, StartSend};
use libc::c_char;
use serde_json;
use tokio_core::net::{TcpListener, TcpStream};
use time;
use tokio_core::net::TcpListener;
use tokio_core::reactor::{Core, Timeout, Handle, Interval};
use tokio_io;
use tokio_tungstenite::{accept_async, WebSocketStream};
use tungstenite::Message;
use uuid::Uuid;
Expand All @@ -26,8 +28,13 @@ use errors::*;
use protocol::{ClientMessage, ServerMessage, ServerNotification, Notification};
use queue::{self, AutopushQueue};
use rt::{self, AutopushError, UnwindGuard};
use server::webpush_io::WebpushIo;
use server::dispatch::{Dispatch, RequestType};
use util::{self, RcObject, timeout};

mod dispatch;
mod webpush_io;

#[repr(C)]
pub struct AutopushServer {
inner: UnwindGuard<AutopushServerInner>,
Expand Down Expand Up @@ -299,19 +306,38 @@ impl Server {

// TODO: TCP socket options here?

// Perform the websocket handshake on each connection, but don't let
// it take too long.
let ws = accept_async(socket, None).chain_err(|| "failed to accept client");
let ws = timeout(ws, srv.opts.open_handshake_timeout, &handle);

// Once the handshake is done we'll start the main communication
// with the client, managing pings here and deferring to
// `Client` to start driving the internal state machine.
// Figure out if this is a websocket or a `/status` request,
// without letting it take too long.
let request = Dispatch::new(socket);
let request = timeout(request,
srv.opts.open_handshake_timeout,
&handle);
let srv2 = srv.clone();
let client = ws.and_then(move |ws| {
PingManager::new(&srv2, ws)
.chain_err(|| "failed to make ping handler")
}).flatten();
let handle2 = handle.clone();
let client = request.and_then(move |(socket, request)| -> MyFuture<_> {
match request {
RequestType::Status => write_status(socket),
RequestType::Websocket => {
// Perform the websocket handshake on each
// connection, but don't let it take too long.
let ws = accept_async(socket, None).chain_err(|| {
"failed to accept client"
});
let ws = timeout(ws,
srv2.opts.open_handshake_timeout,
&handle2);

// Once the handshake is done we'll start the main
// communication with the client, managing pings
// here and deferring to `Client` to start driving
// the internal state machine.
Box::new(ws.and_then(move |ws| {
PingManager::new(&srv2, ws)
.chain_err(|| "failed to make ping handler")
}).flatten())
}
}
});

let srv = srv.clone();
handle.spawn(client.then(move |res| {
Expand Down Expand Up @@ -376,11 +402,11 @@ impl Drop for Server {
}

struct PingManager {
socket: RcObject<WebpushSocket<WebSocketStream<TcpStream>>>,
socket: RcObject<WebpushSocket<WebSocketStream<WebpushIo>>>,
ping_interval: Interval,
timeout: TimeoutState,
srv: Rc<Server>,
client: CloseState<Client<RcObject<WebpushSocket<WebSocketStream<TcpStream>>>>>,
client: CloseState<Client<RcObject<WebpushSocket<WebSocketStream<WebpushIo>>>>>,
}

enum TimeoutState {
Expand All @@ -395,7 +421,7 @@ enum CloseState<T> {
}

impl PingManager {
fn new(srv: &Rc<Server>, socket: WebSocketStream<TcpStream>)
fn new(srv: &Rc<Server>, socket: WebSocketStream<WebpushIo>)
-> io::Result<PingManager>
{
// The `socket` is itself a sink and a stream, and we've also got a sink
Expand Down Expand Up @@ -606,3 +632,25 @@ impl<T> Sink for WebpushSocket<T>
Ok(self.inner.close()?)
}
}

fn write_status(socket: WebpushIo) -> MyFuture<()> {
let data = json!({
"status": "OK",
"version": env!("CARGO_PKG_VERSION"),
}).to_string();
let data = format!("\
HTTP/1.1 200 Ok\r\n\
Server: webpush\r\n\
Date: {date}\r\n\
Content-Length: {len}\r\n\
\r\n\
{data}\
",
date = time::at(time::get_time()).rfc822(),
len = data.len(),
data = data,
);
Box::new(tokio_io::io::write_all(socket, data.into_bytes())
.map(|_| ())
.chain_err(|| "failed to write status response"))
}
65 changes: 65 additions & 0 deletions autopush_rs/src/server/webpush_io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//! I/O wrapper created through `Dispatch`
//!
//! Most I/O happens through just raw TCP sockets, but at the beginning of a
//! request we'll take a look at the headers and figure out where to route it.
//! After that, for tungstenite the websocket library, we'll want to replay the
//! data we already read as there's no ability to pass this in currently. That
//! means we'll parse headers twice, but alas!

use std::io::{self, Read, Write};

use bytes::BytesMut;
use futures::Poll;
use tokio_core::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};

pub struct WebpushIo {
tcp: TcpStream,
header_to_read: Option<BytesMut>,
}

impl WebpushIo {
pub fn new(tcp: TcpStream, header: BytesMut) -> WebpushIo {
WebpushIo {
tcp: tcp,
header_to_read: Some(header),
}
}
}

impl Read for WebpushIo {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
// Start off by replaying the bytes already read, and after that just
// delegate everything to the internal `TcpStream`
if let Some(ref mut header) = self.header_to_read {
let n = (&header[..]).read(buf)?;
header.split_to(n);
if buf.len() == 0 || n > 0 {
return Ok(n)
}
}
self.header_to_read = None;
self.tcp.read(buf)
}
}

// All `write` calls are routed through the `TcpStream` instance directly as we
// don't buffer this at all.
impl Write for WebpushIo {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.tcp.write(buf)
}

fn flush(&mut self) -> io::Result<()> {
self.tcp.flush()
}
}

impl AsyncRead for WebpushIo {
}

impl AsyncWrite for WebpushIo {
fn shutdown(&mut self) -> Poll<(), io::Error> {
AsyncWrite::shutdown(&mut self.tcp)
}
}

0 comments on commit 2430d35

Please sign in to comment.