|
| 1 | +/* Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. |
| 2 | +
|
| 3 | + This program is free software; you can redistribute it and/or modify |
| 4 | + it under the terms of the GNU General Public License, version 2.0, |
| 5 | + as published by the Free Software Foundation. |
| 6 | +
|
| 7 | + This program is also distributed with certain software (including |
| 8 | + but not limited to OpenSSL) that is licensed under separate terms, |
| 9 | + as designated in a particular file or component or in included license |
| 10 | + documentation. The authors of MySQL hereby grant you an additional |
| 11 | + permission to link the program and your derivative works with the |
| 12 | + separately licensed software that they have included with MySQL. |
| 13 | +
|
| 14 | + This program is distributed in the hope that it will be useful, |
| 15 | + but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 16 | + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 17 | + GNU General Public License, version 2.0, for more details. |
| 18 | +
|
| 19 | + You should have received a copy of the GNU General Public License |
| 20 | + along with this program; if not, write to the Free Software |
| 21 | + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ |
| 22 | + |
| 23 | +#include "sql/composite_iterators.h" |
| 24 | + |
| 25 | +#include <atomic> |
| 26 | +#include <string> |
| 27 | +#include <vector> |
| 28 | + |
| 29 | +#include "sql/item.h" |
| 30 | +#include "sql/sql_class.h" |
| 31 | +#include "sql/sql_executor.h" |
| 32 | +#include "sql/sql_lex.h" |
| 33 | +#include "sql/sql_opt_exec_shared.h" |
| 34 | +#include "sql/sql_optimizer.h" |
| 35 | + |
| 36 | +class Item_sum; |
| 37 | + |
| 38 | +using std::string; |
| 39 | +using std::vector; |
| 40 | + |
| 41 | +namespace { |
| 42 | + |
| 43 | +void SwitchSlice(JOIN *join, int slice_num) { |
| 44 | + if (!join->ref_items[slice_num].is_null()) { |
| 45 | + join->set_ref_item_slice(slice_num); |
| 46 | + } |
| 47 | +} |
| 48 | + |
| 49 | +} // namespace |
| 50 | + |
| 51 | +int FilterIterator::Read() { |
| 52 | + for (;;) { |
| 53 | + int err = m_source->Read(); |
| 54 | + if (err != 0) return err; |
| 55 | + |
| 56 | + bool matched = m_condition->val_int(); |
| 57 | + |
| 58 | + if (thd()->killed) { |
| 59 | + thd()->send_kill_message(); |
| 60 | + return 1; |
| 61 | + } |
| 62 | + |
| 63 | + /* check for errors evaluating the condition */ |
| 64 | + if (thd()->is_error()) return 1; |
| 65 | + |
| 66 | + if (!matched) { |
| 67 | + m_source->UnlockRow(); |
| 68 | + continue; |
| 69 | + } |
| 70 | + |
| 71 | + // Successful row. |
| 72 | + return 0; |
| 73 | + } |
| 74 | +} |
| 75 | + |
| 76 | +bool LimitOffsetIterator::Init() { |
| 77 | + if (m_source->Init()) { |
| 78 | + return true; |
| 79 | + } |
| 80 | + for (ha_rows row_idx = 0; row_idx < m_offset; ++row_idx) { |
| 81 | + int err = m_source->Read(); |
| 82 | + if (err == 1) { |
| 83 | + return true; // Note that this will propagate Read() errors to Init(). |
| 84 | + } else if (err == -1) { |
| 85 | + m_seen_rows = m_offset; // So that Read() will return -1. |
| 86 | + return false; // EOF is not an error. |
| 87 | + } |
| 88 | + if (m_skipped_rows != nullptr) { |
| 89 | + ++*m_skipped_rows; |
| 90 | + } |
| 91 | + m_source->UnlockRow(); |
| 92 | + } |
| 93 | + m_seen_rows = m_offset; |
| 94 | + return false; |
| 95 | +} |
| 96 | + |
| 97 | +int LimitOffsetIterator::Read() { |
| 98 | + if (m_seen_rows++ >= m_limit) { |
| 99 | + return -1; |
| 100 | + } else { |
| 101 | + return m_source->Read(); |
| 102 | + } |
| 103 | +} |
| 104 | + |
| 105 | +bool AggregateIterator::Init() { |
| 106 | + DBUG_ASSERT(!m_join->tmp_table_param.precomputed_group_by); |
| 107 | + if (m_source->Init()) { |
| 108 | + return true; |
| 109 | + } |
| 110 | + |
| 111 | + // Store which slice we will be reading from. |
| 112 | + m_input_slice = m_join->get_ref_item_slice(); |
| 113 | + |
| 114 | + m_first_row = true; |
| 115 | + m_eof = false; |
| 116 | + m_save_nullinfo = 0; |
| 117 | + return false; |
| 118 | +} |
| 119 | + |
| 120 | +int AggregateIterator::Read() { |
| 121 | + if (m_eof) { |
| 122 | + // We've seen the last row earlier. |
| 123 | + if (m_save_nullinfo != 0) { |
| 124 | + m_join->restore_fields(m_save_nullinfo); |
| 125 | + m_save_nullinfo = 0; |
| 126 | + } |
| 127 | + return -1; |
| 128 | + } |
| 129 | + |
| 130 | + // Switch to the input slice before we call Read(), so that any processing |
| 131 | + // that happens in sub-iterators is on the right slice. |
| 132 | + SwitchSlice(m_join, m_input_slice); |
| 133 | + |
| 134 | + if (m_first_row) { |
| 135 | + // Start the first group, if possible. (If we're not at the first row, |
| 136 | + // we already saw the first row in the new group at the previous Read().) |
| 137 | + m_first_row = false; |
| 138 | + int err = m_source->Read(); |
| 139 | + if (err == -1) { |
| 140 | + m_eof = true; |
| 141 | + if (m_join->grouped || m_join->group_optimized_away) { |
| 142 | + return -1; |
| 143 | + } else { |
| 144 | + // If there's no GROUP BY, we need to output a row even if there are no |
| 145 | + // input rows. |
| 146 | + |
| 147 | + // Calculate aggregate functions for no rows |
| 148 | + for (Item &item : *m_join->get_current_fields()) { |
| 149 | + item.no_rows_in_result(); |
| 150 | + } |
| 151 | + |
| 152 | + /* |
| 153 | + Mark tables as containing only NULL values for ha_write_row(). |
| 154 | + Calculate a set of tables for which NULL values need to |
| 155 | + be restored after sending data. |
| 156 | + */ |
| 157 | + if (m_join->clear_fields(&m_save_nullinfo)) { |
| 158 | + return 1; |
| 159 | + } |
| 160 | + return 0; |
| 161 | + } |
| 162 | + } |
| 163 | + if (err != 0) return err; |
| 164 | + } |
| 165 | + |
| 166 | + // This is the start of a new group. Make a copy of the group expressions, |
| 167 | + // because they risk being overwritten on the next call to m_source->Read(). |
| 168 | + // We cannot reuse the Item_cached_* fields in m_join->group_fields for this |
| 169 | + // (even though also need to be initialized as part of the start of the |
| 170 | + // group), because they are overwritten by the testing at each row, just like |
| 171 | + // the data from Read() will be. |
| 172 | + { |
| 173 | + Switch_ref_item_slice slice_switch(m_join, REF_SLICE_ORDERED_GROUP_BY); |
| 174 | + if (copy_fields(&m_join->tmp_table_param, m_join->thd)) { |
| 175 | + return 1; |
| 176 | + } |
| 177 | + (void)update_item_cache_if_changed(m_join->group_fields); |
| 178 | + // TODO: Implement rollup. |
| 179 | + if (init_sum_functions(m_join->sum_funcs, m_join->sum_funcs_end[0])) { |
| 180 | + return 1; |
| 181 | + } |
| 182 | + } |
| 183 | + |
| 184 | + // Keep reading rows as long as they are part of the existing group. |
| 185 | + for (;;) { |
| 186 | + int err = m_source->Read(); |
| 187 | + if (err == 1) return 1; // Error. |
| 188 | + |
| 189 | + if (err == -1) { |
| 190 | + // End of input rows; return the last group. |
| 191 | + SwitchSlice(m_join, REF_SLICE_ORDERED_GROUP_BY); |
| 192 | + m_eof = true; |
| 193 | + return 0; |
| 194 | + } |
| 195 | + |
| 196 | + int idx = update_item_cache_if_changed(m_join->group_fields); |
| 197 | + if (idx >= 0) { |
| 198 | + // The group changed. Return the current row; the next Read() will deal |
| 199 | + // with the new group. |
| 200 | + SwitchSlice(m_join, REF_SLICE_ORDERED_GROUP_BY); |
| 201 | + return 0; |
| 202 | + } else { |
| 203 | + // We're still in the same group. |
| 204 | + if (update_sum_func(m_join->sum_funcs)) { |
| 205 | + return 1; |
| 206 | + } |
| 207 | + } |
| 208 | + } |
| 209 | +} |
| 210 | + |
| 211 | +void AggregateIterator::UnlockRow() { |
| 212 | + // Most likely, HAVING failed. Ideally, we'd like to backtrack and unlock |
| 213 | + // all rows that went into this aggregate, but we can't do that, and we also |
| 214 | + // can't unlock the _current_ row, since that belongs to a different group. |
| 215 | + // Thus, do nothing. |
| 216 | +} |
| 217 | + |
| 218 | +bool PrecomputedAggregateIterator::Init() { |
| 219 | + DBUG_ASSERT(m_join->tmp_table_param.precomputed_group_by); |
| 220 | + DBUG_ASSERT(m_join->grouped || m_join->group_optimized_away); |
| 221 | + return m_source->Init(); |
| 222 | +} |
| 223 | + |
| 224 | +int PrecomputedAggregateIterator::Read() { |
| 225 | + int err = m_source->Read(); |
| 226 | + if (err != 0) { |
| 227 | + return err; |
| 228 | + } |
| 229 | + |
| 230 | + // Even if the aggregates have been precomputed (typically by |
| 231 | + // QUICK_RANGE_MIN_MAX), we need to copy over the non-aggregated |
| 232 | + // fields here. |
| 233 | + if (copy_fields(&m_join->tmp_table_param, m_join->thd)) { |
| 234 | + return 1; |
| 235 | + } |
| 236 | + SwitchSlice(m_join, REF_SLICE_ORDERED_GROUP_BY); |
| 237 | + return 0; |
| 238 | +} |
| 239 | + |
| 240 | +void PrecomputedAggregateIterator::UnlockRow() { |
| 241 | + // See AggregateIterator::UnlockRow(). |
| 242 | +} |
0 commit comments