Skip to content

Commit

Permalink
fix event-loop dead-lock on single-large-async write (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
y-oda-oni-juba committed Apr 10, 2013
1 parent 2d3bd2e commit a47769b
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 1 deletion.
21 changes: 21 additions & 0 deletions mpsrc/wavy_loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ void loop_impl::thread_main()
goto process_handler;
}

#if 0
if(!m_pollable) {
if(m_out->has_queue()) {
do_out(lk);
Expand All @@ -226,6 +227,26 @@ void loop_impl::thread_main()
do_task(lk);
goto retry;
}
#endif

#if 1
if(m_out->has_queue()) {
do_out(lk);
goto retry;
}
if(!m_pollable) {
if(!m_task_queue.empty()) {
do_task(lk);
goto retry;
} else {
m_cond.wait(m_mutex);
goto retry_task;
}
} else if(m_task_queue.size() > MP_WAVY_TASK_QUEUE_LIMIT) {
do_task(lk);
goto retry;
}
#endif

if(m_num == m_off) {
m_pollable = false;
Expand Down
4 changes: 3 additions & 1 deletion test/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ check_PROGRAMS = \
signal \
timer \
many_timer \
sync
sync \
huge_message

TESTS = $(check_PROGRAMS)

Expand All @@ -26,3 +27,4 @@ many_timer_SOURCES = many_timer.cc

sync_SOURCES = sync.cc

huge_message_SOURCES = huge_message.cc
172 changes: 172 additions & 0 deletions test/huge_message.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright (C) 2013 Preferred Infrastructure and Nippon Telegraph and Telephone Corporation.

#include <iostream>
#include <string>
#include <vector>
#include <stdexcept>

#include <jubatus/mp/wavy.h>
#include <jubatus/mp/functional.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>

using namespace mp::placeholders;

size_t total_write_size = 1024*1024*128; // KByte

typedef std::vector<char> buffer_t;

class server_handler: public mp::wavy::handler {
public:
server_handler(int fd, mp::wavy::loop *lo) :
mp::wavy::handler(fd),
loop_(lo) {}

void on_read(mp::wavy::event& ev) {
buffer_t rbuf(1024);
ssize_t read_size = ::read(fd(), &rbuf[0], rbuf.size());
if ( read_size < 0 ) {
perror("read error");
exit(1);
} else if ( read_size == 0 ) {
ev.remove();
return;
}

std::auto_ptr<buffer_t> buf( new buffer_t(total_write_size) );
loop_->write(fd(), &((*buf)[0]), buf->size(), buf);
std::cerr << "wrote " << total_write_size << "bytes"
<< std::endl;
}
static void accepted(mp::wavy::loop *lo, int fd, int err) {
if(fd < 0) {
errno = err;
perror("accept error");
exit(1);
}

try {
std::cerr << "accepted. " << std::endl;
lo->add_handler<server_handler>(fd, lo);

} catch(...) {
throw;
}
}

private:
mp::wavy::loop *loop_;
};

class client_handler: public mp::wavy::handler {
public:
client_handler(int fd, mp::wavy::loop *lo) :
mp::wavy::handler(fd),
loop_(lo),
buf_(1024*1024),
total_size_(0) { }

void on_read(mp::wavy::event& ev) {
ssize_t read_size = read( fd(), &buf_[0], buf_.size());
std::cerr << "." << std::flush;

if ( read_size <= 0 ) {
std::cerr << std::endl
<< "session closed with " << read_size
<< std::endl;

ev.remove();
loop_->end();
exit(1);
}
total_size_ += read_size;

if ( total_size_ == total_write_size ) {
std::cerr << std::endl << "OK" << std::endl;
ev.remove();
loop_->end();
return;
}
}

static void connected(mp::wavy::loop *lo, int fd, int err) {
if ( fd < 0 ) {
errno = err;
perror("connect error");
exit(1);
}

try {
std::cerr << "conencted" << std::endl;

buffer_t buf(32);
if ( ::write(fd, &buf[0], buf.size()) < 0 ) {
perror("write error");
exit(1);
}


lo->add_handler<client_handler>(fd, lo);
lo->add_timer( 10.0, 0.0,
mp::bind( &client_handler::on_timed_out,
lo));
} catch(...) {
::close(fd);
throw;
}
}

static bool on_timed_out(mp::wavy::loop *lo) {
lo->end();
std::cerr << "timeout" << std::endl;
exit(1);
return false;
}

private:
mp::wavy::loop *loop_;
std::vector<char> buf_;
size_t total_size_;
};

int main(int argc, char **argv)
{
std::string host = "127.0.0.1";
int port = 9090;
if ( argv[1] ) {
host = argv[1];
if ( argv[2] ) port = strtoul( argv[2], NULL, 10);
}

mp::wavy::loop lo_server;

struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_port = htons(port);
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);

lo_server.listen(PF_INET, SOCK_STREAM, 0,
(struct sockaddr*)&addr, sizeof(addr),
mp::bind(&server_handler::accepted,
&lo_server, _1, _2));

lo_server.start(1); // run with 1 threads

mp::wavy::loop lo_client;
lo_client.start(2); // run with 1 threads

addr.sin_addr.s_addr = inet_addr("127.0.0.1");
lo_client.connect(PF_INET, SOCK_STREAM, 0,
(struct sockaddr*)&addr, sizeof(addr),
0.0,
mp::bind(&client_handler::connected,
&lo_client, _1, _2));

lo_client.join();
lo_server.end();
lo_server.join();

return 0;
}

0 comments on commit a47769b

Please sign in to comment.