Permalink
Browse files

port to may based version

  • Loading branch information...
Xudong-Huang committed Mar 12, 2017
1 parent 8ff4ee4 commit 0765ef06a0bf95c65e4434603cad4cea7e2826ab
Showing with 27 additions and 17 deletions.
  1. +1 −0 Cargo.toml
  2. +2 −0 examples/hello.rs
  3. +2 −1 src/client/pool.rs
  4. +1 −0 src/lib.rs
  5. +2 −2 src/net.rs
  6. +11 −8 src/server/listener.rs
  7. +4 −3 src/server/mod.rs
  8. +4 −3 src/server/response.rs
View
@@ -31,6 +31,7 @@ traitobject = "0.1"
typeable = "0.1"
unicase = "1.0"
url = "1.0"
may = "0.1"
[dev-dependencies]
env_logger = "0.4"
View
@@ -1,4 +1,5 @@
#![deny(warnings)]
extern crate may;
extern crate hyper;
extern crate env_logger;
@@ -12,6 +13,7 @@ fn hello(_: Request, res: Response) {
fn main() {
env_logger::init().unwrap();
may::config().set_io_workers(2).set_stack_size(0x2000);
let _listening = hyper::Server::http("127.0.0.1:3000").unwrap()
.handle(hello);
println!("Listening on http://127.0.0.1:3000");
View
@@ -4,7 +4,8 @@ use std::collections::HashMap;
use std::fmt;
use std::io::{self, Read, Write};
use std::net::{SocketAddr, Shutdown};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use may::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
View
@@ -128,6 +128,7 @@
//! implement `Reader` and can be read to get the data out of a `Response`.
//!
extern crate may;
extern crate base64;
extern crate time;
#[macro_use] extern crate url;
View
@@ -2,7 +2,8 @@
use std::any::{Any, TypeId};
use std::fmt;
use std::io::{self, ErrorKind, Read, Write};
use std::net::{SocketAddr, ToSocketAddrs, TcpStream, TcpListener, Shutdown};
use std::net::{SocketAddr, ToSocketAddrs, Shutdown};
use may::net::{TcpStream, TcpListener};
use std::mem;
use std::sync::Arc;
@@ -635,4 +636,3 @@ mod tests {
assert_eq!(mock, Box::new(MockStream::new()));
}
}
View
@@ -1,5 +1,6 @@
use std::sync::{Arc, mpsc};
use std::thread;
use std::sync::Arc;
use may::coroutine;
use may::sync::mpsc;
use net::NetworkListener;
@@ -27,9 +28,9 @@ impl<A: NetworkListener + Send + 'static> ListenerPool<A> {
let work = Arc::new(work);
// Begin work.
for _ in 0..threads {
spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone())
}
// for _ in 0..threads {
spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone());
// }
// Monitor for panics.
// FIXME(reem): This won't ever exit since we still have a super_tx handle.
@@ -42,12 +43,15 @@ impl<A: NetworkListener + Send + 'static> ListenerPool<A> {
fn spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A)
where A: NetworkListener + Send + 'static,
F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'static {
thread::spawn(move || {
coroutine::spawn(move || -> () {
let _sentinel = Sentinel::new(supervisor, ());
loop {
match acceptor.accept() {
Ok(stream) => work(stream),
Ok(stream) => {
let w = work.clone();
coroutine::spawn(move || w(stream));
}
Err(e) => {
info!("Connection failed: {}", e);
}
@@ -76,4 +80,3 @@ impl<T: Send + 'static> Drop for Sentinel<T> {
let _ = self.supervisor.send(self.value.take().unwrap());
}
}
View
@@ -110,7 +110,7 @@
use std::fmt;
use std::io::{self, ErrorKind, BufWriter, Write};
use std::net::{SocketAddr, ToSocketAddrs};
use std::thread::{self, JoinHandle};
use may::coroutine::{self, JoinHandle};
use std::time::Duration;
use num_cpus;
@@ -221,7 +221,8 @@ impl<S: SslServer + Clone + Send> Server<HttpsListener<S>> {
impl<L: NetworkListener + Send + 'static> Server<L> {
/// Binds to a socket and starts handling connections.
pub fn handle<H: Handler + 'static>(self, handler: H) -> ::Result<Listening> {
self.handle_threads(handler, num_cpus::get() * 5 / 4)
// start in 1000 coroutines here
self.handle_threads(handler, 1000)
}
/// Binds to a socket and starts handling connections with the provided
@@ -241,7 +242,7 @@ where H: Handler + 'static, L: NetworkListener + Send + 'static {
let worker = Worker::new(handler, server.timeouts);
let work = move |mut stream| worker.handle_connection(&mut stream);
let guard = thread::spawn(move || pool.accept(work, threads));
let guard = coroutine::spawn(move || pool.accept(work, threads));
Ok(Listening {
_guard: Some(guard),
View
@@ -352,15 +352,16 @@ mod tests {
#[cfg(not(all(windows, target_arch="x86", target_env="msvc")))]
#[test]
fn test_fresh_drop_panicing() {
use std::thread;
use std::sync::{Arc, Mutex};
use may::coroutine;
use std::sync::Arc;
use may::sync::Mutex;
use status::StatusCode;
let stream = MockStream::new();
let stream = Arc::new(Mutex::new(stream));
let inner_stream = stream.clone();
let join_handle = thread::spawn(move || {
let join_handle = coroutine::spawn(move || {
let mut headers = Headers::new();
let mut stream = inner_stream.lock().unwrap();
let mut res = Response::new(&mut *stream, &mut headers);

5 comments on commit 0765ef0

@daogangtang

This comment has been minimized.

daogangtang replied Jan 3, 2018

wonderful!

@alanhoff

This comment has been minimized.

alanhoff replied Jan 6, 2018

I wonder how it's handling HTTPS connections

@daogangtang

This comment has been minimized.

daogangtang replied Jan 8, 2018

can use now?

@Xudong-Huang

This comment has been minimized.

Collaborator

Xudong-Huang replied Jan 8, 2018

yes, this is ready to use. actually self.handle_threads(handler, 1000) has no effect at all, don't worry, it will automatically spawn enough coroutines.

@daogangtang

@daogangtang

This comment has been minimized.

daogangtang replied Jan 8, 2018

nice. sapper will make a new branch to test it.

Please sign in to comment.