Skip to content

Commit

Permalink
* New feature: When doing online master switch,
Browse files Browse the repository at this point in the history
MHA checks whether long queries are running
on the new master. This is important to reduce
workloads on the new master. Query time limit can be
controlled via --running_seconds_limit.
  • Loading branch information
yoshinorim committed Nov 3, 2011
1 parent a9e4efd commit 4bb9953
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 39 deletions.
1 change: 1 addition & 0 deletions MANIFEST
Expand Up @@ -140,6 +140,7 @@ tests/t/t_normal_crash.sh
tests/t/t_online_3tier.sh
tests/t/t_online_3tier_slave.sh
tests/t/t_online_3tier_slave_keep.sh
tests/t/t_online_busy.sh
tests/t/t_online_mm.sh
tests/t/t_online_mm_3tier.sh
tests/t/t_online_mm_3tier_slave.sh
Expand Down
60 changes: 46 additions & 14 deletions lib/MHA/DBHelper.pm
Expand Up @@ -26,6 +26,7 @@ use English qw(-no_match_vars);
use MHA::SlaveUtil;
use MHA::ManagerConst;
use DBI;
use Data::Dumper;

use constant Status => "Status";
use constant Errstr => "Errstr";
Expand Down Expand Up @@ -120,14 +121,22 @@ sub get_connection_id($) {
return $href->{Value};
}

sub check_connection_fast_util {
sub connect_util {
my $host = shift;
my $port = shift;
my $user = shift;
my $password = shift;
my $dsn = "DBI:mysql:;host=$host;port=$port;mysql_connect_timeout=1";
my $dbh = DBI->connect( $dsn, $user, $password, { PrintError => 0 } );
return $dbh;
}

my $dsn = "DBI:mysql:;host=$host;port=$port;mysql_connect_timeout=1";
my $dbh = DBI->connect( $dsn, $user, $password, { PrintError => 0 } );
sub check_connection_fast_util {
my $host = shift;
my $port = shift;
my $user = shift;
my $password = shift;
my $dbh = connect_util( $host, $port, $user, $password );
if ( defined($dbh) ) {
return "1:Connection Succeeded";
}
Expand Down Expand Up @@ -586,13 +595,13 @@ sub read_all_relay_log($$) {
return %status;
}

sub get_num_running_threads_util {
sub get_threads_util {
my $dbh = shift;
my $my_connection_id = shift;
my $running_time_threshold = shift;
my $update_only = shift;
my $type = shift;
$running_time_threshold = 0 unless ($running_time_threshold);
$update_only = 0 unless ($update_only);
$type = 0 unless ($type);
my @threads;

my $sth = $dbh->prepare(Show_Processlist_SQL);
Expand All @@ -612,9 +621,12 @@ sub get_num_running_threads_util {
next if ( defined($command) && $command eq "Binlog Dump" );
next if ( defined($user) && $user eq "system user" );

if ($update_only) {
if ( $type >= 1 ) {
next if ( defined($command) && $command eq "Sleep" );
next if ( defined($command) && $command eq "Connect" );
}

if ( $type >= 2 ) {
next if ( defined($info) && $info =~ m/^select/i );
next if ( defined($info) && $info =~ m/^show/i );
}
Expand All @@ -623,19 +635,39 @@ sub get_num_running_threads_util {
return @threads;
}

sub get_num_running_threads($$$) {
sub print_threads_util {
my ( $threads_ref, $max_prints ) = @_;
my @threads = @$threads_ref;
my $count = 0;
print "Details:\n";
foreach my $thread (@threads) {
print Data::Dumper->new( [$thread] )->Indent(0)->Terse(1)->Dump . "\n";
$count++;
if ( $count >= $max_prints ) {
printf( "And more.. (%d threads in total)\n", $#threads + 1 );
last;
}
}
}

sub get_threads($$$) {
my $self = shift;
my $running_time_threshold = shift;
my $type = shift;
return MHA::DBHelper::get_threads_util( $self->{dbh}, $self->{connection_id},
$running_time_threshold, $type );
}

sub get_running_threads($$) {
my $self = shift;
my $running_time_threshold = shift;
my $update_only = shift;
return MHA::DBHelper::get_num_running_threads_util( $self->{dbh},
$self->{connection_id},
$running_time_threshold, $update_only );
return $self->get_threads( $running_time_threshold, 1 );
}

sub get_num_running_update_threads($$) {
sub get_running_update_threads($$) {
my $self = shift;
my $running_time_threshold = shift;
return $self->get_num_running_threads( $running_time_threshold, 1 );
return $self->get_threads( $running_time_threshold, 2 );
}

sub kill_threads {
Expand Down
35 changes: 22 additions & 13 deletions lib/MHA/MasterRotate.pm
Expand Up @@ -46,6 +46,7 @@ my $g_workdir;
my $g_flush_tables = 2;
my $g_orig_master_is_new_slave;
my $g_running_updates_limit = 1;
my $g_running_seconds_limit = 10;
my $g_skip_lock_all_tables;
my $g_remove_orig_master_conf;
my $g_interactive = 1;
Expand Down Expand Up @@ -155,23 +156,16 @@ sub identify_orig_master() {
$_server_manager->check_replication_health($g_running_updates_limit);

my @threads =
$orig_master->get_num_running_update_threads(
$g_running_updates_limit + 1 );
$orig_master->get_running_update_threads( $g_running_updates_limit + 1 );
if ( $#threads >= 0 ) {
$log->error(
sprintf(
"We should not start online master switch when one of connections are running long updates. Currently %d update threads are running.",
$#threads + 1 )
"We should not start online master switch when one of connections are running long updates on the current master(%s). Currently %d update thread(s) are running.",
$orig_master->get_hostinfo(),
$#threads + 1
)
);
if ( $#threads < 10 ) {
print "Details: \n";
foreach my $thread (@threads) {
print " ID: $thread->{Id} ";
print " Query: $thread->{Info}" if ( $thread->{Info} );
print " Time: $thread->{Time}" if ( $thread->{Time} );
print "\n";
}
}
MHA::DBHelper::print_threads_util( \@threads, 10 );
croak;
}
return $orig_master;
Expand Down Expand Up @@ -230,6 +224,20 @@ sub identify_new_master {
$log->info(" ok.");

check_filter( $orig_master, $new_master );

my @threads = $new_master->get_running_threads($g_running_seconds_limit);
if ( $#threads >= 0 ) {
$log->error(
sprintf(
"We should not start online master switch when one of connections are running long queries on the new master(%s). Currently %d thread(s) are running.",
$new_master->get_hostinfo(),
$#threads + 1
)
);
MHA::DBHelper::print_threads_util( \@threads, 10 );
croak;
}

return $new_master;
}

Expand Down Expand Up @@ -647,6 +655,7 @@ sub main {
'interactive=i' => \$g_interactive,
'orig_master_is_new_slave' => \$g_orig_master_is_new_slave,
'running_updates_limit=i' => \$g_running_updates_limit,
'running_seconds_limit=i' => \$g_running_seconds_limit,
'skip_lock_all_tables' => \$g_skip_lock_all_tables,
'remove_dead_master_conf' => \$g_remove_orig_master_conf,
'remove_orig_master_conf' => \$g_remove_orig_master_conf,
Expand Down
17 changes: 15 additions & 2 deletions lib/MHA/Server.pm
Expand Up @@ -162,6 +162,12 @@ sub connect_check {
return 0;
}

sub connect_util {
my $self = shift;
return MHA::DBHelper::connect_util( $self->{ip}, $self->{port}, $self->{user},
$self->{password} );
}

# Failed to connect does not result in script die, because it is sometimes expected.
# Configuration error results in script die, because it should not happen if correctly configured.
sub connect_and_get_status {
Expand Down Expand Up @@ -432,11 +438,18 @@ sub has_replication_problem {
return 0;
}

sub get_num_running_update_threads($$) {
sub get_running_threads($$) {
my $self = shift;
my $mode = shift;
my $dbhelper = $self->{dbhelper};
$dbhelper->get_running_threads($mode);
}

sub get_running_update_threads($$) {
my $self = shift;
my $mode = shift;
my $dbhelper = $self->{dbhelper};
$dbhelper->get_num_running_update_threads($mode);
$dbhelper->get_running_update_threads($mode);
}

sub wait_until_relay_log_applied {
Expand Down
21 changes: 13 additions & 8 deletions samples/scripts/master_ip_online_change
Expand Up @@ -59,13 +59,13 @@ sub sleep_until {
}
}

sub get_num_running_threads_util {
sub get_threads_util {
my $dbh = shift;
my $my_connection_id = shift;
my $running_time_threshold = shift;
my $update_only = shift;
my $type = shift;
$running_time_threshold = 0 unless ($running_time_threshold);
$update_only = 0 unless ($update_only);
$type = 0 unless ($type);
my @threads;

my $sth = $dbh->prepare("SHOW PROCESSLIST");
Expand All @@ -90,11 +90,16 @@ sub get_num_running_threads_util {
&& defined($query_time)
&& $query_time >= 1 );

if ($update_only) {
if ( $type >= 1 ) {
next if ( defined($command) && $command eq "Sleep" );
next if ( defined($command) && $command eq "Connect" );
}

if ( $type >= 2 ) {
next if ( defined($info) && $info =~ m/^select/i );
next if ( defined($info) && $info =~ m/^show/i );
}

push @threads, $ref;
}
return @threads;
Expand Down Expand Up @@ -139,7 +144,7 @@ sub main {
## Waiting for N * 100 milliseconds so that current connections can exit
my $time_until_read_only = 15;
$_tstart = [gettimeofday];
my @threads = get_num_running_threads_util( $orig_master_handler->{dbh},
my @threads = get_threads_util( $orig_master_handler->{dbh},
$orig_master_handler->{connection_id} );
while ( $time_until_read_only > 0 && $#threads >= 0 ) {
if ( $time_until_read_only % 5 == 0 ) {
Expand All @@ -154,7 +159,7 @@ sub main {
sleep_until();
$_tstart = [gettimeofday];
$time_until_read_only--;
@threads = get_num_running_threads_util( $orig_master_handler->{dbh},
@threads = get_threads_util( $orig_master_handler->{dbh},
$orig_master_handler->{connection_id} );
}

Expand All @@ -170,7 +175,7 @@ sub main {

## Waiting for M * 100 milliseconds so that current update queries can complete
my $time_until_kill_threads = 5;
@threads = get_num_running_threads_util( $orig_master_handler->{dbh},
@threads = get_threads_util( $orig_master_handler->{dbh},
$orig_master_handler->{connection_id} );
while ( $time_until_kill_threads > 0 && $#threads >= 0 ) {
if ( $time_until_kill_threads % 5 == 0 ) {
Expand All @@ -186,7 +191,7 @@ sub main {
sleep_until();
$_tstart = [gettimeofday];
$time_until_kill_threads--;
@threads = get_num_running_threads_util( $orig_master_handler->{dbh},
@threads = get_threads_util( $orig_master_handler->{dbh},
$orig_master_handler->{connection_id} );
}

Expand Down
4 changes: 2 additions & 2 deletions tests/t/env.sh
Expand Up @@ -34,14 +34,14 @@ export CONF_IGNORE=mha_test_ignore.cnf

fail_if_zero() {
if test $2 -eq 0 ; then
echo "$1 [Fail]"
echo "$1 [Fail] (expected non-zero exit code, but $2 returned)"
exit 1
fi
}

fail_if_nonzero() {
if test $2 -ne 0 ; then
echo "$1 [Fail]"
echo "$1 [Fail] (expected zero exit code, but $2 returned)"
exit 1
fi
}
Expand Down
42 changes: 42 additions & 0 deletions tests/t/t_online_busy.sh
@@ -0,0 +1,42 @@
. ./init.sh

masterha_master_switch --master_state=alive --interactive=0 --conf=mha_test_online.cnf --new_master_host=127.0.0.1 --new_master_port=$S1P --orig_master_is_new_slave --check_only > switch.log 2>&1
fail_if_nonzero $0 $?
check_master $0 $S1P $MP

mysql $M -e "create database if not exists mysqlslap"
sleep 1
mysqlslap --host=127.0.0.1 --port=$S1P --concurrency=15 --query="select sleep(100)" > /dev/null 2>&1 &
pid=$!

sleep 15
masterha_master_switch --master_state=alive --interactive=0 --conf=mha_test_online.cnf --new_master_host=127.0.0.1 --new_master_port=$S1P --orig_master_is_new_slave > switch.log 2>&1
rc=$?
kill $pid
wait $pid 2> /dev/null
fail_if_zero $0 $rc
sleep 10

mysql $M test -e "lock tables t1 write; select sleep(100)" > /dev/null 2>&1 &
pid2=$!
sleep 1
mysql $M test -e "lock tables t1 write" > /dev/null 2>&1 &
pid3=$!
sleep 2

masterha_master_switch --master_state=alive --interactive=0 --conf=mha_test_online.cnf --new_master_host=127.0.0.1 --new_master_port=$S1P --orig_master_is_new_slave --flush_tables=0 > switch.log 2>&1
rc=$?
kill $pid2
wait $pid2 2> /dev/null
kill $pid3
wait $pid3 2> /dev/null
fail_if_zero $0 $rc
sleep 5

masterha_master_switch --master_state=alive --interactive=0 --conf=mha_test_online.cnf --new_master_host=127.0.0.1 --new_master_port=$S1P --orig_master_is_new_slave > switch.log 2>&1
fail_if_nonzero $0 $?
mysql $S1 test -e "insert into t1 values(10000003, 300, 'bbbaaaaaaa');"
check_master $0 $MP $S1P
check_count $0 $MP 2
./check $0 2

0 comments on commit 4bb9953

Please sign in to comment.