Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 169 lines (131 sloc) 3.87 KB
#!perl
use strict;
use DBI;
use Getopt::Long;
main() unless caller();
sub main {
my $file;
if (! GetOptions( "config:s" => \$file ) ) {
exit 1;
}
if (! $file) {
print STDERR "No config file specified :/\n";
exit 1;
}
my $config;
if ( $file =~ /\.yaml$/ ) {
require YAML;
$config = YAML::LoadFile($file);
} elsif ($file =~ /\.json$/ ) {
require JSON;
my $fh;
if (! open $fh, '<', $file) {
print STDERR "Could not open $file: $!";
exit 1;
}
my $content = do { local $/; <$fh> };
$config = JSON::decode_json($content);
} else {
$config = require $file;
}
if ( ! exists $config->{threshold} || $config->{threshold} <= 0 ) {
$config->{threshold} = 100;
}
my $stats = collect_stats( $config );
check_and_balance( $config, $stats );
}
sub collect_stats {
my $config = shift;
my $connect_infos = $config->{connect_info};
my $tables = $config->{tables};
my %sizes;
foreach my $key ( sort keys %$connect_infos ) {
my $connect_info = $connect_infos->{$key};
my $dbh = DBI->connect( @{ $connect_info } );
$sizes{$key} = {};
foreach my $table (@$tables) {
my ($count) = $dbh->selectrow_array( "SELECT count(*) FROM $table");
print "${key}[$table] -> $count\n";
$sizes{ $key }->{ $table } = $count;
}
}
return \%sizes;
}
sub check_and_balance {
my ($config, $stats) = @_;
my $tables = $config->{tables};
my $connect_infos = $config->{connect_info};
foreach my $table ( @$tables ) {
my $max = 0;
my $max_key;
my $min;
foreach my $key ( keys %$connect_infos ) {
my $x = $stats->{$key}->{$table};
if ($max < $x) {
$max = $x;
$max_key = $key;
}
if (! defined $min || $x < $min) {
$min = $x;
}
}
if ( defined $max_key ) {
if ( ($max - $min) > $config->{threshold} && $max > $min * 2) {
my $count = int( $max * 0.4 );
if ( $count <= 0 ) {
$count = 1;
}
distribute( $config, $max_key, $table, $count );
}
}
}
}
sub distribute {
my ($config, $from, $table, $count) = @_;
my $tables = $config->{tables};
my $connect_infos = $config->{connect_info};
my @keys = grep { $_ ne $from } keys %$connect_infos;
my $source = DBI->connect( @{ $connect_infos->{$from} } );
my $sth = $source->prepare( "SELECT * FROM $table WHERE queue_wait('$table', 1)" );
while ( $count ) {
my $rv = $sth->execute();
if ($rv <= 0) {
last;
}
while ( my $row = $sth->fetchrow_arrayref ) {
$source->do( "SELECT queue_end()" );
my $target = shift @keys;
push @keys, $target;
my $sql = "INSERT INTO $table VALUES (" .
join( ",", map { "?" } @$row ) . ")";
my $dbh = DBI->connect( @{ $connect_infos->{ $target } } );
$dbh->do( $sql, undef, @$row );
$count--;
}
}
}
__END__
=head1 naME
q4m-balance - Balance Redundunt Q4M Queues
=head1 SYNOPSIS
q4m-balance -c config.yaml
q4m-balance -c config.pl
q4m-balance -c config.json
=head1 CONFIG
threshold: 100 # minimum discrapancy beteen max and min
connect_info: # name and connection information for the queues
name1:
- DSN
- username
- password
- option1: value1
option2: value2
name2:
...
tables:
- queue1
- queue2
...
=head1 WARNING
Do not use if order of the jobs makes a different
=cut