Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Some enhancements to gearman

This is some older code, and I can't really guarantee the sanity
of this code, however it Works For Us (TM).

The healthtest module is especially useful
  • Loading branch information...
commit 5cc4b71f12c556bb9f8c4a221c0fead857bf0e37 1 parent edf9e29
@mnunberg authored
View
9 lib/Gearman/Client.pm
@@ -3,12 +3,12 @@
package Gearman::Client;
our $VERSION;
-$VERSION = '1.10';
+$VERSION = '1.11';
use strict;
use IO::Socket::INET;
use Socket qw(IPPROTO_TCP TCP_NODELAY SOL_SOCKET);
-
+use UNIVERSAL qw(isa);
use Gearman::Objects;
use Gearman::Task;
use Gearman::Taskset;
@@ -70,7 +70,7 @@ sub _get_task_from_args {
my Gearman::Task $task;
if (ref $_[0]) {
$task = $_[0];
- Carp::croak("Argument isn't a Gearman::Task") unless ref $_[0] eq "Gearman::Task";
+ Carp::croak("Argument isn't a Gearman::Task") unless isa(ref $_[0], "Gearman::Task");
} else {
my ($func, $arg_p, $opts) = @_;
my $argref = ref $arg_p ? $arg_p : \$arg_p;
@@ -214,8 +214,7 @@ sub _get_js_sock {
my $sock = IO::Socket::INET->new(PeerAddr => $hostport,
Timeout => 1)
or return undef;
-
- setsockopt($sock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
+ setsockopt($sock, SOL_SOCKET, TCP_NODELAY, pack("l", 1));
$sock->autoflush(1);
# If exceptions support is to be requested, and the request fails, disable
View
399 lib/Gearman/Healthtest.pm
@@ -0,0 +1,399 @@
+#!/usr/bin/perl
+
+#the following begin block allows use to make a pseudo-module named _Constants
+#and use it in our code via the fake '_Constants.pm' file.. we need a BEGIN
+#here to modify the %INC hash
+
+BEGIN {
+ use strict;
+ use warnings;
+ package _Constants;
+ use base qw(Exporter);
+ our @EXPORT = qw(_FAILED _SUCCESS _INPROGRESS randstring);
+ use List::Util "shuffle";
+ use constant _FAILED => -1;
+ use constant _INPROGRESS => 0;
+ use constant _SUCCESS => 1;
+ sub randstring {
+ my $length = $_[0];
+ my @l = ("a".."z", "A".."Z");
+ return join("", (shuffle(@l))[0..$length-1]);
+ }
+ $INC{"_Constants.pm"} = 1;
+}
+
+#this wraps a Gearman::Task object, so that we can get a more accurate understanding
+#of whether a task completed successfully or not
+package _TaskWrapper;
+use strict;
+use warnings;
+use base "Gearman::Task";
+use _Constants;
+#Gearman::Task uses the fields module, and thus in order to extend the class,
+#we need to use fields as well
+use fields qw(status);
+sub new {
+ my ($cls, $qname, $arg, $opts) = @_;
+ my $self = fields::new($cls);
+ $self->status(_INPROGRESS);
+ $self->SUPER::new($qname,$arg,$opts);
+ return $self;
+}
+#gets/sets an integer constant representing the current status
+sub status {
+ my $self = shift;
+ my $new = shift;
+ if (defined $new) { $self->{status} = $new }
+ return $self->{status};
+}
+
+#the following two are hacks, but we need to hook into this calls in order to
+#determine properly whether our server has failed or not. These are called
+#internally by Gearman::Task upon completion/failure
+sub complete {
+ my $self = shift;
+ $self->status(_SUCCESS);
+ return $self->SUPER::complete(@_);
+}
+
+sub final_fail {
+ my $self = shift;
+ $self->status(_FAILED);
+ return $self->SUPER::final_fail(@_);
+}
+
+#just a context manager for us.. provides two distinct pieces of functionality
+#it allows us to test a jobserver by setting up a dummy worker and spamming it
+#with jobs, and it also allows us to fetch the status of individual queues in
+#a hash
+=head1 DESCRIPTION
+
+Gearman::Healthtest is a module that allows one to test the status of a given
+gearman server. It requires UNIVERSAL::ref as a dependency, and must be B<use>d
+before any Gearman:: modules
+
+=head1 SYNOPSIS
+
+ my $tester = Gearman::Healthtest->new(
+ "jobserver:4730", $timeout, $test_queue_name);
+ my ($njobs, $time_taken_to_complete) = $tester->test();
+ my %status_hash = $tester->getstatus(newsock=>1, timeout=> $timeout);
+
+=cut
+
+package _Util;
+use IO::Socket::INET;
+use Data::Dumper;
+sub force_so_linger {
+ my $fn = shift;
+ my $args = \@_;
+ {
+ #this is a weird hack, but basically what this does is force an
+ #SO_LINGER on the socket. what this means is that close() will wait
+ #until the socket is actually close rather than backgrounding it.
+ #this is important to us if we are going to be making this call
+ #frequently.
+ my $real_newsock = \&IO::Socket::INET::new;
+ no warnings;
+ local *IO::Socket::INET::new = sub {
+ my $ret = $real_newsock->(@_);
+ return $ret if !defined $ret;
+ setsockopt($ret, SOL_SOCKET, SO_LINGER, pack("ii", (1, 0)));
+ return $ret;
+ };
+ use warnings;
+ return $fn->(@$args);
+ }
+}
+
+package Gearman::Healthtest;
+use _Constants;
+use strict;
+use warnings;
+use Gearman::Client;
+use Gearman::Worker;
+use IO::Socket::INET;
+use POSIX;
+use Hash::Util qw(lock_keys unlock_keys);
+use Time::HiRes qw(sleep time);
+use constant MAXTASKS => 200;
+use constant DEFAULT_DATALEN => 4096;
+use Data::Dumper;
+
+#fields, because i suck at typing and still need to learn something like Moose
+#or whatever
+my @__fields = ();
+=head2 METHODS
+
+=over
+
+=item B<< Gearman::Healthtest->new() >>
+
+initialize a new tester object, providing $jobserver, $timeout, $queue_name
+as arguments, as well as a hash of other optional arguments:
+ C<$jobserver>: e.g. "foo.bar.com:4730"
+ C<$timeout>: timeout in seconds. this can be a floating point number
+ C<$queue_name>: the name of the queue we can test in gearman.
+ there is no default, so make sure you choose something which doesn't
+ conflict with a real queue.
+
+optional keyword arguments currently are
+ C<datalen>: the length of sample data (actually a bunch of null bytes)
+ to exchange between the client and worker.
+ This currently defaults to 4096 if not specified
+
+In the background this actually forks and creates a worker which registers with
+the jobserver, then spams it (via the jobserver) with requests.
+=cut
+push @__fields, (qw(jobserver timeout queue_name client worker testdata));
+sub new {
+ my $cls = shift;
+ my $self = {};
+ #can't bless a locked hash, so bless it ASAP
+ bless($self, $cls);
+ lock_keys(%$self, @__fields);
+
+ #positional arguments...
+ $self->{$_} = shift @_ foreach qw(jobserver timeout queue_name);
+ my (%opts) = @_;
+ $self->{testdata} = \scalar("I" x scalar((defined $opts{datalen}) ?
+ $opts{datalen} : DEFAULT_DATALEN));
+
+
+ $self->{client} = Gearman::Client->new();
+ $self->{client}->job_servers($self->{jobserver});
+ return $self;
+}
+
+sub _do_worker {
+ $|++;
+ my $self = shift;
+ my $pid = fork();
+ my $begin_time = time();
+ my $time_remaining = $self->{timeout};
+ if($pid > 0) {return $pid;}
+ elsif($pid < 0) { warn "$!"; return}
+ my $stop_work = 0;
+ my @sockets;
+ my $worker = Gearman::Worker->new();
+ $worker->job_servers($self->{jobserver});
+ $worker->register_function($self->{queue_name}, sub { $self->{testdata} });
+ $SIG{USR1} = sub { exit(1); };
+
+ while(!$stop_work && $time_remaining > 0) {
+ $time_remaining -= (time() - $begin_time);
+ $worker->work(stop_if => sub { $stop_work || $time_remaining <= 0});
+ }
+ print "Exiting\n";
+ exit(0);
+}
+
+=item B<< $tester->test() >>
+
+test the server. This will submit 200 jobs or as many jobs as it can before the
+$timeout (passed to L</new()>). Returns an arrayref of [$njobs,$duration]
+
+you can optionally pass a maximum amount of jobs. the default is 200
+
+=cut
+sub test {
+ my ($self) = shift;
+ my $maxtasks = shift || MAXTASKS;
+ my $completed = 0;
+ #hook up everything...
+ my $worker_pid = _Util::force_so_linger(
+ $self->can("_do_worker"), $self) || return;
+ #do as many jobs as we can cram into our time interval
+ my $begin_time = time();
+ my $duration;
+ my %tsks = ();
+ my $time_remaining = $self->{timeout};
+ while($time_remaining >= 0 && scalar(keys %tsks) < $maxtasks) {
+ $time_remaining = -(time() - $begin_time - $self->{timeout});
+ my $task = _TaskWrapper->new(
+ $self->{queue_name}, $self->{testdata}, {timeout=> $time_remaining});
+ eval {
+ _Util::force_so_linger(
+ $self->{client}->can('do_task'), $self->{client}, $task);
+ };
+ if($@) {
+ warn "$!" if $!;
+ warn "Got Error: $@\n";
+ last;
+ }
+
+ if($task->status == _SUCCESS) { $completed++ }
+ $tsks{$task} = $task;
+ #can't find a sane async interface for this.. oh well...
+ }
+ $duration = time() - $begin_time;
+ kill(SIGUSR1, ($worker_pid));
+ waitpid($worker_pid, 0);
+ return [$completed, $duration];
+}
+
+
+####################### STATUS REPORT #####################
+
+#status sock is the persistent socket used for querying the status
+#the _status_cts is a boolean value which tells us if we have received a fully
+#qualified (e.g. \n. terminated) message on our last read. We will set an
+push @__fields, (qw(_status_sock _status_cts));
+
+
+#gets a socket (either by using an existing socket, or creating a new one.
+#set newsock => 1 to force creation of a new socket
+sub _get_status_sock {
+ my $self = shift;
+ my %opts = @_;
+
+ if($self->{_status_sock}) {
+ if($opts{newsock}) {
+ close($self->{_status_sock});
+ $self->{_status_sock} = undef;
+ } else {
+ return $self->{_status_sock};
+ }
+ }
+ my $jobserver = ($self->{jobserver});
+ my ($host, $port) = split(":", $jobserver);
+ if(!defined $port) { $port = 4730 }
+ my $sock = IO::Socket::INET->new(
+ PeerHost => $host,
+ PeerPort => $port,
+ ReuseAddr => 1,
+ Blocking => 0,
+ Timeout => $opts{timeout},
+ );
+ return if !defined $sock;
+
+ setsockopt($sock, SOL_SOCKET, SO_LINGER, pack("ii", 1, 0));
+
+ $self->{_status_sock} = $sock;
+ $self->{_status_cts} = 1;
+ return $sock;
+}
+
+#tries to get a status dump from gearman's administrative interface
+sub _sock_io {
+ my $self = shift;
+ my $timeout = shift;
+ my $delim_regexp = qr/^[.]\n\Z/m;
+ my $sendbuf = "status\n";
+ my $begin_time = time();
+ my $time_remaining = $timeout;
+ my $io_wrap = sub {
+ $time_remaining -= (time() - $begin_time);
+ my $result = shift;
+ #none of these exceptional conditions, except EAGAIN, should happen
+ #under normal operation..
+ if(!defined $result && $! != EAGAIN) {
+ warn "SOCKET ERROR!";
+ goto ERR;
+ } elsif ($! == EAGAIN) {
+ #nothing to do here...
+ } elsif ($result == 0) {
+ warn "SOCKET CLOSED!";
+ goto ERR;
+ }
+ return $result;
+ };
+ my $s = $self->{_status_sock};
+ #send
+ while($time_remaining > 0 && $sendbuf) {
+ my $nsent = $io_wrap->(syswrite($s, $sendbuf));
+ $sendbuf = substr($sendbuf, $nsent);
+ }
+ #receive
+ #we can be crazy optimizing here and try not to recheck the entire string
+ #each time, but really, we're not expecting heavy traffic here..
+ my $match_found = 0;
+ my $rbuf = "";
+ while($time_remaining > 0 && !$match_found) {
+ $io_wrap->(sysread($s, my $tmp, 4096));
+ $rbuf .= $tmp;
+ if($tmp =~ $delim_regexp || $rbuf =~ $delim_regexp) {
+ $match_found = 1;
+ }
+ }
+ return $rbuf unless !$match_found;
+ ERR:
+ $self->_get_status_sock(timeout => $timeout, newsock => 1);
+ return;
+}
+
+=item B<< $tester->getstatus(timeout => $self->{timeout}, newsock => 0) >>
+
+Returns status information from Gearman's administrative (text-based) interface.
+This will try to connect to a gearman port, and wait $timeout seconds for status
+information. Optionally you may set newsock to 1 in order to force a new socket
+on every fetch. Otherwise it will keep an internal socket object in state and try
+to use that. The return information is in a hash keyed by queue name, and whose
+values are an arrayref in the format of [nwaiting, nworking, navailable]
+=cut
+sub getstatus {
+ my $self = shift;
+ my %opts = @_;
+ $opts{timeout} = ($opts{timeout}) ? $opts{timeout} : $self->{timeout};
+ #initialize the socket...
+ my $status = $self->_get_status_sock(%opts);
+ warn "$!" if $!;
+ return if !defined $status;
+
+ my $ret = $self->_sock_io($opts{timeout});
+ my %h = ();
+ if(!defined $ret) { return }
+ foreach my $l (split("\n", $ret)) {
+ my ($q, $nwait, $nwork, $navail) = split("\t", $l);
+ next if(!defined $navail);
+ $h{$q} = [$nwait, $nwork, $navail];
+ }
+ return %h;
+}
+
+=item B<< test_multi($host,$timeout, maxjobs => 300, batchsize => 50) >>
+
+This is a static method. It creates multiple test objects, and tests repeatedly
+until the timeout expires or a set amount of jobs have been executed
+
+maxjobs is the maximum amount of jobs to wait for, batchsize is the amount of
+jobs to run for each client
+
+=cut
+
+sub test_multi {
+ my ($host,$timeout, $queue_name, %opts) = @_;
+ $host .= ":4730" unless $host =~ /:\d+$/;
+ my $begin_time = time();
+ my $time_remaining = $timeout;
+ my %params = (maxjobs => 300, batchsize => 50, %opts);
+ my $completed = 0;
+ while($time_remaining >= 0 && $completed < $params{maxjobs}) {
+ $time_remaining = -(time() - $begin_time - $timeout);
+ my $tester = __PACKAGE__->new($host, 1, $queue_name);
+ my ($ndone,undef) = @{ $tester->test($params{batchsize}) };
+ $completed += $ndone if $ndone;
+ }
+ return $completed;
+}
+
+if(!caller) {
+ $|++;
+ my $test = Gearman::Healthtest->new("localhost:4730", 1, "foo", datalen=>1);
+ my $result = $test->test(1);
+ if(defined $result) {
+ my ($completed,$time) = @$result;
+ print "Got $completed jobs in $time seconds\n";
+ } else {
+ print "OOPS!\n";
+ }
+ my %ret = $test->getstatus();
+ foreach (keys %ret) {
+ my $qname = $_;
+ my ($nwait,$nwork,$navail) = @{$ret{$_}};
+ printf("Queue: %-15s [WAIT %-4d] [WORK %-4d] [AVAIL %-4d]\n",
+ $qname, $nwait, $nwork, $navail);
+ }
+}
+
+1;
View
37 lib/Gearman/Notes.pod
@@ -0,0 +1,37 @@
+=head1 DESCRIPTION
+
+This is a set of patched code and add in modules that improve (read, fix) the old
+Gearman:: Client/Worker/Task API.
+
+The modules contained are:
+
+=over
+
+=item L<Gearman::Healthtest>
+
+A healthtest module
+
+=item Nonblocking I/O
+
+The modules from 6apart use blocking I/O for operations. While it uses a select
+loop which should eliminate any type of blocking operations, there are writes
+to available-for-reading sockets and vice versa, in other words, a huge mess.
+The improvements in this module, while not complete, have shown an elimination
+of hanging code at gearman, perhaps at the cost of efficiency.
+
+Two variables are available with control the behavior of nonblocking I/O, which
+is still timeout-based:
+
+$Gearman::Util::Timeout: the default timeout value, normally 1.0
+$Gearman::Util::MinBytesPerSecond: the rate at which to set the timeout for
+payload i/o
+
+=item Unique UUIDs for Jobs
+
+The unpatched gearman modules have flakey behavior when it comes to unique IDs
+for jobs. For some reason this does not affect the jobs (or doesn't seem to do
+so) usually. However when using a persistent DB for a queue, there are fatal
+consequences when UUIDs are not used. All jobs in the same queue are merged into
+one. The patched code ensures that the ID for a job will be a new UUID, unless
+the calling code passes its own identifier. This behavior may change in the
+future
View
18 lib/Gearman/ResponseParser.pm
@@ -1,5 +1,6 @@
package Gearman::ResponseParser;
use strict;
+use POSIX;
# this is an abstract base class. See:
# Gearman::ResponseParser::Taskset (for Gearman::Client, the sync version), or
@@ -105,14 +106,21 @@ sub eof {
# don't override:
sub parse_sock {
- my ($self, $sock) = @_; # $sock is readable, we should sysread it and feed it to $self->parse_data
-
+ my ($self, $sock) = @_; # $sock is (MAYBE) readable,
+ #we should sysread it and feed it to $self->parse_data
+ my $block_prev = $sock->blocking;
+ $sock->blocking(0);
my $data;
my $rv = sysread($sock, $data, 128 * 1024);
-
+ $sock->blocking($block_prev);
if (! defined $rv) {
- $self->on_error("read_error: $!");
- return;
+ if($! != EAGAIN && $! != EWOULDBLOCK) {
+ $self->on_error("read_error: $!");
+ return;
+ } else {
+ print "EAGAIN\n";
+ return;
+ }
}
# FIXME: EAGAIN , EWOULDBLOCK
View
8 lib/Gearman/Task.pm
@@ -4,6 +4,9 @@ use strict;
use Carp ();
use String::CRC32 ();
+#for unique keys
+use Data::UUID;
+my $ug = Data::UUID->new();
use Gearman::Taskset;
use Gearman::Util;
@@ -22,6 +25,8 @@ BEGIN {
}
# constructor, given: ($func, $argref, $opts);
+my $ud = Data::UUID->new();
+
sub new {
my $class = shift;
@@ -41,7 +46,7 @@ sub new {
)) {
$self->{$k} = delete $opts->{$k};
}
-
+ $self->{uniq} ||= $ug->create_str();
$self->{retry_count} ||= 0;
$self->{is_finished} = 0; # bool: if success or fail has been called yet on this.
@@ -92,7 +97,6 @@ sub taskset {
# setter
my Gearman::Taskset $ts = shift;
$task->{taskset} = $ts;
-
my $merge_on = $task->{uniq} && $task->{uniq} eq "-" ?
$task->{argref} : \ $task->{uniq};
if ($$merge_on) {
View
41 lib/Gearman/Taskset.pm
@@ -110,49 +110,59 @@ sub wait {
my Gearman::Taskset $ts = shift;
my %opts = @_;
-
my $timeout;
if (exists $opts{timeout}) {
$timeout = delete $opts{timeout};
- $timeout += Time::HiRes::time() if defined $timeout;
}
Carp::carp "Unknown options: " . join(',', keys %opts) . " passed to Taskset->wait."
if keys %opts;
my %parser; # fd -> Gearman::ResponseParser object
-
- my ($rin, $rout, $eout) = ('', '', '');
+ my ($rin,$rout,$eout) = ('','','','');
my %watching;
-
for my $sock ($ts->{default_sock}, values %{ $ts->{loaned_sock} }) {
next unless $sock;
my $fd = $sock->fileno;
vec($rin, $fd, 1) = 1;
$watching{$fd} = $sock;
}
-
+
my $tries = 0;
+
+ my $begin_time = Time::HiRes::time();
+ my $time_left;
+ if($timeout) {
+ $time_left = sub { -(Time::HiRes::time() - $begin_time - $timeout) };
+ } else {
+ $time_left = sub { 0.5 };
+ }
+
while (!$ts->{cancelled} && keys %{$ts->{waiting}}) {
$tries++;
-
- my $time_left = $timeout ? $timeout - Time::HiRes::time() : 0.5;
- my $nfound = select($rout=$rin, undef, $eout=$rin, $time_left);
- if ($timeout && $time_left <= 0) {
+ my $_tleft = $time_left->();
+ my $nfound = select($rout=$rin, undef, $eout=$rin, $_tleft);
+ if ($timeout && $_tleft <= 0) {
$ts->cancel;
return;
}
next if ! $nfound;
-
foreach my $fd (keys %watching) {
+ if(vec($eout,$fd,1)) {
+ Carp::carp "Socket Error. Cancelling taskset";
+ $ts->cancel;
+ return;
+ }
next unless vec($rout, $fd, 1);
# TODO: deal with error vector
my $sock = $watching{$fd};
my $parser = $parser{$fd} ||= Gearman::ResponseParser::Taskset->new(source => $sock,
taskset => $ts);
+ #my $block_prev = $sock->blocking;
+ #$sock->blocking(0);
eval { $parser->parse_sock($sock); };
-
+ #$sock->blocking($block_prev);
if ($@) {
# TODO this should remove the fd from the list, and reassign any tasks to other jobserver, or bail.
# We're not in an accessable place here, so if all job servers fail we must die to prevent hanging.
@@ -199,6 +209,12 @@ sub add_task {
my $req = $task->pack_submit_packet($ts->client);
my $len = length($req);
+
+ if(!defined($task->{jssock})) {
+ #Carp::carp "task->{jssock} is undefined!";
+ return $task->fail;
+ }
+
my $rv = $task->{jssock}->syswrite($req, $len);
die "Wrote $rv but expected to write $len" unless $rv == $len;
@@ -208,7 +224,6 @@ sub add_task {
if (! $rv) {
shift @{ $ts->{need_handle} }; # ditch it, it failed.
# this will resubmit it if it failed.
- print " INITIAL SUBMIT FAILED\n";
return $task->fail;
}
}
View
99 lib/Gearman/Util.pm
@@ -1,6 +1,17 @@
package Gearman::Util;
use strict;
+use warnings;
+use Carp qw(carp);
+use POSIX;
+use Time::HiRes qw(time usleep);
+use Log::Fu;
+#carp "WARNING WARNING WARNING! this module is incomplete and inefficient. That " .
+#"being said, it should have eliminated *most* of the blocking code";
+
+our $Timeout = 1.0;
+our $MinBytesPerSecond = 1000;
+
# I: to jobserver
# O: out of job server
@@ -79,45 +90,77 @@ sub pack_res_command {
return "\0RES" . pack("NN", $type, $len) . $_[0];
}
+use constant {
+ SOCK_CLOSED => 1,
+ SOCK_ERR => 2,
+ SOCK_OK => 3,
+ NO_TIMEOUT => -1,
+};
+
+sub read_into_buf {
+ my ($sock,$nbytes,$timeout) = @_;
+ my $old_block = $sock->blocking;
+ $sock->blocking(0);
+ $timeout ||=$Timeout;
+ my $begin_time = Time::HiRes::time();
+ my $time_left = $timeout;
+ my $retcode = SOCK_OK;
+ my $buf = "";
+ log_debug("request for $nbytes bytes");
+ while (($time_left >= 0 || $timeout == NO_TIMEOUT) && $nbytes) {
+ $time_left = -(Time::HiRes::time() - $begin_time - $timeout);
+ my $rv = sysread($sock, my $tmp, $nbytes);
+ my $old_errno = $!;
+ if(!defined $rv) {
+ if(!defined $old_errno) {
+ carp "rv is undefined but errno is undefined too";
+ } elsif($old_errno == EAGAIN) {
+ usleep(500);
+ next;
+ }
+ $buf = undef;
+ $retcode = SOCK_ERR;
+ last;
+ } elsif ($rv == 0) {
+ log_debug("EOF");
+ $retcode = SOCK_CLOSED;
+ last;
+ }
+ $buf .= $tmp;
+ $nbytes -= $rv;
+ }
+ $sock->blocking($old_block);
+ log_debug("returning " . length($buf) . " bytes");
+ return ($buf,$retcode);
+}
+
# returns undef on closed socket or malformed packet
sub read_res_packet {
my $sock = shift;
my $err_ref = shift;
-
- my $buf;
- my $rv;
-
my $err = sub {
my $code = shift;
+ log_debug($code);
+ $sock->close() if $sock->connected;
$$err_ref = $code if ref $err_ref;
return undef;
};
-
# read the header
- $rv = sysread($sock, $buf, 12);
-
- return $err->("read_error") unless defined $rv;
- return $err->("eof") unless $rv;
- return $err->("malformed_header") unless $rv == 12;
-
- my ($magic, $type, $len) = unpack("a4NN", $buf);
+ my ($hdr,$retcode) = read_into_buf($sock,12,$Timeout);
+ return $err->("read_error") if ($retcode == SOCK_ERR);
+ #return $err->("eof") if $retcode == SOCK_CLOSED;
+ return $err->("malformed_header") unless length($hdr) == 12;
+
+ my ($magic, $type, $len) = unpack("a4NN", $hdr);
return $err->("malformed_magic") unless $magic eq "\0RES";
-
+
+ my $payload;
if ($len) {
- # Start off trying to read the whole buffer. Store the bits in an array
- # one element for each read, then do a big join at the end. This minimizes
- # the number of memory allocations we have to do.
- my $readlen = $len;
- my $lim = 20 + int( $len / 2**10 );
- my @buffers;
- for (my $i = 0; $readlen > 0 && $i < $lim; $i++) {
- my $rv = sysread($sock, $buffers[$i], $readlen);
- return $err->("short_body") unless $rv > 0;
- last unless $rv > 0;
- $readlen -= $rv;
- }
- $buf = join('', @buffers);
- return $err->("short_body") unless length($buf) == $len;
+ log_debug("trying to read payload");
+ my $timeout = $len / $MinBytesPerSecond;
+ ($payload,$retcode) = read_into_buf($sock,$len,$timeout);
+ return $err->("read_error") if $retcode == SOCK_ERR;
+ return $err->("short_body") unless length($payload) == $len;
}
$type = $cmd{$type};
@@ -127,7 +170,7 @@ sub read_res_packet {
return {
'type' => $type->[1],
'len' => $len,
- 'blobref' => \$buf,
+ 'blobref' => \$payload,
};
}
View
191 scripts/gearman-alltest.pl
@@ -0,0 +1,191 @@
+#!/usr/bin/perl
+
+#tunables:
+use constant GEARMAN_JOB_NAME => "garbage";
+use constant INCREMENTS => 100;
+use constant UPDATE_INTERVAL => 1;
+
+use Getopt::Long;
+use strict;
+use warnings;
+use Storable;
+use List::Util "sum";
+use POE qw(Wheel::Run Filter::Reference Filter::Line);
+use Time::HiRes "sleep";
+use Gearman::Client;
+use Gearman::Worker;
+
+use Data::Dumper;
+
+use Fcntl;
+
+#stupid help generator... (argp emulation.. sorta)
+my @_gopt;
+my $help_text = "$0 <options>\n";
+
+my $Options = {};
+Getopt::Long::Configure("no_ignore_case");
+my @optlist = (
+ ["jobserver", "j", "s@", "list of jobservers to use", undef],
+ ["workers", "w", "i", "number of worker processes", 2],
+ ["clients", "c", "i", "number of client processes", 2],
+ ["bytes", "b", "i", "number of /dev/urandom bytes to send from worker", 8192],
+ ["background", "B", "i", "clients should background jobs", 0]
+);
+foreach my $opt_s (@optlist) {
+ my ($longopt,$shortopt,$type,$description, $default) = @$opt_s;
+ my $optstring = "$longopt|$shortopt=$type";
+ push(@_gopt, ($optstring));
+ $help_text .= "\t-$shortopt --$longopt\t$description";
+ if($default) {
+ $Options->{$longopt} = $default;
+ $help_text .= " (default=$default)";
+ }
+ $help_text .= "\n";
+}
+#defaults
+GetOptions($Options, @_gopt, "help|h" => sub { print $help_text; exit(0); });
+if(!$Options->{jobserver}) { die "Need jobserver!"; }
+
+my $SUBMIT_FN = ($Options->{background}) ? "dispatch_background" : "do_task";
+
+my $begin_time = time()-1;
+
+sub gearman_client {
+ $0 .= " (CLIENT)";
+ my $total = 0;
+ my $filter = POE::Filter::Reference->new();
+ $|++;
+ #connect to gearman and make a client
+ my $client = Gearman::Client->new();
+ $client->job_servers(@{$Options->{jobserver}});
+ while(1) {
+ my $fn = $client->can($SUBMIT_FN);
+ $fn->($client, GEARMAN_JOB_NAME, "foo");
+ $total++;
+ if($total % INCREMENTS) {
+ print @{$filter->put([\INCREMENTS])};
+ }
+ }
+}
+sub gearman_worker {
+ $0 .= " (WORKER)";
+ my $total = 0;
+ my $filter = POE::Filter::Reference->new();
+ my $worker = Gearman::Worker->new();
+ $worker->job_servers(@{$Options->{jobserver}});
+ $worker->register_function(
+ GEARMAN_JOB_NAME, sub {
+ sysopen(my $devrandom, "/dev/urandom", O_RDONLY);
+ sysread($devrandom, my $buf, $Options->{bytes});
+ close($devrandom);
+ return $buf;
+ }
+ );
+ while(1) {
+ $total++;
+ $worker->work();
+ if($total % INCREMENTS) {
+ print @{$filter->put([\INCREMENTS])};
+ }
+ }
+}
+
+my %client_totals;
+my %worker_totals;
+
+sub mk_update_cb($) {
+ my ($hashref) = @_;
+ my $handler = sub {
+ $|++;
+ my $heap = $_[HEAP];
+ $hashref->{$heap->{cmd}} += ${$_[ARG0]};
+ };
+}
+
+sub stderr_handler {
+ $|++;
+ print $_[ARG0] . "\n";
+}
+
+sub launch_children {
+ my ($child_fn, $update_cb) = @_;
+ return POE::Session->create(
+ inline_states => {
+ _start => sub {
+ my ($kernel,$heap) = @_[KERNEL, HEAP];
+ my $child = POE::Wheel::Run->new(
+ Program => $child_fn,
+ StdoutEvent => "update_cb",
+ StderrEvent => "_stderr_handler",
+ StdoutFilter => POE::Filter::Reference->new(),
+ StderrFilter => POE::Filter::Line->new(),
+ );
+ print "Launched child with pid " . $child->PID() . "\n";
+ $_[KERNEL]->sig_child($child->PID(), "on_child_signal");
+ $_[KERNEL]->sig(INT=>"child_cleanup", TERM => "child_cleanup");
+ $_[HEAP]{cmd} = $child;
+ },
+ update_cb => $update_cb,
+ _stop => sub { print "Stopping...\n"; },
+ on_child_signal => sub {
+ print "pid $_[ARG1] exited with status $_[ARG2]\n";
+ delete $_[HEAP]{cmd};
+ },
+ child_cleanup => sub {
+ my $child = $_[HEAP]{cmd};
+ if($child) {
+ $child->kill(15);
+ print "Waiting for child " . $child->PID() . " to die\n";
+ wait;
+ }
+ },
+ _stderr_handler => sub { print "$_[ARG0]\n" },
+ },
+ );
+}
+
+$0 .= " - Gearman Tester - ";
+
+print "Launching $Options->{workers} workers\n";
+for (1..$Options->{workers}) {
+ launch_children(\&gearman_worker, mk_update_cb(\%worker_totals));
+}
+print "Launching $Options->{clients} clients\n";
+for (1..$Options->{clients}) {
+ launch_children(\&gearman_client, mk_update_cb(\%client_totals));
+}
+
+sub status_monitor {
+}
+
+sub print_progress {
+ my ($kernel, $heap) = @_[KERNEL,HEAP];
+ $|++;
+ #print "[C/W (total/sec)]: ";
+ #foreach (\%client_totals, \%worker_totals) {
+ # my $tmp = sum(values(%$_));
+ # next unless $tmp;
+ # printf("(%d %d)|", $tmp, $tmp / (time()-$begin_time));
+ #}
+ #print "\n";
+ my $sum = sum(values(%client_totals));
+ print "$sum\n" unless (!$sum);
+ %client_totals = ();
+ $kernel->delay("print_progress", UPDATE_INTERVAL);
+ $|--;
+}
+
+print "Launching monitor..\n";
+POE::Session->create(
+ inline_states => {
+ _start => sub {
+ $0 .= " (MONITOR)";
+ $_[HEAP]{last_update} = time();
+ $_[KERNEL]->delay("print_progress", UPDATE_INTERVAL);
+ },
+ print_progress => \&print_progress,
+ }
+);
+
+POE::Kernel->run();
View
16 scripts/test.pl
@@ -0,0 +1,16 @@
+#!/usr/bin/env perl
+use strict;
+use warnings;
+use Gearman::Healthtest;
+use Gearman::Util;
+#$Gearman::Util::Timeout = Gearman::Util::NO_TIMEOUT;
+
+my $host = $ARGV[0];
+my $tester = Gearman::Healthtest->new($host, 4, "foo");
+my $ret = $tester->test(100);
+die "Healthtest returned an undefined value" if !defined $ret;
+my ($njobs, $duration) = @$ret;
+print "Completed $njobs in $duration seconds\n";
+print "Trying multi test\n";
+my $completed = Gearman::Healthtest::test_multi($host, 5, "foo", batchsize => 10);
+print "completed $completed jobs\n";
Please sign in to comment.
Something went wrong with that request. Please try again.