Skip to content

Control the Maximum speed(KB/s) to read binlog from master #269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/mysql.h.pp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
const unsigned char *packet, size_t len);
my_bool net_write_packet(NET *net, const unsigned char *packet, size_t length);
unsigned long my_net_read(NET *net);
unsigned long my_net_read_packet_reallen(NET *net, unsigned long *reallen);
struct rand_struct {
unsigned long seed1,seed2,max_value;
double max_value_dbl;
Expand Down
1 change: 1 addition & 0 deletions include/mysql_com.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ my_bool net_write_command(NET *net,unsigned char command,
const unsigned char *packet, size_t len);
my_bool net_write_packet(NET *net, const unsigned char *packet, size_t length);
unsigned long my_net_read(NET *net);
unsigned long my_net_read_packet_reallen(NET *net, unsigned long *reallen);

#ifdef MY_GLOBAL_INCLUDED
void my_net_set_write_timeout(NET *net, uint timeout);
Expand Down
3 changes: 3 additions & 0 deletions include/sql_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,11 @@ cli_advanced_command(MYSQL *mysql, enum enum_server_command command,
const unsigned char *arg, size_t arg_length,
my_bool skip_check, MYSQL_STMT *stmt);
unsigned long cli_safe_read(MYSQL *mysql, my_bool *is_data_packet);
unsigned long cli_safe_read_reallen(MYSQL *mysql, my_bool *is_data_packet, unsigned long *reallen);
unsigned long cli_safe_read_with_ok(MYSQL *mysql, my_bool parse_ok,
my_bool *is_data_packet);
unsigned long cli_safe_read_reallen_with_ok(MYSQL *mysql, my_bool parse_ok,
my_bool *is_data_packet, unsigned long *reallen);
void net_clear_error(NET *net);
void set_stmt_errmsg(MYSQL_STMT *stmt, NET *net);
void set_stmt_error(MYSQL_STMT *stmt, int errcode, const char *sqlstate,
Expand Down
4 changes: 4 additions & 0 deletions mysql-test/r/mysqld--help-notwin.result
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,9 @@ The following options may be given as the first argument:
optimization of a query, index range scan will not be
considered for this query. A value of 0 means range
optimizer does not have any cap on memory.
--read-binlog-speed-limit=#
Maximum speed(KB/s) to read binlog from master (0 = no
limit)
--read-buffer-size=#
Each thread that does a sequential scan allocates a
buffer of this size for each table it scans. If you do
Expand Down Expand Up @@ -1540,6 +1543,7 @@ query-cache-wlock-invalidate FALSE
query-prealloc-size 8192
range-alloc-block-size 4096
range-optimizer-max-mem-size 8388608
read-binlog-speed-limit 0
read-buffer-size 131072
read-only FALSE
read-rnd-buffer-size 262144
Expand Down
2 changes: 2 additions & 0 deletions mysql-test/suite/perfschema/r/show_sanity.result
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ SHOW_MODE SOURCE VARIABLE_NAME
5.6 I_S.SESSION_VARIABLES INNODB_STATS_INCLUDE_DELETE_MARKED
5.6 I_S.SESSION_VARIABLES KEYRING_OPERATIONS
5.6 I_S.SESSION_VARIABLES LOG_STATEMENTS_UNSAFE_FOR_BINLOG
5.6 I_S.SESSION_VARIABLES READ_BINLOG_SPEED_LIMIT
5.6 I_S.SESSION_VARIABLES TLS_VERSION

================================================================================
Expand Down Expand Up @@ -442,6 +443,7 @@ SHOW_MODE SOURCE VARIABLE_NAME
5.6 I_S.SESSION_VARIABLES INNODB_STATS_INCLUDE_DELETE_MARKED
5.6 I_S.SESSION_VARIABLES KEYRING_OPERATIONS
5.6 I_S.SESSION_VARIABLES LOG_STATEMENTS_UNSAFE_FOR_BINLOG
5.6 I_S.SESSION_VARIABLES READ_BINLOG_SPEED_LIMIT
5.6 I_S.SESSION_VARIABLES TLS_VERSION

================================================================================
Expand Down
2 changes: 2 additions & 0 deletions mysql-test/suite/sys_vars/r/all_vars.result
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ DISABLED_STORAGE_ENGINES
DISABLED_STORAGE_ENGINES
KEYRING_OPERATIONS
KEYRING_OPERATIONS
READ_BINLOG_SPEED_LIMIT
READ_BINLOG_SPEED_LIMIT
TLS_VERSION
TLS_VERSION
drop table t1;
Expand Down
16 changes: 15 additions & 1 deletion sql-common/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,14 @@ void read_ok_ex(MYSQL *mysql, ulong length)
ulong
cli_safe_read_with_ok(MYSQL *mysql, my_bool parse_ok,
my_bool *is_data_packet)
{
ulong reallen = 0;
return cli_safe_read_reallen_with_ok(mysql, parse_ok, is_data_packet, &reallen);
}
ulong
cli_safe_read_reallen_with_ok(MYSQL *mysql, my_bool parse_ok,
my_bool *is_data_packet,
ulong *reallen)
{
NET *net= &mysql->net;
ulong len=0;
Expand All @@ -1058,7 +1066,8 @@ cli_safe_read_with_ok(MYSQL *mysql, my_bool parse_ok,
*is_data_packet= FALSE;

if (net->vio != 0)
len=my_net_read(net);
len=my_net_read_packet_reallen(net, reallen);


if (len == packet_error || len == 0)
{
Expand Down Expand Up @@ -1193,6 +1202,11 @@ ulong cli_safe_read(MYSQL *mysql, my_bool *is_data_packet)
{
return cli_safe_read_with_ok(mysql, 0, is_data_packet);
}
ulong cli_safe_read_reallen(MYSQL *mysql, my_bool *is_data_packet, ulong *reallen)
{
return cli_safe_read_reallen_with_ok(mysql, 0, is_data_packet, reallen);
}



void free_rows(MYSQL_DATA *cur)
Expand Down
14 changes: 13 additions & 1 deletion sql/net_serv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -887,11 +887,19 @@ static size_t net_read_packet(NET *net, size_t *complen)

ulong
my_net_read(NET *net)
{
ulong reallen = 0;
return my_net_read_packet_reallen(net, &reallen);
}

ulong
my_net_read_packet_reallen(NET *net, ulong* reallen)
{
size_t len, complen;

MYSQL_NET_READ_START();

*reallen = 0;
#ifdef HAVE_COMPRESS
if (!net->compress)
{
Expand All @@ -914,7 +922,10 @@ my_net_read(NET *net)
}
net->read_pos = net->buff + net->where_b;
if (len != packet_error)
net->read_pos[len]=0; /* Safeguard for mysql_use_result */
{
net->read_pos[len] = 0; /* Safeguard for mysql_use_result */
*reallen = len;
}
MYSQL_NET_READ_DONE(0, len);
return static_cast<ulong>(len);
#ifdef HAVE_COMPRESS
Expand Down Expand Up @@ -1020,6 +1031,7 @@ my_net_read(NET *net)
return packet_error;
}
buf_length+= complen;
*reallen+= packet_len;
}

net->read_pos= net->buff+ first_packet_offset + NET_HEADER_SIZE;
Expand Down
54 changes: 50 additions & 4 deletions sql/rpl_slave.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE];
char* slave_load_tmpdir = 0;
my_bool replicate_same_server_id;
ulonglong relay_log_space_limit = 0;
ulonglong opt_read_binlog_speed_limit = 0;

const char *relay_log_index= 0;
const char *relay_log_basename= 0;
Expand Down Expand Up @@ -4463,13 +4464,16 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
try a reconnect. We do not want to print anything to
the error log in this case because this a anormal
event in an idle server.
network_read_len get the real network read length in VIO, especially
using compressed protocol

RETURN VALUES
'packet_error' Error
number Length of packet
*/

static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings)
static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings,
ulong* network_read_len)
{
ulong len;
DBUG_ENTER("read_event");
Expand All @@ -4484,7 +4488,7 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings)
DBUG_RETURN(packet_error);
#endif

len= cli_safe_read(mysql, NULL);
len= cli_safe_read_reallen(mysql, NULL, network_read_len);
if (len == packet_error || (long) len < 1)
{
if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
Expand Down Expand Up @@ -5761,17 +5765,19 @@ requesting master dump") ||
const char *event_buf;

DBUG_ASSERT(mi->last_error().number == 0);
ulonglong lastchecktime = my_micro_time();
ulonglong tokenamount = opt_read_binlog_speed_limit * 1024;
while (!io_slave_killed(thd,mi))
{
ulong event_len;
ulong event_len, network_read_len = 0;
/*
We say "waiting" because read_event() will wait if there's nothing to
read. But if there's something to read, it will not wait. The
important thing is to not confuse users by saying "reading" whereas
we're in fact receiving nothing.
*/
THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event);
event_len= read_event(mysql, mi, &suppress_warnings);
event_len= read_event(mysql, mi, &suppress_warnings, &network_read_len);
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
reading event"))
goto err;
Expand Down Expand Up @@ -5834,6 +5840,46 @@ Stopping slave I/O thread due to out-of-memory error from master");
goto err;
}

/* Control the binlog read speed of master
when read_binlog_speed_limit is non-zero
*/
ulonglong speed_limit_in_bytes = opt_read_binlog_speed_limit * 1024;
if (speed_limit_in_bytes)
{
/* Prevent the tokenamount become a large value,
for example, the IO thread doesn't work for a long time
*/
if (tokenamount > speed_limit_in_bytes * 2)
{
lastchecktime = my_micro_time();
tokenamount = speed_limit_in_bytes * 2;
}

do
{
ulonglong currenttime = my_micro_time();
tokenamount += (currenttime - lastchecktime) * speed_limit_in_bytes / (1000 * 1000);
lastchecktime = currenttime;
if (tokenamount < network_read_len)
{
ulonglong micro_time = 1000 * 1000 * (network_read_len - tokenamount) / speed_limit_in_bytes;
ulonglong second_time = micro_time / (1000 * 1000);
micro_time = micro_time % (1000 * 1000);

// at least sleep 1000 micro second
my_sleep(micro_time > 1000 ? micro_time : 1000);

/*
If it sleep more than one second,
it should use slave_sleep() to avoid the STOP SLAVE hang.
*/
if (second_time)
slave_sleep(thd, second_time, io_slave_killed, mi);
}
} while (tokenamount < network_read_len);
tokenamount -= network_read_len;
}

/* XXX: 'synced' should be updated by queue_event to indicate
whether event has been synced to disk */
bool synced= 0;
Expand Down
1 change: 1 addition & 0 deletions sql/rpl_slave.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ extern my_bool opt_skip_slave_start, opt_reckless_slave;
extern my_bool opt_log_slave_updates;
extern char *opt_slave_skip_errors;
extern ulonglong relay_log_space_limit;
extern ulonglong opt_read_binlog_speed_limit;

extern const char *relay_log_index;
extern const char *relay_log_basename;
Expand Down
6 changes: 6 additions & 0 deletions sql/sys_vars.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5316,6 +5316,12 @@ static Sys_var_ulonglong Sys_relay_log_space_limit(
READ_ONLY GLOBAL_VAR(relay_log_space_limit), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, ULONG_MAX), DEFAULT(0), BLOCK_SIZE(1));

static Sys_var_ulonglong Sys_read_binlog_speed_limit(
"read_binlog_speed_limit", "Maximum speed(KB/s) to read binlog from"
" master (0 = no limit)",
GLOBAL_VAR(opt_read_binlog_speed_limit), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, ULONG_MAX), DEFAULT(0), BLOCK_SIZE(1));

static Sys_var_uint Sys_sync_relaylog_period(
"sync_relay_log", "Synchronously flush relay log to disk after "
"every #th event. Use 0 to disable synchronous flushing",
Expand Down