diff --git a/lib/Jonk.pm b/lib/Jonk.pm index d4bf76b..d225ee3 100644 --- a/lib/Jonk.pm +++ b/lib/Jonk.pm @@ -23,6 +23,7 @@ sub new { table_name => $table_name, functions => $functions, driver => $driver, + has_func => scalar(keys %{$functions}) ? 1 : 0, _errstr => undef, @@ -68,7 +69,9 @@ sub _parse_functions { if (not defined $functions->[$i+1]) {$i++ } elsif (ref $functions->[$i+1]) {$value = $functions->[++$i]} - $value ||= +{grab_for => $default_grab_for}; + $value->{grab_for} ||= $default_grab_for; + $value->{serializer} ||= ($opts->{default_serializer} || sub {$_[0]}); + $value->{deserializer} ||= ($opts->{default_deserializer} || sub {$_[0]}); $funcs->{$func} = $value; } @@ -102,9 +105,10 @@ sub insert { local $self->{dbh}->{RaiseError} = 1; local $self->{dbh}->{PrintError} = 0; + my $serializer = $self->{functions}->{$func}->{serializer} || sub {$_[0]}; my $sth = $self->{dbh}->prepare_cached($self->{insert_query}); $sth->bind_param(1, $func); - $sth->bind_param(2, $arg, _bind_param_attr($self->{driver})); + $sth->bind_param(2, $serializer->($arg), _bind_param_attr($self->{driver})); $sth->bind_param(3, $self->{insert_time_callback}->()); $sth->bind_param(4, $opt->{run_after}||0); $sth->bind_param(5, $opt->{priority} ||0); @@ -192,7 +196,7 @@ sub lookup_job { sub find_job { my ($self, $opts) = @_; - unless (keys %{$self->{functions}}) { + unless ($self->{has_func}) { Carp::croak('missin find_job functions.'); } @@ -223,7 +227,7 @@ sub _delete { sub _failed { my ($self, $job_id, $opt) = @_; - my $retry_delay = $self->_server_unixitime + ($opt->{retry_delay} || 60); + my $retry_delay = $self->_server_unixitime + (defined($opt->{retry_delay}) ? $opt->{retry_delay} : 60); try { my $sth = $self->{dbh}->prepare_cached($self->{failed_query}); $sth->execute($retry_delay, $job_id); @@ -288,6 +292,26 @@ $dbh is database handle. Key word of job which this Jonk instance looks for. +=over 4 + +=item * $options->{functions} = [qw/worker_key worker_key2/] + +can set *worker_key* at arrayref. + +=item * $options->{functions} = ['worker_key' => {grab_for => 5}], + +can set worker_key's grab_for setting by hash-ref. + +=item * $options->{functions} = ['worker_key' => {serializer => \&serialize_code, deserializer => \&deserialize_code}], + +can set worker_key's (de)serializer code setting by hash-ref. + +=item * $options->{functions} = ['worker_key' => {serializer => \&serialize_code, deserializer => \&deserialize_code}, 'worker_key2'], + +can mix worker settings. + +=back + =item * $options->{table_name} specific job table name. @@ -300,6 +324,18 @@ specific lookup job record size. Default 50. +=item * $options->{default_serializer} + +global serializer setting. + +=item * $options->{default_deserializer} + +global deserializer setting. + +=item * $options->{default_grab_for} + +global grab_for setting. + =back =head2 my $job_id = $jonk->insert($func, $arg); @@ -317,10 +353,6 @@ specific your worker funcname. job argument data. -serialize is not done in Jonk. - -Please pass data that does serialize if it is necessary. - =back =head2 my $job = $jonk->lookup_job($job_id); diff --git a/lib/Jonk/Job.pm b/lib/Jonk/Job.pm index 7358617..1f3cbde 100644 --- a/lib/Jonk/Job.pm +++ b/lib/Jonk/Job.pm @@ -48,7 +48,11 @@ sub aborted { sub id { $_[0]->{job}->{id} } sub func { $_[0]->{job}->{func} } -sub arg { $_[0]->{job}->{arg} } +sub raw_arg { $_[0]->{job}->{arg} } +sub arg { + my $self = shift; + $self->{_jonk}->{functions}->{$self->func}->{deserializer}->($self->{job}->{arg}); +} sub enqueue_time { $_[0]->{job}->{enqueue_time} } sub grabbed_until { $_[0]->{job}->{grabbed_until} } sub retry_cnt { $_[0]->{job}->{retry_cnt} } diff --git a/t/serialize.t b/t/serialize.t new file mode 100644 index 0000000..e536281 --- /dev/null +++ b/t/serialize.t @@ -0,0 +1,103 @@ +use t::Utils; +use Test::More; +use DBI; +use Jonk; +use Storable (); + +my $dbh = t::Utils->setup; + +subtest '(de)serializer set each functions' => sub { + my $serialized_arg; + my $jonk = Jonk->new($dbh, { + functions => [ + 'MyWorker' => { + serializer => sub {$serialized_arg = Storable::nfreeze($_[0])}, + deserializer => sub {Storable::thaw($_[0])} + } + ] + }); + + subtest 'job completed' => sub { + my $job_id = $jonk->insert('MyWorker', {chars => 'bar'}); + ok $job_id; + + my $job = $jonk->lookup_job($job_id); + is $job->id, $job_id; + is_deeply $job->arg, {chars => 'bar'}; + is $job->raw_arg, $serialized_arg; + + $job->completed; + + ok not $jonk->errstr; + }; + + subtest 'job failed and retry' => sub { + + my $job_id = $jonk->insert('MyWorker', {chars => 'bar'}); + ok $job_id; + + my $job = $jonk->lookup_job($job_id); + is $job->id, $job_id; + is_deeply $job->arg, {chars => 'bar'}; + is $job->raw_arg, $serialized_arg; + + $job->failed({retry_delay => 0}); + + $job = $jonk->lookup_job($job_id); + is $job->id, $job_id; + is_deeply $job->arg, {chars => 'bar'}; + is $job->raw_arg, $serialized_arg; + + $job->completed; + + ok not $jonk->errstr; + }; +}; + +subtest '(de)serializer set global' => sub { + my $serialized_arg; + my $jonk = Jonk->new($dbh, { + functions => [qw/MyWorker/], + default_serializer => sub {$serialized_arg = Storable::nfreeze($_[0])}, + default_deserializer => sub {Storable::thaw($_[0])}, + }); + + subtest 'job completed' => sub { + my $job_id = $jonk->insert('MyWorker', {chars => 'bar'}); + ok $job_id; + + my $job = $jonk->lookup_job($job_id); + is $job->id, $job_id; + is_deeply $job->arg, {chars => 'bar'}; + is $job->raw_arg, $serialized_arg; + + $job->completed; + + ok not $jonk->errstr; + }; + + subtest 'job failed and retry' => sub { + + my $job_id = $jonk->insert('MyWorker', {chars => 'bar'}); + ok $job_id; + + my $job = $jonk->lookup_job($job_id); + is $job->id, $job_id; + is_deeply $job->arg, {chars => 'bar'}; + is $job->raw_arg, $serialized_arg; + + $job->failed({retry_delay => 0}); + + $job = $jonk->lookup_job($job_id); + is $job->id, $job_id; + is_deeply $job->arg, {chars => 'bar'}; + is $job->raw_arg, $serialized_arg; + + $job->completed; + + ok not $jonk->errstr; + }; +}; + +done_testing; +