Skip to content

Commit

Permalink
add (de)serialize settings.
Browse files Browse the repository at this point in the history
  • Loading branch information
nekokak committed Feb 8, 2011
1 parent ac558ee commit 7899641
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 9 deletions.
48 changes: 40 additions & 8 deletions lib/Jonk.pm
Expand Up @@ -23,6 +23,7 @@ sub new {
table_name => $table_name,
functions => $functions,
driver => $driver,
has_func => scalar(keys %{$functions}) ? 1 : 0,

_errstr => undef,

Expand Down Expand Up @@ -68,7 +69,9 @@ sub _parse_functions {
if (not defined $functions->[$i+1]) {$i++ }
elsif (ref $functions->[$i+1]) {$value = $functions->[++$i]}

$value ||= +{grab_for => $default_grab_for};
$value->{grab_for} ||= $default_grab_for;
$value->{serializer} ||= ($opts->{default_serializer} || sub {$_[0]});
$value->{deserializer} ||= ($opts->{default_deserializer} || sub {$_[0]});

$funcs->{$func} = $value;
}
Expand Down Expand Up @@ -102,9 +105,10 @@ sub insert {
local $self->{dbh}->{RaiseError} = 1;
local $self->{dbh}->{PrintError} = 0;

my $serializer = $self->{functions}->{$func}->{serializer} || sub {$_[0]};
my $sth = $self->{dbh}->prepare_cached($self->{insert_query});
$sth->bind_param(1, $func);
$sth->bind_param(2, $arg, _bind_param_attr($self->{driver}));
$sth->bind_param(2, $serializer->($arg), _bind_param_attr($self->{driver}));
$sth->bind_param(3, $self->{insert_time_callback}->());
$sth->bind_param(4, $opt->{run_after}||0);
$sth->bind_param(5, $opt->{priority} ||0);
Expand Down Expand Up @@ -192,7 +196,7 @@ sub lookup_job {
sub find_job {
my ($self, $opts) = @_;

unless (keys %{$self->{functions}}) {
unless ($self->{has_func}) {
Carp::croak('missin find_job functions.');
}

Expand Down Expand Up @@ -223,7 +227,7 @@ sub _delete {
sub _failed {
my ($self, $job_id, $opt) = @_;

my $retry_delay = $self->_server_unixitime + ($opt->{retry_delay} || 60);
my $retry_delay = $self->_server_unixitime + (defined($opt->{retry_delay}) ? $opt->{retry_delay} : 60);
try {
my $sth = $self->{dbh}->prepare_cached($self->{failed_query});
$sth->execute($retry_delay, $job_id);
Expand Down Expand Up @@ -288,6 +292,26 @@ $dbh is database handle.
Key word of job which this Jonk instance looks for.
=over 4
=item * $options->{functions} = [qw/worker_key worker_key2/]
can set *worker_key* at arrayref.
=item * $options->{functions} = ['worker_key' => {grab_for => 5}],
can set worker_key's grab_for setting by hash-ref.
=item * $options->{functions} = ['worker_key' => {serializer => \&serialize_code, deserializer => \&deserialize_code}],
can set worker_key's (de)serializer code setting by hash-ref.
=item * $options->{functions} = ['worker_key' => {serializer => \&serialize_code, deserializer => \&deserialize_code}, 'worker_key2'],
can mix worker settings.
=back
=item * $options->{table_name}
specific job table name.
Expand All @@ -300,6 +324,18 @@ specific lookup job record size.
Default 50.
=item * $options->{default_serializer}
global serializer setting.
=item * $options->{default_deserializer}
global deserializer setting.
=item * $options->{default_grab_for}
global grab_for setting.
=back
=head2 my $job_id = $jonk->insert($func, $arg);
Expand All @@ -317,10 +353,6 @@ specific your worker funcname.
job argument data.
serialize is not done in Jonk.
Please pass data that does serialize if it is necessary.
=back
=head2 my $job = $jonk->lookup_job($job_id);
Expand Down
6 changes: 5 additions & 1 deletion lib/Jonk/Job.pm
Expand Up @@ -48,7 +48,11 @@ sub aborted {

sub id { $_[0]->{job}->{id} }
sub func { $_[0]->{job}->{func} }
sub arg { $_[0]->{job}->{arg} }
sub raw_arg { $_[0]->{job}->{arg} }
sub arg {
my $self = shift;
$self->{_jonk}->{functions}->{$self->func}->{deserializer}->($self->{job}->{arg});
}
sub enqueue_time { $_[0]->{job}->{enqueue_time} }
sub grabbed_until { $_[0]->{job}->{grabbed_until} }
sub retry_cnt { $_[0]->{job}->{retry_cnt} }
Expand Down
103 changes: 103 additions & 0 deletions t/serialize.t
@@ -0,0 +1,103 @@
use t::Utils;
use Test::More;
use DBI;
use Jonk;
use Storable ();

my $dbh = t::Utils->setup;

subtest '(de)serializer set each functions' => sub {
my $serialized_arg;
my $jonk = Jonk->new($dbh, {
functions => [
'MyWorker' => {
serializer => sub {$serialized_arg = Storable::nfreeze($_[0])},
deserializer => sub {Storable::thaw($_[0])}
}
]
});

subtest 'job completed' => sub {
my $job_id = $jonk->insert('MyWorker', {chars => 'bar'});
ok $job_id;

my $job = $jonk->lookup_job($job_id);
is $job->id, $job_id;
is_deeply $job->arg, {chars => 'bar'};
is $job->raw_arg, $serialized_arg;

$job->completed;

ok not $jonk->errstr;
};

subtest 'job failed and retry' => sub {

my $job_id = $jonk->insert('MyWorker', {chars => 'bar'});
ok $job_id;

my $job = $jonk->lookup_job($job_id);
is $job->id, $job_id;
is_deeply $job->arg, {chars => 'bar'};
is $job->raw_arg, $serialized_arg;

$job->failed({retry_delay => 0});

$job = $jonk->lookup_job($job_id);
is $job->id, $job_id;
is_deeply $job->arg, {chars => 'bar'};
is $job->raw_arg, $serialized_arg;

$job->completed;

ok not $jonk->errstr;
};
};

subtest '(de)serializer set global' => sub {
my $serialized_arg;
my $jonk = Jonk->new($dbh, {
functions => [qw/MyWorker/],
default_serializer => sub {$serialized_arg = Storable::nfreeze($_[0])},
default_deserializer => sub {Storable::thaw($_[0])},
});

subtest 'job completed' => sub {
my $job_id = $jonk->insert('MyWorker', {chars => 'bar'});
ok $job_id;

my $job = $jonk->lookup_job($job_id);
is $job->id, $job_id;
is_deeply $job->arg, {chars => 'bar'};
is $job->raw_arg, $serialized_arg;

$job->completed;

ok not $jonk->errstr;
};

subtest 'job failed and retry' => sub {

my $job_id = $jonk->insert('MyWorker', {chars => 'bar'});
ok $job_id;

my $job = $jonk->lookup_job($job_id);
is $job->id, $job_id;
is_deeply $job->arg, {chars => 'bar'};
is $job->raw_arg, $serialized_arg;

$job->failed({retry_delay => 0});

$job = $jonk->lookup_job($job_id);
is $job->id, $job_id;
is_deeply $job->arg, {chars => 'bar'};
is $job->raw_arg, $serialized_arg;

$job->completed;

ok not $jonk->errstr;
};
};

done_testing;

0 comments on commit 7899641

Please sign in to comment.