Skip to content
Browse files

better load balancing + improving performance

  • Loading branch information...
1 parent bfa2328 commit 85bd13a747247a790dd86240109242ceb8507cfb @kmx committed Oct 16, 2012
Showing with 42 additions and 59 deletions.
  1. +42 −59 metyl
View
101 metyl
@@ -3,7 +3,8 @@
use threads;
use threads::shared;
-use 5.012;
+use 5.010;
+use strict;
use warnings;
use Socket qw(IPPROTO_TCP TCP_NODELAY);
@@ -25,66 +26,47 @@ my $workers = 4;
sub thread_dispatcher {
my ($TQ_handles, $TQ_logs) = @_;
- my %clients;
- my $accept_count = 0;
- my $tick_count = 0;
- #create selector
- my $read_selector = IO::Select->new();
+ $TQ_handles->[$_]->wait_for_listener() for (0..$workers-1);
+ $TQ_logs->wait_for_listener();
- #open listen socket
my $listener = IO::Socket::INET->new(
- LocalAddr=>$addr,
- LocalPort=>$port,
- Proto=>'tcp',
- Listen=>SOMAXCONN,
- Reuse=>1,
- Type=> SOCK_STREAM,
- );
- die "FATAL: Can't start listening: $!" unless $listener;
- $listener->blocking(0);
+ LocalAddr =>$addr,
+ LocalPort =>$port,
+ Proto =>'tcp',
+ Listen =>SOMAXCONN,
+ ReuseAddr =>1,
+ Blocking => 0,
+ Type => SOCK_STREAM,
+ ) or die "FATAL: Can't start listening: $!";
+
+ my $read_selector = IO::Select->new();
$read_selector->add($listener);
- $TQ_handles->wait_for_listener();
- $TQ_logs->wait_for_listener();
+ my $accept_count = 0;
+ my %clients;
$TQ_logs->enqueue(threads->tid, "Info", "entering dispatcher loop, listening at '$addr:$port'");
- while (1) {
- $tick_count++;
- # do blocking select with reasonable timeout
- my ($re) = IO::Select->select($read_selector, undef, undef, 3); # 3 sec timeout is fine here
- my @ready = defined $re ? @$re : ();
-
- #handle accepted sockets
- for my $s (@ready) {
+ while (1) {
+ my @a = $read_selector->can_read(0.25);
+ for my $s (@a) {
my $fd = $s->accept();
next unless $fd;
- $accept_count++;
- #BEWARE: passing socket is not thread-safe therefore pass only fileno to other thread
- my $resp_id = $TQ_handles->enqueue('newfd', time, $fd->fileno); #non-blocking
- $clients{$resp_id} = $fd;
- #cannot close the socket here as we need to be sure that worker thread has taken it over
+ my $w = $accept_count++ % $workers;
+ my $id = $TQ_handles->[$w]->enqueue('newfd', $fd->fileno);
+ $clients{"$w.$id"} = $fd;
}
-
- # get all available ids from TQ_handles queue
- my @available = $TQ_handles->available;
- @available = () unless defined($available[0]); # because if none available: @qa = (undef);
-
- # handle responses from TQ_handles (response means that corresponding socket can be closed on dispatcher side)
- for my $id (@available) {
- my $response = $TQ_handles->dequeue_response($id);
- my $fd = delete $clients{$id};
- $fd->close if defined $fd;
+ for my $w (0..$workers-1) {
+ my @i = $TQ_handles->[$w]->available;
+ next unless defined $i[0]; #if none available: @i = (undef);
+ for my $id (@i) {
+ $TQ_handles->[$w]->dequeue_response($id);
+ delete $clients{"$w.$id"}; # will be destroyed&closed automatically
+ }
}
-
- # collect and print some debug info
- my $ar = scalar(@ready);
- my $qa = scalar(@available);
- my $qp = $TQ_handles->pending;
- my $tm = localtime->hms;
- warn "DEBUG[$tm]: q.pending=$qp; q.ready=$qa; acc.ready=$ar acc.count=$accept_count\n" if $tick_count % 10 == 0;
}
+
}
sub thread_logger {
@@ -109,14 +91,12 @@ sub thread_worker {
my $mojosrv = Mojo::Server::Daemon->new(listen =>[]); # do not listen
$mojosrv->load_app($mojo_app_path);
- $mojosrv->ioloop->recurring(0.5 =>
+ $mojosrv->ioloop->recurring(0.02 =>
sub {
while (my $dq = $TQ_handles->dequeue_nb) { # NON-BLOCKING get socket from TQ_handles queue
- #XXX-FIXME: we are taking over all available handles from TQ_handles queue
- #XXX-FIXME: which is not too clever, better load balancing needed
- my ($id, $cmd, $timestamp, $fn) = @$dq;
+ my ($id, $cmd, $fn) = @$dq;
my $socket = IO::Socket::INET->new_from_fd($fn, '+>');
- $TQ_handles->respond($id, 'handle taken'); # now we can tell dispatcher that we taken the socket
+ $TQ_handles->respond($id, 'handle taken'); # tell dispatcher that we took the socket
if ($socket) {
$socket->blocking(0);
setsockopt($socket, IPPROTO_TCP, TCP_NODELAY, 1);
@@ -139,7 +119,6 @@ sub thread_worker {
$TQ_logs->enqueue(threads->tid, "Info", "worker[$wid] entering mojo loop");
$mojosrv->run;
- $TQ_logs->enqueue(threads->tid, "Info", "worker[$wid] quitting");
}
### main
@@ -158,12 +137,16 @@ These options are available:
-w, --workers <num> Set number of worker threads (default: 4)
EOF
-my $TQ_handles = Thread::Queue::Duplex->new(ListenerRequired => 1, MaxPending => 10000);
-my $TQ_logs = Thread::Queue::Duplex->new(ListenerRequired => 1, MaxPending => 100);
-
-my $TH_dispatcher = threads->new(\&thread_dispatcher, $TQ_handles, $TQ_logs);
+my @TQ_handles;
+my $TQ_logs = Thread::Queue::Duplex->new(ListenerRequired => 1, MaxPending => 100);
my $TH_logger = threads->new(\&thread_logger, $TQ_logs);
-threads->new(\&thread_worker, $_-1, $TQ_handles, $TQ_logs, $mojo_app_path)->detach for (1..$workers);
+for my $w (0..$workers-1) {
+ $TQ_handles[$w] = Thread::Queue::Duplex->new(ListenerRequired => 1, MaxPending => 1000);
+ threads->new(\&thread_worker, $w, $TQ_handles[$w], $TQ_logs, $mojo_app_path)->detach;
+}
+
+my $TH_dispatcher = threads->new(\&thread_dispatcher, \@TQ_handles, $TQ_logs);
$TH_dispatcher->join;
+
die "FATAL: We should never reach this point!";

0 comments on commit 85bd13a

Please sign in to comment.
Something went wrong with that request. Please try again.