Permalink
Browse files

refactoring.

  • Loading branch information...
1 parent 67511d7 commit ac558ee6ea5ba0d11ac638462338bbe74d298f4f @nekokak committed Feb 8, 2011
Showing with 77 additions and 62 deletions.
  1. +77 −62 lib/Jonk.pm
View
@@ -5,7 +5,7 @@ use Jonk::Job;
use Try::Tiny;
use Carp ();
-our $VERSION = '0.10_01';
+our $VERSION = '0.10_02';
sub new {
my ($class, $dbh, $opts) = @_;
@@ -14,37 +14,81 @@ sub new {
Carp::croak('missing job queue database handle.');
}
- my $default_grab_for = $opts->{default_grab_for}|| 60*60;
+ my $functions = _parse_functions($opts);
+ my $table_name = $opts->{table_name} || 'job';
+ my $driver = _verify_driver($dbh);
+
+ bless {
+ dbh => $dbh,
+ table_name => $table_name,
+ functions => $functions,
+ driver => $driver,
+
+ _errstr => undef,
+
+ insert_time_callback => ($opts->{insert_time_callback}||sub{
+ my ( $sec, $min, $hour, $mday, $mon, $year, undef, undef, undef ) = localtime(time);
+ return sprintf('%04d-%02d-%02d %02d:%02d:%02d', $year + 1900, $mon + 1, $mday, $hour, $min, $sec);
+ }),
+
+ insert_query => sprintf(
+ 'INSERT INTO %s (func, arg, enqueue_time, grabbed_until, run_after, retry_cnt, priority) VALUES (?,?,?,0,?,0,?)'
+ ,$table_name
+ ),
+
+ grab_query => sprintf('UPDATE %s SET grabbed_until = ? WHERE id = ? AND grabbed_until = ?', $table_name),
+
+ lookup_job_query => sprintf('SELECT * FROM %s WHERE id = ? AND grabbed_until <= ? AND run_after <= ?', $table_name),
+
+ find_job_query => sprintf(
+ 'SELECT * FROM %s WHERE func IN (%s) AND grabbed_until <= ? AND run_after <= ? ORDER BY priority DESC LIMIT %s',
+ $table_name, (join(', ', map { "'$_'" } keys %{$functions})), ($opts->{job_find_size} || 50)
+ ),
+
+ delete_query => sprintf('DELETE FROM %s WHERE id = ?', $table_name),
+
+ failed_query => sprintf('UPDATE %s SET retry_cnt = retry_cnt + 1, run_after = ?, grabbed_until = 0 WHERE id = ?', $table_name),
+
+ unixtime_query => _settled_unixtime_query($driver),
+
+ }, $class;
+}
+
+sub _parse_functions {
+ my $opts = shift;
+
+ my $functions = $opts->{functions} || [];
+ my $default_grab_for = $opts->{default_grab_for} || (60*60);
+
my $funcs = +{};
- my $functions = $opts->{functions}||[];
- for (my $i = 0; $i < @$functions; $i++) {
+ for (my $i = 0; $i < @{$functions}; $i++) {
my $func = $functions->[$i];
my $value;
-
if (not defined $functions->[$i+1]) {$i++ }
elsif (ref $functions->[$i+1]) {$value = $functions->[++$i]}
$value ||= +{grab_for => $default_grab_for};
$funcs->{$func} = $value;
}
+ $funcs;
+}
- bless {
- dbh => $dbh,
- table_name => $opts->{table_name} || 'job',
- functions => $funcs,
- find_funcs => join(', ', map { "'$_'" } keys %{$funcs}),
- job_find_size => $opts->{job_find_size} || 50,
-
- _errstr => undef,
+sub _verify_driver {
+ my $dbh = shift;
+ my $driver = $dbh->{Driver}{Name};
+ $driver =~ /(mysql|SQLite|Pg)/ ? $driver : Carp::croak('Jonk support only mysql,SQLite,Pg');
+}
- insert_time_callback => ($opts->{insert_time_callback}||sub{
- my ( $sec, $min, $hour, $mday, $mon, $year, undef, undef, undef ) = localtime(time);
- return sprintf('%04d-%02d-%02d %02d:%02d:%02d', $year + 1900, $mon + 1, $mday, $hour, $min, $sec);
- }),
+sub _settled_unixtime_query {
+ my $driver = shift;
- }, $class;
+ if ($driver eq 'Pg') {
+ return "SELECT TRUNC(EXTRACT('epoch' from NOW()))";
+ } elsif ($driver eq 'mysql') {
+ return 'SELECT UNIX_TIMESTAMP()';
+ }
}
sub errstr {$_[0]->{_errstr}}
@@ -58,17 +102,12 @@ sub insert {
local $self->{dbh}->{RaiseError} = 1;
local $self->{dbh}->{PrintError} = 0;
- my $sth = $self->{dbh}->prepare_cached(
- sprintf(
- 'INSERT INTO %s (func, arg, enqueue_time, grabbed_until, run_after, retry_cnt, priority) VALUES (?,?,?,0,?,0,?)'
- ,$self->{table_name}
- )
- );
+ my $sth = $self->{dbh}->prepare_cached($self->{insert_query});
$sth->bind_param(1, $func);
- $sth->bind_param(2, $arg, _bind_param_attr($self->{dbh}));
+ $sth->bind_param(2, $arg, _bind_param_attr($self->{driver}));
$sth->bind_param(3, $self->{insert_time_callback}->());
$sth->bind_param(4, $opt->{run_after}||0);
- $sth->bind_param(5, $opt->{priority}||0);
+ $sth->bind_param(5, $opt->{priority} ||0);
$sth->execute();
$job_id = $self->{dbh}->last_insert_id("","",$self->{table_name},"");
@@ -81,9 +120,8 @@ sub insert {
}
sub _bind_param_attr {
- my $dbh = shift;
+ my $driver = shift;
- my $driver = $dbh->{Driver}{Name};
if ( $driver eq 'Pg' ) {
return { pg_type => DBD::Pg::PG_BYTEA() };
} elsif ( $driver eq 'SQLite' ) {
@@ -93,18 +131,9 @@ sub _bind_param_attr {
}
sub _server_unixitime {
- my $dbh = shift;
-
- my $driver = $dbh->{Driver}{Name};
- return time if $driver eq 'SQLite';
-
- my $q;
- if ( $driver eq 'Pg' ) {
- $q = "TRUNC(EXTRACT('epoch' from NOW()))";
- } else {
- $q = 'UNIX_TIMESTAMP()';
- }
- return $dbh->selectrow_array("SELECT $q");
+ my $self = shift;
+ return time() if $self->{driver} eq 'SQLite';
+ $self->{dbh}->selectrow_array($self->{unixtime_query});
}
sub _grab_job {
@@ -116,7 +145,7 @@ sub _grab_job {
local $self->{dbh}->{RaiseError} = 1;
local $self->{dbh}->{PrintError} = 0;
- my $time = _server_unixitime($self->{dbh});
+ my $time = $self->_server_unixitime;
my $sth = $callback->($time);
while (my $row = $sth->fetchrow_hashref) {
@@ -136,9 +165,7 @@ sub _grab_job {
sub _grab_a_job {
my ($self, $row, $time) = @_;
- my $sth = $self->{dbh}->prepare_cached(
- sprintf('UPDATE %s SET grabbed_until = ? WHERE id = ? AND grabbed_until = ?', $self->{table_name}),
- );
+ my $sth = $self->{dbh}->prepare_cached($self->{grab_query});
$sth->execute(
($time + ($self->{functions}->{$row->{func}}->{grab_for})),
$row->{id},
@@ -155,9 +182,7 @@ sub lookup_job {
$self->_grab_job(
sub {
my $time = shift;
- my $sth = $self->{dbh}->prepare_cached(
- sprintf('SELECT * FROM %s WHERE id = ? AND grabbed_until <= ? AND run_after <= ?', $self->{table_name})
- );
+ my $sth = $self->{dbh}->prepare_cached($self->{lookup_job_query});
$sth->execute($job_id, $time, $time);
$sth;
}
@@ -167,20 +192,14 @@ sub lookup_job {
sub find_job {
my ($self, $opts) = @_;
- unless ($self->{find_funcs}) {
+ unless (keys %{$self->{functions}}) {
Carp::croak('missin find_job functions.');
}
$self->_grab_job(
sub {
my $time = shift;
- my $sth = $self->{dbh}->prepare_cached(
- sprintf('SELECT * FROM %s WHERE func IN (%s) AND grabbed_until <= ? AND run_after <= ? ORDER BY priority DESC LIMIT %s',
- $self->{table_name},
- $self->{find_funcs},
- ($opts->{job_find_size}||50),
- ),
- );
+ my $sth = $self->{dbh}->prepare_cached($self->{find_job_query});
$sth->execute($time, $time);
$sth;
}
@@ -191,9 +210,7 @@ sub _delete {
my ($self, $job_id) = @_;
try {
- my $sth = $self->{dbh}->prepare_cached(
- sprintf('DELETE FROM %s WHERE id = ?', $self->{table_name})
- );
+ my $sth = $self->{dbh}->prepare_cached($self->{delete_query});
$sth->execute($job_id);
$sth->finish;
return $sth->rows;
@@ -206,11 +223,9 @@ sub _delete {
sub _failed {
my ($self, $job_id, $opt) = @_;
- my $retry_delay = _server_unixitime($self->{dbh}) + ($opt->{retry_delay} || 60);
+ my $retry_delay = $self->_server_unixitime + ($opt->{retry_delay} || 60);
try {
- my $sth = $self->{dbh}->prepare_cached(
- sprintf('UPDATE %s SET retry_cnt = retry_cnt + 1, run_after = ?, grabbed_until = 0 WHERE id = ?', $self->{table_name})
- );
+ my $sth = $self->{dbh}->prepare_cached($self->{failed_query});
$sth->execute($retry_delay, $job_id);
$sth->finish;
return $sth->rows;

0 comments on commit ac558ee

Please sign in to comment.