From 359d529a66e181189d7ac4d7c8407b74bf62250f Mon Sep 17 00:00:00 2001 From: "l.khaliullov" Date: Wed, 5 Feb 2014 12:35:42 +0400 Subject: [PATCH 1/3] backport to perl 5.8.8 --- lib/DR/TarantoolQueue/Worker.pm | 3 ++- t/000-queue.t | 2 +- t/010-dr-tqueue.t | 2 +- t/020-worker.t | 4 ++-- t/030-parallel.t | 2 +- 5 files changed, 7 insertions(+), 6 deletions(-) diff --git a/lib/DR/TarantoolQueue/Worker.pm b/lib/DR/TarantoolQueue/Worker.pm index e0de0e30..62368233 100644 --- a/lib/DR/TarantoolQueue/Worker.pm +++ b/lib/DR/TarantoolQueue/Worker.pm @@ -204,7 +204,8 @@ The function will be called as L. sub run { my ($self, $cb, $debugf) = @_; croak 'process subroutine is not CODEREF' unless 'CODE' eq ref $cb; - $debugf //= sub { }; + $debugf = sub { } + unless defined $debugf; croak 'debugf subroutine is not CODEREF' unless 'CODE' eq ref $debugf; croak 'worker is already run' if $self->is_run; diff --git a/t/000-queue.t b/t/000-queue.t index 9369de19..17e12911 100644 --- a/t/000-queue.t +++ b/t/000-queue.t @@ -21,7 +21,7 @@ BEGIN { use Encode qw(decode encode); use Cwd 'cwd'; use File::Spec::Functions 'catfile'; -use feature 'state'; +# use feature 'state'; diff --git a/t/010-dr-tqueue.t b/t/010-dr-tqueue.t index edd8ee9a..6a13a431 100644 --- a/t/010-dr-tqueue.t +++ b/t/010-dr-tqueue.t @@ -21,7 +21,7 @@ BEGIN { use Encode qw(decode encode); use Cwd 'cwd'; use File::Spec::Functions 'catfile'; -use feature 'state'; +# use feature 'state'; diff --git a/t/020-worker.t b/t/020-worker.t index 8fe35964..e0ff5822 100644 --- a/t/020-worker.t +++ b/t/020-worker.t @@ -21,7 +21,7 @@ BEGIN { use Encode qw(decode encode); use Cwd 'cwd'; use File::Spec::Functions 'catfile'; -use feature 'state'; +#use feature 'state'; @@ -85,7 +85,7 @@ Coro::AnyEvent::sleep 0.2; for (@tasks) { $_ = eval { $q->peek(id => $_->id) }; } -is scalar grep({ $_ ~~ undef } @tasks), scalar @tasks, 'All tasks were ack'; +is scalar grep({ !defined($_) } @tasks), scalar @tasks, 'All tasks were ack'; is $wrk->stop, 0, 'workers were stopped'; @tasks = (); diff --git a/t/030-parallel.t b/t/030-parallel.t index a3a462d1..8753ed1a 100644 --- a/t/030-parallel.t +++ b/t/030-parallel.t @@ -21,7 +21,7 @@ BEGIN { use Encode qw(decode encode); use Cwd 'cwd'; use File::Spec::Functions 'catfile'; -use feature 'state'; +#use feature 'state'; From c9cb8faebcf8f759ead2d94a5840d385eeaff85b Mon Sep 17 00:00:00 2001 From: "l.khaliullov" Date: Wed, 5 Feb 2014 13:05:53 +0400 Subject: [PATCH 2/3] Added support of putting unique tasks --- init.lua | 23 +++++++++++++++++++++ lib/DR/TarantoolQueue.pm | 24 ++++++++++++++++++++++ t/000-queue.t | 43 +++++++++++++++++++++++++++++++++++++++- t/010-dr-tqueue.t | 6 +++++- tarantool.cfg | 14 +++++++++++++ 5 files changed, 108 insertions(+), 2 deletions(-) diff --git a/init.lua b/init.lua index 2da41f43..853b44a1 100644 --- a/init.lua +++ b/init.lua @@ -109,6 +109,7 @@ local function pri_unpack(pri) return box.unpack('b', pri) end local idx_task = 0 local idx_tube = 1 local idx_event = 2 +local idx_data = 3 -- task statuses local ST_READY = 'r' @@ -666,6 +667,28 @@ queue.put = function(space, tube, ...) return put_task(space, tube, queue.default.ipri, ...) end +-- queue.put_unique(space, tube, delay, ttl, ttr, pri, ...) +-- put unique task into queue. +-- arguments +-- 1. tube - queue name +-- 2. delay - delay before task can be taken +-- 3. ttl - time to live (if delay > 0 ttl := ttl + delay) +-- 4. ttr - time to release (when task is taken) +-- 5. pri - priority +-- 6. ... - task data +queue.put_unique = function(space, tube, delay, ttl, ttr, pri, data, ...) + space = tonumber(space) + + if data == nil then + error('Can not put unique task without data') + end + local task = box.select( space, idx_data, tube, data ) + if task ~= nil then + return rettask( task ) + end + queue.stat[space][tube]:inc('put') + return put_task(space, tube, queue.default.ipri, delay, ttl, ttr, pri, data, ...) +end -- queue.urgent(space, tube, delay, ttl, ttr, pri, ...) -- like queue.put but put task at begin of queue diff --git a/lib/DR/TarantoolQueue.pm b/lib/DR/TarantoolQueue.pm index d1c9a29e..44c2c1f8 100644 --- a/lib/DR/TarantoolQueue.pm +++ b/lib/DR/TarantoolQueue.pm @@ -442,6 +442,30 @@ sub put { return $self->_producer(put => \%opts); } +=head2 put_unique + + $q->put_unique(data => { 1 => 2 }); + $q->put_unique(space => 1, tube => 'abc', + delay => 10, ttl => 3600, + ttr => 60, pri => 10, data => [ 3, 4, 5 ]); + $q->put_unique(data => 'string'); + + +Enqueue an unique task. Returns new L object, +if it was not enqueued previously. Otherwise it will return existing task. +The list of fields with task data (C<< data => ... >>) is optional. + + +If 'B' and (or) 'B' aren't defined the method +will try to use them from L object. + +=cut + +sub put_unique { + my ($self, %opts) = @_; + return $self->_producer(put_unique => \%opts); +} + =head2 urgent Enqueue a task. The task will get the highest priority. diff --git a/t/000-queue.t b/t/000-queue.t index 17e12911..dfac8dc5 100644 --- a/t/000-queue.t +++ b/t/000-queue.t @@ -7,7 +7,7 @@ use open qw(:std :utf8); use lib qw(lib ../lib); use Test::More; -use constant PLAN => 79; +use constant PLAN => 81; BEGIN { system 'which tarantool_box >/dev/null 2>&1'; @@ -449,6 +449,47 @@ is_deeply $task1_t, $task1, 'task1 (pri)'; is_deeply $task2_t, $task2, 'task2 (pri)'; is_deeply $task3_t, $task3, 'task3 (pri)'; +my $task_unique1 = tnt->call_lua('queue.put_unique', + [ + $sno, + 'tube_name', + 0, # delay + 5, # ttl + 1, # ttr + 30, # pri + 'unique_task', 50 .. 60 + ] +)->raw; + +my $task_unique2 = tnt->call_lua('queue.put_unique', + [ + $sno, + 'tube_name', + 0, # delay + 5, # ttl + 1, # ttr + 30, # pri + 'unique_task', 50 .. 60 + ] +)->raw; + +is_deeply $task_unique1, $task_unique2, 'unique tasks'; + +eval { + tnt->call_lua('queue.put_unique', + [ + $sno, + 'tube_name', + 0, # delay + 5, # ttl + 1, # ttr + 30 # pri + ] + ); +}; + +like $@, qr/Can not put unique task without data/, + 'unique task without data are prohibited'; END { note $t->log if $ENV{DEBUG}; diff --git a/t/010-dr-tqueue.t b/t/010-dr-tqueue.t index 6a13a431..186b632c 100644 --- a/t/010-dr-tqueue.t +++ b/t/010-dr-tqueue.t @@ -7,7 +7,7 @@ use open qw(:std :utf8); use lib qw(lib ../lib); use Test::More; -use constant PLAN => 80; +use constant PLAN => 81; BEGIN { system 'which tarantool_box >/dev/null 2>&1'; @@ -189,6 +189,10 @@ my $task4_t = $q->take(tube => 'utftube'); is_deeply $task4->data, $task4_t->data, 'Task and decoded utf data'; is_deeply $task5->data, $task5_t->data, 'Task and encoded utf data'; +my $task_unique1 = $q->put_unique(tube => 'utftube_unique', data => [ 3, 4, 'привет' ]); +my $task_unique2 = $q->put_unique(tube => 'utftube_unique', data => [ 3, 4, 'привет' ]); + +is_deeply $task_unique1->id, $task_unique2->id, 'Unique tasks putting has equal ids'; { use Scalar::Util 'refaddr'; diff --git a/tarantool.cfg b/tarantool.cfg index 8904092f..84668142 100644 --- a/tarantool.cfg +++ b/tarantool.cfg @@ -54,6 +54,20 @@ space = [ type = "NUM64" } ] + }, + { + type = "TREE", + unique = 0, + key_field = [ + { + fieldno = 1, # tube + type = "STR" + }, + { + fieldno = 12, # task data + type = "STR" + } + ] } ] } From 4c613a936d7e8dc55b3be9ca59baf9613be311d4 Mon Sep 17 00:00:00 2001 From: Khaliullov Leandr Date: Wed, 5 Feb 2014 15:11:11 +0400 Subject: [PATCH 3/3] added t/030-parallel.t into MANIFEST. fixed comment in init.lua with new structure of space --- MANIFEST | 1 + init.lua | 14 ++++++++++++++ 2 files changed, 15 insertions(+) diff --git a/MANIFEST b/MANIFEST index 9a92de30..5e5c9722 100644 --- a/MANIFEST +++ b/MANIFEST @@ -26,4 +26,5 @@ README.md t/000-queue.t t/010-dr-tqueue.t t/020-worker.t +t/030-parallel.t tarantool.cfg diff --git a/init.lua b/init.lua index 853b44a1..c0171697 100644 --- a/init.lua +++ b/init.lua @@ -67,6 +67,20 @@ -- type = "NUM64" -- } -- ] +-- }, +-- { +-- type = "TREE", +-- unique = 0, +-- key_field = [ +-- { +-- fieldno = 1, # tube +-- type = "STR" +-- }, +-- { +-- fieldno = 12, # task data +-- type = "STR" +-- } +-- ] -- } -- ] -- }