Permalink
Browse files

Sub::Throttleをつかってworkerの処理時間分散

  • Loading branch information...
1 parent 6dfaafd commit 469b7ce551e591d8e88d408d928ca495159dc1c8 @nekokak committed Jul 6, 2010
Showing with 77 additions and 3 deletions.
  1. +16 −0 eg/enqueue.pl
  2. +11 −0 eg/lib/Worker/Test.pm
  3. +20 −0 eg/manager.pl
  4. +19 −0 eg/monitor.pl
  5. +11 −3 lib/Qudo/Parallel/Manager.pm
View
@@ -0,0 +1,16 @@
+#! /usr/bin/env perl
+use strict;
+use warnings;
+use Qudo;
+
+my $m = Qudo->new(
+ databases => [+{
+ dsn => 'dbi:mysql:qudo',
+ username => 'root',
+ password => '',
+ }],
+);
+
+for my $i (1..10000) {
+ $m->enqueue('Worker::Test', {args => $i});
+}
View
@@ -0,0 +1,11 @@
+package Worker::Test;
+use strict;
+use warnings;
+use base 'Qudo::Worker';
+sub work {
+ my ($class, $job) = @_;
+ srand(time ^ ($$ + ($$ << 15)));
+ sleep(int(rand(10)));
+ $job->completed;
+}
+1;
View
@@ -0,0 +1,20 @@
+#! /usr/bin/env perl
+use strict;
+use warnings;
+use Qudo::Parallel::Manager;
+
+my $m = Qudo::Parallel::Manager->new(
+ databases => [+{
+ dsn => 'dbi:mysql:qudo',
+ username => 'root',
+ password => '',
+ }],
+ manager_abilities => [qw/Worker::Test/],
+ min_spare_workers => 10,
+ max_spare_workers => 50,
+ max_workers => 50,
+ work_delay => 3,
+ debug => 1,
+);
+
+$m->run;
View
@@ -0,0 +1,19 @@
+#! /usr/bin/perl
+use strict;
+use warnings;
+use IO::Socket::INET;
+
+while (1) {
+ my $sock = IO::Socket::INET->new(
+ PeerHost => '127.0.0.1',
+ PeerPort => 90000,
+ Proto => 'tcp',
+ ) or die 'can not connect admin port.';
+
+ my $status = $sock->getline;
+ print $status, "\n";
+ $sock->close;
+
+ sleep(1);
+}
+
@@ -4,6 +4,7 @@ use warnings;
use Qudo;
use UNIVERSAL::require;
use Parallel::Prefork::SpareWorkers qw(:status);
+use Sub::Throttle qw/throttle/;
use IO::Socket;
our $VERSION = '0.01';
@@ -16,6 +17,7 @@ sub new {
my $min_spare_workers = delete $args{min_spare_workers} || 1;
my $max_spare_workers = delete $args{max_spare_workers} || $max_workers;
my $auto_load_worker = delete $args{auto_load_worker} || 1;
+ my $work_delay = $args{work_delay} || 5;
my $debug = delete $args{debug} || 0;
my $qudo = Qudo->new(%args);
@@ -27,6 +29,7 @@ sub new {
max_request_par_child => $max_request_par_child,
min_spare_workers => $min_spare_workers,
max_spare_workers => $max_spare_workers,
+ work_delay => $work_delay,
debug => $debug,
qudo => $qudo,
}, $class;
@@ -70,12 +73,16 @@ sub run {
my $reqs_before_exit = $self->{max_request_par_child};
- $SIG{TERM} = sub { $reqs_before_exit = 0 };
+ local $SIG{TERM} = sub { $reqs_before_exit = 0 };
while ($reqs_before_exit > 0) {
- if ($manager->work_once) {
+ if (throttle(0.5, sub { $manager->work_once })) {
+ $self->debug("WORK $$\n");
--$reqs_before_exit
+ } else {
+ sleep $self->{work_delay};
}
+ $self->debug("$$ $reqs_before_exit\n")
}
}
@@ -85,7 +92,7 @@ sub run {
$pm->wait_all_children;
- kill 'KILL', $c_pid;
+ kill 'TERM', $c_pid;
} else {
my $admin = IO::Socket::INET->new(
@@ -146,6 +153,7 @@ Qudo::Parallel::Manager - auto control forking manager process.
username => '',
password => '',
}],
+ work_delay => 3,
max_workers => 5,
min_spare_workers => 1,
max_spare_workers => 5,

0 comments on commit 469b7ce

Please sign in to comment.