From 9bfe6028224b6125c7e9104b7e91734e9b030952 Mon Sep 17 00:00:00 2001 From: Yasufumi Kinoshita Date: Mon, 6 Apr 2020 11:52:36 +0900 Subject: [PATCH] Bug#30088404 : CONCURRENT LOG BUFFER OF 8.0 CAUSES REGRESSION OF CPU BOUND PERFORMANCE ON LOW-END ARCHITECTURE - improved ut0link_buf.h and log_closer_thread was removed because not needed - add variable to choose former style log_write_up_to for low-end servers cpu% bound innodb_log_writer_threads=[ON|OFF] - optimized checkpoint and sync_flush behavior RB: 23457 Reviewed-by: Pawel Olchawa --- mysql-test/r/all_persisted_variables.result | 8 +- .../suite/innodb/r/innodb_redo_debug_1.result | 154 ------ .../suite/innodb/r/log_writer_threads.result | 16 + .../suite/innodb/t/innodb_redo_debug_1.test | 16 - .../suite/innodb/t/log_writer_threads.test | 49 ++ .../suite/perfschema/r/threads_innodb.result | 1 - .../r/innodb_log_writer_threads_basic.result | 34 ++ .../t/innodb_log_writer_threads_basic.test | 29 ++ .../r/pr_ps_setup_show_enabled.result | 2 - mysql-test/t/all_persisted_variables.test | 2 +- storage/innobase/buf/buf0flu.cc | 13 +- storage/innobase/handler/ha_innodb.cc | 35 +- storage/innobase/include/buf0flu.h | 5 +- storage/innobase/include/log0log.h | 51 +- storage/innobase/include/log0log.ic | 6 +- storage/innobase/include/log0types.h | 27 + storage/innobase/include/srv0srv.h | 14 +- storage/innobase/include/ut0link_buf.h | 166 ++++-- storage/innobase/log/log0buf.cc | 106 ++-- storage/innobase/log/log0chkp.cc | 151 +++--- storage/innobase/log/log0log.cc | 112 ++++- storage/innobase/log/log0write.cc | 472 ++++++++++++------ storage/innobase/srv/srv0srv.cc | 26 +- storage/innobase/srv/srv0start.cc | 3 + 24 files changed, 881 insertions(+), 617 deletions(-) create mode 100644 mysql-test/suite/innodb/r/log_writer_threads.result create mode 100644 mysql-test/suite/innodb/t/log_writer_threads.test create mode 100644 mysql-test/suite/sys_vars/r/innodb_log_writer_threads_basic.result create mode 100644 mysql-test/suite/sys_vars/t/innodb_log_writer_threads_basic.test diff --git a/mysql-test/r/all_persisted_variables.result b/mysql-test/r/all_persisted_variables.result index 731aac9cf508..d8ef2fbe61b2 100644 --- a/mysql-test/r/all_persisted_variables.result +++ b/mysql-test/r/all_persisted_variables.result @@ -38,7 +38,7 @@ include/assert.inc [Expect 500+ variables in the table. Due to open Bugs, we are # Test SET PERSIST -include/assert.inc [Expect 402 persisted variables in the table.] +include/assert.inc [Expect 403 persisted variables in the table.] ************************************************************ * 3. Restart server, it must preserve the persisted variable @@ -46,9 +46,9 @@ include/assert.inc [Expect 402 persisted variables in the table.] ************************************************************ # restart -include/assert.inc [Expect 402 persisted variables in persisted_variables table.] -include/assert.inc [Expect 402 persisted variables shown as PERSISTED in variables_info table.] -include/assert.inc [Expect 402 persisted variables with matching peristed and global values.] +include/assert.inc [Expect 403 persisted variables in persisted_variables table.] +include/assert.inc [Expect 403 persisted variables shown as PERSISTED in variables_info table.] +include/assert.inc [Expect 403 persisted variables with matching peristed and global values.] ************************************************************ * 4. Test RESET PERSIST IF EXISTS. Verify persisted variable diff --git a/mysql-test/suite/innodb/r/innodb_redo_debug_1.result b/mysql-test/suite/innodb/r/innodb_redo_debug_1.result index 25999c368852..636a5be9869b 100644 --- a/mysql-test/suite/innodb/r/innodb_redo_debug_1.result +++ b/mysql-test/suite/innodb/r/innodb_redo_debug_1.result @@ -229,157 +229,3 @@ count(*)=0 1 DROP DATABASE dbredoopt; DROP USER 'redo_wl_user'@'localhost'; -# Test with log_advance_ready_for_write_before_update -DROP DATABASE IF EXISTS dbredoopt; -CREATE DATABASE dbredoopt; -USE dbredoopt; -CREATE TABLE dbredoopt.t1 -(col1 INT, col2 INT, c1 LONGBLOB, c2 LONGBLOB, -PRIMARY KEY(col1), INDEX `idx2` (col2), -INDEX `idx3` (c1(300),c2(200)) -); -CREATE PROCEDURE dbredoopt.populate_t1() -BEGIN -DECLARE i INT DEFAULT 1; -while (i <= 500) DO -INSERT INTO dbredoopt.t1 values (i, i,REPEAT('a',5000), REPEAT('b',5000)); -SET i = i + 1; -END WHILE; -END| -CREATE USER 'redo_wl_user'@'localhost'; -GRANT ALL PRIVILEGES ON *.* TO 'redo_wl_user'@'localhost' WITH GRANT OPTION; -START TRANSACTION; -SELECT count(*) FROM dbredoopt.t1; -count(*) -0 -call dbredoopt.populate_t1(); -SELECT count(*) FROM dbredoopt.t1; -count(*) -500 -SELECT col1 FROM dbredoopt.t1 LIMIT 10; -col1 -1 -2 -3 -4 -5 -6 -7 -8 -9 -10 -COMMIT; -# Check rows which are not modified by parallel load -SELECT col1 FROM dbredoopt.t1 WHERE col1%50=0 AND (col1 < 501 AND col1 > 0 ); -col1 -50 -100 -150 -200 -250 -300 -350 -400 -450 -500 -# Wait till all mysql clients are connected -# Singal all mysql clients since each one is waiting for signal go_ahead -# with 'go_ahead' signal , all client suppose to resume in parallel -# Wait till all clients are over or server is gone -# restart -# Check rows which are not modified by parallel load -SELECT col1 FROM dbredoopt.t1 WHERE col1%50=0 AND (col1 < 501 AND col1 > 0 ); -col1 -50 -100 -150 -200 -250 -300 -350 -400 -450 -500 -# Check rows which are updated by parallel load -SELECT count(*)=0 FROM (SELECT col1,col2 FROM dbredoopt.t1 WHERE col1 NOT BETWEEN 1 AND 501 ) AS A WHERE (A.col1+A.col2) != 0 ; -count(*)=0 -1 -DROP DATABASE dbredoopt; -DROP USER 'redo_wl_user'@'localhost'; -# Test with log_advance_ready_for_write_before_reclaim -DROP DATABASE IF EXISTS dbredoopt; -CREATE DATABASE dbredoopt; -USE dbredoopt; -CREATE TABLE dbredoopt.t1 -(col1 INT, col2 INT, c1 LONGBLOB, c2 LONGBLOB, -PRIMARY KEY(col1), INDEX `idx2` (col2), -INDEX `idx3` (c1(300),c2(200)) -); -CREATE PROCEDURE dbredoopt.populate_t1() -BEGIN -DECLARE i INT DEFAULT 1; -while (i <= 500) DO -INSERT INTO dbredoopt.t1 values (i, i,REPEAT('a',5000), REPEAT('b',5000)); -SET i = i + 1; -END WHILE; -END| -CREATE USER 'redo_wl_user'@'localhost'; -GRANT ALL PRIVILEGES ON *.* TO 'redo_wl_user'@'localhost' WITH GRANT OPTION; -START TRANSACTION; -SELECT count(*) FROM dbredoopt.t1; -count(*) -0 -call dbredoopt.populate_t1(); -SELECT count(*) FROM dbredoopt.t1; -count(*) -500 -SELECT col1 FROM dbredoopt.t1 LIMIT 10; -col1 -1 -2 -3 -4 -5 -6 -7 -8 -9 -10 -COMMIT; -# Check rows which are not modified by parallel load -SELECT col1 FROM dbredoopt.t1 WHERE col1%50=0 AND (col1 < 501 AND col1 > 0 ); -col1 -50 -100 -150 -200 -250 -300 -350 -400 -450 -500 -# Wait till all mysql clients are connected -# Singal all mysql clients since each one is waiting for signal go_ahead -# with 'go_ahead' signal , all client suppose to resume in parallel -# Wait till all clients are over or server is gone -# restart -# Check rows which are not modified by parallel load -SELECT col1 FROM dbredoopt.t1 WHERE col1%50=0 AND (col1 < 501 AND col1 > 0 ); -col1 -50 -100 -150 -200 -250 -300 -350 -400 -450 -500 -# Check rows which are updated by parallel load -SELECT count(*)=0 FROM (SELECT col1,col2 FROM dbredoopt.t1 WHERE col1 NOT BETWEEN 1 AND 501 ) AS A WHERE (A.col1+A.col2) != 0 ; -count(*)=0 -1 -DROP DATABASE dbredoopt; -DROP USER 'redo_wl_user'@'localhost'; diff --git a/mysql-test/suite/innodb/r/log_writer_threads.result b/mysql-test/suite/innodb/r/log_writer_threads.result new file mode 100644 index 000000000000..c6c2dab0221e --- /dev/null +++ b/mysql-test/suite/innodb/r/log_writer_threads.result @@ -0,0 +1,16 @@ +SET GLOBAL innodb_flush_log_at_trx_commit = 1; +CREATE TABLE t1 (a CHAR(8)); +SET GLOBAL innodb_log_writer_threads = ON; +SET DEBUG_SYNC = 'RESET'; +SET DEBUG_SYNC = 'log_flushed_by_writer SIGNAL log_flushed'; +SET AUTOCOMMIT = 1; +INSERT INTO t1 (a) VALUES ('a'); +SET DEBUG_SYNC = 'now WAIT_FOR log_flushed'; +SET GLOBAL innodb_log_writer_threads = OFF; +SET DEBUG_SYNC = 'RESET'; +SET DEBUG_SYNC = 'log_flushed_by_self SIGNAL log_flushed'; +SET AUTOCOMMIT = 1; +INSERT INTO t1 (a) VALUES ('a'); +SET DEBUG_SYNC = 'now WAIT_FOR log_flushed'; +SET DEBUG_SYNC = 'RESET'; +DROP TABLE t1; diff --git a/mysql-test/suite/innodb/t/innodb_redo_debug_1.test b/mysql-test/suite/innodb/t/innodb_redo_debug_1.test index 123ea4c4c1ce..145ee01068b6 100644 --- a/mysql-test/suite/innodb/t/innodb_redo_debug_1.test +++ b/mysql-test/suite/innodb/t/innodb_redo_debug_1.test @@ -4,8 +4,6 @@ # 1) log_buffer_write_completed_before_store # 2) log_buffer_write_before_memcpy # 3) log_buffer_set_first_record_group_before_update -# 4) log_advance_ready_for_write_before_update -# 5) log_advance_ready_for_write_before_reclaim #------------------------------------------------------------------------------ # Test uses debug binary @@ -35,17 +33,3 @@ let debug_point=log_buffer_write_before_memcpy; let client_cnt=5; let debug_point=log_buffer_set_first_record_group_before_update; --source suite/innodb/include/innodb_redo_debug.inc - ---echo # Test with log_advance_ready_for_write_before_update ---source include/expect_crash.inc -# No. of parallel client running load -let client_cnt=5; -let debug_point=log_advance_ready_for_write_before_update; ---source suite/innodb/include/innodb_redo_debug.inc - ---echo # Test with log_advance_ready_for_write_before_reclaim ---source include/expect_crash.inc -# No. of parallel client running load -let client_cnt=5; -let debug_point=log_advance_ready_for_write_before_reclaim; ---source suite/innodb/include/innodb_redo_debug.inc diff --git a/mysql-test/suite/innodb/t/log_writer_threads.test b/mysql-test/suite/innodb/t/log_writer_threads.test new file mode 100644 index 000000000000..ae7d98e8f220 --- /dev/null +++ b/mysql-test/suite/innodb/t/log_writer_threads.test @@ -0,0 +1,49 @@ +--source include/have_debug.inc +--source include/have_debug_sync.inc + +--disable_query_log +SET @old_innodb_log_writer_threads = @@innodb_log_writer_threads; +SET @old_innodb_flush_log_at_trx_commit = @@innodb_flush_log_at_trx_commit; +--enable_query_log + +# Save the initial number of concurrent sessions +--source include/count_sessions.inc + +SET GLOBAL innodb_flush_log_at_trx_commit = 1; + +--connect (con1,localhost,root,,) + +CREATE TABLE t1 (a CHAR(8)); + +# log flushed by writer threads +SET GLOBAL innodb_log_writer_threads = ON; +SET DEBUG_SYNC = 'RESET'; +--connection con1 +SET DEBUG_SYNC = 'log_flushed_by_writer SIGNAL log_flushed'; +SET AUTOCOMMIT = 1; +INSERT INTO t1 (a) VALUES ('a'); +--connection default +SET DEBUG_SYNC = 'now WAIT_FOR log_flushed'; + +# log flushed by user thread +SET GLOBAL innodb_log_writer_threads = OFF; +SET DEBUG_SYNC = 'RESET'; +--connection con1 +SET DEBUG_SYNC = 'log_flushed_by_self SIGNAL log_flushed'; +SET AUTOCOMMIT = 1; +INSERT INTO t1 (a) VALUES ('a'); +--connection default +SET DEBUG_SYNC = 'now WAIT_FOR log_flushed'; + +# Cleanup +SET DEBUG_SYNC = 'RESET'; +--disconnect con1 +DROP TABLE t1; + +--disable_query_log +SET GLOBAL innodb_log_writer_threads = @old_innodb_log_writer_threads; +SET GLOBAL innodb_flush_log_at_trx_commit = @old_innodb_flush_log_at_trx_commit; +--enable_query_log + +# Wait till all disconnects are completed. +--source include/wait_until_count_sessions.inc diff --git a/mysql-test/suite/perfschema/r/threads_innodb.result b/mysql-test/suite/perfschema/r/threads_innodb.result index 7602cb4cf6a1..fad60d8bb065 100644 --- a/mysql-test/suite/perfschema/r/threads_innodb.result +++ b/mysql-test/suite/perfschema/r/threads_innodb.result @@ -16,7 +16,6 @@ thread/innodb/io_log_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL Y thread/innodb/io_read_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL YES thread/innodb/io_write_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL YES thread/innodb/log_checkpointer_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL YES -thread/innodb/log_closer_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL YES thread/innodb/log_flush_notifier_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL YES thread/innodb/log_flusher_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL YES thread/innodb/log_write_notifier_thread BACKGROUND NULL NULL NULL NULL NULL NULL NULL NULL YES diff --git a/mysql-test/suite/sys_vars/r/innodb_log_writer_threads_basic.result b/mysql-test/suite/sys_vars/r/innodb_log_writer_threads_basic.result new file mode 100644 index 000000000000..556eb87be25a --- /dev/null +++ b/mysql-test/suite/sys_vars/r/innodb_log_writer_threads_basic.result @@ -0,0 +1,34 @@ +SET @orig = @@global.innodb_log_writer_threads; +SET GLOBAL innodb_log_writer_threads = OFF; +SELECT @@global.innodb_log_writer_threads; +@@global.innodb_log_writer_threads +0 +SET GLOBAL innodb_log_writer_threads = default; +SELECT @@global.innodb_log_writer_threads; +@@global.innodb_log_writer_threads +1 +SET GLOBAL innodb_log_writer_threads = ON; +SELECT @@global.innodb_log_writer_threads; +@@global.innodb_log_writer_threads +1 +SET GLOBAL innodb_log_writer_threads = 2; +ERROR 42000: Variable 'innodb_log_writer_threads' can't be set to the value of '2' +SELECT @@global.innodb_log_writer_threads; +@@global.innodb_log_writer_threads +1 +SET GLOBAL innodb_log_writer_threads = 1e2; +ERROR 42000: Incorrect argument type to variable 'innodb_log_writer_threads' +SELECT @@global.innodb_log_writer_threads; +@@global.innodb_log_writer_threads +1 +SET GLOBAL innodb_log_writer_threads = 1.0; +ERROR 42000: Incorrect argument type to variable 'innodb_log_writer_threads' +SELECT @@global.innodb_log_writer_threads; +@@global.innodb_log_writer_threads +1 +SET innodb_log_writer_threads = OFF; +ERROR HY000: Variable 'innodb_log_writer_threads' is a GLOBAL variable and should be set with SET GLOBAL +SELECT @@global.innodb_log_writer_threads; +@@global.innodb_log_writer_threads +1 +SET GLOBAL innodb_log_writer_threads = @orig; diff --git a/mysql-test/suite/sys_vars/t/innodb_log_writer_threads_basic.test b/mysql-test/suite/sys_vars/t/innodb_log_writer_threads_basic.test new file mode 100644 index 000000000000..cdb9c72201af --- /dev/null +++ b/mysql-test/suite/sys_vars/t/innodb_log_writer_threads_basic.test @@ -0,0 +1,29 @@ +# Check the default value +SET @orig = @@global.innodb_log_writer_threads; + +SET GLOBAL innodb_log_writer_threads = OFF; +SELECT @@global.innodb_log_writer_threads; + +SET GLOBAL innodb_log_writer_threads = default; +SELECT @@global.innodb_log_writer_threads; + +SET GLOBAL innodb_log_writer_threads = ON; +SELECT @@global.innodb_log_writer_threads; + +-- error ER_WRONG_VALUE_FOR_VAR +SET GLOBAL innodb_log_writer_threads = 2; +SELECT @@global.innodb_log_writer_threads; + +-- error ER_WRONG_TYPE_FOR_VAR +SET GLOBAL innodb_log_writer_threads = 1e2; +SELECT @@global.innodb_log_writer_threads; + +-- error ER_WRONG_TYPE_FOR_VAR +SET GLOBAL innodb_log_writer_threads = 1.0; +SELECT @@global.innodb_log_writer_threads; + +-- error ER_GLOBAL_VARIABLE +SET innodb_log_writer_threads = OFF; +SELECT @@global.innodb_log_writer_threads; + +SET GLOBAL innodb_log_writer_threads = @orig; diff --git a/mysql-test/suite/sysschema/r/pr_ps_setup_show_enabled.result b/mysql-test/suite/sysschema/r/pr_ps_setup_show_enabled.result index 2c2e4d67d467..64baf091f726 100644 --- a/mysql-test/suite/sysschema/r/pr_ps_setup_show_enabled.result +++ b/mysql-test/suite/sysschema/r/pr_ps_setup_show_enabled.result @@ -227,7 +227,6 @@ innodb/io_write_thread BACKGROUND innodb/io_write_thread BACKGROUND innodb/io_write_thread BACKGROUND innodb/log_checkpointer_thread BACKGROUND -innodb/log_closer_thread BACKGROUND innodb/log_flush_notifier_thread BACKGROUND innodb/log_flusher_thread BACKGROUND innodb/log_write_notifier_thread BACKGROUND @@ -278,7 +277,6 @@ innodb/io_write_thread BACKGROUND innodb/io_write_thread BACKGROUND innodb/io_write_thread BACKGROUND innodb/log_checkpointer_thread BACKGROUND -innodb/log_closer_thread BACKGROUND innodb/log_flush_notifier_thread BACKGROUND innodb/log_flusher_thread BACKGROUND innodb/log_write_notifier_thread BACKGROUND diff --git a/mysql-test/t/all_persisted_variables.test b/mysql-test/t/all_persisted_variables.test index 54d75dc5469a..4fbaf491f4f8 100644 --- a/mysql-test/t/all_persisted_variables.test +++ b/mysql-test/t/all_persisted_variables.test @@ -40,7 +40,7 @@ call mtr.add_suppression("Failed to set up SSL because of the following SSL libr call mtr.add_suppression("Failed to initialize TLS for channel: mysql_main"); let $total_global_vars=`SELECT COUNT(*) FROM performance_schema.global_variables where variable_name NOT LIKE 'ndb_%'`; -let $total_persistent_vars=402; +let $total_persistent_vars=403; --echo *************************************************************** --echo * 0. Verify that variables present in performance_schema.global diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc index cc60a2c589db..f9bcae3d9a4a 100644 --- a/storage/innobase/buf/buf0flu.cc +++ b/storage/innobase/buf/buf0flu.cc @@ -95,6 +95,9 @@ mysql_pfs_key_t page_flush_coordinator_thread_key; /** Event to synchronise with the flushing. */ os_event_t buf_flush_event; +/** Event to wait for one flushing step */ +os_event_t buf_flush_tick_event; + /** State for page cleaner array slot */ enum page_cleaner_state_t { /** Not requested any yet. @@ -2835,6 +2838,8 @@ static bool pc_wait_finished(ulint *n_flushed_lru, ulint *n_flushed_list) { mutex_exit(&page_cleaner->mutex); + os_event_set(buf_flush_tick_event); + return (all_succeeded); } @@ -3102,7 +3107,9 @@ static void buf_flush_page_coordinator_thread(size_t n_page_cleaners) { } mutex_enter(&page_cleaner->mutex); + /* lsn_limit!=0 means there are requests. needs to check the lsn. */ lsn_t lsn_limit = buf_flush_sync_lsn; + buf_flush_sync_lsn = 0; mutex_exit(&page_cleaner->mutex); if (srv_read_only_mode) { @@ -3110,11 +3117,7 @@ static void buf_flush_page_coordinator_thread(size_t n_page_cleaners) { } else { ut_a(log_sys != nullptr); - const lsn_t checkpoint_lsn = log_sys->last_checkpoint_lsn.load(); - - const lsn_t lag = log_buffer_flush_order_lag(*log_sys); - - is_sync_flush = srv_flush_sync && lsn_limit > checkpoint_lsn + lag; + is_sync_flush = srv_flush_sync && lsn_limit > 0; } if (is_sync_flush || is_server_active) { diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index bafc66a9ed60..9c37c3104278 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -730,7 +730,6 @@ static PSI_thread_info all_innodb_threads[] = { PSI_KEY(io_write_thread, 0, 0, PSI_DOCUMENT_ME), PSI_KEY(buf_resize_thread, 0, 0, PSI_DOCUMENT_ME), PSI_KEY(log_writer_thread, 0, 0, PSI_DOCUMENT_ME), - PSI_KEY(log_closer_thread, 0, 0, PSI_DOCUMENT_ME), PSI_KEY(log_checkpointer_thread, 0, 0, PSI_DOCUMENT_ME), PSI_KEY(log_flusher_thread, 0, 0, PSI_DOCUMENT_ME), PSI_KEY(log_write_notifier_thread, 0, 0, PSI_DOCUMENT_ME), @@ -20780,6 +20779,20 @@ static void innodb_log_buffer_size_update(THD *thd, SYS_VAR *var, void *var_ptr, } } +/** Update the innodb_log_writer_threads parameter. +@param[in] thd thread handle +@param[in] var system variable +@param[out] var_ptr current value +@param[in] save immediate result from check function */ +static void innodb_log_writer_threads_update(THD *thd, SYS_VAR *var, + void *var_ptr, const void *save) { + *static_cast(var_ptr) = *static_cast(save); + + /* pause/resume the log writer threads based on innodb_log_writer_threads + value. */ + log_control_writer_threads(*log_sys); +} + /** Update the system variable innodb_thread_concurrency using the "saved" value. This function is registered as a callback with MySQL. @param[in] thd thread handle @@ -21457,6 +21470,12 @@ static MYSQL_SYSVAR_ULONG(log_write_ahead_size, srv_log_write_ahead_size, INNODB_LOG_WRITE_AHEAD_SIZE_MAX, OS_FILE_LOG_BLOCK_SIZE); +static MYSQL_SYSVAR_BOOL( + log_writer_threads, srv_log_writer_threads, PLUGIN_VAR_RQCMDARG, + "Whether the log writer threads should be activated (ON), or write/flush " + "of the redo log should be done by each thread individually (OFF).", + nullptr, innodb_log_writer_threads_update, TRUE); + static MYSQL_SYSVAR_UINT( log_spin_cpu_abs_lwm, srv_log_spin_cpu_abs_lwm, PLUGIN_VAR_RQCMDARG, "Minimum value of cpu time for which spin-delay is used." @@ -21597,17 +21616,6 @@ static MYSQL_SYSVAR_ULONG( " (microseconds)", NULL, NULL, INNODB_LOG_FLUSH_NOTIFIER_TIMEOUT_DEFAULT, 0, ULONG_MAX, 0); -static MYSQL_SYSVAR_ULONG( - log_closer_spin_delay, srv_log_closer_spin_delay, PLUGIN_VAR_RQCMDARG, - "Number of spin iterations, for which log closer thread is waiting" - " for dirty pages added.", - NULL, NULL, INNODB_LOG_CLOSER_SPIN_DELAY_DEFAULT, 0, ULONG_MAX, 0); - -static MYSQL_SYSVAR_ULONG( - log_closer_timeout, srv_log_closer_timeout, PLUGIN_VAR_RQCMDARG, - "Initial sleep time in log closer thread (microseconds)", NULL, NULL, - INNODB_LOG_CLOSER_TIMEOUT_DEFAULT, 0, ULONG_MAX, 0); - #endif /* ENABLE_EXPERIMENT_SYSVARS */ static MYSQL_SYSVAR_UINT( @@ -22080,6 +22088,7 @@ static SYS_VAR *innobase_system_variables[] = { #endif /* UNIV_DEBUG_DEDICATED */ MYSQL_SYSVAR(log_write_ahead_size), MYSQL_SYSVAR(log_group_home_dir), + MYSQL_SYSVAR(log_writer_threads), MYSQL_SYSVAR(log_spin_cpu_abs_lwm), MYSQL_SYSVAR(log_spin_cpu_pct_hwm), MYSQL_SYSVAR(log_wait_for_flush_spin_hwm), @@ -22102,8 +22111,6 @@ static SYS_VAR *innobase_system_variables[] = { MYSQL_SYSVAR(log_write_notifier_timeout), MYSQL_SYSVAR(log_flush_notifier_spin_delay), MYSQL_SYSVAR(log_flush_notifier_timeout), - MYSQL_SYSVAR(log_closer_spin_delay), - MYSQL_SYSVAR(log_closer_timeout), #endif /* ENABLE_EXPERIMENT_SYSVARS */ MYSQL_SYSVAR(log_compressed_pages), MYSQL_SYSVAR(max_dirty_pages_pct), diff --git a/storage/innobase/include/buf0flu.h b/storage/innobase/include/buf0flu.h index 3d1cd4112407..651fc73f7ce5 100644 --- a/storage/innobase/include/buf0flu.h +++ b/storage/innobase/include/buf0flu.h @@ -1,6 +1,6 @@ /***************************************************************************** -Copyright (c) 1995, 2019, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 1995, 2020, Oracle and/or its affiliates. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the @@ -52,6 +52,9 @@ extern bool innodb_page_cleaner_disabled_debug; /** Event to synchronise with the flushing. */ extern os_event_t buf_flush_event; +/** Event to wait for one flushing step */ +extern os_event_t buf_flush_tick_event; + class ut_stage_alter_t; /** Remove a block from the flush list of modified blocks. diff --git a/storage/innobase/include/log0log.h b/storage/innobase/include/log0log.h index 6d2960b7de7f..d9ae3e34d7ae 100644 --- a/storage/innobase/include/log0log.h +++ b/storage/innobase/include/log0log.h @@ -299,12 +299,6 @@ constexpr ulong INNODB_LOG_FLUSH_NOTIFIER_SPIN_DELAY_DEFAULT = 0; /** Default value of innodb_log_flush_notifier_timeout (in microseconds). */ constexpr ulong INNODB_LOG_FLUSH_NOTIFIER_TIMEOUT_DEFAULT = 10; -/** Default value of innodb_log_closer_spin_delay (in spin rounds). */ -constexpr ulong INNODB_LOG_CLOSER_SPIN_DELAY_DEFAULT = 0; - -/** Default value of innodb_log_closer_timeout (in microseconds). */ -constexpr ulong INNODB_LOG_CLOSER_TIMEOUT_DEFAULT = 1000; - /** Default value of innodb_log_buffer_size (in bytes). */ constexpr ulong INNODB_LOG_BUFFER_SIZE_DEFAULT = 16 * 1024 * 1024UL; @@ -689,6 +683,11 @@ void log_buffer_close(log_t &log, const Log_handle &handle); also to be flushed to disk. */ void log_buffer_flush_to_disk(log_t &log, bool sync = true); +/** Writes the log buffer to the log file. It is intended to be called from +background master thread periodically. If the log writer threads are active, +this function writes nothing. */ +void log_buffer_sync_in_background(); + /** Requests flush of the log buffer. @param[in] sync true: wait until the flush is done */ inline void log_buffer_flush_to_disk(bool sync = true); @@ -718,12 +717,6 @@ buffer. It's used by the log writer thread only. @return true if and only if the lsn has been advanced */ bool log_advance_ready_for_write_lsn(log_t &log); -/** Advances log.buf_dirty_pages_added_up_to_lsn using links in the recent -closed buffer. It's used by the log closer thread only. -@param[in] log redo log -@return true if and only if the lsn has been advanced */ -bool log_advance_dirty_pages_added_up_to_lsn(log_t &log); - /** Validates that all slots in log recent written buffer for lsn values in range between begin and end, are empty. Used during tests, crashes the program if validation does not pass. @@ -1056,11 +1049,6 @@ Used only to assert, that the state is correct. @param[in] log redo log */ void log_writer_thread_active_validate(const log_t &log); -/** Validates that the log closer thread is active. -Used only to assert, that the state is correct. -@param[in] log redo log */ -void log_closer_thread_active_validate(const log_t &log); - /** Validates that the log writer, flusher threads are active. Used only to assert, that the state is correct. @param[in] log redo log */ @@ -1096,6 +1084,13 @@ void log_stop_background_threads_nowait(log_t &log); /** Wakes up all log threads which are alive. */ void log_wake_threads(log_t &log); +/** Pause/Resume the log writer threads based on innodb_log_writer_threads +value. +NOTE: These pause/resume functions should be protected by mutex while serving. +The caller innodb_log_writer_threads_update() is protected +by LOCK_global_system_variables in mysqld. */ +void log_control_writer_threads(log_t &log); + /** Free the log system data structures. Deallocate all the related memory. */ void log_sys_close(); @@ -1147,18 +1142,6 @@ MY_COMPILER_DIAGNOSTIC_POP() @param[in,out] log_ptr pointer to redo log */ void log_write_notifier(log_t *log_ptr); -/** The log closer thread co-routine. - */ -MY_COMPILER_DIAGNOSTIC_PUSH() -MY_COMPILER_CLANG_WORKAROUND_REF_DOCBUG() -/** -@see @ref sect_redo_log_closer -*/ -MY_COMPILER_DIAGNOSTIC_POP() -/** -@param[in,out] log_ptr pointer to redo log */ -void log_closer(log_t *log_ptr); - /** The log checkpointer thread co-routine. */ MY_COMPILER_DIAGNOSTIC_PUSH() @@ -1183,10 +1166,10 @@ void log_checkpointer(log_t *log_ptr); #define log_closer_mutex_enter(log) mutex_enter(&((log).closer_mutex)) -#define log_closer_mutex_exit(log) mutex_exit(&((log).closer_mutex)) +#define log_closer_mutex_enter_nowait(log) \ + mutex_enter_nowait(&((log).closer_mutex)) -#define log_closer_mutex_own(log) \ - (mutex_own(&((log).closer_mutex)) || !log_closer_is_active()) +#define log_closer_mutex_exit(log) mutex_exit(&((log).closer_mutex)) #define log_flusher_mutex_enter(log) mutex_enter(&((log).flusher_mutex)) @@ -1273,10 +1256,6 @@ inline bool log_flusher_is_active(); @return true if and only if the log flush notifier thread is active */ inline bool log_flush_notifier_is_active(); -/** Checks if log closer thread is active. -@return true if and only if the log closer thread is active */ -inline bool log_closer_is_active(); - /** Checks if log checkpointer thread is active. @return true if and only if the log checkpointer thread is active */ inline bool log_checkpointer_is_active(); diff --git a/storage/innobase/include/log0log.ic b/storage/innobase/include/log0log.ic index 9fadfed40508..1e440364b341 100644 --- a/storage/innobase/include/log0log.ic +++ b/storage/innobase/include/log0log.ic @@ -1,6 +1,6 @@ /***************************************************************************** -Copyright (c) 1995, 2019, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 1995, 2020, Oracle and/or its affiliates. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the @@ -357,10 +357,6 @@ inline bool log_flush_notifier_is_active() { return (srv_thread_is_active(srv_threads.m_log_flush_notifier)); } -inline bool log_closer_is_active() { - return (srv_thread_is_active(srv_threads.m_log_closer)); -} - inline bool log_checkpointer_is_active() { return (srv_thread_is_active(srv_threads.m_log_checkpointer)); } diff --git a/storage/innobase/include/log0types.h b/storage/innobase/include/log0types.h index e89539131991..f259e2ec013d 100644 --- a/storage/innobase/include/log0types.h +++ b/storage/innobase/include/log0types.h @@ -186,6 +186,16 @@ struct alignas(ut::INNODB_CACHE_LINE_SIZE) log_t { Protected by: sn_lock or writer_mutex. */ Link_buf recent_written; + /** Used for pausing the log writer threads. + When paused, each user thread should write log as in the former version. */ + std::atomic_bool writer_threads_paused; + + /** Some threads waiting for the ready for write lsn by closer_event. */ + lsn_t current_ready_waiting_lsn; + + /** current_ready_waiting_lsn is waited using this sig_count. */ + int64_t current_ready_waiting_sig_count; + alignas(ut::INNODB_CACHE_LINE_SIZE) /** The recent closed buffer. @@ -272,6 +282,11 @@ struct alignas(ut::INNODB_CACHE_LINE_SIZE) log_t { /** Number of entries in the array with events. */ size_t flush_events_size; + /** This event is in the reset state when a flush is running; + a thread should wait for this without owning any of redo mutexes, + but NOTE that to reset this event, the thread MUST own the writer_mutex */ + os_event_t old_flush_event; + /** Padding before the frequently updated flushed_to_disk_lsn. */ alignas(ut::INNODB_CACHE_LINE_SIZE) @@ -436,6 +451,9 @@ struct alignas(ut::INNODB_CACHE_LINE_SIZE) log_t { advanced). */ os_event_t flush_notifier_event; + /** The next flushed_to_disk_lsn can be waited using this sig_count. */ + int64_t current_flush_sig_count; + /** Mutex which can be used to pause log flush notifier thread. */ mutable ib_mutex_t flush_notifier_mutex; @@ -477,6 +495,15 @@ struct alignas(ut::INNODB_CACHE_LINE_SIZE) log_t { /** Used for stopping the log background threads. */ std::atomic_bool should_stop_threads; + /** Event used for pausing the log writer threads. */ + os_event_t writer_threads_resume_event; + + /** Used for resuming write notifier thread */ + atomic_lsn_t write_notifier_resume_lsn; + + /** Used for resuming flush notifier thread */ + atomic_lsn_t flush_notifier_resume_lsn; + /** Number of total I/O operations performed when we printed the statistics last time. */ mutable uint64_t n_log_ios_old; diff --git a/storage/innobase/include/srv0srv.h b/storage/innobase/include/srv0srv.h index 91be9df09cd1..1e78347fa277 100644 --- a/storage/innobase/include/srv0srv.h +++ b/storage/innobase/include/srv0srv.h @@ -168,9 +168,6 @@ struct Srv_threads { /** Error monitor thread. */ IB_thread m_error_monitor; - /** Redo closer thread. */ - IB_thread m_log_closer; - /** Redo checkpointer thread. */ IB_thread m_log_checkpointer; @@ -462,6 +459,9 @@ for total order of dirty pages, when they are added to flush lists. The slots are addressed by LSN values modulo number of the slots. */ extern ulong srv_log_recent_closed_size; +/** Whether to activate/pause the log writer threads. */ +extern bool srv_log_writer_threads; + /** Minimum absolute value of cpu time for which spin-delay is used. */ extern uint srv_log_spin_cpu_abs_lwm; @@ -520,13 +520,6 @@ extern ulong srv_log_flush_notifier_spin_delay; /** Initial timeout used to wait on flush_notifier_event. */ extern ulong srv_log_flush_notifier_timeout; -/** Number of spin iterations, for which log closerr thread is waiting -for a reachable untraversed link in recent_closed. */ -extern ulong srv_log_closer_spin_delay; - -/** Initial sleep used in log closer after spin delay is finished. */ -extern ulong srv_log_closer_timeout; - /** Whether to generate and require checksums on the redo log pages. */ extern bool srv_log_checksums; @@ -754,7 +747,6 @@ extern mysql_pfs_key_t io_log_thread_key; extern mysql_pfs_key_t io_read_thread_key; extern mysql_pfs_key_t io_write_thread_key; extern mysql_pfs_key_t log_writer_thread_key; -extern mysql_pfs_key_t log_closer_thread_key; extern mysql_pfs_key_t log_checkpointer_thread_key; extern mysql_pfs_key_t log_flusher_thread_key; extern mysql_pfs_key_t log_write_notifier_thread_key; diff --git a/storage/innobase/include/ut0link_buf.h b/storage/innobase/include/ut0link_buf.h index f6e939274053..ed69180dc437 100644 --- a/storage/innobase/include/ut0link_buf.h +++ b/storage/innobase/include/ut0link_buf.h @@ -108,6 +108,15 @@ class Link_buf { @param[in] to position where the link ends (from -> to) */ void add_link(Position from, Position to); + /** Add a directed link between two given positions. It is user's + responsibility to ensure that there is space for the link. This is + because it can be useful to ensure much earlier that there is space. + In addition, advances the tail pointer in the buffer if possible. + + @param[in] from position where the link starts + @param[in] to position where the link ends (from -> to) */ + void add_link_advance_tail(Position from, Position to); + /** Advances the tail pointer in the buffer by following connected path created by links. Starts at current position of the pointer. Stops when the provided function returns true. @@ -142,7 +151,7 @@ class Link_buf { @param[in] position position to check @return true if and only if the space is free */ - bool has_space(Position position) const; + bool has_space(Position position); /** Validates (using assertions) that there are no links set in the range [begin, end). */ @@ -169,12 +178,6 @@ class Link_buf { @return false if there was no link, true otherwise */ bool next_position(Position position, Position &next); - /** Claims a link starting in provided position that has been - traversed and is no longer required (reclaims the slot). - - @param[in] position position where link starts */ - void claim_position(Position position); - /** Deallocated memory, if it was allocated. */ void free(); @@ -251,39 +254,90 @@ inline void Link_buf::add_link(Position from, Position to) { auto &slot = m_links[index]; - ut_ad(slot.load() == 0); - - slot.store(to - from); + slot.store(to); } template -bool Link_buf::next_position(Position position, Position &next) { +inline bool Link_buf::next_position(Position position, + Position &next) { const auto index = slot_index(position); auto &slot = m_links[index]; - const auto distance = slot.load(); + next = slot.load(std::memory_order_relaxed); - ut_ad(position < std::numeric_limits::max() - distance); - - next = position + distance; - - return distance == 0; + return next <= position; } template -void Link_buf::claim_position(Position position) { - const auto index = slot_index(position); +inline void Link_buf::add_link_advance_tail(Position from, + Position to) { + ut_ad(to > from); + ut_ad(to - from <= std::numeric_limits::max()); - auto &slot = m_links[index]; + auto position = m_tail.load(std::memory_order_acquire); + + ut_ad(position <= from); - slot.store(0); + if (position == from) { + /* can advance m_tail directly and exclusively, and it is unlock */ + m_tail.store(to, std::memory_order_release); + } else { + auto index = slot_index(from); + auto &slot = m_links[index]; + + /* add link */ + slot.store(to, std::memory_order_release); + + auto stop_condition = [&](Position prev_pos, Position next_pos) { + return (prev_pos > from); + }; + + advance_tail_until(stop_condition); + } } template template bool Link_buf::advance_tail_until(Stop_condition stop_condition) { - auto position = m_tail.load(); + /* multi threaded aware */ + auto position = m_tail.load(std::memory_order_acquire); + auto from = position; + + uint retry = 0; + while (true) { + auto index = slot_index(position); + auto &slot = m_links[index]; + + auto next_load = slot.load(std::memory_order_acquire); + + if (next_load <= position || stop_condition(position, next_load)) { + /* nothing to advance for now */ + return false; + } + + /* try to lock as storing the end */ + if (slot.compare_exchange_strong(next_load, position, + std::memory_order_acq_rel)) { + /* can advance m_tail exclusively */ + position = next_load; + break; + } + + retry++; + if (retry >= 2) { + /* give up */ + return false; + } + + UT_RELAX_CPU(); + position = m_tail.load(std::memory_order_acquire); + if (position == from) { + /* no progress? */ + return false; + } + from = position; + } while (true) { Position next; @@ -294,25 +348,24 @@ bool Link_buf::advance_tail_until(Stop_condition stop_condition) { break; } - /* Reclaim the slot. */ - claim_position(position); - position = next; } - if (position > m_tail.load()) { - m_tail.store(position); + ut_a(from == m_tail.load(std::memory_order_acquire)); - return true; + /* unlock */ + m_tail.store(position, std::memory_order_release); - } else { + if (position == from) { return false; } + + return true; } template inline bool Link_buf::advance_tail() { - auto stop_condition = [](Position from, Position to) { return (to == from); }; + auto stop_condition = [](Position from, Position to) { return false; }; return advance_tail_until(stop_condition); } @@ -324,12 +377,55 @@ inline size_t Link_buf::capacity() const { template inline Position Link_buf::tail() const { - return m_tail.load(); + return m_tail.load(std::memory_order_acquire); } template -inline bool Link_buf::has_space(Position position) const { - return tail() + m_capacity > position; +inline bool Link_buf::has_space(Position position) { + auto tail = m_tail.load(std::memory_order_acquire); + if (tail + m_capacity > position) { + return true; + } + + auto index = slot_index(tail); + auto &slot = m_links[index]; + + auto next_load = slot.load(std::memory_order_acquire); + + if (next_load <= tail) { + return false; + } + + /* try to advance m_tail with no wait */ + if (!slot.compare_exchange_strong(next_load, tail, + std::memory_order_acq_rel)) { + /* maybe in advance by the other */ + return false; + } + + /* can advance m_tail exclusively */ + auto next = next_load; + + while (next + m_capacity <= position) { + index = slot_index(next); + auto &slot2 = m_links[index]; + + next_load = slot2.load(std::memory_order_relaxed); + + if (next_load <= next) { + /* found the next tail candidate */ + break; + } + + next = next_load; + } + + ut_ad(tail == m_tail.load(std::memory_order_acquire)); + + /* unlock */ + m_tail.store(next, std::memory_order_release); + + return next + m_capacity > position; } template @@ -339,6 +435,8 @@ inline size_t Link_buf::slot_index(Position position) const { template void Link_buf::validate_no_links(Position begin, Position end) { + const auto tail = m_tail.load(); + /* After m_capacity iterations we would have all slots tested. */ end = std::min(end, begin + m_capacity); @@ -348,7 +446,7 @@ void Link_buf::validate_no_links(Position begin, Position end) { const auto &slot = m_links[index]; - ut_a(slot.load() == 0); + ut_a(slot.load() <= tail); } } diff --git a/storage/innobase/log/log0buf.cc b/storage/innobase/log/log0buf.cc index 57eef90bd4e9..9809cdcaf5b5 100644 --- a/storage/innobase/log/log0buf.cc +++ b/storage/innobase/log/log0buf.cc @@ -1,6 +1,6 @@ /***************************************************************************** -Copyright (c) 1995, 2019, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 1995, 2020, Oracle and/or its affiliates. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, @@ -66,11 +66,6 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA -# @ref sect_redo_log_add_dirty_pages -# @ref sect_redo_log_add_link_to_recent_closed - Then the [log closer thread](@ref sect_redo_log_closer) advances lsn up - to which lsn values might be available for checkpoint safely (up to which - all dirty pages have been added to flush lists). - [Read more about reclaiming space...](@ref sect_redo_log_reclaim_space) - @section sect_redo_log_buf_reserve Reservation of space in the redo Range of lsn values is reserved for a provided number of data bytes. @@ -372,16 +367,6 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA where _L_ is size of the log recent closed buffer. The value gives information about how much to advance lsn when traversing the link. - The [log closer thread](@ref sect_redo_log_closer) is responsible for reseting - the entry in _log.recent_closed_ to 0, which must happen before the slot might - be reused for larger lsn values (larger by _L_, _2L_, ...). Afterwards the log - closer thread advances @ref subsect_redo_log_buf_dirty_pages_added_up_to_lsn, - allowing user threads, waiting for free space in the log recent closed buffer, - to proceed. - - @note Note that the increased value of _log.buf_dirty_pages_added_up_to_lsn_ - might possibly allow a newer checkpoint. - @see log_buffer_write_completed_and_dirty_pages_added() After the link is added, the shared-access for log buffer is released. @@ -421,11 +406,6 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA would logically erase the just written data to the redo log, until the related dirty pages have been added to flush lists. - The [log closer thread](@ref sect_redo_log_closer) tracks up to which lsn the - log checkpointer thread might trust that all dirty pages have been added - - so called @ref subsect_redo_log_buf_dirty_pages_added_up_to_lsn. Any attempts - to make checkpoint at higher value are limited to this lsn. - When user thread has added all the dirty pages related to _start_lsn_ .. _end_lsn_, it creates link in the log recent closed buffer, pointing from _start_lsn_ to _end_lsn_. The log closer thread tracks the links in the recent @@ -912,7 +892,23 @@ void log_buffer_write_completed(log_t &log, const Log_handle &handle, /* Note that end_lsn will not point to just before footer, because we have already validated that end_lsn is valid. */ - log.recent_written.add_link(start_lsn, end_lsn); + log.recent_written.add_link_advance_tail(start_lsn, end_lsn); + + /* if someone is waiting for, set the event. (if possible) */ + lsn_t ready_lsn = log_buffer_ready_for_write_lsn(log); + + if (log.current_ready_waiting_lsn > 0 && + log.current_ready_waiting_lsn <= ready_lsn && + !os_event_is_set(log.closer_event) && + log_closer_mutex_enter_nowait(log) == 0) { + if (log.current_ready_waiting_lsn > 0 && + log.current_ready_waiting_lsn <= ready_lsn && + !os_event_is_set(log.closer_event)) { + log.current_ready_waiting_lsn = 0; + os_event_set(log.closer_event); + } + log_closer_mutex_exit(log); + } } void log_wait_for_space_in_log_recent_closed(log_t &log, lsn_t lsn) { @@ -923,7 +919,6 @@ void log_wait_for_space_in_log_recent_closed(log_t &log, lsn_t lsn) { uint64_t wait_loops = 0; while (!log.recent_closed.has_space(lsn)) { - os_event_set(log.closer_event); ++wait_loops; os_thread_sleep(20); } @@ -949,7 +944,7 @@ void log_buffer_close(log_t &log, const Log_handle &handle) { LOG_SYNC_POINT("log_buffer_write_completed_dpa_before_store"); - log.recent_closed.add_link(start_lsn, end_lsn); + log.recent_closed.add_link_advance_tail(start_lsn, end_lsn); log_buffer_s_lock_exit(log, handle.lock_no); } @@ -996,6 +991,19 @@ void log_buffer_flush_to_disk(log_t &log, bool sync) { log_write_up_to(log, lsn, sync); } +void log_buffer_sync_in_background() { + log_t &log = *log_sys; + + /* Just to be sure not to miss advance */ + log.recent_closed.advance_tail(); + + /* If the log flusher thread is working, no need to call. */ + if (log.writer_threads_paused.load(std::memory_order_acquire)) { + log.recent_written.advance_tail(); + log_buffer_flush_to_disk(log, true); + } +} + void log_buffer_get_last_block(log_t &log, lsn_t &last_lsn, byte *last_block, uint32_t &block_len) { ut_ad(last_block != nullptr); @@ -1116,56 +1124,6 @@ bool log_advance_ready_for_write_lsn(log_t &log) { return (true); } else { - ut_a(log_buffer_ready_for_write_lsn(log) == previous_lsn); - - return (false); - } -} - -bool log_advance_dirty_pages_added_up_to_lsn(log_t &log) { - ut_ad(log_closer_mutex_own(log)); - - const lsn_t previous_lsn = log_buffer_dirty_pages_added_up_to_lsn(log); - - ut_a(previous_lsn >= LOG_START_LSN); - - ut_a(previous_lsn >= log_get_checkpoint_lsn(log)); - - ut_d(log_closer_thread_active_validate(log)); - - auto stop_condition = [&](lsn_t prev_lsn, lsn_t next_lsn) { - ut_a(log_lsn_validate(prev_lsn)); - ut_a(log_lsn_validate(next_lsn)); - - ut_a(next_lsn > prev_lsn); - - LOG_SYNC_POINT("log_advance_dpa_before_update"); - return (false); - }; - - if (log.recent_closed.advance_tail_until(stop_condition)) { - LOG_SYNC_POINT("log_advance_dpa_before_reclaim"); - - /* Validation of recent_closed is optional because - it takes significant time (delaying the log closer). */ - if (log_test != nullptr && - log_test->enabled(Log_test::Options::VALIDATE_RECENT_CLOSED)) { - /* All links between ready_lsn and lsn have - been traversed. The slots can't be re-used - before we updated the tail. */ - log.recent_closed.validate_no_links( - previous_lsn, log_buffer_dirty_pages_added_up_to_lsn(log)); - } - - ut_a(log_buffer_dirty_pages_added_up_to_lsn(log) > previous_lsn); - - std::atomic_thread_fence(std::memory_order_acquire); - - return (true); - - } else { - ut_a(log_buffer_dirty_pages_added_up_to_lsn(log) == previous_lsn); - return (false); } } diff --git a/storage/innobase/log/log0chkp.cc b/storage/innobase/log/log0chkp.cc index 2a16d1ae8640..61351035fd1f 100644 --- a/storage/innobase/log/log0chkp.cc +++ b/storage/innobase/log/log0chkp.cc @@ -90,13 +90,11 @@ write the checkpoint (e.g. coming from log_make_latest_checkpoint()). static bool log_should_checkpoint(log_t &log); /** Considers writing next checkpoint. Checks if checkpoint should be written -(using log_should_checkpoint()) and writes the checkpoint if that's the case. -@return true if checkpoint has been written */ -static bool log_consider_checkpoint(log_t &log); +(using log_should_checkpoint()) and writes the checkpoint if that's the case. */ +static void log_consider_checkpoint(log_t &log); -/** Considers requesting page cleaners to execute sync flush. -@return true if request has been made */ -static bool log_consider_sync_flush(log_t &log); +/** Considers requesting page cleaners to execute sync flush. */ +static void log_consider_sync_flush(log_t &log); /** Makes a checkpoint. Note that this function does not flush dirty blocks from the buffer pool. It only checks what is lsn of the oldest modification @@ -180,23 +178,15 @@ static lsn_t log_compute_available_for_checkpoint_lsn(const log_t &log) { lsn_t lwm_lsn = buf_pool_get_oldest_modification_lwm(); - /* Empty flush list. */ + /* We cannot return lsn larger than dpa_lsn, + because some mtr's commit could be in the middle, after + its log records have been written to log buffer, but before + its dirty pages have been added to flush lists. */ if (lwm_lsn == 0) { - /* There are no dirty pages. We cannot return current lsn, - because some mtr's commit could be in the middle, after - its log records have been written to log buffer, but before - its dirty pages have been added to flush lists. */ - + /* Empty flush list. */ lwm_lsn = dpa_lsn; - } else { - /* Cannot go beyound dpa_lsn. - - Note that log_closer might still not advance dpa enough, - because it might be scheduled out. Yes, the returned lwm - guarantees it is able to advance, but it needs to do it! */ - lwm_lsn = std::min(lwm_lsn, dpa_lsn); } @@ -251,6 +241,7 @@ static lsn_t log_compute_available_for_checkpoint_lsn(const log_t &log) { static lsn_t log_update_available_for_checkpoint_lsn(log_t &log) { /* Update lsn available for checkpoint. */ + log.recent_closed.advance_tail(); const lsn_t oldest_lsn = log_compute_available_for_checkpoint_lsn(log); log_limits_mutex_enter(log); @@ -760,14 +751,20 @@ static bool log_request_sync_flush(const log_t &log, lsn_t new_oldest) { (unless user explicitly disabled sync-flushes). */ new_oldest += log_buffer_flush_order_lag(log); - return (buf_flush_request_force(new_oldest)); + int64_t sig_count = os_event_reset(buf_flush_tick_event); + + bool result = buf_flush_request_force(new_oldest); + + os_event_wait_time_low(buf_flush_tick_event, 1000000, sig_count); + + return (result); } else { return (false); } } -static bool log_consider_sync_flush(log_t &log) { +static void log_consider_sync_flush(log_t &log) { ut_ad(log_checkpointer_mutex_own(log)); /* We acquire limits mutex only for a short period. Afterwards these @@ -791,7 +788,7 @@ static bool log_consider_sync_flush(log_t &log) { ut_a(flush_up_to <= current_lsn); if (current_lsn == flush_up_to) { - return (false); + return; } const lsn_t margin = log_free_check_margin(log); @@ -813,7 +810,7 @@ static bool log_consider_sync_flush(log_t &log) { if (flush_up_to > oldest_lsn) { log_checkpointer_mutex_exit(log); - const bool result = log_request_sync_flush(log, flush_up_to); + log_request_sync_flush(log, flush_up_to); log_checkpointer_mutex_enter(log); @@ -821,11 +818,7 @@ static bool log_consider_sync_flush(log_t &log) { lsn available for creating a new checkpoint, just try to update it to not wait for next checkpointer loop. */ log_update_available_for_checkpoint_lsn(log); - - return (result); } - - return (false); } static uint64_t log_checkpoint_time_elapsed(const log_t &log) { @@ -918,11 +911,11 @@ static bool log_should_checkpoint(log_t &log) { return (false); } -static bool log_consider_checkpoint(log_t &log) { +static void log_consider_checkpoint(log_t &log) { ut_ad(log_checkpointer_mutex_own(log)); if (!log_should_checkpoint(log)) { - return (false); + return; } /* It's clear that a new checkpoint should be written. @@ -944,14 +937,12 @@ static bool log_consider_checkpoint(log_t &log) { have changed, we follow a simple way and perform a full re-check of all conditions. */ if (!log_should_checkpoint(log)) { - return (false); + return; } log_checkpoint(log); fil_checkpoint(log.last_checkpoint_lsn.load()); - - return (true); } void log_checkpointer(log_t *log_ptr) { @@ -959,12 +950,30 @@ void log_checkpointer(log_t *log_ptr) { log_t &log = *log_ptr; - log_checkpointer_mutex_enter(log); + static const ulint log_busy_checkpoint_interval = + 7; /*SRV_MASTER_CHECKPOINT_INTERVAL*/ + ulint old_activity_count = srv_get_activity_count(); + ulint error = OS_SYNC_TIME_EXCEEDED; for (;;) { - auto do_some_work = [&log] { - ut_ad(log_checkpointer_mutex_own(log)); + log_checkpointer_mutex_enter(log); + + const auto sig_count = os_event_reset(log.checkpointer_event); + const lsn_t requested_checkpoint_lsn = log.requested_checkpoint_lsn; + + bool system_is_busy = false; + if (error == OS_SYNC_TIME_EXCEEDED && + srv_check_activity(old_activity_count)) { + old_activity_count = srv_get_activity_count(); + /* system is busy. takes longer interval. */ + system_is_busy = true; + } + if (error != OS_SYNC_TIME_EXCEEDED || !system_is_busy || + requested_checkpoint_lsn > + log.last_checkpoint_lsn.load(std::memory_order_acquire) || + log_checkpoint_time_elapsed(log) >= + log_busy_checkpoint_interval * srv_log_checkpoint_every * 1000ULL) { /* We will base our next decisions on maximum lsn available for creating a new checkpoint. It would be great to have it updated beforehand. Also, this @@ -973,52 +982,68 @@ void log_checkpointer(log_t *log_ptr) { log_update_available_for_checkpoint_lsn(log); /* Consider flushing some dirty pages. */ - const bool sync_flushed = log_consider_sync_flush(log); + log_consider_sync_flush(log); LOG_SYNC_POINT("log_checkpointer_before_consider_checkpoint"); /* Consider writing checkpoint. */ - const bool checkpointed = log_consider_checkpoint(log); + log_consider_checkpoint(log); + } - if (sync_flushed || checkpointed) { - return (true); - } + log_checkpointer_mutex_exit(log); - if (log.should_stop_threads.load()) { - if (!log_closer_is_active()) { - return (true); - } - } + if (requested_checkpoint_lsn > + log.last_checkpoint_lsn.load(std::memory_order_relaxed)) { + /* not satisfied. retry. */ + error = 0; + } else { + error = os_event_wait_time_low( + log.checkpointer_event, srv_log_checkpoint_every * 1000, sig_count); + } - return (false); - }; + /* Check if we should close the thread. */ + if (log.should_stop_threads.load()) { + ut_ad(!log.writer_threads_paused.load()); + if (!log_flusher_is_active() && !log_writer_is_active()) { + lsn_t end_lsn = log.write_lsn.load(); - const auto sig_count = os_event_reset(log.checkpointer_event); + ut_a(log_lsn_validate(end_lsn)); + ut_a(end_lsn == log.flushed_to_disk_lsn.load()); + ut_a(end_lsn == log_buffer_ready_for_write_lsn(log)); - if (!do_some_work()) { - log_checkpointer_mutex_exit(log); + ut_a(end_lsn >= log_buffer_dirty_pages_added_up_to_lsn(log)); - os_event_wait_time_low(log.checkpointer_event, 10 * 1000, sig_count); + if (log_buffer_dirty_pages_added_up_to_lsn(log) == end_lsn) { + /* All confirmed reservations have been written + to redo and all dirty pages related to those + writes have been added to flush lists. - log_checkpointer_mutex_enter(log); + However, there could be user threads, which are + in the middle of log_buffer_reserve(), reserved + range of sn values, but could not confirm. - } else { - log_checkpointer_mutex_exit(log); + Note that because log_writer is already not alive, + the only possible reason guaranteed by its death, + is that there is x-lock at end_lsn, in which case + end_lsn separates two regions in log buffer: + completely full and completely empty. */ + const lsn_t ready_lsn = log_buffer_ready_for_write_lsn(log); - os_thread_sleep(0); + const lsn_t current_lsn = log_get_lsn(log); - log_checkpointer_mutex_enter(log); - } + if (current_lsn > ready_lsn) { + log.recent_written.validate_no_links(ready_lsn, current_lsn); + log.recent_closed.validate_no_links(ready_lsn, current_lsn); + } - /* Check if we should close the thread. */ - if (log.should_stop_threads.load()) { - if (!log_closer_is_active()) { - break; + break; + } + /* We need to wait until remaining dirty pages + have been added. */ } + /* We prefer to wait until all writing is done. */ } } - - log_checkpointer_mutex_exit(log); } /* @} */ diff --git a/storage/innobase/log/log0log.cc b/storage/innobase/log/log0log.cc index 951e34c8f96c..6b9c821e02c4 100644 --- a/storage/innobase/log/log0log.cc +++ b/storage/innobase/log/log0log.cc @@ -1,6 +1,6 @@ /***************************************************************************** -Copyright (c) 1995, 2019, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 1995, 2020, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2009, Google Inc. Portions of this file contain modifications contributed and copyrighted by @@ -315,8 +315,6 @@ It holds (unless the log writer thread misses an update of the log.buf_dirty_pages_added_up_to_lsn <= log.buf_ready_for_write_lsn. -Value is updated by: [log closer thread](@ref sect_redo_log_closer). - @subsection subsect_redo_log_available_for_checkpoint_lsn log.available_for_checkpoint_lsn @@ -379,9 +377,6 @@ log_t *log_sys; /** PFS key for the log writer thread. */ mysql_pfs_key_t log_writer_thread_key; -/** PFS key for the log closer thread. */ -mysql_pfs_key_t log_closer_thread_key; - /** PFS key for the log checkpointer thread. */ mysql_pfs_key_t log_checkpointer_thread_key; @@ -467,6 +462,19 @@ that the proper size of the log buffer should be a power of two. @param[out] log redo log */ static void log_calc_buf_size(log_t &log); +/** Pauses writer, flusher and notifiers and switches user threads +to write log as former version. +NOTE: These pause/resume functions should be protected by mutex while serving. +The caller innodb_log_writer_threads_update() is protected +by LOCK_global_system_variables in mysqld. +@param[out] log redo log */ +static void log_pause_writer_threads(log_t &log); + +/** Resumes writer, flusher and notifiers and switches user threads +not to write log. +@param[out] log redo log */ +static void log_resume_writer_threads(log_t &log); + /**************************************************/ /** @name Initialization and finalization of log_sys @@ -519,6 +527,11 @@ bool log_sys_init(uint32_t n_files, uint64_t file_size, space_id_t space_id) { log.flush_notifier_event = os_event_create("log_flush_notifier_event"); log.writer_event = os_event_create("log_writer_event"); log.flusher_event = os_event_create("log_flusher_event"); + log.old_flush_event = os_event_create("log_old_flush_event"); + os_event_set(log.old_flush_event); + log.writer_threads_resume_event = + os_event_create("log_writer_threads_resume_event"); + os_event_set(log.writer_threads_resume_event); mutex_create(LATCH_ID_LOG_CHECKPOINTER, &log.checkpointer_mutex); mutex_create(LATCH_ID_LOG_CLOSER, &log.closer_mutex); @@ -666,6 +679,8 @@ void log_sys_close() { os_event_destroy(log.checkpointer_event); os_event_destroy(log.writer_event); os_event_destroy(log.flusher_event); + os_event_destroy(log.old_flush_event); + os_event_destroy(log.writer_threads_resume_event); log_sys_object->destroy(); @@ -689,10 +704,6 @@ void log_writer_thread_active_validate(const log_t &log) { ut_a(log_writer_is_active()); } -void log_closer_thread_active_validate(const log_t &log) { - ut_a(log_closer_is_active()); -} - void log_background_write_threads_active_validate(const log_t &log) { ut_ad(!log.disable_redo_writes); @@ -705,13 +716,11 @@ void log_background_threads_active_validate(const log_t &log) { ut_a(log_write_notifier_is_active()); ut_a(log_flush_notifier_is_active()); - ut_a(log_closer_is_active()); ut_a(log_checkpointer_is_active()); } void log_background_threads_inactive_validate(const log_t &log) { ut_a(!log_checkpointer_is_active()); - ut_a(!log_closer_is_active()); ut_a(!log_write_notifier_is_active()); ut_a(!log_flush_notifier_is_active()); ut_a(!log_writer_is_active()); @@ -728,13 +737,11 @@ void log_start_background_threads(log_t &log) { ut_a(log.sn.load() > 0); log.should_stop_threads.store(false); + log.writer_threads_paused.store(false); srv_threads.m_log_checkpointer = os_thread_create(log_checkpointer_thread_key, log_checkpointer, &log); - srv_threads.m_log_closer = - os_thread_create(log_closer_thread_key, log_closer, &log); - srv_threads.m_log_flush_notifier = os_thread_create(log_flush_notifier_thread_key, log_flush_notifier, &log); @@ -748,7 +755,6 @@ void log_start_background_threads(log_t &log) { os_thread_create(log_writer_thread_key, log_writer, &log); srv_threads.m_log_checkpointer.start(); - srv_threads.m_log_closer.start(); srv_threads.m_log_flush_notifier.start(); srv_threads.m_log_flusher.start(); srv_threads.m_log_write_notifier.start(); @@ -756,6 +762,8 @@ void log_start_background_threads(log_t &log) { log_background_threads_active_validate(log); + log_control_writer_threads(log); + meb::redo_log_archive_init(); } @@ -779,6 +787,7 @@ void log_stop_background_threads(log_t &log) { ut_a(!srv_read_only_mode); + log_resume_writer_threads(log); log.should_stop_threads.store(true); /* Wait until threads are closed. */ @@ -798,10 +807,6 @@ void log_stop_background_threads(log_t &log) { os_event_set(log.flush_notifier_event); os_thread_sleep(10); } - while (log_closer_is_active()) { - os_event_set(log.closer_event); - os_thread_sleep(10); - } while (log_checkpointer_is_active()) { os_event_set(log.checkpointer_event); os_thread_sleep(10); @@ -811,14 +816,12 @@ void log_stop_background_threads(log_t &log) { } void log_stop_background_threads_nowait(log_t &log) { + log_resume_writer_threads(log); log.should_stop_threads.store(true); log_wake_threads(log); } void log_wake_threads(log_t &log) { - if (log_closer_is_active()) { - os_event_set(log.closer_event); - } if (log_checkpointer_is_active()) { os_event_set(log.checkpointer_event); } @@ -833,6 +836,69 @@ void log_wake_threads(log_t &log) { } } +static void log_pause_writer_threads(log_t &log) { + /* protected by LOCK_global_system_variables */ + if (!log.writer_threads_paused.load()) { + os_event_reset(log.writer_threads_resume_event); + log.writer_threads_paused.store(true); + if (log_writer_is_active()) { + os_event_set(log.writer_event); + } + if (log_flusher_is_active()) { + os_event_set(log.flusher_event); + } + if (log_write_notifier_is_active()) { + os_event_set(log.write_notifier_event); + } + if (log_flush_notifier_is_active()) { + os_event_set(log.flush_notifier_event); + } + + /* wakeup waiters to use the log writer threads */ + for (size_t i = 0; i < log.write_events_size; ++i) { + os_event_set(log.write_events[i]); + } + for (size_t i = 0; i < log.flush_events_size; ++i) { + os_event_set(log.flush_events[i]); + } + } +} + +static void log_resume_writer_threads(log_t &log) { + /* protected by LOCK_global_system_variables */ + if (log.writer_threads_paused.load()) { + log.writer_threads_paused.store(false); + + /* wakeup waiters not to use the log writer threads */ + os_event_set(log.old_flush_event); + + /* gives resume lsn for each notifiers */ + log.write_notifier_resume_lsn.store(log.write_lsn.load()); + log.flush_notifier_resume_lsn.store(log.flushed_to_disk_lsn.load()); + os_event_set(log.writer_threads_resume_event); + + /* confirms *_notifier_resume_lsn have been accepted */ + while (log.write_notifier_resume_lsn.load(std::memory_order_acquire) != 0 || + log.flush_notifier_resume_lsn.load(std::memory_order_acquire) != 0) { + os_thread_sleep(1000); + ut_a(log_write_notifier_is_active()); + ut_a(log_flush_notifier_is_active()); + os_event_set(log.writer_threads_resume_event); + } + } +} + +void log_control_writer_threads(log_t &log) { + /* pause/resume the log writer threads based on innodb_log_writer_threads + value. NOTE: This function is protected by LOCK_global_system_variables + in mysqld by called from innodb_log_writer_threads_update() */ + if (srv_log_writer_threads) { + log_resume_writer_threads(log); + } else { + log_pause_writer_threads(log); + } +} + /* @} */ /**************************************************/ /** diff --git a/storage/innobase/log/log0write.cc b/storage/innobase/log/log0write.cc index d55b010e1ee7..e995a246b9cc 100644 --- a/storage/innobase/log/log0write.cc +++ b/storage/innobase/log/log0write.cc @@ -1,6 +1,6 @@ /***************************************************************************** -Copyright (c) 1995, 2019, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 1995, 2020, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2009, Google Inc. This program is free software; you can redistribute it and/or modify @@ -90,9 +90,6 @@ the file COPYING.Google. Two background log threads are responsible for checkpoints (reclaiming space in log files): - -# [Log closer](@ref sect_redo_log_closer) - tracks up to which lsn all - dirty pages have been added to flush lists (wrt. oldest_modification). - -# [Log checkpointer](@ref sect_redo_log_checkpointer) - determines @ref subsect_redo_log_available_for_checkpoint_lsn and writes checkpoints. @@ -362,26 +359,6 @@ the file COPYING.Google. @see @ref sect_redo_log_waiting_for_writer - @section sect_redo_log_closer Thread: log closer - - The log closer thread is responsible for tracking up to which lsn, all - dirty pages have already been added to flush lists. It traverses links - in the log recent closed buffer, following a connected path, which is - created by the links. The traversed links are removed and afterwards - the @ref subsect_redo_log_buf_dirty_pages_added_up_to_lsn is updated. - - Links are stored inside slots in a ring buffer. When link is removed, - the related slot becomes empty. Later it is reused for link pointing - from larger lsn value. - - The log checkpointer thread must not write a checkpoint for lsn larger - than _buf_dirty_pages_added_up_to_lsn_. That is because some user thread - might be in state where it is just after writing to the log buffer, but - before adding its dirty pages to flush lists. The dirty pages could have - modifications protected by log records, which start at lsn, which would - be logically deleted by such checkpoint. - - @section sect_redo_log_checkpointer Thread: log checkpointer The log checkpointer thread is responsible for: @@ -822,15 +799,24 @@ static inline uint64_t log_max_spins_when_waiting_in_user_thread( We do not care if it's flushed or not. @param[in] log redo log @param[in] lsn wait until log.write_lsn >= lsn +@param[in,out] interrupted if true, was interrupted, needs retry. @return statistics related to waiting inside */ -static Wait_stats log_wait_for_write(const log_t &log, lsn_t lsn) { +static Wait_stats log_wait_for_write(const log_t &log, lsn_t lsn, + bool *interrupted) { os_event_set(log.writer_event); const uint64_t max_spins = log_max_spins_when_waiting_in_user_thread( srv_log_wait_for_write_spin_delay); - auto stop_condition = [&log, lsn](bool wait) { + auto stop_condition = [&log, lsn, interrupted](bool wait) { if (log.write_lsn.load() >= lsn) { + *interrupted = false; + return (true); + } + + if (UNIV_UNLIKELY( + log.writer_threads_paused.load(std::memory_order_relaxed))) { + *interrupted = true; return (true); } @@ -856,8 +842,10 @@ static Wait_stats log_wait_for_write(const log_t &log, lsn_t lsn) { /** Waits until redo log is flushed up to provided lsn (or greater). @param[in] log redo log @param[in] lsn wait until log.flushed_to_disk_lsn >= lsn +@param[in,out] interrupted if true, was interrupted, needs retry. @return statistics related to waiting inside */ -static Wait_stats log_wait_for_flush(const log_t &log, lsn_t lsn) { +static Wait_stats log_wait_for_flush(const log_t &log, lsn_t lsn, + bool *interrupted) { if (log.write_lsn.load(std::memory_order_relaxed) < lsn) { os_event_set(log.writer_event); } @@ -870,10 +858,17 @@ static Wait_stats log_wait_for_flush(const log_t &log, lsn_t lsn) { max_spins = 0; } - auto stop_condition = [&log, lsn](bool wait) { + auto stop_condition = [&log, lsn, interrupted](bool wait) { LOG_SYNC_POINT("log_wait_for_flush_before_flushed_to_disk_lsn"); if (log.flushed_to_disk_lsn.load() >= lsn) { + *interrupted = false; + return (true); + } + + if (UNIV_UNLIKELY( + log.writer_threads_paused.load(std::memory_order_relaxed))) { + *interrupted = true; return (true); } @@ -900,6 +895,137 @@ static Wait_stats log_wait_for_flush(const log_t &log, lsn_t lsn) { return (wait_stats); } +/** Write the redo log up to a provided lsn by itself, if necessary. +@param[in] log redo log +@param[in] end_lsn lsn to write for +@param[in] flush_to_disk whether the written log should also be flushed +@param[in,out] interrupted if true, was interrupted, needs retry +@return statistics about waiting inside */ +static Wait_stats log_self_write_up_to(log_t &log, lsn_t end_lsn, + bool flush_to_disk, bool *interrupted) { + ut_ad(!mutex_own(&(log.writer_mutex))); + + uint32_t waits = 0; + *interrupted = false; + + lsn_t ready_lsn = log_buffer_ready_for_write_lsn(log); + ulint i = 0; + /* must wait for (ready_lsn >= end_lsn) at first */ + while (i < srv_n_spin_wait_rounds && ready_lsn < end_lsn) { + if (srv_spin_wait_delay) { + ut_delay(ut_rnd_interval(0, srv_spin_wait_delay)); + } + i++; + ready_lsn = log_buffer_ready_for_write_lsn(log); + } + if (ready_lsn < end_lsn) { + log.recent_written.advance_tail(); + ready_lsn = log_buffer_ready_for_write_lsn(log); + } + if (ready_lsn < end_lsn) { + os_thread_yield(); + ready_lsn = log_buffer_ready_for_write_lsn(log); + } + while (ready_lsn < end_lsn) { + /* wait using event */ + log_closer_mutex_enter(log); + if (log.current_ready_waiting_lsn == 0 && + os_event_is_set(log.closer_event)) { + log.current_ready_waiting_lsn = end_lsn; + log.current_ready_waiting_sig_count = os_event_reset(log.closer_event); + } + const auto sig_count = log.current_ready_waiting_sig_count; + log_closer_mutex_exit(log); + ++waits; + os_event_wait_time_low(log.closer_event, 100000, sig_count); + log.recent_written.advance_tail(); + ready_lsn = log_buffer_ready_for_write_lsn(log); + } + + /* NOTE: Currently doesn't do dirty read for (flush_to_disk == true) case, + because the mutex contention also works as the arbitrator for write-IO + (fsync) bandwidth between log files and data files. */ + if (!flush_to_disk && + log.write_lsn.load(std::memory_order_acquire) >= end_lsn) { + return (Wait_stats{waits}); + } + + /* mysql-test compatibility */ + LOG_SYNC_POINT("log_wait_for_flush_before_flushed_to_disk_lsn"); + LOG_SYNC_POINT("log_wait_for_flush_before_wait"); + + log_writer_mutex_enter(log); + + if (UNIV_UNLIKELY( + !log.writer_threads_paused.load(std::memory_order_relaxed))) { + log_writer_mutex_exit(log); + *interrupted = true; + return (Wait_stats{waits}); + } + + lsn_t limit_lsn = + flush_to_disk ? log.flushed_to_disk_lsn.load(std::memory_order_acquire) + : log.write_lsn.load(std::memory_order_relaxed); + if (limit_lsn >= end_lsn) { + log_writer_mutex_exit(log); + return (Wait_stats{waits}); + } + + /* If it is a write call we should just go ahead and do it + as we checked that write_lsn is not where we'd like it to + be. If we have to flush as well then we check if there is a + pending flush and based on that we wait for it to finish + before proceeding further. */ + if (flush_to_disk && !os_event_is_set(log.old_flush_event)) { + const auto sig_count = log.current_flush_sig_count; + log_writer_mutex_exit(log); + ++waits; + os_event_wait_low(log.old_flush_event, sig_count); + /* Needs to confirm actual value, + because the log writer threads might be resumed. */ + if (log.flushed_to_disk_lsn.load(std::memory_order_relaxed) < end_lsn) { + *interrupted = true; + } + return (Wait_stats{waits}); + } + + if (flush_to_disk) { + log.current_flush_sig_count = os_event_reset(log.old_flush_event); + } + + /* write to ready_lsn */ + lsn_t write_lsn = log.write_lsn.load(std::memory_order_relaxed); + while (write_lsn < ready_lsn) { + log_writer_write_buffer(log, log_buffer_ready_for_write_lsn(log)); + write_lsn = log.write_lsn.load(std::memory_order_relaxed); + } + + log_writer_mutex_exit(log); + + if (flush_to_disk) { + /* basically, no other flushers */ + if (UNIV_UNLIKELY(log_flusher_mutex_enter_nowait(log))) { + if (!log.writer_threads_paused.load(std::memory_order_relaxed)) { + os_event_set(log.old_flush_event); + *interrupted = true; + return (Wait_stats{waits}); + } + log_flusher_mutex_enter(log); + } + log_flush_low(log); + log_flusher_mutex_exit(log); + + /* mysql-test compatibility */ + LOG_SYNC_POINT("log_flush_notifier_after_event_reset"); + LOG_SYNC_POINT("log_flush_notifier_before_check"); + LOG_SYNC_POINT("log_flush_notifier_before_wait"); + LOG_SYNC_POINT("log_flush_notifier_before_flushed_to_disk_lsn"); + LOG_SYNC_POINT("log_flush_notifier_before_notify"); + } + + return (Wait_stats{waits}); +} + Wait_stats log_write_up_to(log_t &log, lsn_t end_lsn, bool flush_to_disk) { ut_a(!srv_read_only_mode); @@ -946,13 +1072,31 @@ Wait_stats log_write_up_to(log_t &log, lsn_t end_lsn, bool flush_to_disk) { ut_ad(end_lsn <= log_get_lsn(log)); + Wait_stats wait_stats{0}; + bool interrupted = false; + +retry: + if (log.writer_threads_paused.load(std::memory_order_acquire)) { + /* the log writer threads are paused not to waste CPU resource. */ + wait_stats += + log_self_write_up_to(log, end_lsn, flush_to_disk, &interrupted); + + if (UNIV_UNLIKELY(interrupted)) { + /* the log writer threads might be working. retry. */ + goto retry; + } + + DEBUG_SYNC_C("log_flushed_by_self"); + return (wait_stats); + } + + /* the log writer threads are working for high concurrency scale */ if (flush_to_disk) { if (log.flushed_to_disk_lsn.load() >= end_lsn) { - return (Wait_stats{0}); + DEBUG_SYNC_C("log_flushed_by_writer"); + return (wait_stats); } - Wait_stats wait_stats{0}; - if (srv_flush_log_at_trx_commit != 1) { /* We need redo flushed, but because trx != 1, we have disabled notifications sent from log_writer to log_flusher. @@ -967,21 +1111,34 @@ Wait_stats log_write_up_to(log_t &log, lsn_t end_lsn, bool flush_to_disk) { return to sleeping for next 1 second. */ if (log.write_lsn.load() < end_lsn) { - wait_stats = log_wait_for_write(log, end_lsn); + wait_stats += log_wait_for_write(log, end_lsn, &interrupted); } } /* Wait until log gets flushed up to end_lsn. */ - return (wait_stats + log_wait_for_flush(log, end_lsn)); + wait_stats += log_wait_for_flush(log, end_lsn, &interrupted); + + if (UNIV_UNLIKELY(interrupted)) { + /* the log writer threads might be paused. retry. */ + goto retry; + } + DEBUG_SYNC_C("log_flushed_by_writer"); } else { if (log.write_lsn.load() >= end_lsn) { - return (Wait_stats{0}); + return (wait_stats); } /* Wait until log gets written up to end_lsn. */ - return (log_wait_for_write(log, end_lsn)); + wait_stats += log_wait_for_write(log, end_lsn, &interrupted); + + if (UNIV_UNLIKELY(interrupted)) { + /* the log writer threads might be paused. retry. */ + goto retry; + } } + + return (wait_stats); } /* @} */ @@ -1494,20 +1651,23 @@ static inline void write_blocks(log_t &log, byte *write_buf, size_t write_size, static inline void notify_about_advanced_write_lsn(log_t &log, lsn_t old_write_lsn, lsn_t new_write_lsn) { - if (srv_flush_log_at_trx_commit == 1) { - os_event_set(log.flusher_event); - } + if (!log.writer_threads_paused.load(std::memory_order_acquire)) { + if (srv_flush_log_at_trx_commit == 1) { + os_event_set(log.flusher_event); + } - const auto first_slot = log_compute_write_event_slot(log, old_write_lsn + 1); + const auto first_slot = + log_compute_write_event_slot(log, old_write_lsn + 1); - const auto last_slot = log_compute_write_event_slot(log, new_write_lsn); + const auto last_slot = log_compute_write_event_slot(log, new_write_lsn); - if (first_slot == last_slot) { - LOG_SYNC_POINT("log_write_before_users_notify"); - os_event_set(log.write_events[first_slot]); - } else { - LOG_SYNC_POINT("log_write_before_notifier_notify"); - os_event_set(log.write_notifier_event); + if (first_slot == last_slot) { + LOG_SYNC_POINT("log_write_before_users_notify"); + os_event_set(log.write_events[first_slot]); + } else { + LOG_SYNC_POINT("log_write_before_notifier_notify"); + os_event_set(log.write_notifier_event); + } } if (arch_log_sys && arch_log_sys->is_active()) { @@ -1774,7 +1934,9 @@ static lsn_t log_writer_wait_on_checkpoint(log_t &log, lsn_t last_write_lsn, break; } - (void)log_advance_ready_for_write_lsn(log); + if (!log.writer_threads_paused.load(std::memory_order_acquire)) { + (void)log_advance_ready_for_write_lsn(log); + } const int32_t ATTEMPTS_UNTIL_ERROR = TIME_UNTIL_ERROR_IN_US / SLEEP_BETWEEN_RETRIES_IN_US; @@ -1838,7 +2000,9 @@ static void log_writer_wait_on_archiver(log_t &log, lsn_t last_write_lsn, break; } - (void)log_advance_ready_for_write_lsn(log); + if (!log.writer_threads_paused.load(std::memory_order_acquire)) { + (void)log_advance_ready_for_write_lsn(log); + } const int32_t ATTEMPTS_UNTIL_ERROR = TIME_UNTIL_ERROR_IN_US / SLEEP_BETWEEN_RETRIES_IN_US; @@ -1999,6 +2163,11 @@ void log_writer(log_t *log_ptr) { return (true); } + if (UNIV_UNLIKELY( + log.writer_threads_paused.load(std::memory_order_acquire))) { + return (true); + } + if (wait) { write_to_file_requests_monitor.update(); log_writer_mutex_exit(log); @@ -2012,6 +2181,17 @@ void log_writer(log_t *log_ptr) { MONITOR_INC_WAIT_STATS(MONITOR_LOG_WRITER_, wait_stats); + if (UNIV_UNLIKELY( + log.writer_threads_paused.load(std::memory_order_acquire) && + !log.should_stop_threads.load())) { + log_writer_mutex_exit(log); + + os_event_wait(log.writer_threads_resume_event); + + log_writer_mutex_enter(log); + ready_lsn = log_buffer_ready_for_write_lsn(log); + } + /* Do the actual work. */ if (log.write_lsn.load() < ready_lsn) { log_writer_write_buffer(log, ready_lsn); @@ -2073,16 +2253,16 @@ static void log_flush_update_stats(log_t &log) { fsync_time = log.last_flush_end_time - log.last_flush_start_time; - ut_a(fsync_time.count() >= 0); - fsync_max_time = std::max(fsync_max_time, fsync_time); - fsync_total_time += fsync_time; + if (fsync_time.count() > 0) { + fsync_total_time += fsync_time; - MONITOR_INC_VALUE( - MONITOR_LOG_FLUSH_TOTAL_TIME, - std::chrono::duration_cast(fsync_time) - .count()); + MONITOR_INC_VALUE( + MONITOR_LOG_FLUSH_TOTAL_TIME, + std::chrono::duration_cast(fsync_time) + .count()); + } /* Calculate time elapsed since start of last sample. */ @@ -2149,7 +2329,9 @@ static void log_flush_low(log_t &log) { bool do_flush = true; #endif - os_event_reset(log.flusher_event); + if (!log.writer_threads_paused.load(std::memory_order_acquire)) { + os_event_reset(log.flusher_event); + } log.last_flush_start_time = Log_clock::now(); @@ -2157,6 +2339,11 @@ static void log_flush_low(log_t &log) { const lsn_t flush_up_to_lsn = log.write_lsn.load(); + if (flush_up_to_lsn == last_flush_lsn) { + os_event_set(log.old_flush_event); + return; + } + ut_a(flush_up_to_lsn > last_flush_lsn); if (do_flush) { @@ -2184,16 +2371,23 @@ static void log_flush_low(log_t &log) { DBUG_PRINT("ib_log", ("Flushed to disk up to " LSN_PF, flush_up_to_lsn)); - const auto first_slot = log_compute_flush_event_slot(log, last_flush_lsn + 1); + if (!log.writer_threads_paused.load(std::memory_order_acquire)) { + const auto first_slot = + log_compute_flush_event_slot(log, last_flush_lsn + 1); - const auto last_slot = log_compute_flush_event_slot(log, flush_up_to_lsn); + const auto last_slot = log_compute_flush_event_slot(log, flush_up_to_lsn); - if (first_slot == last_slot) { - LOG_SYNC_POINT("log_flush_before_users_notify"); - os_event_set(log.flush_events[first_slot]); + if (first_slot == last_slot) { + LOG_SYNC_POINT("log_flush_before_users_notify"); + os_event_set(log.flush_events[first_slot]); + } else { + LOG_SYNC_POINT("log_flush_before_notifier_notify"); + os_event_set(log.flush_notifier_event); + } } else { + LOG_SYNC_POINT("log_flush_before_users_notify"); LOG_SYNC_POINT("log_flush_before_notifier_notify"); - os_event_set(log.flush_notifier_event); + os_event_set(log.old_flush_event); } /* Update stats. */ @@ -2221,6 +2415,15 @@ void log_flusher(log_t *log_ptr) { } } + if (UNIV_UNLIKELY( + log.writer_threads_paused.load(std::memory_order_acquire))) { + log_flusher_mutex_exit(log); + + os_event_wait(log.writer_threads_resume_event); + + log_flusher_mutex_enter(log); + } + bool released = false; auto stop_condition = [&log, &released, step](bool wait) { @@ -2257,6 +2460,11 @@ void log_flusher(log_t *log_ptr) { } } + if (UNIV_UNLIKELY( + log.writer_threads_paused.load(std::memory_order_acquire))) { + return (true); + } + if (wait) { log_flusher_mutex_exit(log); released = true; @@ -2351,6 +2559,20 @@ void log_write_notifier(log_t *log_ptr) { } } + if (UNIV_UNLIKELY( + log.writer_threads_paused.load(std::memory_order_acquire))) { + log_write_notifier_mutex_exit(log); + + os_event_wait(log.writer_threads_resume_event); + ut_ad(log.write_notifier_resume_lsn.load(std::memory_order_acquire) + 1 >= + lsn); + lsn = log.write_notifier_resume_lsn.load(std::memory_order_acquire) + 1; + /* clears to acknowledge */ + log.write_notifier_resume_lsn.store(0, std::memory_order_release); + + log_write_notifier_mutex_enter(log); + } + LOG_SYNC_POINT("log_write_notifier_before_check"); bool released = false; @@ -2374,6 +2596,11 @@ void log_write_notifier(log_t *log_ptr) { } } + if (UNIV_UNLIKELY( + log.writer_threads_paused.load(std::memory_order_acquire))) { + return (true); + } + if (wait) { log_write_notifier_mutex_exit(log); released = true; @@ -2450,6 +2677,20 @@ void log_flush_notifier(log_t *log_ptr) { } } + if (UNIV_UNLIKELY( + log.writer_threads_paused.load(std::memory_order_acquire))) { + log_flush_notifier_mutex_exit(log); + + os_event_wait(log.writer_threads_resume_event); + ut_ad(log.flush_notifier_resume_lsn.load(std::memory_order_acquire) + 1 >= + lsn); + lsn = log.flush_notifier_resume_lsn.load(std::memory_order_acquire) + 1; + /* clears to acknowledge */ + log.flush_notifier_resume_lsn.store(0, std::memory_order_release); + + log_flush_notifier_mutex_enter(log); + } + LOG_SYNC_POINT("log_flush_notifier_before_check"); bool released = false; @@ -2473,6 +2714,11 @@ void log_flush_notifier(log_t *log_ptr) { } } + if (UNIV_UNLIKELY( + log.writer_threads_paused.load(std::memory_order_acquire))) { + return (true); + } + if (wait) { log_flush_notifier_mutex_exit(log); released = true; @@ -2519,108 +2765,6 @@ void log_flush_notifier(log_t *log_ptr) { /* @} */ -/**************************************************/ /** - - @name Log closer thread - - *******************************************************/ - -/* @{ */ - -void log_closer(log_t *log_ptr) { - ut_a(log_ptr != nullptr); - - log_t &log = *log_ptr; - lsn_t end_lsn = 0; - - log_closer_mutex_enter(log); - - Log_thread_waiting waiting{log, log.closer_event, srv_log_closer_spin_delay, - srv_log_closer_timeout}; - - for (uint64_t step = 0;; ++step) { - bool released = false; - - auto stop_condition = [&log, &released, step](bool wait) { - if (released) { - log_closer_mutex_enter(log); - released = false; - } - - /* Advance lsn up to which all the dirty pages have - been added to flush lists. */ - - if (log_advance_dirty_pages_added_up_to_lsn(log)) { - if (step % 1024 == 0) { - log_closer_mutex_exit(log); - os_thread_sleep(0); - log_closer_mutex_enter(log); - } - return (true); - } - - if (log.should_stop_threads.load()) { - return (true); - } - - if (wait) { - log_closer_mutex_exit(log); - released = true; - } - return (false); - }; - - waiting.wait(stop_condition); - - /* Check if we should close the thread. */ - if (log.should_stop_threads.load()) { - if (!log_flusher_is_active() && !log_writer_is_active()) { - end_lsn = log.write_lsn.load(); - - ut_a(log_lsn_validate(end_lsn)); - ut_a(end_lsn == log.flushed_to_disk_lsn.load()); - ut_a(end_lsn == log_buffer_ready_for_write_lsn(log)); - - ut_a(end_lsn >= log_buffer_dirty_pages_added_up_to_lsn(log)); - - if (log_buffer_dirty_pages_added_up_to_lsn(log) == end_lsn) { - /* All confirmed reservations have been written - to redo and all dirty pages related to those - writes have been added to flush lists. - - However, there could be user threads, which are - in the middle of log_buffer_reserve(), reserved - range of sn values, but could not confirm. - - Note that because log_writer is already not alive, - the only possible reason guaranteed by its death, - is that there is x-lock at end_lsn, in which case - end_lsn separates two regions in log buffer: - completely full and completely empty. */ - const lsn_t ready_lsn = log_buffer_ready_for_write_lsn(log); - - const lsn_t current_lsn = log_get_lsn(log); - - if (current_lsn > ready_lsn) { - log.recent_written.validate_no_links(ready_lsn, current_lsn); - - log.recent_closed.validate_no_links(ready_lsn, current_lsn); - } - - break; - } - /* We need to wait until remaining dirty pages - have been added. */ - } - /* We prefer to wait until all writing is done. */ - } - } - - log_closer_mutex_exit(log); -} - -/* @} */ - /**************************************************/ /** @name Log files encryption diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index c5d61b947279..21a617286ad3 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -252,6 +252,9 @@ ulong srv_log_buffer_size; /** Size of block, used for writing ahead to avoid read-on-write. */ ulong srv_log_write_ahead_size; +/** Whether to activate/pause the log writer threads. */ +bool srv_log_writer_threads; + /** Minimum absolute value of cpu time for which spin-delay is used. */ uint srv_log_spin_cpu_abs_lwm; @@ -341,13 +344,6 @@ ulong srv_log_flush_notifier_spin_delay = ulong srv_log_flush_notifier_timeout = INNODB_LOG_FLUSH_NOTIFIER_TIMEOUT_DEFAULT; -/** Number of spin iterations, for which log closerr thread is waiting -for a reachable untraversed link in recent_closed. */ -ulong srv_log_closer_spin_delay = INNODB_LOG_CLOSER_SPIN_DELAY_DEFAULT; - -/** Initial sleep used in log closer after spin delay is finished. */ -ulong srv_log_closer_timeout = INNODB_LOG_CLOSER_TIMEOUT_DEFAULT; - /* End of EXPERIMENTAL sys vars */ /** Whether to generate and require checksums on the redo log pages. */ @@ -1125,6 +1121,8 @@ static void srv_init(void) { buf_flush_event = os_event_create("buf_flush_event"); + buf_flush_tick_event = os_event_create("buf_flush_tick_event"); + UT_LIST_INIT(srv_sys->tasks, &que_thr_t::queue); } @@ -1173,6 +1171,7 @@ void srv_free(void) { os_event_destroy(srv_monitor_event); os_event_destroy(srv_buf_dump_event); os_event_destroy(buf_flush_event); + os_event_destroy(buf_flush_tick_event); } os_event_destroy(srv_buf_resize_event); @@ -1920,14 +1919,17 @@ void srv_wake_master_thread(void) { /** Get current server activity count. We don't hold srv_sys::mutex while reading this value as it is only used in heuristics. @return activity count. */ -ulint srv_get_activity_count(void) { return (srv_sys->activity_count); } +ulint srv_get_activity_count(void) { + return (srv_sys == nullptr ? 0 : srv_sys->activity_count); +} /** Check if there has been any activity. @return false if no change in activity counter. */ ibool srv_check_activity( ulint old_activity_count) /*!< in: old activity count */ { - return (srv_sys->activity_count != old_activity_count); + return (srv_sys == nullptr ? false + : srv_sys->activity_count != old_activity_count); } /** Make room in the table cache by evicting an unused table. @@ -2279,6 +2281,9 @@ static void srv_master_do_active_tasks(void) { MONITOR_INC_TIME_IN_MICRO_SECS(MONITOR_SRV_IBUF_MERGE_MICROSECOND, counter_time); + /* Flush logs if needed */ + log_buffer_sync_in_background(); + /* Now see if various tasks that are performed at defined intervals need to be performed. */ @@ -2356,6 +2361,9 @@ static void srv_master_do_idle_tasks(void) { } MONITOR_INC_TIME_IN_MICRO_SECS(MONITOR_SRV_DICT_LRU_MICROSECOND, counter_time); + + /* Flush logs if needed */ + log_buffer_sync_in_background(); } /** Perform the tasks during shutdown. The tasks that we do at shutdown diff --git a/storage/innobase/srv/srv0start.cc b/storage/innobase/srv/srv0start.cc index e74725613283..e7a4dc49c429 100644 --- a/storage/innobase/srv/srv0start.cc +++ b/storage/innobase/srv/srv0start.cc @@ -3062,6 +3062,9 @@ void srv_start_threads_after_ddl_recovery() { /* Now the InnoDB Metadata and file system should be consistent. Start the Purge thread */ srv_start_purge_threads(); + + /* If recovered, should do write back the dynamic metadata. */ + dict_persist_to_dd_table_buffer(); } #if 0