diff --git a/mpsrc/wavy_loop.cc b/mpsrc/wavy_loop.cc index 9f70081..c535403 100644 --- a/mpsrc/wavy_loop.cc +++ b/mpsrc/wavy_loop.cc @@ -211,6 +211,7 @@ void loop_impl::thread_main() goto process_handler; } +#if 0 if(!m_pollable) { if(m_out->has_queue()) { do_out(lk); @@ -226,6 +227,26 @@ void loop_impl::thread_main() do_task(lk); goto retry; } +#endif + +#if 1 + if(m_out->has_queue()) { + do_out(lk); + goto retry; + } + if(!m_pollable) { + if(!m_task_queue.empty()) { + do_task(lk); + goto retry; + } else { + m_cond.wait(m_mutex); + goto retry_task; + } + } else if(m_task_queue.size() > MP_WAVY_TASK_QUEUE_LIMIT) { + do_task(lk); + goto retry; + } +#endif if(m_num == m_off) { m_pollable = false; diff --git a/test/Makefile.am b/test/Makefile.am index c66e7ac..20e2a7e 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -10,7 +10,8 @@ check_PROGRAMS = \ signal \ timer \ many_timer \ - sync + sync \ + huge_message TESTS = $(check_PROGRAMS) @@ -26,3 +27,4 @@ many_timer_SOURCES = many_timer.cc sync_SOURCES = sync.cc +huge_message_SOURCES = huge_message.cc diff --git a/test/huge_message.cc b/test/huge_message.cc new file mode 100644 index 0000000..800f048 --- /dev/null +++ b/test/huge_message.cc @@ -0,0 +1,172 @@ +// Copyright (C) 2013 Preferred Infrastructure and Nippon Telegraph and Telephone Corporation. + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +using namespace mp::placeholders; + +size_t total_write_size = 1024*1024*128; // KByte + +typedef std::vector buffer_t; + +class server_handler: public mp::wavy::handler { +public: + server_handler(int fd, mp::wavy::loop *lo) : + mp::wavy::handler(fd), + loop_(lo) {} + + void on_read(mp::wavy::event& ev) { + buffer_t rbuf(1024); + ssize_t read_size = ::read(fd(), &rbuf[0], rbuf.size()); + if ( read_size < 0 ) { + perror("read error"); + exit(1); + } else if ( read_size == 0 ) { + ev.remove(); + return; + } + + std::auto_ptr buf( new buffer_t(total_write_size) ); + loop_->write(fd(), &((*buf)[0]), buf->size(), buf); + std::cerr << "wrote " << total_write_size << "bytes" + << std::endl; + } + static void accepted(mp::wavy::loop *lo, int fd, int err) { + if(fd < 0) { + errno = err; + perror("accept error"); + exit(1); + } + + try { + std::cerr << "accepted. " << std::endl; + lo->add_handler(fd, lo); + + } catch(...) { + throw; + } + } + +private: + mp::wavy::loop *loop_; +}; + +class client_handler: public mp::wavy::handler { +public: + client_handler(int fd, mp::wavy::loop *lo) : + mp::wavy::handler(fd), + loop_(lo), + buf_(1024*1024), + total_size_(0) { } + + void on_read(mp::wavy::event& ev) { + ssize_t read_size = read( fd(), &buf_[0], buf_.size()); + std::cerr << "." << std::flush; + + if ( read_size <= 0 ) { + std::cerr << std::endl + << "session closed with " << read_size + << std::endl; + + ev.remove(); + loop_->end(); + exit(1); + } + total_size_ += read_size; + + if ( total_size_ == total_write_size ) { + std::cerr << std::endl << "OK" << std::endl; + ev.remove(); + loop_->end(); + return; + } + } + + static void connected(mp::wavy::loop *lo, int fd, int err) { + if ( fd < 0 ) { + errno = err; + perror("connect error"); + exit(1); + } + + try { + std::cerr << "conencted" << std::endl; + + buffer_t buf(32); + if ( ::write(fd, &buf[0], buf.size()) < 0 ) { + perror("write error"); + exit(1); + } + + + lo->add_handler(fd, lo); + lo->add_timer( 10.0, 0.0, + mp::bind( &client_handler::on_timed_out, + lo)); + } catch(...) { + ::close(fd); + throw; + } + } + + static bool on_timed_out(mp::wavy::loop *lo) { + lo->end(); + std::cerr << "timeout" << std::endl; + exit(1); + return false; + } + +private: + mp::wavy::loop *loop_; + std::vector buf_; + size_t total_size_; +}; + +int main(int argc, char **argv) +{ + std::string host = "127.0.0.1"; + int port = 9090; + if ( argv[1] ) { + host = argv[1]; + if ( argv[2] ) port = strtoul( argv[2], NULL, 10); + } + + mp::wavy::loop lo_server; + + struct sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_port = htons(port); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_ANY); + + lo_server.listen(PF_INET, SOCK_STREAM, 0, + (struct sockaddr*)&addr, sizeof(addr), + mp::bind(&server_handler::accepted, + &lo_server, _1, _2)); + + lo_server.start(1); // run with 1 threads + + mp::wavy::loop lo_client; + lo_client.start(2); // run with 1 threads + + addr.sin_addr.s_addr = inet_addr("127.0.0.1"); + lo_client.connect(PF_INET, SOCK_STREAM, 0, + (struct sockaddr*)&addr, sizeof(addr), + 0.0, + mp::bind(&client_handler::connected, + &lo_client, _1, _2)); + + lo_client.join(); + lo_server.end(); + lo_server.join(); + + return 0; +}