Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

grab_for.

  • Loading branch information...
commit 70b9cd758db9aada6041057018b6cad999299858 1 parent 07faaa3
@nekokak authored
Showing with 22 additions and 21 deletions.
  1. +10 −10 lib/Jonk.pm
  2. +12 −11 t/worker.t
View
20 lib/Jonk.pm
@@ -14,12 +14,12 @@ sub new {
Carp::croak('missing job queue database handle.');
}
- # functions check
bless {
dbh => $dbh,
table_name => $opts->{table_name} || 'job',
- functions => join(', ', map { "'$_'" } @{$opts->{functions}}),
+ functions => $opts->{functions},
+ find_funcs => join(', ', map { "'$_'" } keys %{$opts->{functions}}),
job_find_size => $opts->{job_find_size} || 50,
default_grab_for => $opts->{default_grab_for}||(60*60),
@@ -121,13 +121,13 @@ sub _grab_job {
}
sub _grab_a_job {
- my ($self, $row, $time, $opts) = @_;
+ my ($self, $row, $time) = @_;
my $sth = $self->{dbh}->prepare_cached(
sprintf('UPDATE %s SET grabbed_until = ? WHERE id = ? AND grabbed_until = ?', $self->{table_name}),
);
$sth->execute(
- ($time + ($opts->{grab_for} || $self->{default_grab_for})),
+ ($time + ($self->{functions}->{$row->{func}}->{grab_for} || $self->{default_grab_for})),
$row->{id},
$row->{grabbed_until}
);
@@ -137,7 +137,7 @@ sub _grab_a_job {
}
sub lookup_job {
- my ($self, $job_id, $opts) = @_;
+ my ($self, $job_id) = @_;
$self->_grab_job(
sub {
@@ -147,14 +147,14 @@ sub lookup_job {
);
$sth->execute($job_id, $time, $time);
$sth;
- }, $opts
+ }
);
}
sub find_job {
my ($self, $opts) = @_;
- unless ($self->{functions}) {
+ unless ($self->{find_funcs}) {
Carp::croak('missin find_job functions.');
}
@@ -164,13 +164,13 @@ sub find_job {
my $sth = $self->{dbh}->prepare_cached(
sprintf('SELECT * FROM %s WHERE func IN (%s) AND grabbed_until <= ? AND run_after <= ? ORDER BY priority DESC LIMIT %s',
$self->{table_name},
- $self->{functions},
+ $self->{find_funcs},
($opts->{job_find_size}||50),
),
);
$sth->execute($time, $time);
$sth;
- }, $opts
+ }
);
}
@@ -221,7 +221,7 @@ Jonk - simple job tank manager.
use DBI;
use Jonk;
my $dbh = DBI->connect(...);
- my $jonk = Jonk->new($dbh, {functions => ['MyWorker']});
+ my $jonk = Jonk->new($dbh, {functions => {MyWorker => {}}});
# insert job
{
$jonk->insert('MyWorker', 'arg');
View
23 t/worker.t
@@ -5,12 +5,13 @@ use Jonk;
my $dbh = t::Utils->setup;
subtest 'find_job' => sub {
- my $client = Jonk->new($dbh, {functions => [qw/MyWorker/]});
+ my $client = Jonk->new($dbh, {functions => {MyWorker => {}}});
my $job_id = $client->insert('MyWorker', 'arg');
ok $job_id;
my $job = $client->find_job();
+ ok $job;
is $job->arg, 'arg';
is $job->func, 'MyWorker';
is $job->retry_cnt, 0;
@@ -25,12 +26,11 @@ subtest 'find_job' => sub {
};
subtest 'find_job / with priority' => sub {
- my $client = Jonk->new($dbh, {functions => [qw/MyWorker/]});
+ my $client = Jonk->new($dbh, {functions => {MyWorker => {}}});
- $client->insert('MyWorker', 'arg_10', {priority => 10});
- $client->insert('MyWorker', 'arg_30', {priority => 30});
- $client->insert('MyWorker', 'arg_20', {priority => 20});
-
+ $client->insert('MyWorker', 'arg_10', {priority => 10});
+ $client->insert('MyWorker', 'arg_30', {priority => 30});
+ $client->insert('MyWorker', 'arg_20', {priority => 20});
my $job = $client->find_job();
is $job->arg, 'arg_30';
@@ -60,7 +60,7 @@ subtest 'find_job / with priority' => sub {
};
subtest 'find_job / with run_after' => sub {
- my $client = Jonk->new($dbh, {functions => [qw/MyWorker/]});
+ my $client = Jonk->new($dbh, {functions => {MyWorker => {}}});
my $time = time() + 2;
$client->insert('MyWorker', 'arg', {run_after => $time});
@@ -84,9 +84,9 @@ subtest 'find_job / with run_after' => sub {
};
subtest 'find_job / with grabbed_until' => sub {
- my $client = Jonk->new($dbh, {functions => [qw/MyWorker/], default_grab_for => 2});
{
+ my $client = Jonk->new($dbh, {functions => {MyWorker => {}}, default_grab_for => 2});
$client->insert('MyWorker', 'arg');
my $job = $client->find_job();
@@ -115,9 +115,10 @@ subtest 'find_job / with grabbed_until' => sub {
}
{
+ my $client = Jonk->new($dbh, {functions => {MyWorker => {grab_for => 5}}, default_grab_for => 2});
$client->insert('MyWorker', 'arg');
- my $job = $client->find_job({grab_for => 5});
+ my $job = $client->find_job();
is $job->arg, 'arg';
is $job->func, 'MyWorker';
is $job->retry_cnt, 0;
@@ -175,7 +176,7 @@ t::Utils->cleanup($dbh);
subtest 'find_job / flexible job table name' => sub {
my $dbh = t::Utils->setup("my_job");
- my $client = Jonk->new($dbh, { table_name => 'my_job', functions => [qw/MyWorker/]});
+ my $client = Jonk->new($dbh, { table_name => 'my_job', functions => {MyWorker => {}}});
my $job_id = $client->insert('MyWorker', 'arg');
ok $job_id;
@@ -195,7 +196,7 @@ subtest 'find_job / flexible job table name' => sub {
subtest 'lookup_job / flexible job table name' => sub {
my $dbh = t::Utils->setup("my_job");
- my $client = Jonk->new($dbh, { table_name => 'my_job', functions => [qw/MyWorker/]});
+ my $client = Jonk->new($dbh, { table_name => 'my_job', functions => {MyWorker => {}}});
my $job_id = $client->insert('MyWorker', 'arg');
ok $job_id;
Please sign in to comment.
Something went wrong with that request. Please try again.