diff --git a/examples/hello.rs b/examples/hello.rs index 0b736544ab..c3a1717328 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -14,7 +14,7 @@ static PHRASE: &'static [u8] = b"Hello World!"; struct Hello; impl Handler for Hello { - fn on_request(&mut self, _: Request) -> Next { + fn on_request(&mut self, _: Request) -> Next { Next::write() } fn on_request_readable(&mut self, _: &mut Decoder) -> Next { diff --git a/examples/server.rs b/examples/server.rs index 3a2d8ccb83..7a0dd44fdb 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -46,7 +46,7 @@ impl Echo { } impl Handler for Echo { - fn on_request(&mut self, req: Request) -> Next { + fn on_request(&mut self, req: Request) -> Next { match *req.uri() { RequestUri::AbsolutePath(ref path) => match (req.method(), &path[..]) { (&Get, "/") | (&Get, "/echo") => { diff --git a/examples/sync.rs b/examples/sync.rs deleted file mode 100644 index 6d4eacae64..0000000000 --- a/examples/sync.rs +++ /dev/null @@ -1,278 +0,0 @@ -extern crate hyper; -extern crate env_logger; -extern crate time; - -use std::io::{self, Read, Write}; -use std::marker::PhantomData; -use std::thread; -use std::sync::{Arc, mpsc}; - -pub struct Server { - listening: hyper::server::Listening, -} - -pub struct Request<'a> { - #[allow(dead_code)] - inner: hyper::server::Request, - tx: &'a mpsc::Sender, - rx: &'a mpsc::Receiver>, - ctrl: &'a hyper::Control, -} - -impl<'a> Request<'a> { - fn new(inner: hyper::server::Request, tx: &'a mpsc::Sender, rx: &'a mpsc::Receiver>, ctrl: &'a hyper::Control) -> Request<'a> { - Request { - inner: inner, - tx: tx, - rx: rx, - ctrl: ctrl, - } - } -} - -impl<'a> io::Read for Request<'a> { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.tx.send(Action::Read(buf.as_mut_ptr(), buf.len())).unwrap(); - self.ctrl.ready(hyper::Next::read()).unwrap(); - self.rx.recv().unwrap() - } -} - -pub enum Fresh {} -pub enum Streaming {} - -pub struct Response<'a, W = Fresh> { - status: hyper::StatusCode, - headers: hyper::Headers, - version: hyper::HttpVersion, - tx: &'a mpsc::Sender, - rx: &'a mpsc::Receiver>, - ctrl: &'a hyper::Control, - _marker: PhantomData, -} - -impl<'a> Response<'a, Fresh> { - fn new(tx: &'a mpsc::Sender, rx: &'a mpsc::Receiver>, ctrl: &'a hyper::Control) -> Response<'a, Fresh> { - Response { - status: hyper::Ok, - headers: hyper::Headers::new(), - version: hyper::HttpVersion::Http11, - tx: tx, - rx: rx, - ctrl: ctrl, - _marker: PhantomData, - } - } - - pub fn start(self) -> io::Result> { - self.tx.send(Action::Respond(self.version.clone(), self.status.clone(), self.headers.clone())).unwrap(); - self.ctrl.ready(hyper::Next::write()).unwrap(); - let res = self.rx.recv().unwrap(); - res.map(move |_| Response { - status: self.status, - headers: self.headers, - version: self.version, - tx: self.tx, - rx: self.rx, - ctrl: self.ctrl, - _marker: PhantomData, - }) - } - - pub fn send(mut self, msg: &[u8]) -> io::Result<()> { - self.headers.set(hyper::header::ContentLength(msg.len() as u64)); - self.start().and_then(|mut res| res.write_all(msg)).map(|_| ()) - } -} - -impl<'a> Write for Response<'a, Streaming> { - fn write(&mut self, msg: &[u8]) -> io::Result { - self.tx.send(Action::Write(msg.as_ptr(), msg.len())).unwrap(); - self.ctrl.ready(hyper::Next::write()).unwrap(); - let res = self.rx.recv().unwrap(); - res - } - - fn flush(&mut self) -> io::Result<()> { - panic!("Response.flush() not impemented") - } -} - -struct SynchronousHandler { - req_tx: mpsc::Sender, - tx: mpsc::Sender>, - rx: mpsc::Receiver, - reading: Option<(*mut u8, usize)>, - writing: Option<(*const u8, usize)>, - respond: Option<(hyper::HttpVersion, hyper::StatusCode, hyper::Headers)> -} - -unsafe impl Send for SynchronousHandler {} - -impl SynchronousHandler { - fn next(&mut self) -> hyper::Next { - match self.rx.try_recv() { - Ok(Action::Read(ptr, len)) => { - self.reading = Some((ptr, len)); - hyper::Next::read() - }, - Ok(Action::Respond(ver, status, headers)) => { - self.respond = Some((ver, status, headers)); - hyper::Next::write() - }, - Ok(Action::Write(ptr, len)) => { - self.writing = Some((ptr, len)); - hyper::Next::write() - } - Err(mpsc::TryRecvError::Empty) => { - // we're too fast, the other thread hasn't had a chance to respond - hyper::Next::wait() - } - Err(mpsc::TryRecvError::Disconnected) => { - // they dropped it - // TODO: should finish up sending response, whatever it was - hyper::Next::end() - } - } - } - - fn reading(&mut self) -> Option<(*mut u8, usize)> { - self.reading.take().or_else(|| { - match self.rx.try_recv() { - Ok(Action::Read(ptr, len)) => { - Some((ptr, len)) - }, - _ => None - } - }) - } - - fn writing(&mut self) -> Option<(*const u8, usize)> { - self.writing.take().or_else(|| { - match self.rx.try_recv() { - Ok(Action::Write(ptr, len)) => { - Some((ptr, len)) - }, - _ => None - } - }) - } - fn respond(&mut self) -> Option<(hyper::HttpVersion, hyper::StatusCode, hyper::Headers)> { - self.respond.take().or_else(|| { - match self.rx.try_recv() { - Ok(Action::Respond(ver, status, headers)) => { - Some((ver, status, headers)) - }, - _ => None - } - }) - } -} - -impl hyper::server::Handler for SynchronousHandler { - fn on_request(&mut self, req: hyper::server::Request) -> hyper::Next { - if let Err(_) = self.req_tx.send(req) { - return hyper::Next::end(); - } - - self.next() - } - - fn on_request_readable(&mut self, decoder: &mut hyper::Decoder) -> hyper::Next { - if let Some(raw) = self.reading() { - let slice = unsafe { ::std::slice::from_raw_parts_mut(raw.0, raw.1) }; - if self.tx.send(decoder.read(slice)).is_err() { - return hyper::Next::end(); - } - } - self.next() - } - - fn on_response(&mut self, req: &mut hyper::server::Response) -> hyper::Next { - use std::iter::Extend; - if let Some(head) = self.respond() { - req.set_status(head.1); - req.headers_mut().extend(head.2.iter()); - if self.tx.send(Ok(0)).is_err() { - return hyper::Next::end(); - } - } else { - // wtf happened? - panic!("no head to respond with"); - } - self.next() - } - - fn on_response_writable(&mut self, encoder: &mut hyper::Encoder) -> hyper::Next { - if let Some(raw) = self.writing() { - let slice = unsafe { ::std::slice::from_raw_parts(raw.0, raw.1) }; - if self.tx.send(encoder.write(slice)).is_err() { - return hyper::Next::end(); - } - } - self.next() - } -} - -enum Action { - Read(*mut u8, usize), - Write(*const u8, usize), - Respond(hyper::HttpVersion, hyper::StatusCode, hyper::Headers), -} - -unsafe impl Send for Action {} - -trait Handler: Send + Sync + 'static { - fn handle(&self, req: Request, res: Response); -} - -impl Handler for F where F: Fn(Request, Response) + Send + Sync + 'static { - fn handle(&self, req: Request, res: Response) { - (self)(req, res) - } -} - -impl Server { - fn handle(addr: &str, handler: H) -> Server { - let handler = Arc::new(handler); - let (listening, server) = hyper::Server::http(&addr.parse().unwrap()).unwrap() - .handle(move |ctrl| { - let (req_tx, req_rx) = mpsc::channel(); - let (blocking_tx, blocking_rx) = mpsc::channel(); - let (async_tx, async_rx) = mpsc::channel(); - let handler = handler.clone(); - thread::Builder::new().name("handler-thread".into()).spawn(move || { - let req = Request::new(req_rx.recv().unwrap(), &blocking_tx, &async_rx, &ctrl); - let res = Response::new(&blocking_tx, &async_rx, &ctrl); - handler.handle(req, res); - }).unwrap(); - - SynchronousHandler { - req_tx: req_tx, - tx: async_tx, - rx: blocking_rx, - reading: None, - writing: None, - respond: None, - } - }).unwrap(); - thread::spawn(move || { - server.run(); - }); - Server { - listening: listening - } - } -} - -fn main() { - env_logger::init().unwrap(); - let s = Server::handle("127.0.0.1:0", |mut req: Request, res: Response| { - let mut body = [0; 256]; - let n = req.read(&mut body).unwrap(); - println!("!!!: received: {:?}", ::std::str::from_utf8(&body[..n]).unwrap()); - - res.send(b"Hello World!").unwrap(); - }); - println!("listening on {}", s.listening.addr()); -} diff --git a/src/client/mod.rs b/src/client/mod.rs index 65443e78be..32656c556d 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -310,7 +310,7 @@ impl, T: Transport> http::MessageHandler for Message { self.handler.on_request_writable(transport) } - fn on_incoming(&mut self, head: http::ResponseHead) -> Next { + fn on_incoming(&mut self, head: http::ResponseHead, _: &T) -> Next { trace!("on_incoming {:?}", head); let resp = response::new(head); self.handler.on_response(resp) diff --git a/src/http/conn.rs b/src/http/conn.rs index 1e335b3967..8c6c23e1af 100644 --- a/src/http/conn.rs +++ b/src/http/conn.rs @@ -164,7 +164,7 @@ impl> ConnInner { trace!("decoder = {:?}", decoder); let keep_alive = self.keep_alive_enabled && head.should_keep_alive(); let mut handler = scope.create(Seed(&self.key, &self.ctrl.0)); - let next = handler.on_incoming(head); + let next = handler.on_incoming(head, &self.transport); trace!("handler.on_incoming() -> {:?}", next); match next.interest { @@ -231,7 +231,7 @@ impl> ConnInner { if http1.keep_alive { http1.keep_alive = head.should_keep_alive(); } - let next = http1.handler.on_incoming(head); + let next = http1.handler.on_incoming(head, &self.transport); http1.reading = Reading::Wait(decoder); trace!("handler.on_incoming() -> {:?}", next); Some(next) @@ -874,7 +874,7 @@ impl Chunk { pub trait MessageHandler { type Message: Http1Message; - fn on_incoming(&mut self, head: http::MessageHead<::Incoming>) -> Next; + fn on_incoming(&mut self, head: http::MessageHead<::Incoming>, transport: &T) -> Next; fn on_outgoing(&mut self, head: &mut http::MessageHead<::Outgoing>) -> Next; fn on_decode(&mut self, &mut http::Decoder) -> Next; fn on_encode(&mut self, &mut http::Encoder) -> Next; diff --git a/src/server/message.rs b/src/server/message.rs index 81c12a3eb7..5870445d9d 100644 --- a/src/server/message.rs +++ b/src/server/message.rs @@ -28,9 +28,9 @@ impl, T: Transport> Message { impl, T: Transport> http::MessageHandler for Message { type Message = http::ServerMessage; - fn on_incoming(&mut self, head: http::RequestHead) -> Next { + fn on_incoming(&mut self, head: http::RequestHead, transport: &T) -> Next { trace!("on_incoming {:?}", head); - let req = request::new(head); + let req = request::new(head, transport); self.handler.on_request(req) } diff --git a/src/server/mod.rs b/src/server/mod.rs index 996631f0c2..706773065b 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -324,7 +324,7 @@ impl Listening { /// Each event handler returns it's desired `Next` action. pub trait Handler { /// This event occurs first, triggering when a `Request` has been parsed. - fn on_request(&mut self, request: Request) -> Next; + fn on_request(&mut self, request: Request) -> Next; /// This event occurs each time the `Request` is ready to be read from. fn on_request_readable(&mut self, request: &mut http::Decoder) -> Next; /// This event occurs after the first time this handled signals `Next::write()`. diff --git a/src/server/request.rs b/src/server/request.rs index 0cee723a20..cc6d435caf 100644 --- a/src/server/request.rs +++ b/src/server/request.rs @@ -3,13 +3,15 @@ //! These are requests that a `hyper::Server` receives, and include its method, //! target URI, headers, and message body. +use std::fmt; + use version::HttpVersion; use method::Method; use header::Headers; use http::{RequestHead, MessageHead, RequestLine}; use uri::RequestUri; -pub fn new(incoming: RequestHead) -> Request { +pub fn new<'a, T>(incoming: RequestHead, transport: &'a T) -> Request<'a, T> { let MessageHead { version, subject: RequestLine(method, uri), headers } = incoming; debug!("Request Line: {:?} {:?} {:?}", method, uri, version); debug!("{:#?}", headers); @@ -19,22 +21,31 @@ pub fn new(incoming: RequestHead) -> Request { uri: uri, headers: headers, version: version, + transport: transport, } } /// A request bundles several parts of an incoming `NetworkStream`, given to a `Handler`. -#[derive(Debug)] -pub struct Request { - // The IP address of the remote connection. - //remote_addr: SocketAddr, +pub struct Request<'a, T: 'a> { method: Method, - headers: Headers, uri: RequestUri, version: HttpVersion, + headers: Headers, + transport: &'a T, } +impl<'a, T> fmt::Debug for Request<'a, T> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Request") + .field("method", &self.method) + .field("uri", &self.uri) + .field("version", &self.version) + .field("headers", &self.headers) + .finish() + } +} -impl Request { +impl<'a, T> Request<'a, T> { /// The `Method`, such as `Get`, `Post`, etc. #[inline] pub fn method(&self) -> &Method { &self.method } @@ -43,6 +54,10 @@ impl Request { #[inline] pub fn headers(&self) -> &Headers { &self.headers } + /// The underlying `Transport` of this request. + #[inline] + pub fn transport(&self) -> &'a T { self.transport } + /// The target request-uri for this request. #[inline] pub fn uri(&self) -> &RequestUri { &self.uri } diff --git a/tests/server.rs b/tests/server.rs index b92c4d088f..a93ac3c125 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -99,7 +99,7 @@ impl TestHandler { } impl Handler for TestHandler { - fn on_request(&mut self, _req: Request) -> Next { + fn on_request(&mut self, _req: Request) -> Next { //self.tx.send(Msg::Head(req)).unwrap(); self.next(Next::read()) }