-
Notifications
You must be signed in to change notification settings - Fork 4k
/
Copy pathsql_insert.cc
3364 lines (2862 loc) · 124 KB
/
sql_insert.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright (c) 2000, 2024, Oracle and/or its affiliates.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
This program is designed to work with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have either included with
the program or referenced in the documentation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License, version 2.0, for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
/* Insert of records */
#include "sql/sql_insert.h"
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <atomic>
#include <iterator>
#include <map>
#include <utility>
#include "field_types.h"
#include "lex_string.h"
#include "my_alloc.h"
#include "my_base.h"
#include "my_bitmap.h"
#include "my_dbug.h"
#include "my_psi_config.h"
#include "my_sys.h"
#include "my_table_map.h"
#include "my_thread_local.h"
#include "mysql/components/services/bits/psi_bits.h"
#include "mysql/mysql_lex_string.h"
#include "mysql/psi/mysql_table.h" // IWYU pragma: keep
#include "mysql/service_mysql_alloc.h"
#include "mysql/strings/m_ctype.h"
#include "mysql/udf_registration_types.h"
#include "mysql_com.h"
#include "mysqld_error.h"
#include "pfs_table_provider.h"
#include "prealloced_array.h"
#include "sql/auth/auth_acls.h"
#include "sql/auth/auth_common.h" // check_grant_all_columns
#include "sql/binlog.h"
#include "sql/create_field.h"
#include "sql/dd/cache/dictionary_client.h"
#include "sql/dd/dd.h" // dd::get_dictionary
#include "sql/dd/dictionary.h" // dd::Dictionary
#include "sql/dd_sql_view.h" // update_referencing_views_metadata
#include "sql/debug_sync.h" // DEBUG_SYNC
#include "sql/derror.h" // ER_THD
#include "sql/discrete_interval.h"
#include "sql/field.h"
#include "sql/handler.h"
#include "sql/item.h"
#include "sql/key.h"
#include "sql/lock.h" // mysql_unlock_tables
#include "sql/locked_tables_list.h"
#include "sql/mdl.h"
#include "sql/mysqld.h" // stage_update
#include "sql/nested_join.h"
#include "sql/opt_explain.h" // Modification_plan
#include "sql/opt_explain_format.h"
#include "sql/partition_info.h" // partition_info
#include "sql/protocol.h"
#include "sql/query_options.h"
#include "sql/rpl_rli.h" // Relay_log_info
#include "sql/select_lex_visitor.h"
#include "sql/sql_alter.h"
#include "sql/sql_array.h"
#include "sql/sql_base.h" // setup_fields
#include "sql/sql_class.h"
#include "sql/sql_const.h"
#include "sql/sql_error.h"
#include "sql/sql_gipk.h"
#include "sql/sql_lex.h"
#include "sql/sql_list.h"
#include "sql/sql_resolver.h" // validate_gc_assignment
#include "sql/sql_select.h" // check_privileges_for_list
#include "sql/sql_show.h" // store_create_info
#include "sql/sql_table.h" // quick_rm_table
#include "sql/sql_view.h" // check_key_in_view
#include "sql/stateless_allocator.h"
#include "sql/system_variables.h"
#include "sql/table_trigger_dispatcher.h" // Table_trigger_dispatcher
#include "sql/thd_raii.h"
#include "sql/transaction.h" // trans_commit_stmt
#include "sql/transaction_info.h"
#include "sql/trigger_def.h"
#include "sql/visible_fields.h"
#include "sql_string.h"
#include "string_with_len.h"
#include "template_utils.h"
#include "thr_lock.h"
namespace dd {
class Table;
} // namespace dd
static bool check_view_insertability(THD *thd, Table_ref *view,
const Table_ref *insert_table_ref);
static void prepare_for_positional_update(TABLE *table, Table_ref *tables);
/**
Check that insert fields are from a single table of a multi-table view.
@param fields The insert fields to be checked.
@param view The view for insert.
@param [out] insert_table_ref Reference to table to insert into
This function is called to check that the fields being inserted into
are from a single base table. This must be checked when the table to
be inserted into is a multi-table view.
@return false if success, true if an error was raised.
*/
static bool check_single_table_insert(const mem_root_deque<Item *> &fields,
Table_ref *view,
Table_ref **insert_table_ref) {
// It is join view => we need to find the table for insert
*insert_table_ref = nullptr; // reset for call to check_single_table()
table_map tables = 0;
for (Item *item : fields) tables |= item->used_tables();
if (view->check_single_table(insert_table_ref, tables)) {
my_error(ER_VIEW_MULTIUPDATE, MYF(0), view->db, view->table_name);
return true;
}
assert(*insert_table_ref && (*insert_table_ref)->is_insertable());
return false;
}
/**
Check insert fields.
@param thd The current thread.
@param table_list The table for insert.
@param fields The insert fields.
@return false if success, true if error
Resolved reference to base table is returned in lex->insert_table_leaf.
*/
static bool check_insert_fields(THD *thd, Table_ref *table_list,
mem_root_deque<Item *> *fields) {
LEX *const lex = thd->lex;
#ifndef NDEBUG
Table_ref *const saved_insert_table_leaf = lex->insert_table_leaf;
#endif
TABLE *table = table_list->table;
assert(table_list->is_insertable());
if (fields->empty()) {
/*
No field list supplied, but a value list has been supplied.
Use field list of table being updated.
*/
assert(table); // This branch is not reached with a view:
lex->insert_table_leaf = table_list;
Field_iterator_table_ref it;
it.set(table_list);
if (check_grant_all_columns(thd, INSERT_ACL, &it)) return true;
for (it.set(table_list); !it.end_of_fields(); it.next()) {
if (it.field()->is_hidden()) continue;
Item *item = it.create_item(thd);
if (item == nullptr) return true;
fields->push_back(item);
}
} else {
// INSERT with explicit field list.
Query_block *query_block = thd->lex->query_block;
Name_resolution_context *context = &query_block->context;
Name_resolution_context_state ctx_state;
/* Save the state of the current name resolution context. */
ctx_state.save_state(context, table_list);
/*
Perform name resolution only in the first table - 'table_list',
which is the table that is inserted into.
*/
table_list->next_local = nullptr;
context->resolve_in_table_list_only(table_list);
const bool res =
setup_fields(thd, /*want_privilege=*/INSERT_ACL,
/*allow_sum_func=*/false, /*split_sum_funcs=*/false,
/*column_update=*/true, /*typed_items=*/nullptr, fields,
Ref_item_array());
/* Restore the current context. */
ctx_state.restore_state(context, table_list);
if (res) return true;
if (table_list->is_merged()) {
if (check_single_table_insert(*fields, table_list,
&lex->insert_table_leaf))
return true;
table = lex->insert_table_leaf->table;
} else {
lex->insert_table_leaf = table_list;
}
// We currently don't check for unique columns when inserting via a view.
const bool check_unique = !table_list->is_view();
if (check_unique && bitmap_bits_set(table->write_set) < fields->size()) {
for (auto i = fields->cbegin(); i != fields->cend(); ++i) {
// Skipping views means that we only have FIELD_ITEM.
const Item *item1 = *i;
for (auto j = std::next(i); j != fields->cend(); ++j) {
const Item *item2 = *j;
if (item1->eq(item2)) {
my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), item1->item_name.ptr());
return true;
}
}
}
// A duplicate column name should have been found by now.
assert(false);
}
}
/* Mark all generated columns for write*/
if (table->vfield) table->mark_generated_columns(false);
if (check_key_in_view(thd, table_list, lex->insert_table_leaf) ||
(table_list->is_view() &&
check_view_insertability(thd, table_list, lex->insert_table_leaf))) {
my_error(ER_NON_INSERTABLE_TABLE, MYF(0), table_list->alias, "INSERT");
return true;
}
assert(saved_insert_table_leaf == nullptr ||
lex->insert_table_leaf == saved_insert_table_leaf);
return false;
}
/**
Check that table references are restricted to the supplied table map.
The check can be skipped if the supplied table is a base table.
@param view Table being specified
@param values Values whose used tables are to be matched against table map
@param map Table map to match against
@return false if success, true if error
*/
static bool check_valid_table_refs(const Table_ref *view,
const mem_root_deque<Item *> &values,
table_map map) {
if (!view->is_view()) // Ignore check if called with base table.
return false;
map |= PSEUDO_TABLE_BITS;
for (Item *item : values) {
if (item->used_tables() & ~map) {
my_error(ER_VIEW_MULTIUPDATE, MYF(0), view->db, view->table_name);
return true;
}
}
return false;
}
/**
Validates default value of fields which are not specified in
the column list of INSERT statement.
@note table->record[0] should be be populated with default values
before calling this function.
@param thd thread context
@param table table to which values are inserted.
@returns false if success, true if error
*/
bool validate_default_values_of_unset_fields(THD *thd, TABLE *table) {
MY_BITMAP *write_set = table->write_set;
DBUG_TRACE;
for (Field **field = table->field; *field; field++) {
if (!bitmap_is_set(write_set, (*field)->field_index()) &&
!(*field)->is_flag_set(NO_DEFAULT_VALUE_FLAG)) {
if ((*field)->validate_stored_val(thd) && thd->is_error()) return true;
}
}
return false;
}
/**
Prepare triggers for INSERT-like statement.
@param thd Thread handler
@param table Table to which insert will happen
@note
Prepare triggers for INSERT-like statement by marking fields
used by triggers and inform handlers that batching of UPDATE/DELETE
cannot be done if there are BEFORE UPDATE/DELETE triggers.
*/
void prepare_triggers_for_insert_stmt(THD *thd, TABLE *table) {
if (table->triggers) {
if (table->triggers->has_triggers(TRG_EVENT_DELETE, TRG_ACTION_AFTER)) {
/*
The table has AFTER DELETE triggers that might access to
subject table and therefore might need delete to be done
immediately. So we turn-off the batching.
*/
(void)table->file->ha_extra(HA_EXTRA_DELETE_CANNOT_BATCH);
}
if (table->triggers->has_triggers(TRG_EVENT_UPDATE, TRG_ACTION_AFTER)) {
/*
The table has AFTER UPDATE triggers that might access to subject
table and therefore might need update to be done immediately.
So we turn-off the batching.
*/
(void)table->file->ha_extra(HA_EXTRA_UPDATE_CANNOT_BATCH);
}
}
table->mark_columns_needed_for_insert(thd);
}
/**
Setup data for field BLOB/GEOMETRY field types for execution of
"INSERT...UPDATE" statement. For a expression in 'UPDATE' clause
like "a= VALUES(a)", let as call Field* referring 'a' as LHS_FIELD
and Field* referring field 'a' in "VALUES(a)" as RHS_FIELD
This function creates a separate copy of the blob value for RHS_FIELD,
if the field is updated as well as accessed through VALUES()
function in 'UPDATE' clause of "INSERT...UPDATE" statement.
@param [in] thd
Pointer to THD object.
@param [in] fields
List of fields representing LHS_FIELD of all expressions
in 'UPDATE' clause.
@param [in] mem_root
MEM_ROOT for blob copy.
@return - Can fail only when we are out of memory.
@retval false Success
@retval true Failure
*/
static bool mysql_prepare_blob_values(THD *thd,
const mem_root_deque<Item *> &fields,
MEM_ROOT *mem_root) {
DBUG_TRACE;
if (fields.size() <= 1) return false;
// Collect LHS_FIELD's which are updated in a 'set'.
// This 'set' helps decide if we need to make copy of BLOB value
// or not.
Prealloced_array<Field_blob *, 16> blob_update_field_set(
PSI_NOT_INSTRUMENTED);
if (blob_update_field_set.reserve(fields.size())) return true;
for (Item *fld : fields) {
Item_field *field = fld->field_for_view_update();
Field *lhs_field = field->field;
if (lhs_field->type() == MYSQL_TYPE_BLOB ||
lhs_field->type() == MYSQL_TYPE_VECTOR ||
lhs_field->type() == MYSQL_TYPE_GEOMETRY)
blob_update_field_set.insert_unique(down_cast<Field_blob *>(lhs_field));
}
// Traverse through thd->lex->insert_update_values_map
// and make copy of BLOB values in RHS_FIELD, if the same field is
// modified (present in above 'set' prepared).
if (thd->lex->has_values_map()) {
std::map<Item_field *, Field *>::iterator iter;
for (iter = thd->lex->begin_values_map();
iter != thd->lex->end_values_map(); ++iter) {
// Retrieve the Field_blob pointers from the map.
// and initialize newly declared variables immediately.
Field_blob *lhs_field = down_cast<Field_blob *>(iter->first->field);
Field_blob *rhs_field = down_cast<Field_blob *>(iter->second);
// Check if the Field_blob object is updated before making a copy.
if (blob_update_field_set.count_unique(lhs_field) == 0) continue;
// Copy blob value
if (rhs_field->copy_blob_value(mem_root)) return true;
}
}
return false;
}
bool Sql_cmd_insert_base::precheck(THD *thd) {
DBUG_TRACE;
/*
Check that we have modify privileges for the first table and
select privileges for the rest
*/
ulong privilege = INSERT_ACL | (duplicates == DUP_REPLACE ? DELETE_ACL : 0) |
(update_value_list.empty() ? 0 : UPDATE_ACL);
if (check_one_table_access(thd, privilege, lex->query_tables)) return true;
return false;
}
bool Sql_cmd_insert_base::check_privileges(THD *thd) {
DBUG_TRACE;
if (check_all_table_privileges(thd)) return (true);
if (check_privileges_for_list(thd, insert_field_list, INSERT_ACL))
return true;
if (values_need_privilege_check) {
for (List_item *values : insert_many_values) {
if (check_privileges_for_list(thd, *values, SELECT_ACL)) return true;
}
}
if (duplicates == DUP_UPDATE) {
if (check_privileges_for_list(thd, update_field_list, UPDATE_ACL))
return true;
if (check_privileges_for_list(thd, update_value_list, SELECT_ACL))
return true;
}
for (Query_block *sl = lex->unit->first_query_block(); sl;
sl = sl->next_query_block()) {
if (sl->check_column_privileges(thd)) return true;
}
return false;
}
/**
Insert one or more rows from a VALUES list into a table
@param thd thread handler
@returns false if success, true if error
*/
bool Sql_cmd_insert_values::execute_inner(THD *thd) {
DBUG_TRACE;
assert(thd->lex->sql_command == SQLCOM_REPLACE ||
thd->lex->sql_command == SQLCOM_INSERT);
/*
We have three alternative syntax rules for the INSERT statement:
1) "INSERT (columns) VALUES ...", so non-listed columns need a default
2) "INSERT VALUES (), ..." so all columns need a default;
note that "VALUES (),(expr_1, ..., expr_n)" is not allowed, so checking
emptiness of the first row is enough
3) "INSERT VALUES (expr_1, ...), ..." so no defaults are needed; even if
expr_i is "DEFAULT" (in which case the column is set by
Item_default_value::save_in_field_inner()).
*/
const bool manage_defaults = column_count > 0 || // 1)
value_count == 0; // 2)
COPY_INFO info(COPY_INFO::INSERT_OPERATION, &insert_field_list,
manage_defaults, duplicates);
COPY_INFO update(COPY_INFO::UPDATE_OPERATION, &update_field_list,
&update_value_list);
Query_block *const query_block = lex->query_block;
Table_ref *const table_list = lex->insert_table;
TABLE *const insert_table = lex->insert_table_leaf->table;
if (duplicates == DUP_UPDATE || duplicates == DUP_REPLACE)
prepare_for_positional_update(insert_table, table_list);
/* Must be done before can_prune_insert, due to internal initialization. */
if (info.add_function_default_columns(insert_table, insert_table->write_set))
return true; /* purecov: inspected */
if (duplicates == DUP_UPDATE && update.add_function_default_columns(
insert_table, insert_table->write_set))
return true; /* purecov: inspected */
// Current error state inside and after the insert loop
bool has_error = false;
{ // Statement plan is available within these braces
const Modification_plan plan(
thd, (lex->sql_command == SQLCOM_INSERT) ? MT_INSERT : MT_REPLACE,
insert_table, nullptr, false, 0);
DEBUG_SYNC(thd, "planned_single_insert");
if (lex->is_explain()) {
const bool err =
explain_single_table_modification(thd, thd, &plan, query_block);
return err;
}
insert_table->next_number_field = insert_table->found_next_number_field;
THD_STAGE_INFO(thd, stage_update);
if (duplicates == DUP_REPLACE &&
(!insert_table->triggers ||
!insert_table->triggers->has_delete_triggers()))
insert_table->file->ha_extra(HA_EXTRA_WRITE_CAN_REPLACE);
if (duplicates == DUP_UPDATE)
insert_table->file->ha_extra(HA_EXTRA_INSERT_WITH_UPDATE);
/*
let's *try* to start bulk inserts. It won't necessary
start them as insert_many_values.elements should be greater than
some - handler dependent - threshold.
We should not start bulk inserts if this statement uses
functions or invokes triggers since they may access
to the same table and therefore should not see its
inconsistent state created by this optimization.
So we call start_bulk_insert to perform nesessary checks on
insert_many_values.elements, and - if nothing else - to initialize
the code to make the call of end_bulk_insert() below safe.
*/
if (duplicates != DUP_ERROR || lex->is_ignore())
insert_table->file->ha_extra(HA_EXTRA_IGNORE_DUP_KEY);
/*
This is a simple check for the case when the table has a trigger
that reads from it, or when the statement invokes a stored function
that reads from the table being inserted to.
Engines can't handle a bulk insert in parallel with a read form the
same table in the same connection.
*/
if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
insert_table->file->ha_start_bulk_insert(insert_many_values.size());
prepare_triggers_for_insert_stmt(thd, insert_table);
/*
Count warnings for all inserts. For single row insert, generate an error
if trying to set a NOT NULL field to NULL.
Notice that policy must be reset before leaving this function.
*/
thd->check_for_truncated_fields =
((insert_many_values.size() == 1 && !lex->is_ignore())
? CHECK_FIELD_ERROR_FOR_NULL
: CHECK_FIELD_WARN);
thd->num_truncated_fields = 0L;
for (Field **next_field = insert_table->field; *next_field; ++next_field) {
(*next_field)->reset_warnings();
}
for (const List_item *values : insert_many_values) {
Autoinc_field_has_explicit_non_null_value_reset_guard after_each_row(
insert_table);
restore_record(insert_table, s->default_values); // Get empty record
/*
Check whether default values of the insert_field_list not specified in
column list are correct or not.
*/
if (validate_default_values_of_unset_fields(thd, insert_table)) {
has_error = true;
break;
}
if (fill_record_n_invoke_before_triggers(
thd, &info, insert_field_list, *values, insert_table,
TRG_EVENT_INSERT, insert_table->s->fields, true, nullptr)) {
assert(thd->is_error());
/*
TODO: Convert warnings to errors if values_list.elements == 1
and check that all items return warning in case of problem with
storing field.
*/
has_error = true;
break;
}
if (check_that_all_fields_are_given_values(thd, insert_table,
table_list)) {
assert(thd->is_error());
has_error = true;
break;
}
const int check_result = table_list->view_check_option(thd);
if (check_result == VIEW_CHECK_SKIP)
continue;
else if (check_result == VIEW_CHECK_ERROR) {
has_error = true;
break;
}
if (invoke_table_check_constraints(thd, insert_table)) {
if (thd->is_error()) {
has_error = true;
break;
}
// continue when IGNORE clause is used.
continue;
}
if (write_record(thd, insert_table, &info, &update)) {
has_error = true;
break;
}
thd->get_stmt_da()->inc_current_row_for_condition();
}
} // Statement plan is available within these braces
assert(has_error == thd->get_stmt_da()->is_error());
/*
Now all rows are inserted. Time to update logs and sends response to
user
*/
{
/* TODO: Only call this if insert_table->found_next_number_field.*/
insert_table->file->ha_release_auto_increment();
/*
Make sure 'end_bulk_insert()' is called regardless of current error
*/
int loc_error = 0;
if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
loc_error = insert_table->file->ha_end_bulk_insert();
/*
Report error if 'end_bulk_insert()' failed, and set 'has_error'
*/
if (loc_error && !has_error) {
/* purecov: begin inspected */
myf error_flags = MYF(0);
if (insert_table->file->is_fatal_error(loc_error))
error_flags |= ME_FATALERROR;
insert_table->file->print_error(loc_error, error_flags);
has_error = true;
/* purecov: end */
}
const bool transactional_table = insert_table->file->has_transactions();
const bool changed [[maybe_unused]] =
info.stats.copied || info.stats.deleted || info.stats.updated;
if (!has_error ||
thd->get_transaction()->cannot_safely_rollback(Transaction_ctx::STMT)) {
if (mysql_bin_log.is_open()) {
int errcode = 0;
if (!has_error) {
/*
[Guilhem wrote] Temporary errors may have filled
thd->net.last_error/errno. For example if there has
been a disk full error when writing the row, and it was
MyISAM, then thd->net.last_error/errno will be set to
"disk full"... and the mysql_file_pwrite() will wait until free
space appears, and so when it finishes then the
write_row() was entirely successful
*/
/* todo: consider removing */
thd->clear_error();
} else
errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED);
/* bug#22725:
A query which per-row-loop can not be interrupted with
KILLED, like INSERT, and that does not invoke stored
routines can be binlogged with neglecting the KILLED error.
If there was no error (has_error == false) until after the end of
inserting loop the KILLED flag that appeared later can be
disregarded since previously possible invocation of stored
routines did not result in any error due to the KILLED. In
such case the flag is ignored for constructing binlog event.
*/
if (thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query().str,
thd->query().length, transactional_table, false,
false, errcode))
has_error = true;
}
}
assert(
transactional_table || !changed ||
thd->get_transaction()->cannot_safely_rollback(Transaction_ctx::STMT));
}
/*
We'll report to the client this id:
- if the table contains an autoincrement column and we successfully
inserted an autogenerated value, the autogenerated value.
- if the table contains no autoincrement column and LAST_INSERT_ID(X) was
called, X.
- if the table contains an autoincrement column, and some rows were
inserted, the id of the last "inserted" row (if IGNORE, that value may not
have been really inserted but ignored).
*/
const ulonglong id =
(thd->first_successful_insert_id_in_cur_stmt > 0)
? thd->first_successful_insert_id_in_cur_stmt
: (thd->arg_of_last_insert_id_function
? thd->first_successful_insert_id_in_prev_stmt
: ((insert_table->next_number_field && info.stats.copied)
? insert_table->next_number_field->val_int()
: 0));
insert_table->next_number_field = nullptr;
// Remember to restore warning handling before leaving
thd->check_for_truncated_fields = CHECK_FIELD_IGNORE;
assert(has_error == thd->get_stmt_da()->is_error());
if (has_error) return true;
if (insert_many_values.size() == 1 &&
(!(thd->variables.option_bits & OPTION_WARNINGS) ||
!thd->num_truncated_fields)) {
my_ok(thd,
info.stats.copied + info.stats.deleted +
(thd->get_protocol()->has_client_capability(CLIENT_FOUND_ROWS)
? info.stats.touched
: info.stats.updated),
id);
} else {
char buff[160];
const ha_rows updated =
thd->get_protocol()->has_client_capability(CLIENT_FOUND_ROWS)
? info.stats.touched
: info.stats.updated;
if (lex->is_ignore())
snprintf(buff, sizeof(buff), ER_THD(thd, ER_INSERT_INFO),
(long)info.stats.records,
(long)(info.stats.records - info.stats.copied),
(long)thd->get_stmt_da()->current_statement_cond_count());
else
snprintf(buff, sizeof(buff), ER_THD(thd, ER_INSERT_INFO),
(long)info.stats.records, (long)(info.stats.deleted + updated),
(long)thd->get_stmt_da()->current_statement_cond_count());
my_ok(thd, info.stats.copied + info.stats.deleted + updated, id, buff);
}
/*
If we have inserted into a VIEW, and the base table has
AUTO_INCREMENT column, but this column is not accessible through
a view, then we should restore LAST_INSERT_ID to the value it
had before the statement.
*/
if (table_list->is_view() && !table_list->contain_auto_increment)
thd->first_successful_insert_id_in_cur_stmt =
thd->first_successful_insert_id_in_prev_stmt;
DBUG_EXECUTE_IF("after_mysql_insert", {
const char act[] =
"now "
"wait_for signal.continue";
assert(opt_debug_sync_timeout > 0);
assert(!debug_sync_set_action(thd, STRING_WITH_LEN(act)));
};);
return false;
}
/**
Additional check for insertability for VIEW
A view is insertable if the following conditions are true:
- All columns being inserted into are from a single table.
- All not used columns in table have default values.
- All columns in view are distinct (not referring to the same column).
- All columns in view are insertable-into.
@param thd thread handler
@param[in,out] view reference to view being inserted into.
view->contain_auto_increment is true if and only if
the view contains an auto_increment field.
@param insert_table_ref reference to underlying table being inserted into
@retval false if success
@retval true if table is not insertable-into (no error is reported)
*/
static bool check_view_insertability(THD *thd, Table_ref *view,
const Table_ref *insert_table_ref) {
DBUG_TRACE;
const uint num = view->view_query()->query_block->num_visible_fields();
TABLE *const table = insert_table_ref->table;
MY_BITMAP used_fields;
const enum_mark_columns save_mark_used_columns = thd->mark_used_columns;
const uint used_fields_buff_size = bitmap_buffer_size(table->s->fields);
uint32 *const used_fields_buff = (uint32 *)thd->alloc(used_fields_buff_size);
if (!used_fields_buff) return true; /* purecov: inspected */
assert(view->table == nullptr && table != nullptr &&
view->field_translation != nullptr);
(void)bitmap_init(&used_fields, used_fields_buff, table->s->fields);
bitmap_clear_all(&used_fields);
view->contain_auto_increment = false;
thd->mark_used_columns = MARK_COLUMNS_NONE;
// No privilege checking is done for these columns
const Column_privilege_tracker column_privilege(thd, 0);
/* check simplicity and prepare unique test of view */
Field_translator *const trans_start = view->field_translation;
Field_translator *const trans_end = trans_start + num;
for (Field_translator *trans = trans_start; trans != trans_end; trans++) {
if (trans->item == nullptr) continue;
/*
@todo
This fix_fields() call is necessary for execution of prepared statements.
When repeated preparation is eliminated the call can be deleted.
*/
if (!trans->item->fixed && trans->item->fix_fields(thd, &trans->item))
return true; /* purecov: inspected */
// Extract the underlying base table column, if there is one
Item_field *const field = trans->item->field_for_view_update();
// No underlying base table column, view is not insertable-into
if (field == nullptr) return true;
if (field->field->auto_flags & Field::NEXT_NUMBER)
view->contain_auto_increment = true;
/* prepare unique test */
/*
remove collation (or other transparent for update function) if we have
it
*/
trans->item = field;
}
thd->mark_used_columns = save_mark_used_columns;
/* unique test */
for (Field_translator *trans = trans_start; trans != trans_end; trans++) {
if (trans->item == nullptr) continue;
/* Thanks to test above, we know that all columns are of type Item_field */
Item_field *field = down_cast<Item_field *>(trans->item);
/* check fields belong to table in which we are inserting */
if (field->field->table == table &&
bitmap_test_and_set(&used_fields, field->field->field_index()))
return true;
}
return false;
}
/**
Recursive helper function for resolving join conditions for
insertion into view for prepared statements.
@param thd Thread handler
@param tr Table structure which is traversed recursively
@return false if success, true if error
*/
static bool fix_join_cond_for_insert(THD *thd, Table_ref *tr) {
if (tr->join_cond() && !tr->join_cond()->fixed) {
const Column_privilege_tracker column_privilege(thd, SELECT_ACL);
if (tr->join_cond()->fix_fields(thd, nullptr))
return true; /* purecov: inspected */
}
if (tr->nested_join == nullptr) return false;
for (Table_ref *ti : tr->nested_join->m_tables) {
if (fix_join_cond_for_insert(thd, ti)) return true; /* purecov: inspected */
}
return false;
}
/**
Get extra info for tables we insert into
@param table table(TABLE object) we insert into,
might be NULL in case of view
@param tables (Table_ref object) or view we insert into
*/
static void prepare_for_positional_update(TABLE *table, Table_ref *tables) {
if (table) {
table->prepare_for_position();
return;
}
assert(tables->is_view());
for (Table_ref *tbl : *tables->view_tables) {
prepare_for_positional_update(tbl->table, tbl);
}
}
static bool allocate_column_bitmap(THD *thd, TABLE *table, MY_BITMAP **bitmap) {
DBUG_TRACE;
const uint number_bits = table->s->fields;
MY_BITMAP *the_struct;
my_bitmap_map *the_bits;
if (multi_alloc_root(thd->mem_root, &the_struct, sizeof(MY_BITMAP), &the_bits,
bitmap_buffer_size(number_bits), nullptr) == nullptr)
return true;
if (bitmap_init(the_struct, the_bits, number_bits) != 0) return true;
*bitmap = the_struct;
return false;
}
bool Sql_cmd_insert_base::get_default_columns(
THD *thd, TABLE *table, MY_BITMAP **m_function_default_columns) {
if (allocate_column_bitmap(thd, table, m_function_default_columns)) {
return true;
}
/*
Find columns with function default on insert or update, mark them in
bitmap.
*/
for (uint i = 0; i < table->s->fields; ++i) {
Field *f = table->field[i];
// if it's a default expression
if (f->has_insert_default_general_value_expression()) {
bitmap_set_bit(*m_function_default_columns, f->field_index());
}
}
// Remove from map the fields that are explicitly specified
bitmap_subtract(*m_function_default_columns, table->write_set);
// If no bit left exit
if (bitmap_is_clear_all(*m_function_default_columns)) return false;
// For each default function that is used restore the flags
for (uint i = 0; i < table->s->fields; ++i) {
Field *f = table->field[i];
if (bitmap_is_set(*m_function_default_columns, i)) {
assert(f->m_default_val_expr != nullptr);
// restore binlog safety flags
thd->lex->set_stmt_unsafe_flags(
f->m_default_val_expr->get_stmt_unsafe_flags());
// Mark the columns the expression reads in the table's read_set
for (uint j = 0; j < table->s->fields; j++) {
if (bitmap_is_set(&f->m_default_val_expr->base_columns_map, j)) {
bitmap_set_bit(table->read_set, j);
}
}
}
}
return false;
}
/**
Prepare items in INSERT statement
@param thd Thread handler
WARNING
You MUST set table->insert_values to 0 after calling this function
before releasing the table object.
@return false if success, true if error
*/
bool Sql_cmd_insert_base::prepare_inner(THD *thd) {
DBUG_TRACE;
// Save original number of columns in insert list
column_count = insert_field_list.size();