Skip to content
Browse files

grab/run_after/priority etc...

  • Loading branch information...
1 parent aa40029 commit 07faaa3399562bb708cd718675bc0e4f4d40988b @nekokak committed Jan 20, 2011
Showing with 188 additions and 30 deletions.
  1. +24 −13 lib/Jonk.pm
  2. +164 −17 t/worker.t
View
37 lib/Jonk.pm
@@ -22,6 +22,8 @@ sub new {
functions => join(', ', map { "'$_'" } @{$opts->{functions}}),
job_find_size => $opts->{job_find_size} || 50,
+ default_grab_for => $opts->{default_grab_for}||(60*60),
+
_errstr => undef,
insert_time_callback => ($opts->{insert_time_callback}||sub{
@@ -45,14 +47,15 @@ sub insert {
my $sth = $self->{dbh}->prepare_cached(
sprintf(
- 'INSERT INTO %s (func, arg, enqueue_time, grabbed_until, run_after, retry_cnt, priority) VALUES (?,?,?,0,0,0,?)'
+ 'INSERT INTO %s (func, arg, enqueue_time, grabbed_until, run_after, retry_cnt, priority) VALUES (?,?,?,0,?,0,?)'
,$self->{table_name}
)
);
$sth->bind_param(1, $func);
$sth->bind_param(2, $arg, _bind_param_attr($self->{dbh}));
$sth->bind_param(3, $self->{insert_time_callback}->());
- $sth->bind_param(4, $opt->{priority}||0);
+ $sth->bind_param(4, $opt->{run_after}||0);
+ $sth->bind_param(5, $opt->{priority}||0);
$sth->execute();
$job_id = $self->{dbh}->last_insert_id("","",$self->{table_name},"");
@@ -92,7 +95,7 @@ sub _server_unixitme {
}
sub _grab_job {
- my ($self, $callback) = @_;
+ my ($self, $callback, $opt) = @_;
my $job;
try {
@@ -104,7 +107,7 @@ sub _grab_job {
my $sth = $callback->($time);
while (my $row = $sth->fetchrow_hashref) {
- $job = $self->_grab_a_job($row, $time);
+ $job = $self->_grab_a_job($row, $time, $opt);
last;
}
@@ -118,48 +121,56 @@ sub _grab_job {
}
sub _grab_a_job {
- my ($self, $row, $time) = @_;
+ my ($self, $row, $time, $opts) = @_;
my $sth = $self->{dbh}->prepare_cached(
sprintf('UPDATE %s SET grabbed_until = ? WHERE id = ? AND grabbed_until = ?', $self->{table_name}),
);
- $sth->execute(($time + 60), $row->{id}, $row->{grabbed_until});
+ $sth->execute(
+ ($time + ($opts->{grab_for} || $self->{default_grab_for})),
+ $row->{id},
+ $row->{grabbed_until}
+ );
my $grabbed = $sth->rows;
$sth->finish;
$grabbed ? Jonk::Job->new($self => $row) : undef;
}
sub lookup_job {
- my ($self, $job_id) = @_;
+ my ($self, $job_id, $opts) = @_;
$self->_grab_job(
sub {
my $time = shift;
my $sth = $self->{dbh}->prepare_cached(
- sprintf('SELECT * FROM %s WHERE id = ? AND grabbed_until <= ?', $self->{table_name})
+ sprintf('SELECT * FROM %s WHERE id = ? AND grabbed_until <= ? AND run_after <= ?', $self->{table_name})
);
- $sth->execute($job_id, $time);
+ $sth->execute($job_id, $time, $time);
$sth;
- }
+ }, $opts
);
}
sub find_job {
my ($self, $opts) = @_;
+ unless ($self->{functions}) {
+ Carp::croak('missin find_job functions.');
+ }
+
$self->_grab_job(
sub {
my $time = shift;
my $sth = $self->{dbh}->prepare_cached(
- sprintf('SELECT * FROM %s WHERE func IN (%s) AND grabbed_until <= ? ORDER BY id LIMIT %s',
+ sprintf('SELECT * FROM %s WHERE func IN (%s) AND grabbed_until <= ? AND run_after <= ? ORDER BY priority DESC LIMIT %s',
$self->{table_name},
$self->{functions},
($opts->{job_find_size}||50),
),
);
- $sth->execute($time);
+ $sth->execute($time, $time);
$sth;
- }
+ }, $opts
);
}
View
181 t/worker.t
@@ -4,7 +4,7 @@ use Jonk;
my $dbh = t::Utils->setup;
-subtest 'grab_job' => sub {
+subtest 'find_job' => sub {
my $client = Jonk->new($dbh, {functions => [qw/MyWorker/]});
my $job_id = $client->insert('MyWorker', 'arg');
@@ -24,46 +24,193 @@ subtest 'grab_job' => sub {
ok not $client->find_job();
};
-done_testing;
-__END__
-subtest 'grab_job / no job' => sub {
+subtest 'find_job / with priority' => sub {
+ my $client = Jonk->new($dbh, {functions => [qw/MyWorker/]});
+
+ $client->insert('MyWorker', 'arg_10', {priority => 10});
+ $client->insert('MyWorker', 'arg_30', {priority => 30});
+ $client->insert('MyWorker', 'arg_20', {priority => 20});
+
+
+ my $job = $client->find_job();
+ is $job->arg, 'arg_30';
+ is $job->priority, 30;
+
+ ok not $client->errstr;
+
+ $job->completed;
+
+ $job = $client->find_job();
+
+ is $job->arg, 'arg_20';
+ is $job->priority, 20;
+
+ ok not $client->errstr;
+
+ $job->completed;
+
+ $job = $client->find_job();
+
+ is $job->arg, 'arg_10';
+ is $job->priority, 10;
+
+ ok not $client->errstr;
+
+ $job->completed;
+};
+
+subtest 'find_job / with run_after' => sub {
my $client = Jonk->new($dbh, {functions => [qw/MyWorker/]});
- my $job = $client->grab_job;
- ok not $job;
+
+ my $time = time() + 2;
+ $client->insert('MyWorker', 'arg', {run_after => $time});
+
+ ok not $client->find_job();
+
+ sleep 2;
+
+ my $job = $client->find_job();
+ is $job->arg, 'arg';
+ is $job->func, 'MyWorker';
+ is $job->retry_cnt, 0;
+ is $job->run_after, $time;
+ is $job->priority, 0;
+
+ ok not $client->errstr;
+
+ $job->completed;
+
+ ok not $client->find_job();
+};
+
+subtest 'find_job / with grabbed_until' => sub {
+ my $client = Jonk->new($dbh, {functions => [qw/MyWorker/], default_grab_for => 2});
+
+ {
+ $client->insert('MyWorker', 'arg');
+
+ my $job = $client->find_job();
+ is $job->arg, 'arg';
+ is $job->func, 'MyWorker';
+ is $job->retry_cnt, 0;
+ is $job->run_after, 0;
+ is $job->priority, 0;
+
+ ok not $client->errstr;
+
+ ok not $client->find_job();
+
+ sleep 2;
+
+ my $re_grabbed_job = $client->find_job();
+ is $re_grabbed_job->arg, 'arg';
+ is $re_grabbed_job->func, 'MyWorker';
+ is $re_grabbed_job->retry_cnt, 0;
+ is $re_grabbed_job->run_after, 0;
+ is $re_grabbed_job->priority, 0;
+
+ $re_grabbed_job->completed;
+
+ ok not $client->find_job();
+ }
+
+ {
+ $client->insert('MyWorker', 'arg');
+
+ my $job = $client->find_job({grab_for => 5});
+ is $job->arg, 'arg';
+ is $job->func, 'MyWorker';
+ is $job->retry_cnt, 0;
+ is $job->run_after, 0;
+ is $job->priority, 0;
+
+ ok not $client->errstr;
+
+ ok not $client->find_job();
+ sleep 2;
+
+ ok not $client->find_job();
+ sleep 3;
+
+ my $re_grabbed_job = $client->find_job();
+ is $re_grabbed_job->arg, 'arg';
+ is $re_grabbed_job->func, 'MyWorker';
+ is $re_grabbed_job->retry_cnt, 0;
+ is $re_grabbed_job->run_after, 0;
+ is $re_grabbed_job->priority, 0;
+
+ $re_grabbed_job->completed;
+
+ ok not $client->find_job();
+ }
};
-subtest 'grab_job / specific job_id' => sub {
+subtest 'find_job / without functions' => sub {
my $client = Jonk->new($dbh);
- my $job_id = $client->insert('MyWorker', 'grab_job');
+ $client->insert('MyWorker', 'arg');
+
+ eval { $client->find_job };
+ like $@, qr/missin find_job functions. at /;
+};
+
+subtest 'lookup_job' => sub {
+ my $client = Jonk->new($dbh);
+
+ my $job_id = $client->insert('MyWorker', 'arg');
ok $job_id;
- my $job = $client->grab_job($job_id);
- is $job->arg, 'grab_job';
+ my $job = $client->lookup_job($job_id);
is $job->func, 'MyWorker';
+ is $job->arg, 'arg';
+
+ ok not $client->errstr;
+
$job->completed;
- ok not $client->grab_job;
+
+ ok not $client->lookup_job($job_id);
};
t::Utils->cleanup($dbh);
-subtest 'grab_job / flexible job table name' => sub {
+subtest 'find_job / flexible job table name' => sub {
my $dbh = t::Utils->setup("my_job");
- my $client = Jonk->new($dbh, { table_name => "my_job" });
+ my $client = Jonk->new($dbh, { table_name => 'my_job', functions => [qw/MyWorker/]});
my $job_id = $client->insert('MyWorker', 'arg');
ok $job_id;
- my $jonk = Jonk->new($dbh, { table_name => "my_job", functions => [qw/MyWorker/]});
- my $job = $jonk->grab_job;
+ my $job = $client->find_job();
is $job->arg, 'arg';
is $job->func, 'MyWorker';
- ok not $jonk->errstr;
+
+ ok not $client->errstr;
+
$job->completed;
- ok not $client->grab_job;
+
+ ok not $client->find_job;
t::Utils->cleanup($dbh, "my_job");
};
+subtest 'lookup_job / flexible job table name' => sub {
+ my $dbh = t::Utils->setup("my_job");
+ my $client = Jonk->new($dbh, { table_name => 'my_job', functions => [qw/MyWorker/]});
+
+ my $job_id = $client->insert('MyWorker', 'arg');
+ ok $job_id;
+
+ my $job = $client->lookup_job($job_id);
+ is $job->arg, 'arg';
+ is $job->func, 'MyWorker';
+
+ ok not $client->errstr;
+
+ $job->completed;
+
+ ok not $client->lookup_job($job_id);
+
+ t::Utils->cleanup($dbh, "my_job");
+};
done_testing;

0 comments on commit 07faaa3

Please sign in to comment.
Something went wrong with that request. Please try again.