Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MANIFEST
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ README.md
t/000-queue.t
t/010-dr-tqueue.t
t/020-worker.t
t/030-parallel.t
tarantool.cfg
37 changes: 37 additions & 0 deletions init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@
-- type = "NUM64"
-- }
-- ]
-- },
-- {
-- type = "TREE",
-- unique = 0,
-- key_field = [
-- {
-- fieldno = 1, # tube
-- type = "STR"
-- },
-- {
-- fieldno = 12, # task data
-- type = "STR"
-- }
-- ]
-- }
-- ]
-- }
Expand Down Expand Up @@ -109,6 +123,7 @@ local function pri_unpack(pri) return box.unpack('b', pri) end
local idx_task = 0
local idx_tube = 1
local idx_event = 2
local idx_data = 3

-- task statuses
local ST_READY = 'r'
Expand Down Expand Up @@ -666,6 +681,28 @@ queue.put = function(space, tube, ...)
return put_task(space, tube, queue.default.ipri, ...)
end

-- queue.put_unique(space, tube, delay, ttl, ttr, pri, ...)
-- put unique task into queue.
-- arguments
-- 1. tube - queue name
-- 2. delay - delay before task can be taken
-- 3. ttl - time to live (if delay > 0 ttl := ttl + delay)
-- 4. ttr - time to release (when task is taken)
-- 5. pri - priority
-- 6. ... - task data
queue.put_unique = function(space, tube, delay, ttl, ttr, pri, data, ...)
space = tonumber(space)

if data == nil then
error('Can not put unique task without data')
end
local task = box.select( space, idx_data, tube, data )
if task ~= nil then
return rettask( task )
end
queue.stat[space][tube]:inc('put')
return put_task(space, tube, queue.default.ipri, delay, ttl, ttr, pri, data, ...)
end

-- queue.urgent(space, tube, delay, ttl, ttr, pri, ...)
-- like queue.put but put task at begin of queue
Expand Down
24 changes: 24 additions & 0 deletions lib/DR/TarantoolQueue.pm
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,30 @@ sub put {
return $self->_producer(put => \%opts);
}

=head2 put_unique

$q->put_unique(data => { 1 => 2 });
$q->put_unique(space => 1, tube => 'abc',
delay => 10, ttl => 3600,
ttr => 60, pri => 10, data => [ 3, 4, 5 ]);
$q->put_unique(data => 'string');


Enqueue an unique task. Returns new L<task|DR::TarantoolQueue::Task> object,
if it was not enqueued previously. Otherwise it will return existing task.
The list of fields with task data (C<< data => ... >>) is optional.


If 'B<space>' and (or) 'B<tube>' aren't defined the method
will try to use them from L<queue|DR::TarantoolQueue/new> object.

=cut

sub put_unique {
my ($self, %opts) = @_;
return $self->_producer(put_unique => \%opts);
}

=head2 urgent

Enqueue a task. The task will get the highest priority.
Expand Down
3 changes: 2 additions & 1 deletion lib/DR/TarantoolQueue/Worker.pm
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ The function will be called as L<perlfunc/sprintf>.
sub run {
my ($self, $cb, $debugf) = @_;
croak 'process subroutine is not CODEREF' unless 'CODE' eq ref $cb;
$debugf //= sub { };
$debugf = sub { }
unless defined $debugf;
croak 'debugf subroutine is not CODEREF' unless 'CODE' eq ref $debugf;

croak 'worker is already run' if $self->is_run;
Expand Down
45 changes: 43 additions & 2 deletions t/000-queue.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use open qw(:std :utf8);
use lib qw(lib ../lib);

use Test::More;
use constant PLAN => 79;
use constant PLAN => 81;

BEGIN {
system 'which tarantool_box >/dev/null 2>&1';
Expand All @@ -21,7 +21,7 @@ BEGIN {
use Encode qw(decode encode);
use Cwd 'cwd';
use File::Spec::Functions 'catfile';
use feature 'state';
# use feature 'state';



Expand Down Expand Up @@ -449,6 +449,47 @@ is_deeply $task1_t, $task1, 'task1 (pri)';
is_deeply $task2_t, $task2, 'task2 (pri)';
is_deeply $task3_t, $task3, 'task3 (pri)';

my $task_unique1 = tnt->call_lua('queue.put_unique',
[
$sno,
'tube_name',
0, # delay
5, # ttl
1, # ttr
30, # pri
'unique_task', 50 .. 60
]
)->raw;

my $task_unique2 = tnt->call_lua('queue.put_unique',
[
$sno,
'tube_name',
0, # delay
5, # ttl
1, # ttr
30, # pri
'unique_task', 50 .. 60
]
)->raw;

is_deeply $task_unique1, $task_unique2, 'unique tasks';

eval {
tnt->call_lua('queue.put_unique',
[
$sno,
'tube_name',
0, # delay
5, # ttl
1, # ttr
30 # pri
]
);
};

like $@, qr/Can not put unique task without data/,
'unique task without data are prohibited';

END {
note $t->log if $ENV{DEBUG};
Expand Down
8 changes: 6 additions & 2 deletions t/010-dr-tqueue.t
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use open qw(:std :utf8);
use lib qw(lib ../lib);

use Test::More;
use constant PLAN => 80;
use constant PLAN => 81;

BEGIN {
system 'which tarantool_box >/dev/null 2>&1';
Expand All @@ -21,7 +21,7 @@ BEGIN {
use Encode qw(decode encode);
use Cwd 'cwd';
use File::Spec::Functions 'catfile';
use feature 'state';
# use feature 'state';



Expand Down Expand Up @@ -189,6 +189,10 @@ my $task4_t = $q->take(tube => 'utftube');
is_deeply $task4->data, $task4_t->data, 'Task and decoded utf data';
is_deeply $task5->data, $task5_t->data, 'Task and encoded utf data';

my $task_unique1 = $q->put_unique(tube => 'utftube_unique', data => [ 3, 4, 'привет' ]);
my $task_unique2 = $q->put_unique(tube => 'utftube_unique', data => [ 3, 4, 'привет' ]);

is_deeply $task_unique1->id, $task_unique2->id, 'Unique tasks putting has equal ids';

{
use Scalar::Util 'refaddr';
Expand Down
4 changes: 2 additions & 2 deletions t/020-worker.t
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ BEGIN {
use Encode qw(decode encode);
use Cwd 'cwd';
use File::Spec::Functions 'catfile';
use feature 'state';
#use feature 'state';



Expand Down Expand Up @@ -85,7 +85,7 @@ Coro::AnyEvent::sleep 0.2;
for (@tasks) {
$_ = eval { $q->peek(id => $_->id) };
}
is scalar grep({ $_ ~~ undef } @tasks), scalar @tasks, 'All tasks were ack';
is scalar grep({ !defined($_) } @tasks), scalar @tasks, 'All tasks were ack';

is $wrk->stop, 0, 'workers were stopped';
@tasks = ();
Expand Down
2 changes: 1 addition & 1 deletion t/030-parallel.t
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ BEGIN {
use Encode qw(decode encode);
use Cwd 'cwd';
use File::Spec::Functions 'catfile';
use feature 'state';
#use feature 'state';



Expand Down
14 changes: 14 additions & 0 deletions tarantool.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,20 @@ space = [
type = "NUM64"
}
]
},
{
type = "TREE",
unique = 0,
key_field = [
{
fieldno = 1, # tube
type = "STR"
},
{
fieldno = 12, # task data
type = "STR"
}
]
}
]
}
Expand Down