Skip to content
Browse files

can set priority.

  • Loading branch information...
1 parent d0eb3fe commit aa40029d6d0dc1218c14a2b43fa54e807f43789e @nekokak committed Jan 19, 2011
Showing with 27 additions and 6 deletions.
  1. +3 −2 MANIFEST
  2. +8 −4 lib/Jonk.pm
  3. +16 −0 t/client.t
View
5 MANIFEST
@@ -11,19 +11,20 @@ inc/Module/Install/Repository.pm
inc/Module/Install/Win32.pm
inc/Module/Install/WriteAll.pm
lib/Jonk.pm
-lib/Jonk/Client.pm
lib/Jonk/Cookbook.pod
lib/Jonk/Cookbook/Basic.pod
lib/Jonk/Cookbook/DequeueSpecificJob.pod
lib/Jonk/Cookbook/ErrorHandling.pod
lib/Jonk/Cookbook/ForkModel.pod
-lib/Jonk/Worker.pm
+lib/Jonk/Job.pm
Makefile.PL
MANIFEST This list of files
META.yml
README
t/00_compile.t
+t/aaa
t/client.t
+t/multi_process.t
t/Utils.pm
t/worker.t
xt/02_pod.t
View
12 lib/Jonk.pm
@@ -14,6 +14,8 @@ sub new {
Carp::croak('missing job queue database handle.');
}
+ # functions check
+
bless {
dbh => $dbh,
table_name => $opts->{table_name} || 'job',
@@ -33,7 +35,7 @@ sub new {
sub errstr {$_[0]->{_errstr}}
sub insert {
- my ($self, $func, $arg) = @_;
+ my ($self, $func, $arg, $opt) = @_;
my $job_id;
try {
@@ -43,13 +45,14 @@ sub insert {
my $sth = $self->{dbh}->prepare_cached(
sprintf(
- 'INSERT INTO %s (func, arg, enqueue_time, grabbed_until, run_after, retry_cnt, priority) VALUES (?,?,?,0,0,0,0)'
+ 'INSERT INTO %s (func, arg, enqueue_time, grabbed_until, run_after, retry_cnt, priority) VALUES (?,?,?,0,0,0,?)'
,$self->{table_name}
)
);
$sth->bind_param(1, $func);
$sth->bind_param(2, $arg, _bind_param_attr($self->{dbh}));
$sth->bind_param(3, $self->{insert_time_callback}->());
+ $sth->bind_param(4, $opt->{priority}||0);
$sth->execute();
$job_id = $self->{dbh}->last_insert_id("","",$self->{table_name},"");
@@ -98,7 +101,7 @@ sub _grab_job {
local $self->{dbh}->{PrintError} = 0;
my $time = _server_unixitme($self->{dbh});
- my $sth = $callback->(time);
+ my $sth = $callback->($time);
while (my $row = $sth->fetchrow_hashref) {
$job = $self->_grab_a_job($row, $time);
@@ -121,8 +124,9 @@ sub _grab_a_job {
sprintf('UPDATE %s SET grabbed_until = ? WHERE id = ? AND grabbed_until = ?', $self->{table_name}),
);
$sth->execute(($time + 60), $row->{id}, $row->{grabbed_until});
+ my $grabbed = $sth->rows;
$sth->finish;
- $sth->rows ? Jonk::Job->new($self => $row) : undef;
+ $grabbed ? Jonk::Job->new($self => $row) : undef;
}
sub lookup_job {
View
16 t/client.t
@@ -54,6 +54,22 @@ subtest 'error handling' => sub {
like $jonk->errstr, qr/can't insert for job queue database:/;
};
+subtest 'insert / set priority' => sub {
+ my $jonk = Jonk->new($dbh);
+
+ my $job_id = $jonk->insert('MyWorker', 'arg', { priority => 10 });
+ ok $job_id;
+
+ my $sth = $dbh->prepare('SELECT * FROM job WHERE id = ?');
+ $sth->execute($job_id);
+ my $row = $sth->fetchrow_hashref;
+
+ is $row->{arg}, 'arg';
+ is $row->{func}, 'MyWorker';
+ is $row->{priority}, 10;
+ ok not $jonk->errstr;
+};
+
t::Utils->cleanup($dbh);
subtest 'insert / flexible job table name' => sub {

0 comments on commit aa40029

Please sign in to comment.
Something went wrong with that request. Please try again.