Skip to content
Browse files

modify method name , schema , etc...

  • Loading branch information...
1 parent 78354c6 commit e95de0bfc8a3ad8973a46d792e6a424b54780d3e @nekokak committed Jan 18, 2011
Showing with 121 additions and 28 deletions.
  1. +24 −12 lib/Jonk.pm
  2. +70 −5 lib/Jonk/Job.pm
  3. +5 −0 t/Utils.pm
  4. +7 −2 t/client.t
  5. +4 −4 t/multi_process.t
  6. +11 −5 t/worker.t
View
36 lib/Jonk.pm
@@ -16,7 +16,6 @@ sub new {
bless {
dbh => $dbh,
- retry_delay => $opts->{retry_delay} || 60,
table_name => $opts->{table_name} || 'job',
job_find_size => $opts->{job_find_size} || 50,
@@ -42,7 +41,7 @@ sub insert {
my $sth = $self->{dbh}->prepare_cached(
sprintf(
- 'INSERT INTO %s (func, arg, enqueue_time, grabbed_until, run_after, retry_cnt, retried_cnt, priority) VALUES (?,?,?,0,0,0,0,0)'
+ 'INSERT INTO %s (func, arg, enqueue_time, grabbed_until, run_after, retry_cnt, priority) VALUES (?,?,?,0,0,0,0)'
,$self->{table_name}
)
);
@@ -72,6 +71,21 @@ sub _bind_param_attr {
return;
}
+sub _server_unixitme {
+ my $dbh = shift;
+
+ my $driver = $dbh->{Driver}{Name};
+ return time if $driver eq 'SQLite';
+
+ my $q;
+ if ( $driver eq 'Pg' ) {
+ $q = "TRUNC(EXTRACT('epoch' from NOW()))";
+ } else {
+ $q = 'UNIX_TIMESTAMP()';
+ }
+ return $dbh->selectrow_array("SELECT $q");
+}
+
sub _grab_job {
my ($self, $callback) = @_;
@@ -81,7 +95,7 @@ sub _grab_job {
local $self->{dbh}->{RaiseError} = 1;
local $self->{dbh}->{PrintError} = 0;
- my $time = time;
+ my $time = _server_unixitme($self->{dbh});
my $sth = $callback->(time);
while (my $row = $sth->fetchrow_hashref) {
@@ -125,7 +139,7 @@ sub lookup_job {
}
sub find_job {
- my ($self, $job_id) = @_;
+ my ($self, $opts) = @_;
$self->_grab_job(
sub {
@@ -143,7 +157,7 @@ sub find_job {
);
}
-sub _completed {
+sub _delete {
my ($self, $job_id) = @_;
try {
@@ -160,13 +174,14 @@ sub _completed {
}
sub _failed {
- my ($self, $job_id) = @_;
+ my ($self, $job_id, $opt) = @_;
+ my $retry_delay = $opt->{retry_delay} || 60;
try {
my $sth = $self->{dbh}->prepare_cached(
- sprintf('UPDATE %s SET retried_cnt = retried_cnt + 1 , run_after = ? WHERE id = ?', $self->{table_name})
+ sprintf('UPDATE %s SET retry_cnt = retry_cnt + 1, run_after = ? WHERE id = ?', $self->{table_name})
);
- $sth->execute($self->{retry_delay}, $job_id);
+ $sth->execute($retry_delay, $job_id);
$sth->finish;
return $sth->rows;
} catch {
@@ -311,7 +326,6 @@ get most recent error infomation.
grabbed_until int(10) UNSIGNED NOT NULL,
run_after int(10) UNSIGNED NOT NULL DEFAULT 0,
retry_cnt int(10) UNSIGNED NOT NULL DEFAULT 0,
- retried_cnt int(10) UNSIGNED NOT NULL DEFAULT 0,
priority int(10) UNSIGNED NOT NULL DEFAULT 0,`
primary key ( id )
) ENGINE=InnoDB
@@ -326,8 +340,7 @@ get most recent error infomation.
grabbed_until INTEGER UNSIGNED NOT NULL,
run_after INTEGER UNSIGNED NOT NULL DEFAULT 0,
retry_cnt INTEGER UNSIGNED NOT NULL DEFAULT 0,
- retried_cnt INTEGER UNSIGNED NOT NULL DEFAULT 0,
- priority INTEGER UNSIGNED NOT NULL DEFAULT 0,`
+ priority INTEGER UNSIGNED NOT NULL DEFAULT 0
)
=head2 PostgreSQL
@@ -340,7 +353,6 @@ get most recent error infomation.
grabbed_until INTEGER NOT NULL,
run_after INTEGER NOT NULL DEFAULT 0,
retry_cnt INTEGER NOT NULL DEFAULT 0,
- retried_cnt INTEGER NOT NULL DEFAULT 0,
priority INTEGER NOT NULL DEFAULT 0
)
View
75 lib/Jonk/Job.pm
@@ -1,27 +1,92 @@
package Jonk::Job;
use strict;
use warnings;
+use Carp ();
sub new {
my ($class, $jonk, $job) =@_;
bless {
- jonk => $jonk,
- job => $job,
+ job => $job,
+ _jonk => $jonk,
_completed => 0,
+ _failed => 0,
+ _aborted => 0,
}, $class;
}
-sub completed { $_[0]->{_completed}=1; $_[0]->{jonk}->_completed($_[0]->id) }
-sub failed { $_[0]->{jonk}->_failed($_[0]->id) }
+sub completed {
+ my $self = shift;
+
+ if ($self->is_aborted || $self->is_failed) {
+ Carp::croak 'job is already (abort|fail)ed.';
+ }
+
+ $self->{_completed} = 1;
+ $self->{_jonk}->_delete($self->id);
+}
+
+sub failed {
+ my ($self, $opt) = @_;
+
+ if ($self->is_complated || $self->is_aborted) {
+ Carp::croak 'job is already (complate|abort)ed.';
+ }
+
+ $self->{_failed} = 1;
+ $self->{_jonk}->_failed(
+ $self->id => $opt
+ );
+}
+
+sub abort {
+ my $self = shift;
+
+ if ($self->is_completed || $self->is_failed) {
+ Carp::croak 'job is already (complate|fail)ed.';
+ }
+
+ $self->{_aborted} = 1;
+ $self->{_jonk}->_delete($self->id);
+}
sub id { $_[0]->{job}->{id} }
sub func { $_[0]->{job}->{func} }
sub arg { $_[0]->{job}->{arg} }
sub enqueue_time { $_[0]->{job}->{enqueue_time} }
sub grabbed_until { $_[0]->{job}->{grabbed_until} }
+sub retry_cnt { $_[0]->{job}->{retry_cnt} }
+sub run_after { $_[0]->{job}->{run_after} }
+sub priority { $_[0]->{job}->{priority} }
+
+sub is_completed { $_[0]->{_completed} }
+sub is_failed { $_[0]->{_failed} }
+sub is_aborted { $_[0]->{_aborted} }
sub DESTROY {
my $self = shift;
- $self->failed unless $self->{_completed};
+ unless ($self->is_completed or $self->is_aborted or $self->is_failed) {
+ Carp::croak "job is not (complete|fail|abor)ed. this job auto failed.";
+ $self->failed;
+ }
}
+
1;
+
+__END__
+
+=head1 METHODS
+
+=head2 $job->failed([$options]);
+
+=over 4
+
+=item * $options->{retry_delay}
+
+job retry delay sec.
+
+Default 60.
+
+=back
+
+=cut
+
View
5 t/Utils.pm
@@ -10,6 +10,11 @@ BEGIN {
plan skip_all => 'needs DBD::SQLite for testing' if $@;
}
+sub import {
+ strict->import;
+ warnings->import;
+}
+
sub _get_schema {
my $dbh = shift;
my $table = shift || "job";
View
9 t/client.t
@@ -1,5 +1,3 @@
-use strict;
-use warnings;
use t::Utils;
use Test::More;
use DBI;
@@ -19,6 +17,11 @@ subtest 'insert' => sub {
is $row->{arg}, 'arg';
is $row->{func}, 'MyWorker';
+ is $row->{grabbed_until}, 0;
+ is $row->{run_after}, 0;
+ is $row->{retry_cnt}, 0;
+ is $row->{priority}, 0;
+
ok not $jonk->errstr;
};
@@ -39,6 +42,8 @@ subtest 'insert / and insert_time_callback' => sub {
is $row->{arg}, 'arg';
is $row->{func}, 'MyWorker';
is $row->{enqueue_time}, $time;
+
+ ok not $jonk->errstr;
};
subtest 'error handling' => sub {
View
8 t/multi_process.t
@@ -16,8 +16,8 @@ my $dbh = t::Utils->setup;
if ( fork ) {
my $dbh = t::Utils->setup;
- my $jonk = Jonk->new($dbh, {functions => [qw/MyWorker/]});
- my $job = $jonk->grab_job();
+ my $jonk = Jonk->new($dbh, {});
+ my $job = $jonk->find_job(+{functions => [qw/MyWorker/]});
is $job->arg, 'arg1';
wait;
@@ -28,8 +28,8 @@ my $dbh = t::Utils->setup;
sleep 1;
- my $jonk = Jonk->new($dbh, {functions => [qw/MyWorker/]});
- my $job = $jonk->grab_job();
+ my $jonk = Jonk->new($dbh, {});
+ my $job = $jonk->find_job(+{functions => [qw/MyWorker/]});
is $job->arg, 'arg2';
}
View
16 t/worker.t
@@ -1,25 +1,31 @@
-use strict;
-use warnings;
use t::Utils;
use Test::More;
use Jonk;
my $dbh = t::Utils->setup;
subtest 'grab_job' => sub {
- my $client = Jonk->new($dbh, {functions => [qw/MyWorker/]});
+ my $client = Jonk->new($dbh, {});
my $job_id = $client->insert('MyWorker', 'arg');
ok $job_id;
- my $job = $client->grab_job;
+ my $job = $client->find_job(+{functions => [qw/MyWorker/]});
is $job->arg, 'arg';
is $job->func, 'MyWorker';
+ is $job->retry_cnt, 0;
+ is $job->run_after, 0;
+ is $job->priority, 0;
+
ok not $client->errstr;
+
$job->completed;
- ok not $client->grab_job;
+
+ ok not $client->find_job(+{functions => [qw/MyWorker/]});
};
+done_testing;
+__END__
subtest 'grab_job / no job' => sub {
my $client = Jonk->new($dbh, {functions => [qw/MyWorker/]});
my $job = $client->grab_job;

0 comments on commit e95de0b

Please sign in to comment.
Something went wrong with that request. Please try again.