@@ -163,6 +163,14 @@ class Thread_excursion
163
163
*/
164
164
int attach_to (THD *thd)
165
165
{
166
+ /*
167
+ Simulate session attach error.
168
+ */
169
+ DBUG_EXECUTE_IF (" simulate_session_attach_error" ,
170
+ {
171
+ if (rand () % 3 == 0 )
172
+ return 1 ;
173
+ };);
166
174
#ifdef HAVE_PSI_THREAD_INTERFACE
167
175
PSI_THREAD_CALL (set_thread)(thd_get_psi (thd));
168
176
#endif /* HAVE_PSI_THREAD_INTERFACE */
@@ -2347,6 +2355,7 @@ void MYSQL_BIN_LOG::cleanup()
2347
2355
mysql_mutex_destroy (&LOCK_index);
2348
2356
mysql_mutex_destroy (&LOCK_commit);
2349
2357
mysql_mutex_destroy (&LOCK_sync);
2358
+ mysql_mutex_destroy (&LOCK_xids);
2350
2359
mysql_cond_destroy (&update_cond);
2351
2360
my_atomic_rwlock_destroy (&m_prep_xids_lock);
2352
2361
mysql_cond_destroy (&m_prep_xids_cond);
@@ -2364,6 +2373,7 @@ void MYSQL_BIN_LOG::init_pthread_objects()
2364
2373
mysql_mutex_init (m_key_LOCK_index, &LOCK_index, MY_MUTEX_INIT_SLOW);
2365
2374
mysql_mutex_init (m_key_LOCK_commit, &LOCK_commit, MY_MUTEX_INIT_FAST);
2366
2375
mysql_mutex_init (m_key_LOCK_sync, &LOCK_sync, MY_MUTEX_INIT_FAST);
2376
+ mysql_mutex_init (m_key_LOCK_xids, &LOCK_xids, MY_MUTEX_INIT_FAST);
2367
2377
mysql_cond_init (m_key_update_cond, &update_cond, 0 );
2368
2378
my_atomic_rwlock_init (&m_prep_xids_lock);
2369
2379
mysql_cond_init (m_key_prep_xids_cond, &m_prep_xids_cond, NULL );
@@ -4809,25 +4819,27 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock_log, Format_description_log_even
4809
4819
mysql_mutex_lock (&LOCK_log);
4810
4820
else
4811
4821
mysql_mutex_assert_owner (&LOCK_log);
4812
- mysql_mutex_lock (&LOCK_commit);
4822
+ DBUG_EXECUTE_IF (" semi_sync_3-way_deadlock" ,
4823
+ DEBUG_SYNC (current_thd, " before_rotate_binlog" ););
4824
+ mysql_mutex_lock (&LOCK_xids);
4813
4825
/*
4814
4826
We need to ensure that the number of prepared XIDs are 0.
4815
4827
4816
4828
If m_prep_xids is not zero:
4817
- - We release the LOCK_commit lock to allow sessions to commit,
4818
- hence decrease m_prep_xids
4829
+ - We wait for storage engine commit, hence decrease m_prep_xids
4819
4830
- We keep the LOCK_log to block new transactions from being
4820
4831
written to the binary log.
4821
4832
*/
4822
4833
while (get_prep_xids () > 0 )
4823
- mysql_cond_wait (&m_prep_xids_cond, &LOCK_commit);
4834
+ mysql_cond_wait (&m_prep_xids_cond, &LOCK_xids);
4835
+ mysql_mutex_unlock (&LOCK_xids);
4836
+
4824
4837
mysql_mutex_lock (&LOCK_index);
4825
4838
4826
4839
if ((error= ha_flush_logs (0 )))
4827
4840
goto end;
4828
4841
4829
4842
mysql_mutex_assert_owner (&LOCK_log);
4830
- mysql_mutex_assert_owner (&LOCK_commit);
4831
4843
mysql_mutex_assert_owner (&LOCK_index);
4832
4844
4833
4845
/* Reuse old name if not binlog and not update log */
@@ -4950,7 +4962,6 @@ int MYSQL_BIN_LOG::new_file_impl(bool need_lock_log, Format_description_log_even
4950
4962
}
4951
4963
4952
4964
mysql_mutex_unlock (&LOCK_index);
4953
- mysql_mutex_unlock (&LOCK_commit);
4954
4965
if (need_lock_log)
4955
4966
mysql_mutex_unlock (&LOCK_log);
4956
4967
@@ -5805,6 +5816,15 @@ bool MYSQL_BIN_LOG::write_cache(THD *thd, binlog_cache_data *cache_data)
5805
5816
IO_CACHE *cache= &cache_data->cache_log ;
5806
5817
bool incident= cache_data->has_incident ();
5807
5818
5819
+ DBUG_EXECUTE_IF (" simulate_binlog_flush_error" ,
5820
+ {
5821
+ if (rand () % 3 == 0 )
5822
+ {
5823
+ write_error=1 ;
5824
+ goto err;
5825
+ }
5826
+ };);
5827
+
5808
5828
mysql_mutex_assert_owner (&LOCK_log);
5809
5829
5810
5830
DBUG_ASSERT (is_open ());
@@ -5865,6 +5885,8 @@ bool MYSQL_BIN_LOG::write_cache(THD *thd, binlog_cache_data *cache_data)
5865
5885
sql_print_error (ER (ER_ERROR_ON_WRITE), name,
5866
5886
errno, my_strerror (errbuf, sizeof (errbuf), errno));
5867
5887
}
5888
+ thd->commit_error = THD::CE_FLUSH_ERROR;
5889
+
5868
5890
DBUG_RETURN (1 );
5869
5891
}
5870
5892
@@ -6444,10 +6466,7 @@ MYSQL_BIN_LOG::flush_thread_caches(THD *thd)
6444
6466
*/
6445
6467
thd->set_trans_pos (log_file_name, my_b_tell (&log_file));
6446
6468
if (wrote_xid)
6447
- {
6448
- inc_prep_xids ();
6449
- thd->transaction .flags .xid_written = true ;
6450
- }
6469
+ inc_prep_xids (thd);
6451
6470
}
6452
6471
DBUG_PRINT (" debug" , (" bytes: %llu" , bytes));
6453
6472
return std::make_pair (error, bytes);
@@ -6474,7 +6493,7 @@ MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
6474
6493
{
6475
6494
DBUG_ASSERT (total_bytes_var && rotate_var && out_queue_var);
6476
6495
my_off_t total_bytes= 0 ;
6477
- int flush_error= 0 ;
6496
+ int flush_error= 1 ;
6478
6497
mysql_mutex_assert_owner (&LOCK_log);
6479
6498
6480
6499
my_atomic_rwlock_rdlock (&opt_binlog_max_flush_queue_time_lock);
@@ -6497,7 +6516,7 @@ MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
6497
6516
std::pair<int ,my_off_t > result= flush_thread_caches (current.second );
6498
6517
has_more= current.first ;
6499
6518
total_bytes+= result.second ;
6500
- if (flush_error == 0 )
6519
+ if (flush_error == 1 )
6501
6520
flush_error= result.first ;
6502
6521
if (first_seen == NULL )
6503
6522
first_seen= current.second ;
@@ -6515,7 +6534,7 @@ MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
6515
6534
{
6516
6535
std::pair<int ,my_off_t > result= flush_thread_caches (head);
6517
6536
total_bytes+= result.second ;
6518
- if (flush_error == 0 )
6537
+ if (flush_error == 1 )
6519
6538
flush_error= result.first ;
6520
6539
}
6521
6540
if (first_seen == NULL )
@@ -6542,12 +6561,10 @@ MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
6542
6561
6543
6562
@param thd The "master" thread
6544
6563
@param first First thread in the queue of threads to commit
6545
- @param flush_error Error code from flush operation.
6546
6564
*/
6547
6565
6548
6566
void
6549
- MYSQL_BIN_LOG::process_commit_stage_queue (THD *thd, THD *first,
6550
- int flush_error)
6567
+ MYSQL_BIN_LOG::process_commit_stage_queue (THD *thd, THD *first)
6551
6568
{
6552
6569
mysql_mutex_assert_owner (&LOCK_commit);
6553
6570
Thread_excursion excursion (thd);
@@ -6570,27 +6587,75 @@ MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first,
6570
6587
#ifndef DBUG_OFF
6571
6588
stage_manager.clear_preempt_status (head);
6572
6589
#endif
6573
- if (flush_error != 0 )
6574
- head->commit_error = flush_error;
6575
- else if (int error= excursion.attach_to (head))
6576
- head->commit_error = error;
6590
+ if (head->commit_error != THD::CE_NONE)
6591
+ ;
6592
+ else if (excursion.attach_to (head))
6593
+ {
6594
+ head->commit_error = THD::CE_COMMIT_ERROR;
6595
+ sql_print_error (" Out of memory while attaching to session thread "
6596
+ " during the group commit phase." );
6597
+ }
6577
6598
else
6578
6599
{
6579
6600
bool all= head->transaction .flags .real_commit ;
6580
6601
if (head->transaction .flags .commit_low )
6581
6602
{
6582
6603
/* head is parked to have exited append() */
6583
6604
DBUG_ASSERT (head->transaction .flags .ready_preempt );
6584
-
6585
- if ( int error= ha_commit_low (head, all))
6586
- head-> commit_error = error;
6587
- else if (head-> transaction . flags . xid_written )
6588
- dec_prep_xids () ;
6605
+ /*
6606
+ storage engine commit
6607
+ */
6608
+ if (ha_commit_low ( head, all, false ) )
6609
+ head-> commit_error = THD::CE_COMMIT_ERROR ;
6589
6610
}
6590
6611
DBUG_PRINT (" debug" , (" commit_error: %d, flags.pending: %s" ,
6591
6612
head->commit_error ,
6592
6613
YESNO (head->transaction .flags .pending )));
6593
6614
}
6615
+ /*
6616
+ Decrement the prepared XID counter after storage engine commit.
6617
+ We also need decrement the prepared XID when encountering a
6618
+ flush error or session attach error for avoiding 3-way deadlock
6619
+ among user thread, rotate thread and dump thread.
6620
+ */
6621
+ if (head->transaction .flags .xid_written )
6622
+ dec_prep_xids (head);
6623
+ }
6624
+ }
6625
+
6626
+ /* *
6627
+ Process after commit for a sequence of sessions.
6628
+
6629
+ @param thd The "master" thread
6630
+ @param first First thread in the queue of threads to commit
6631
+ */
6632
+
6633
+ void
6634
+ MYSQL_BIN_LOG::process_after_commit_stage_queue (THD *thd, THD *first)
6635
+ {
6636
+ Thread_excursion excursion (thd);
6637
+ for (THD *head= first; head; head= head->next_to_commit )
6638
+ {
6639
+ if (head->transaction .flags .run_hooks &&
6640
+ head->commit_error == THD::CE_NONE)
6641
+ {
6642
+ if (excursion.attach_to (head))
6643
+ {
6644
+ head->commit_error = THD::CE_COMMIT_ERROR;
6645
+ sql_print_error (" Out of memory while attaching to session thread "
6646
+ " during the group commit phase." );
6647
+ }
6648
+ if (head->commit_error == THD::CE_NONE)
6649
+ {
6650
+ bool all= head->transaction .flags .real_commit ;
6651
+ (void ) RUN_HOOK (transaction, after_commit, (head, all));
6652
+ /*
6653
+ When after_commit finished for the transaction, clear the run_hooks flag.
6654
+ This allow other parts of the system to check if after_commit was called.
6655
+ */
6656
+ head->transaction .flags .run_hooks = false ;
6657
+ }
6658
+ }
6594
6659
}
6595
6660
}
6596
6661
@@ -6719,13 +6784,31 @@ MYSQL_BIN_LOG::sync_binlog_file(bool force)
6719
6784
int
6720
6785
MYSQL_BIN_LOG::finish_commit (THD *thd)
6721
6786
{
6722
- if (thd->commit_error == 0 && thd-> transaction .flags .commit_low )
6787
+ if (thd->transaction .flags .commit_low )
6723
6788
{
6724
6789
const bool all= thd->transaction .flags .real_commit ;
6725
- thd->commit_error = ha_commit_low (thd, all);
6790
+ /*
6791
+ storage engine commit
6792
+ */
6793
+ if (thd->commit_error == THD::CE_NONE &&
6794
+ ha_commit_low (thd, all, false ))
6795
+ thd->commit_error = THD::CE_COMMIT_ERROR;
6796
+ /*
6797
+ Decrement the prepared XID counter after storage engine commit
6798
+ */
6726
6799
if (thd->transaction .flags .xid_written )
6727
- dec_prep_xids ();
6800
+ dec_prep_xids (thd);
6801
+ /*
6802
+ If commit succeeded, we call the after_commit hook
6803
+ */
6804
+ if (thd->commit_error == THD::CE_NONE)
6805
+ {
6806
+ (void ) RUN_HOOK (transaction, after_commit, (thd, all));
6807
+ thd->transaction .flags .run_hooks = false ;
6808
+ }
6728
6809
}
6810
+ else if (thd->transaction .flags .xid_written )
6811
+ dec_prep_xids (thd);
6729
6812
6730
6813
thd->variables .gtid_next .set_undefined ();
6731
6814
/*
@@ -6736,7 +6819,7 @@ MYSQL_BIN_LOG::finish_commit(THD *thd)
6736
6819
gtid_state->update_on_commit (thd);
6737
6820
global_sid_lock->unlock ();
6738
6821
6739
- DBUG_ASSERT (thd->commit_error || !thd->transaction .flags .commit_low );
6822
+ DBUG_ASSERT (thd->commit_error || !thd->transaction .flags .run_hooks );
6740
6823
DBUG_ASSERT (!thd_get_cache_mngr (thd)->dbug_any_finalized ());
6741
6824
DBUG_PRINT (" return" , (" Thread ID: %lu, commit_error: %d" ,
6742
6825
thd->thread_id , thd->commit_error ));
@@ -6814,12 +6897,13 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit)
6814
6897
ha_commit_low since that calls st_transaction::cleanup.
6815
6898
*/
6816
6899
thd->transaction .flags .pending = true ;
6817
- thd->commit_error = 0 ;
6900
+ thd->commit_error = THD::CE_NONE ;
6818
6901
thd->next_to_commit = NULL ;
6819
6902
thd->durability_property = HA_IGNORE_DURABILITY;
6820
6903
thd->transaction .flags .real_commit = all;
6821
6904
thd->transaction .flags .xid_written = false ;
6822
6905
thd->transaction .flags .commit_low = !skip_commit;
6906
+ thd->transaction .flags .run_hooks = !skip_commit;
6823
6907
#ifndef DBUG_OFF
6824
6908
/*
6825
6909
The group commit Leader may have to wait for follower whose transaction
@@ -6914,8 +6998,15 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit)
6914
6998
DBUG_RETURN (finish_commit (thd));
6915
6999
}
6916
7000
THD *commit_queue= stage_manager.fetch_queue_for (Stage_manager::COMMIT_STAGE);
6917
- process_commit_stage_queue (thd, commit_queue, flush_error);
7001
+ DBUG_EXECUTE_IF (" semi_sync_3-way_deadlock" ,
7002
+ DEBUG_SYNC (thd, " before_process_commit_stage_queue" ););
7003
+ process_commit_stage_queue (thd, commit_queue);
6918
7004
mysql_mutex_unlock (&LOCK_commit);
7005
+ /*
7006
+ Process after_commit after LOCK_commit is released for avoiding
7007
+ 3-way deadlock among user thread, rotate thread and dump thread.
7008
+ */
7009
+ process_after_commit_stage_queue (thd, commit_queue);
6919
7010
final_queue= commit_queue;
6920
7011
}
6921
7012
else
@@ -6932,20 +7023,17 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit)
6932
7023
(void ) finish_commit (thd);
6933
7024
6934
7025
/*
6935
- If we need to rotate, we do it now.
7026
+ If we need to rotate, we do it without commit error.
7027
+ Otherwise the thd->commit_error will be possibly reset.
6936
7028
*/
6937
- if (do_rotate)
7029
+ if (do_rotate && thd-> commit_error == THD::CE_NONE )
6938
7030
{
6939
7031
/*
6940
- We can force the rotate since we did the check in
6941
- flush_session_queue(). Giving "false" would have the same
6942
- result, but will do the check again.
7032
+ Do not force the rotate as several consecutive groups may
7033
+ request unnecessary rotations.
6943
7034
6944
7035
NOTE: Run purge_logs wo/ holding LOCK_log because it does not
6945
7036
need the mutex. Otherwise causes various deadlocks.
6946
-
6947
- NOTE: The LOCK_commit is necessary when doing a rotate, but that
6948
- is grabbed inside new_file_impl().
6949
7037
*/
6950
7038
6951
7039
DBUG_EXECUTE_IF (" crash_commit_before_unlog" , DBUG_SUICIDE (););
@@ -6958,7 +7046,7 @@ int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit)
6958
7046
if (!error && check_purge)
6959
7047
purge ();
6960
7048
else
6961
- thd->commit_error = error ;
7049
+ thd->commit_error = THD::CE_COMMIT_ERROR ;
6962
7050
}
6963
7051
DBUG_RETURN (thd->commit_error );
6964
7052
}
0 commit comments