@@ -5072,6 +5072,259 @@ longlong Item_master_pos_wait::val_int() {
5072
5072
return Item_source_pos_wait::val_int ();
5073
5073
}
5074
5074
5075
+ bool Item_wait_for_executed_gtid_set::itemize (Parse_context *pc, Item **res) {
5076
+ if (skip_itemize (res)) return false ;
5077
+ if (super::itemize (pc, res)) return true ;
5078
+ /*
5079
+ It is unsafe because the return value depends on timing. If the timeout
5080
+ happens, the return value is different from the one in which the function
5081
+ returns with success.
5082
+ */
5083
+ pc->thd ->lex ->set_stmt_unsafe (LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION);
5084
+ pc->thd ->lex ->safe_to_cache_query = false ;
5085
+ return false ;
5086
+ }
5087
+
5088
+ /* *
5089
+ Wait until the given gtid_set is found in the executed gtid_set independent
5090
+ of the slave threads.
5091
+ */
5092
+ longlong Item_wait_for_executed_gtid_set::val_int () {
5093
+ DBUG_TRACE;
5094
+ assert (fixed);
5095
+ THD *thd = current_thd;
5096
+
5097
+ null_value = false ;
5098
+
5099
+ String *gtid_text = args[0 ]->val_str (&value);
5100
+ if (gtid_text == nullptr ) {
5101
+ /*
5102
+ Usually, an argument that is NULL causes an SQL function to return NULL,
5103
+ however since this is a function with side-effects, a NULL value is
5104
+ treated as an error.
5105
+ */
5106
+ if (!thd->is_error ()) {
5107
+ my_error (ER_MALFORMED_GTID_SET_SPECIFICATION, MYF (0 ), " NULL" );
5108
+ }
5109
+ return error_int ();
5110
+ }
5111
+
5112
+ // Waiting for a GTID in a slave thread could cause the slave to
5113
+ // hang/deadlock.
5114
+ // @todo: Return error instead of NULL
5115
+ if (thd->slave_thread ) {
5116
+ return error_int ();
5117
+ }
5118
+
5119
+ Gtid_set wait_for_gtid_set (global_sid_map, nullptr );
5120
+
5121
+ global_sid_lock->rdlock ();
5122
+ if (global_gtid_mode.get () == Gtid_mode::OFF) {
5123
+ global_sid_lock->unlock ();
5124
+ my_error (ER_GTID_MODE_OFF, MYF (0 ), " use WAIT_FOR_EXECUTED_GTID_SET" );
5125
+ return error_int ();
5126
+ }
5127
+
5128
+ if (wait_for_gtid_set.add_gtid_text (gtid_text->c_ptr_safe ()) !=
5129
+ RETURN_STATUS_OK) {
5130
+ global_sid_lock->unlock ();
5131
+ // Error has already been generated.
5132
+ return error_int ();
5133
+ }
5134
+
5135
+ // Cannot wait for a GTID that the thread owns since that would
5136
+ // immediately deadlock.
5137
+ if (thd->owned_gtid .sidno > 0 &&
5138
+ wait_for_gtid_set.contains_gtid (thd->owned_gtid )) {
5139
+ char buf[Gtid::MAX_TEXT_LENGTH + 1 ];
5140
+ thd->owned_gtid .to_string (global_sid_map, buf);
5141
+ global_sid_lock->unlock ();
5142
+ my_error (ER_CANT_WAIT_FOR_EXECUTED_GTID_SET_WHILE_OWNING_A_GTID, MYF (0 ),
5143
+ buf);
5144
+ return error_int ();
5145
+ }
5146
+
5147
+ gtid_state->begin_gtid_wait ();
5148
+
5149
+ double timeout = (arg_count == 2 ) ? args[1 ]->val_real () : 0 ;
5150
+ if (timeout < 0 ) {
5151
+ if (thd->is_strict_mode ()) {
5152
+ my_error (ER_WRONG_ARGUMENTS, MYF (0 ), " WAIT_FOR_EXECUTED_GTID_SET." );
5153
+ } else {
5154
+ push_warning_printf (thd, Sql_condition::SL_WARNING, ER_WRONG_ARGUMENTS,
5155
+ ER_THD (thd, ER_WRONG_ARGUMENTS),
5156
+ " WAIT_FOR_EXECUTED_GTID_SET." );
5157
+ }
5158
+ gtid_state->end_gtid_wait ();
5159
+ global_sid_lock->unlock ();
5160
+ return error_int ();
5161
+ }
5162
+
5163
+ bool result = gtid_state->wait_for_gtid_set (thd, &wait_for_gtid_set, timeout);
5164
+ global_sid_lock->unlock ();
5165
+ gtid_state->end_gtid_wait ();
5166
+
5167
+ return result;
5168
+ }
5169
+
5170
+ Item_master_gtid_set_wait::Item_master_gtid_set_wait (const POS &pos, Item *a)
5171
+ : Item_int_func(pos, a) {
5172
+ push_deprecated_warn (current_thd, " WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS" ,
5173
+ " WAIT_FOR_EXECUTED_GTID_SET" );
5174
+ }
5175
+
5176
+ Item_master_gtid_set_wait::Item_master_gtid_set_wait (const POS &pos, Item *a,
5177
+ Item *b)
5178
+ : Item_int_func(pos, a, b) {
5179
+ push_deprecated_warn (current_thd, " WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS" ,
5180
+ " WAIT_FOR_EXECUTED_GTID_SET" );
5181
+ }
5182
+
5183
+ Item_master_gtid_set_wait::Item_master_gtid_set_wait (const POS &pos, Item *a,
5184
+ Item *b, Item *c)
5185
+ : Item_int_func(pos, a, b, c) {
5186
+ push_deprecated_warn (current_thd, " WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS" ,
5187
+ " WAIT_FOR_EXECUTED_GTID_SET" );
5188
+ }
5189
+
5190
+ bool Item_master_gtid_set_wait::itemize (Parse_context *pc, Item **res) {
5191
+ if (skip_itemize (res)) return false ;
5192
+ if (super::itemize (pc, res)) return true ;
5193
+ pc->thd ->lex ->set_stmt_unsafe (LEX::BINLOG_STMT_UNSAFE_SYSTEM_FUNCTION);
5194
+ pc->thd ->lex ->safe_to_cache_query = false ;
5195
+ return false ;
5196
+ }
5197
+
5198
+ longlong Item_master_gtid_set_wait::val_int () {
5199
+ assert (fixed);
5200
+ DBUG_TRACE;
5201
+ int event_count = 0 ;
5202
+
5203
+ null_value = false ;
5204
+
5205
+ String *gtid = args[0 ]->val_str (&value);
5206
+ if (gtid == nullptr ) {
5207
+ return error_int ();
5208
+ }
5209
+
5210
+ THD *thd = current_thd;
5211
+ Master_info *mi = nullptr ;
5212
+ double timeout = (arg_count >= 2 ) ? args[1 ]->val_real () : 0 ;
5213
+ if (timeout < 0 ) {
5214
+ if (thd->is_strict_mode ()) {
5215
+ my_error (ER_WRONG_ARGUMENTS, MYF (0 ),
5216
+ " WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS." );
5217
+ } else {
5218
+ push_warning_printf (thd, Sql_condition::SL_WARNING, ER_WRONG_ARGUMENTS,
5219
+ ER_THD (thd, ER_WRONG_ARGUMENTS),
5220
+ " WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS." );
5221
+ }
5222
+ return error_int ();
5223
+ }
5224
+
5225
+ if (thd->slave_thread ) {
5226
+ return error_int ();
5227
+ }
5228
+
5229
+ channel_map.rdlock ();
5230
+
5231
+ /* If replication channel is mentioned */
5232
+ if (arg_count == 3 ) {
5233
+ String *channel_str;
5234
+ if (!(channel_str = args[2 ]->val_str (&value))) {
5235
+ channel_map.unlock ();
5236
+ return error_int ();
5237
+ }
5238
+ mi = channel_map.get_mi (channel_str->ptr ());
5239
+ } else {
5240
+ if (channel_map.get_num_instances () > 1 ) {
5241
+ channel_map.unlock ();
5242
+ mi = nullptr ;
5243
+ my_error (ER_REPLICA_MULTIPLE_CHANNELS_CMD, MYF (0 ));
5244
+ return error_int ();
5245
+ } else
5246
+ mi = channel_map.get_default_channel_mi ();
5247
+ }
5248
+
5249
+ if ((mi != nullptr ) &&
5250
+ mi->rli ->m_assign_gtids_to_anonymous_transactions_info .get_type () >
5251
+ Assign_gtids_to_anonymous_transactions_info::enum_type::AGAT_OFF) {
5252
+ my_error (ER_CANT_SET_ANONYMOUS_TO_GTID_AND_WAIT_UNTIL_SQL_THD_AFTER_GTIDS,
5253
+ MYF (0 ));
5254
+ channel_map.unlock ();
5255
+ return error_int ();
5256
+ }
5257
+ if (global_gtid_mode.get () == Gtid_mode::OFF) {
5258
+ channel_map.unlock ();
5259
+ return error_int ();
5260
+ }
5261
+ gtid_state->begin_gtid_wait ();
5262
+
5263
+ if (mi != nullptr ) mi->inc_reference ();
5264
+
5265
+ channel_map.unlock ();
5266
+
5267
+ bool null_result = false ;
5268
+
5269
+ if (mi != nullptr && mi->rli != nullptr ) {
5270
+ event_count = mi->rli ->wait_for_gtid_set (thd, gtid, timeout);
5271
+ if (event_count == -2 ) {
5272
+ null_result = true ;
5273
+ }
5274
+ } else {
5275
+ /*
5276
+ Replication has not been set up, we should return NULL;
5277
+ */
5278
+ null_result = true ;
5279
+ }
5280
+ if (mi != nullptr ) mi->dec_reference ();
5281
+
5282
+ gtid_state->end_gtid_wait ();
5283
+
5284
+ return null_result ? error_int () : event_count;
5285
+ }
5286
+
5287
+ /* *
5288
+ Return 1 if both arguments are Gtid_sets and the first is a subset
5289
+ of the second. Generate an error if any of the arguments is not a
5290
+ Gtid_set.
5291
+ */
5292
+ longlong Item_func_gtid_subset::val_int () {
5293
+ DBUG_TRACE;
5294
+
5295
+ assert (fixed);
5296
+
5297
+ null_value = false ;
5298
+
5299
+ // Evaluate strings without lock
5300
+ String *string1 = args[0 ]->val_str (&buf1);
5301
+ if (string1 == nullptr ) {
5302
+ return error_int ();
5303
+ }
5304
+ String *string2 = args[1 ]->val_str (&buf2);
5305
+ if (string2 == nullptr ) {
5306
+ return error_int ();
5307
+ }
5308
+
5309
+ const char *charp1 = string1->c_ptr_safe ();
5310
+ assert (charp1 != nullptr );
5311
+ const char *charp2 = string2->c_ptr_safe ();
5312
+ assert (charp2 != nullptr );
5313
+ int ret = 1 ;
5314
+ enum_return_status status;
5315
+
5316
+ Sid_map sid_map (nullptr /* no rwlock*/ );
5317
+ // compute sets while holding locks
5318
+ const Gtid_set sub_set (&sid_map, charp1, &status);
5319
+ if (status == RETURN_STATUS_OK) {
5320
+ const Gtid_set super_set (&sid_map, charp2, &status);
5321
+ if (status == RETURN_STATUS_OK) {
5322
+ ret = sub_set.is_subset (&super_set) ? 1 : 0 ;
5323
+ }
5324
+ }
5325
+ return ret;
5326
+ }
5327
+
5075
5328
/* *
5076
5329
Enables a session to wait on a condition until a timeout or a network
5077
5330
disconnect occurs.
0 commit comments