Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

rename methods.

  • Loading branch information...
commit 4b72637883c2a7f8570869657e18b40aad7ea35d 1 parent 5a6ead2
@nekokak authored
View
3  Makefile.PL
@@ -8,7 +8,8 @@ requires 'Try::Tiny';
tests 't/*.t';
author_tests 'xt';
-test_requires 'Test::More' => '0.94';
+test_requires 'Test::More' => '0.96';
+test_requires 'Test::SharedFork' => 0.15;
test_requires 'Test::mysqld' if $Module::Install::AUTHOR;
test_requires 'Test::postgresql' if $Module::Install::AUTHOR;
View
27 lib/Jonk.pm
@@ -19,8 +19,8 @@ sub new {
bless {
dbh => $dbh,
- enqueue_query => sprintf('INSERT INTO %s (func, arg, enqueue_time, grabbed_until) VALUES (?,?,?,0)', $table_name),
- enqueue_time_callback => ($opts->{enqueue_time_callback}||sub{
+ insert_query => sprintf('INSERT INTO %s (func, arg, enqueue_time, grabbed_until) VALUES (?,?,?,0)', $table_name),
+ insert_time_callback => ($opts->{insert_time_callback}||sub{
my ( $sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst ) = localtime(time);
return sprintf('%04d-%02d-%02d %02d:%02d:%02d', $year + 1900, $mon + 1, $mday, $hour, $min, $sec);
}),
@@ -33,16 +33,17 @@ sub new {
),
grab_job_query => sprintf('UPDATE %s SET grabbed_until = ? WHERE id = ? AND grabbed_until = ?', $table_name),
- dequeue_query => sprintf('DELETE FROM %s WHERE id = ?', $table_name),
+ purge_job_query => sprintf('DELETE FROM %s WHERE id = ?', $table_name),
table_name => $table_name,
+
_errstr => undef,
}, $class;
}
sub errstr {$_[0]->{_errstr}}
-sub enqueue {
+sub insert {
my ($self, $func, $arg) = @_;
my $job_id;
@@ -51,16 +52,16 @@ sub enqueue {
local $self->{dbh}->{RaiseError} = 1;
local $self->{dbh}->{PrintError} = 0;
- my $sth = $self->{dbh}->prepare_cached($self->{enqueue_query});
+ my $sth = $self->{dbh}->prepare_cached($self->{insert_query});
$sth->bind_param(1, $func);
$sth->bind_param(2, $arg, _bind_param_attr($self->{dbh}));
- $sth->bind_param(3, $self->{enqueue_time_callback}->());
+ $sth->bind_param(3, $self->{insert_time_callback}->());
$sth->execute();
$job_id = $self->{dbh}->last_insert_id("","",$self->{table_name},"");
$sth->finish;
} catch {
- $self->{_errstr} = "can't enqueue for job queue database: $_"
+ $self->{_errstr} = "can't insert for job queue database: $_"
};
$job_id;
@@ -78,7 +79,7 @@ sub _bind_param_attr {
return;
}
-sub lookup {
+sub grab_job {
my ($self, $job_id) = @_;
my $job;
@@ -111,17 +112,17 @@ sub lookup {
$sth->finish;
} catch {
- $self->{_errstr} = "can't get job from job queue database: $_";
+ $self->{_errstr} = "can't grab job from job queue database: $_";
};
$job;
}
-sub _dequeue {
+sub _completed {
my ($self, $job_id) = @_;
try {
- my $sth = $self->{dbh}->prepare_cached($self->{dequeue_query});
+ my $sth = $self->{dbh}->prepare_cached($self->{purge_job_query});
$sth->execute($job_id);
$sth->finish;
return $sth->rows;
@@ -145,9 +146,9 @@ Jonk - simple job tank manager.
use Jonk;
my $dbh = DBI->connect(...);
my $jonk = Jonk->new($dbh, {functions => ['MyWorker']});
- # enqueue job
+ # insert job
{
- $jonk->enqueue('MyWorker', 'arg');
+ $jonk->insert('MyWorker', 'arg');
}
# dequeue job
View
2  lib/Jonk/Job.pm
@@ -10,7 +10,7 @@ sub new {
}, $class;
}
-sub dequeue { $_[0]->{jonk}->_dequeue($_[0]->id) }
+sub completed { $_[0]->{jonk}->_completed($_[0]->id) }
sub id { $_[0]->{job}->{id} }
sub func { $_[0]->{job}->{func} }
View
26 t/client.t
@@ -7,18 +7,10 @@ use Jonk;
my $dbh = t::Utils->setup;
-subtest 'client / flexible job table name' => sub {
+subtest 'insert' => sub {
my $jonk = Jonk->new($dbh);
- is $jonk->{enqueue_query}, 'INSERT INTO job (func, arg, enqueue_time, grabbed_until) VALUES (?,?,?,0)';
- $jonk = Jonk->new($dbh, +{table_name => 'jonk_job'});
- is $jonk->{enqueue_query}, 'INSERT INTO jonk_job (func, arg, enqueue_time, grabbed_until) VALUES (?,?,?,0)';
-};
-
-subtest 'enqueue' => sub {
- my $jonk = Jonk->new($dbh);
-
- my $job_id = $jonk->enqueue('MyWorker', 'arg');
+ my $job_id = $jonk->insert('MyWorker', 'arg');
ok $job_id;
my $sth = $dbh->prepare('SELECT * FROM job WHERE id = ?');
@@ -30,14 +22,14 @@ subtest 'enqueue' => sub {
ok not $jonk->errstr;
};
-subtest 'enqueue / and enqueue_time_callback' => sub {
+subtest 'insert / and insert_time_callback' => sub {
my $time;
- my $jonk = Jonk->new($dbh,+{enqueue_time_callback => sub {
+ my $jonk = Jonk->new($dbh,+{insert_time_callback => sub {
my ( $sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst ) = localtime(time);
$time = sprintf('%04d-%02d-%02d %02d:%02d:%02d', $year + 1900, $mon + 1, $mday, $hour, $min, $sec);
}});
- my $job_id = $jonk->enqueue('MyWorker', 'arg');
+ my $job_id = $jonk->insert('MyWorker', 'arg');
ok $job_id;
my $sth = $dbh->prepare('SELECT * FROM job WHERE id = ?');
@@ -52,18 +44,18 @@ subtest 'enqueue / and enqueue_time_callback' => sub {
subtest 'error handling' => sub {
my $jonk = Jonk->new($dbh, +{table_name => 'jonk_job'});
- my $job_id = $jonk->enqueue('MyWorker', 'arg');
+ my $job_id = $jonk->insert('MyWorker', 'arg');
ok not $job_id;
- like $jonk->errstr, qr/can't enqueue for job queue database:/;
+ like $jonk->errstr, qr/can't insert for job queue database:/;
};
t::Utils->cleanup($dbh);
-subtest 'enqueue / flexible job table name' => sub {
+subtest 'insert / flexible job table name' => sub {
my $dbh = t::Utils->setup("my_job");
my $jonk = Jonk->new($dbh, +{table_name => "my_job"});
- my $job_id = $jonk->enqueue('MyWorker', 'arg');
+ my $job_id = $jonk->insert('MyWorker', 'arg');
ok $job_id;
my $sth = $dbh->prepare('SELECT * FROM my_job WHERE id = ?');
View
19 t/multi_process.t
@@ -2,39 +2,36 @@ use strict;
use warnings;
use t::Utils;
use Test::More;
+use Test::SharedFork;
use Jonk;
my $dbh = t::Utils->setup;
-subtest 'dequeue' => sub {
-
- {
- my $client = Jonk->new($dbh);
-
- $client->enqueue('MyWorker', 'arg1');
- $client->enqueue('MyWorker', 'arg2');
+ { # insert test job
+ my $jonk = Jonk->new($dbh);
+ ok $jonk->insert('MyWorker', 'arg1');
+ ok $jonk->insert('MyWorker', 'arg2');
}
if ( fork ) {
my $dbh = t::Utils->setup;
my $jonk = Jonk->new($dbh, {functions => [qw/MyWorker/]});
- my $job = $jonk->lookup();
+ my $job = $jonk->grab_job();
is $job->arg, 'arg1';
wait;
}
else {
+ # child
my $dbh = t::Utils->setup;
sleep 1;
my $jonk = Jonk->new($dbh, {functions => [qw/MyWorker/]});
- my $job = $jonk->lookup();
+ my $job = $jonk->grab_job();
is $job->arg, 'arg2';
}
-};
done_testing;
-
View
36 t/worker.t
@@ -6,55 +6,55 @@ use Jonk;
my $dbh = t::Utils->setup;
-subtest 'lookup' => sub {
+subtest 'grab_job' => sub {
my $client = Jonk->new($dbh, {functions => [qw/MyWorker/]});
- my $job_id = $client->enqueue('MyWorker', 'arg');
+ my $job_id = $client->insert('MyWorker', 'arg');
ok $job_id;
- my $job = $client->lookup;
+ my $job = $client->grab_job;
is $job->arg, 'arg';
is $job->func, 'MyWorker';
ok not $client->errstr;
- $job->dequeue;
- ok not $client->lookup;
+ $job->completed;
+ ok not $client->grab_job;
};
-subtest 'lookup / no job' => sub {
+subtest 'grab_job / no job' => sub {
my $client = Jonk->new($dbh, {functions => [qw/MyWorker/]});
- my $job = $client->lookup;
+ my $job = $client->grab_job;
ok not $job;
};
-subtest 'lookup / lookup specific job_id' => sub {
+subtest 'grab_job / specific job_id' => sub {
my $client = Jonk->new($dbh);
- my $job_id = $client->enqueue('MyWorker', 'lookup_job');
+ my $job_id = $client->insert('MyWorker', 'grab_job');
ok $job_id;
- my $job = $client->lookup($job_id);
- is $job->arg, 'lookup_job';
+ my $job = $client->grab_job($job_id);
+ is $job->arg, 'grab_job';
is $job->func, 'MyWorker';
- $job->dequeue;
- ok not $client->lookup;
+ $job->completed;
+ ok not $client->grab_job;
};
t::Utils->cleanup($dbh);
-subtest 'lookup / flexible job table name' => sub {
+subtest 'grab_job / flexible job table name' => sub {
my $dbh = t::Utils->setup("my_job");
my $client = Jonk->new($dbh, { table_name => "my_job" });
- my $job_id = $client->enqueue('MyWorker', 'arg');
+ my $job_id = $client->insert('MyWorker', 'arg');
ok $job_id;
my $jonk = Jonk->new($dbh, { table_name => "my_job", functions => [qw/MyWorker/]});
- my $job = $jonk->lookup;
+ my $job = $jonk->grab_job;
is $job->arg, 'arg';
is $job->func, 'MyWorker';
ok not $jonk->errstr;
- $job->dequeue;
- ok not $client->lookup;
+ $job->completed;
+ ok not $client->grab_job;
t::Utils->cleanup($dbh, "my_job");
};
Please sign in to comment.
Something went wrong with that request. Please try again.