From d84eadc5d4b8b3a9e4c87425f95606ff7025791a Mon Sep 17 00:00:00 2001 From: lestrrat Date: Wed, 7 Mar 2012 12:12:06 +0900 Subject: [PATCH] Make it possible to write/read from multiple Q4M queues --- etc/container.pl | 8 ++++++ lib/STF/API/Queue/Q4M.pm | 57 +++++++++++++++++++++++++++++--------- lib/STF/API/Storage.pm | 1 - lib/STF/Dispatcher.pm | 35 ++--------------------- lib/STF/Utils.pm | 36 ++++++++++++++++++++++++ lib/STF/Worker/Loop/Q4M.pm | 7 +++-- t/005_dispatcher.t | 14 ++++++---- t/201_crash.t | 37 +++++++++++++++++++------ t/202_retire.t | 16 +++++++++-- 9 files changed, 144 insertions(+), 67 deletions(-) diff --git a/etc/container.pl b/etc/container.pl index 2d0a4b6..dcabd90 100644 --- a/etc/container.pl +++ b/etc/container.pl @@ -26,7 +26,14 @@ BEGIN Cache::Memcached::Fast->new( $config->{'Memcached'} ); }; +my @queue_names; foreach my $dbkey (qw(DB::Master DB::Queue)) { + # XXX lazy. We need to know which queue databases are available, + # so we'll just skim it from the list + if ( $dbkey =~ /::Queue/) { + push @queue_names, $dbkey; + } + register $dbkey => sub { my $c = shift; my $config = $c->get('config'); @@ -79,6 +86,7 @@ BEGIN cache_expires => 86400, %{ $c->get('config')->{ "API::Queue::$type" } || {} }, container => $c, + queue_names => \@queue_names, ); }; diff --git a/lib/STF/API/Queue/Q4M.pm b/lib/STF/API/Queue/Q4M.pm index 31946f5..79d0889 100644 --- a/lib/STF/API/Queue/Q4M.pm +++ b/lib/STF/API/Queue/Q4M.pm @@ -1,11 +1,13 @@ package STF::API::Queue::Q4M; use strict; use parent qw(STF::Trait::WithDBI); +use Digest::MurmurHash (); use STF::Constants qw(:func STF_DEBUG); use Class::Accessor::Lite new => 1, rw => [ qw( funcmap + queue_names ) ] ; @@ -29,12 +31,16 @@ sub get_func_id { sub size { my ($self, $func) = @_; - my $dbh = $self->dbh('DB::Queue'); my $table = "queue_$func"; - my ($count) = $dbh->selectrow_array( <queue_names } ) { + my $dbh = $self->dbh($queue_name); + my ($count) = $dbh->selectrow_array( <dbh('DB::Queue'); my $table = "queue_$func"; + # Sort the queue names by the murmur hash value of queue_name + object_id + my $queue_names = $self->queue_names; + my %queues = ( + map { + ( $_ => Digest::MurmurHash::murmur_hash( $_ . $object_id ) ) + } @$queue_names + ); + foreach my $queue_name ( sort { $queues{$a} <=> $queues{$b} } keys %queues) { + my $dbh = $self->dbh($queue_name); + if (STF_DEBUG) { + printf STDERR "[ Queue] INSERT %s into %s for %s on %s\n", + $object_id, $table, $func, $queue_name + ; + } - if (STF_DEBUG) { - printf STDERR "[ Queue] INSERT %s into %s for %s\n", - $object_id, $table, $func - ; - } - my $rv = $dbh->do(<do(<dbh; my $sth = $dbh->prepare( < ? LIMIT ? EOSQL my $limit = 10_000; diff --git a/lib/STF/Dispatcher.pm b/lib/STF/Dispatcher.pm index 3ca4568..d0f3076 100644 --- a/lib/STF/Dispatcher.pm +++ b/lib/STF/Dispatcher.pm @@ -8,7 +8,7 @@ use Guard (); use IPC::SysV qw(S_IRWXU S_IRUSR S_IWUSR IPC_CREAT IPC_NOWAIT SEM_UNDO); use IPC::SharedMem; use IPC::Semaphore; -use POSIX ':signal_h'; +use POSIX (); use Scalar::Util (); use STF::Constants qw( :entity @@ -690,37 +690,8 @@ sub rename_object { sub enqueue { my ($self, $func, $object_id) = @_; - # signals to mask in the handler - my $mask = POSIX::SigSet->new( SIGALRM ); - # the handler code ref - my $action = POSIX::SigAction->new( - sub { die "connect timeout\n" }, - $mask, - # not using (perl 5.8.2 and later) 'safe' switch or sa_flags - ); - my $oldaction = POSIX::SigAction->new(); - sigaction( SIGALRM, $action, $oldaction ); - my $rv; - eval { - eval { - alarm(2); # seconds before time out - my $queue_api = $self->get( 'API::Queue' ); - $rv = $queue_api->enqueue( $func, $object_id ); - }; - alarm(0); # cancel alarm (if connect worked fast) - die "$@\n" if $@; # connect died - }; - sigaction( SIGALRM, $oldaction ); # restore original signal handler - - if ( $@ ) { - # XXX This should not be seen by the client, - # but we need to make sure to log it - printf STDERR "[Dispatcher] Error while enqueuing: %s\n + func: %s\n + object ID = %s\n", - $@, - $func, - $object_id, - ; - } + my $queue_api = $self->get( 'API::Queue' ); + my $rv = $queue_api->enqueue( $func, $object_id ); return $rv; } diff --git a/lib/STF/Utils.pm b/lib/STF/Utils.pm index 2ad253b..8461975 100644 --- a/lib/STF/Utils.pm +++ b/lib/STF/Utils.pm @@ -1,6 +1,8 @@ package STF::Utils; use strict; use Guard (); +use POSIX ':signal_h'; +use Time::HiRes (); sub merge_hashes { my ($left, $right) = @_; @@ -61,4 +63,38 @@ sub timer_guard { }; } +# This is a rather forceful timeout wrapper that allows us to, for example, +# wrap calls to things blocking in the C layer (such as some DBD's). +# Returns the error that occurred. If the call timed out, then this +# error is set to "timeout_call timed out (%d secs)" +sub timeout_call { + my ($timeout, $cb, $timeout_cb, @args) = @_; + + $timeout_cb ||= sub { die sprintf "timeout_call timed out (%d secs)\n", $timeout }; + + # signals to mask in the handler + my $mask = POSIX::SigSet->new( SIGALRM ); + # the handler code ref + my $action = POSIX::SigAction->new( + $timeout_cb, + $mask, + # not using (perl 5.8.2 and later) 'safe' switch or sa_flags + ); + my $oldaction = POSIX::SigAction->new(); + sigaction( SIGALRM, $action, $oldaction ); + my $rv; + eval { + eval { + Time::HiRes::alarm($timeout); # seconds before time out + $cb->(@args); + }; + Time::HiRes::alarm(0); # cancel alarm (if connect worked fast) + die "$@\n" if $@; # connect died + }; + my $e = $@; + sigaction( SIGALRM, $oldaction ); # restore original signal handler + + return $e; +} + 1; diff --git a/lib/STF/Worker/Loop/Q4M.pm b/lib/STF/Worker/Loop/Q4M.pm index 79fd411..8a1f5aa 100644 --- a/lib/STF/Worker/Loop/Q4M.pm +++ b/lib/STF/Worker/Loop/Q4M.pm @@ -10,7 +10,7 @@ use Scalar::Util (); use Time::HiRes (); use STF::Constants qw(STF_DEBUG); use Class::Accessor::Lite - rw => [ qw(interval) ] + rw => [ qw(interval queue_name) ] ; sub queue_table { @@ -42,8 +42,9 @@ sub work { my $table = $self->queue_table( $impl ); my $waitcond = $self->queue_waitcond( $impl ); - my $dbh = $self->get('DB::Queue') or - Carp::confess( "Could not fetch DB::Queue" ); + my $queue_name = $self->queue_name || $ENV{STF_QUEUE_NAME} || 'DB::Queue'; + my $dbh = $self->get($queue_name) or + Carp::confess( "Could not fetch $queue_name" ); my $loop = 1; my $object_id; diff --git a/t/005_dispatcher.t b/t/005_dispatcher.t index a72b215..d72592e 100644 --- a/t/005_dispatcher.t +++ b/t/005_dispatcher.t @@ -54,13 +54,15 @@ subtest 'enqueue timeout' => sub { host_id => time() ); - $ctxt->container->register( 'API::Queue' => Plack::Util::inline_object( - enqueue => sub { + my $dbh = $ctxt->container->get('DB::Queue'); + # XXX hack to get the call to stall + local $dbh->{Callbacks} = { + do => sub { sleep 10; - } - ) ); + }, + }; - my $buf; + my $buf = ''; open my $stderr, '>', \$buf; eval { local *STDERR = $stderr; @@ -70,7 +72,7 @@ subtest 'enqueue timeout' => sub { }; alarm(0); ok !$@, "enqueue timed out 'silently'"; - like $buf, qr/connect timeout/; + like $buf, qr/timeout_call timed out/; }; done_testing; diff --git a/t/201_crash.t b/t/201_crash.t index 95f52f5..139cd1e 100644 --- a/t/201_crash.t +++ b/t/201_crash.t @@ -21,14 +21,22 @@ my $code = sub { my $guard = $container->new_scope(); my $dbh = $container->get( 'DB::Master' ); - # Remove everything in the storage so we can focus on our only object + # Clean up the database so that we don't get bogus errors + # (This should really be done in the original test that + # broke the consistency, damnit) { - my $list = $dbh->selectall_arrayref( <(DELETE "http://127.0.0.1/$row->[0]"); + my $list = $dbh->selectall_arrayref( < {} } ); + SELECT * FROM entity +EOSQL + foreach my $entity (@$list) { + my $object = $dbh->selectrow_hashref( <{object_id} ); + SELECT * FROM object WHERE id = ? +EOSQL + if (! $object) { + $dbh->do( <{object_id} ); + DELETE FROM entity WHERE object_id = ? +EOSQL + } } } @@ -54,7 +62,18 @@ EOM my $storages = $dbh->selectall_arrayref( < {} } ); SELECT s.* FROM storage s EOSQL - my $crashed = $storages->[0]; + my $crashed; + my $count; + foreach my $storage (@$storages) { + ($count) = $dbh->selectrow_array( <{id} ); + SELECT count(*) FROM entity WHERE storage_id = ? +EOSQL + if ($count > 0) { + note "Storage $storage->{id} has $count entities. Choosing this as our crashed storage"; + $crashed = $storage; + last; + } + } $dbh->do( <{id} ); UPDATE storage SET mode = ? WHERE id = ? @@ -78,7 +97,7 @@ EOSQL { my $worker = STF::Worker::RepairObject->new( container => $context->container, - max_works_per_child => 1, + max_works_per_child => $count, ); $worker->work; } diff --git a/t/202_retire.t b/t/202_retire.t index 2f42dc5..9e34cb8 100644 --- a/t/202_retire.t +++ b/t/202_retire.t @@ -42,8 +42,18 @@ my $code = sub { my $storages = $dbh->selectall_arrayref( < {} } ); SELECT s.* FROM storage s EOSQL - my $retired = $storages->[0]; - + my $retired; + my $count; + foreach my $storage (@$storages) { + ($count) = $dbh->selectrow_array( <{id} ); + SELECT count(*) FROM entity WHERE storage_id = ? +EOSQL + if ($count > 0) { + note "Storage $storage->{id} has $count entities. Choosing this as our retired storage"; + $retired = $storage; + last; + } + } $dbh->do( <{id} ); UPDATE storage SET mode = ? WHERE id = ? EOSQL @@ -66,7 +76,7 @@ EOSQL { my $worker = STF::Worker::RepairObject->new( container => $context->container, - max_works_per_child => 1, + max_works_per_child => $count, ); $worker->work; }