Skip to content

Commit

Permalink
Make it possible to write/read from multiple Q4M queues
Browse files Browse the repository at this point in the history
  • Loading branch information
lestrrat committed Mar 7, 2012
1 parent a2a4b7b commit d84eadc
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 67 deletions.
8 changes: 8 additions & 0 deletions etc/container.pl
Expand Up @@ -26,7 +26,14 @@ BEGIN
Cache::Memcached::Fast->new( $config->{'Memcached'} );
};

my @queue_names;
foreach my $dbkey (qw(DB::Master DB::Queue)) {
# XXX lazy. We need to know which queue databases are available,
# so we'll just skim it from the list
if ( $dbkey =~ /::Queue/) {
push @queue_names, $dbkey;
}

register $dbkey => sub {
my $c = shift;
my $config = $c->get('config');
Expand Down Expand Up @@ -79,6 +86,7 @@ BEGIN
cache_expires => 86400,
%{ $c->get('config')->{ "API::Queue::$type" } || {} },
container => $c,
queue_names => \@queue_names,
);
};

Expand Down
57 changes: 44 additions & 13 deletions lib/STF/API/Queue/Q4M.pm
@@ -1,11 +1,13 @@
package STF::API::Queue::Q4M;
use strict;
use parent qw(STF::Trait::WithDBI);
use Digest::MurmurHash ();
use STF::Constants qw(:func STF_DEBUG);
use Class::Accessor::Lite
new => 1,
rw => [ qw(
funcmap
queue_names
) ]
;

Expand All @@ -29,12 +31,16 @@ sub get_func_id {
sub size {
my ($self, $func) = @_;

my $dbh = $self->dbh('DB::Queue');
my $table = "queue_$func";
my ($count) = $dbh->selectrow_array( <<EOSQL );
SELECT COUNT(*) FROM $table
my $total = 0;
foreach my $queue_name ( @{ $self->queue_names } ) {
my $dbh = $self->dbh($queue_name);
my ($count) = $dbh->selectrow_array( <<EOSQL );
SELECT COUNT(*) FROM $table
EOSQL
return $count;
$total += $count;
}
return $total;
}

sub enqueue {
Expand All @@ -48,18 +54,43 @@ sub enqueue {
Carp::confess("No object_id given for $func");
}

my $dbh = $self->dbh('DB::Queue');
my $table = "queue_$func";
# Sort the queue names by the murmur hash value of queue_name + object_id
my $queue_names = $self->queue_names;
my %queues = (
map {
( $_ => Digest::MurmurHash::murmur_hash( $_ . $object_id ) )
} @$queue_names
);
foreach my $queue_name ( sort { $queues{$a} <=> $queues{$b} } keys %queues) {
my $dbh = $self->dbh($queue_name);
if (STF_DEBUG) {
printf STDERR "[ Queue] INSERT %s into %s for %s on %s\n",
$object_id, $table, $func, $queue_name
;
}

if (STF_DEBUG) {
printf STDERR "[ Queue] INSERT %s into %s for %s\n",
$object_id, $table, $func
;
}
my $rv = $dbh->do(<<EOSQL, undef, $object_id );
INSERT INTO $table ( args, created_at ) VALUES (?, UNIX_TIMESTAMP( NOW() ) )
my $rv;
my $err = STF::Utils::timeout_call(
0.5,
sub {
$rv = $dbh->do(<<EOSQL, undef, $object_id );
INSERT INTO $table ( args, created_at ) VALUES (?, UNIX_TIMESTAMP( NOW() ) )
EOSQL
return $rv;
}
);
if ( $err ) {
# XXX Don't wrap in STF_DEBUG
printf STDERR "[ Queue] Error while enqueuing: %s\n + func: %s\n + object ID = %s\n",
$err,
$func,
$object_id,
;
next;
}

return $rv;
}
}

1;
1 change: 0 additions & 1 deletion lib/STF/API/Storage.pm
Expand Up @@ -108,7 +108,6 @@ sub move_entities {
my $dbh = $self->dbh;
my $sth = $dbh->prepare( <<EOSQL );
SELECT e.object_id FROM entity e FORCE INDEX (PRIMARY)
JOIN object o ON e.object_id = o.id
WHERE e.storage_id = ? AND e.object_id > ? LIMIT ?
EOSQL
my $limit = 10_000;
Expand Down
35 changes: 3 additions & 32 deletions lib/STF/Dispatcher.pm
Expand Up @@ -8,7 +8,7 @@ use Guard ();
use IPC::SysV qw(S_IRWXU S_IRUSR S_IWUSR IPC_CREAT IPC_NOWAIT SEM_UNDO);
use IPC::SharedMem;
use IPC::Semaphore;
use POSIX ':signal_h';
use POSIX ();
use Scalar::Util ();
use STF::Constants qw(
:entity
Expand Down Expand Up @@ -690,37 +690,8 @@ sub rename_object {
sub enqueue {
my ($self, $func, $object_id) = @_;
# signals to mask in the handler
my $mask = POSIX::SigSet->new( SIGALRM );
# the handler code ref
my $action = POSIX::SigAction->new(
sub { die "connect timeout\n" },
$mask,
# not using (perl 5.8.2 and later) 'safe' switch or sa_flags
);
my $oldaction = POSIX::SigAction->new();
sigaction( SIGALRM, $action, $oldaction );
my $rv;
eval {
eval {
alarm(2); # seconds before time out
my $queue_api = $self->get( 'API::Queue' );
$rv = $queue_api->enqueue( $func, $object_id );
};
alarm(0); # cancel alarm (if connect worked fast)
die "$@\n" if $@; # connect died
};
sigaction( SIGALRM, $oldaction ); # restore original signal handler
if ( $@ ) {
# XXX This should not be seen by the client,
# but we need to make sure to log it
printf STDERR "[Dispatcher] Error while enqueuing: %s\n + func: %s\n + object ID = %s\n",
$@,
$func,
$object_id,
;
}
my $queue_api = $self->get( 'API::Queue' );
my $rv = $queue_api->enqueue( $func, $object_id );
return $rv;
}
Expand Down
36 changes: 36 additions & 0 deletions lib/STF/Utils.pm
@@ -1,6 +1,8 @@
package STF::Utils;
use strict;
use Guard ();
use POSIX ':signal_h';
use Time::HiRes ();

sub merge_hashes {
my ($left, $right) = @_;
Expand Down Expand Up @@ -61,4 +63,38 @@ sub timer_guard {
};
}

# This is a rather forceful timeout wrapper that allows us to, for example,
# wrap calls to things blocking in the C layer (such as some DBD's).
# Returns the error that occurred. If the call timed out, then this
# error is set to "timeout_call timed out (%d secs)"
sub timeout_call {
my ($timeout, $cb, $timeout_cb, @args) = @_;

$timeout_cb ||= sub { die sprintf "timeout_call timed out (%d secs)\n", $timeout };

# signals to mask in the handler
my $mask = POSIX::SigSet->new( SIGALRM );
# the handler code ref
my $action = POSIX::SigAction->new(
$timeout_cb,
$mask,
# not using (perl 5.8.2 and later) 'safe' switch or sa_flags
);
my $oldaction = POSIX::SigAction->new();
sigaction( SIGALRM, $action, $oldaction );
my $rv;
eval {
eval {
Time::HiRes::alarm($timeout); # seconds before time out
$cb->(@args);
};
Time::HiRes::alarm(0); # cancel alarm (if connect worked fast)
die "$@\n" if $@; # connect died
};
my $e = $@;
sigaction( SIGALRM, $oldaction ); # restore original signal handler

return $e;
}

1;
7 changes: 4 additions & 3 deletions lib/STF/Worker/Loop/Q4M.pm
Expand Up @@ -10,7 +10,7 @@ use Scalar::Util ();
use Time::HiRes ();
use STF::Constants qw(STF_DEBUG);
use Class::Accessor::Lite
rw => [ qw(interval) ]
rw => [ qw(interval queue_name) ]
;

sub queue_table {
Expand Down Expand Up @@ -42,8 +42,9 @@ sub work {

my $table = $self->queue_table( $impl );
my $waitcond = $self->queue_waitcond( $impl );
my $dbh = $self->get('DB::Queue') or
Carp::confess( "Could not fetch DB::Queue" );
my $queue_name = $self->queue_name || $ENV{STF_QUEUE_NAME} || 'DB::Queue';
my $dbh = $self->get($queue_name) or
Carp::confess( "Could not fetch $queue_name" );

my $loop = 1;
my $object_id;
Expand Down
14 changes: 8 additions & 6 deletions t/005_dispatcher.t
Expand Up @@ -54,13 +54,15 @@ subtest 'enqueue timeout' => sub {
host_id => time()
);
$ctxt->container->register( 'API::Queue' => Plack::Util::inline_object(
enqueue => sub {
my $dbh = $ctxt->container->get('DB::Queue');
# XXX hack to get the call to stall
local $dbh->{Callbacks} = {
do => sub {
sleep 10;
}
) );
},
};
my $buf;
my $buf = '';
open my $stderr, '>', \$buf;
eval {
local *STDERR = $stderr;
Expand All @@ -70,7 +72,7 @@ subtest 'enqueue timeout' => sub {
};
alarm(0);
ok !$@, "enqueue timed out 'silently'";
like $buf, qr/connect timeout/;
like $buf, qr/timeout_call timed out/;
};
done_testing;
37 changes: 28 additions & 9 deletions t/201_crash.t
Expand Up @@ -21,14 +21,22 @@ my $code = sub {
my $guard = $container->new_scope();
my $dbh = $container->get( 'DB::Master' );

# Remove everything in the storage so we can focus on our only object
# Clean up the database so that we don't get bogus errors
# (This should really be done in the original test that
# broke the consistency, damnit)
{
my $list = $dbh->selectall_arrayref( <<EOM, undef );
SELECT CONCAT_WS('/', b.name, o.name) AS path
FROM object o JOIN bucket b ON o.bucket_id = b.id
EOM
foreach my $row ( @$list ) {
$cb->(DELETE "http://127.0.0.1/$row->[0]");
my $list = $dbh->selectall_arrayref( <<EOSQL, { Slice => {} } );
SELECT * FROM entity
EOSQL
foreach my $entity (@$list) {
my $object = $dbh->selectrow_hashref( <<EOSQL, undef, $entity->{object_id} );
SELECT * FROM object WHERE id = ?
EOSQL
if (! $object) {
$dbh->do( <<EOSQL, undef, $entity->{object_id} );
DELETE FROM entity WHERE object_id = ?
EOSQL
}
}
}

Expand All @@ -54,7 +62,18 @@ EOM
my $storages = $dbh->selectall_arrayref( <<EOSQL, { Slice => {} } );
SELECT s.* FROM storage s
EOSQL
my $crashed = $storages->[0];
my $crashed;
my $count;
foreach my $storage (@$storages) {
($count) = $dbh->selectrow_array( <<EOSQL, undef, $storage->{id} );
SELECT count(*) FROM entity WHERE storage_id = ?
EOSQL
if ($count > 0) {
note "Storage $storage->{id} has $count entities. Choosing this as our crashed storage";
$crashed = $storage;
last;
}
}

$dbh->do( <<EOSQL, undef, STORAGE_MODE_CRASH, $crashed->{id} );
UPDATE storage SET mode = ? WHERE id = ?
Expand All @@ -78,7 +97,7 @@ EOSQL
{
my $worker = STF::Worker::RepairObject->new(
container => $context->container,
max_works_per_child => 1,
max_works_per_child => $count,
);
$worker->work;
}
Expand Down
16 changes: 13 additions & 3 deletions t/202_retire.t
Expand Up @@ -42,8 +42,18 @@ my $code = sub {
my $storages = $dbh->selectall_arrayref( <<EOSQL, { Slice => {} } );
SELECT s.* FROM storage s
EOSQL
my $retired = $storages->[0];

my $retired;
my $count;
foreach my $storage (@$storages) {
($count) = $dbh->selectrow_array( <<EOSQL, undef, $storage->{id} );
SELECT count(*) FROM entity WHERE storage_id = ?
EOSQL
if ($count > 0) {
note "Storage $storage->{id} has $count entities. Choosing this as our retired storage";
$retired = $storage;
last;
}
}
$dbh->do( <<EOSQL, undef, STORAGE_MODE_RETIRE, $retired->{id} );
UPDATE storage SET mode = ? WHERE id = ?
EOSQL
Expand All @@ -66,7 +76,7 @@ EOSQL
{
my $worker = STF::Worker::RepairObject->new(
container => $context->container,
max_works_per_child => 1,
max_works_per_child => $count,
);
$worker->work;
}
Expand Down

0 comments on commit d84eadc

Please sign in to comment.