-
Notifications
You must be signed in to change notification settings - Fork 4k
/
Copy pathbounded_queue.h
152 lines (128 loc) · 5.7 KB
/
bounded_queue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/* Copyright (c) 2010, 2024, Oracle and/or its affiliates.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
This program is designed to work with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have either included with
the program or referenced in the documentation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License, version 2.0, for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#ifndef BOUNDED_QUEUE_INCLUDED
#define BOUNDED_QUEUE_INCLUDED
#include "my_base.h"
#include "my_sys.h"
#include "mysys_err.h"
#include "priority_queue.h"
#include "sql/malloc_allocator.h"
/**
A priority queue with a fixed, limited size.
This is a wrapper on top of Priority_queue.
It keeps the top-N elements which are inserted.
Elements of type Element_type are pushed into the queue.
For each element, we call a user-supplied Key_generator::make_sortkey(),
to generate a key of type Key_type for the element.
Instances of Key_type are compared with the user-supplied Key_compare.
Pointers to the top-N elements are stored in the sort_keys array given
to the init() function below. To access elements in sorted order,
sort the array and access it sequentially.
*/
template <typename Element_type, typename Key_type, typename Key_generator,
typename Key_compare = std::less<Key_type>>
class Bounded_queue {
public:
typedef Priority_queue<
Key_type, std::vector<Key_type, Malloc_allocator<Key_type>>, Key_compare>
Queue_type;
typedef typename Queue_type::allocator_type allocator_type;
Bounded_queue(
size_t element_size = sizeof(Element_type),
const allocator_type &alloc = allocator_type(PSI_NOT_INSTRUMENTED))
: m_queue(Key_compare(), alloc),
m_sort_keys(nullptr),
m_sort_param(nullptr),
m_element_size(element_size) {}
/**
Initialize the queue.
@param max_elements The size of the queue.
@param sort_param Sort parameters. We call sort_param->make_sortkey()
to generate keys for elements.
@param[in,out] sort_keys Array of keys to sort.
Must be initialized by caller.
Will be filled with pointers to the top-N elements.
@retval false OK, true Could not allocate memory.
We do *not* take ownership of any of the input pointer arguments.
*/
bool init(ha_rows max_elements, Key_generator *sort_param,
Key_type *sort_keys) {
m_sort_keys = sort_keys;
m_sort_param = sort_param;
DBUG_EXECUTE_IF("bounded_queue_init_fail",
my_error(EE_OUTOFMEMORY, MYF(ME_FATALERROR), 42);
return true;);
// We allocate space for one extra element, for replace when queue is full.
if (m_queue.reserve(max_elements + 1)) return true;
// We cannot have packed keys in the queue.
m_queue.m_compare_length = sort_param->max_compare_length();
// We can have variable length keys though.
if (sort_param->using_varlen_keys()) m_queue.m_param = sort_param;
return false;
}
/**
Pushes an element on the queue.
If the queue is already full, we discard one element.
Calls m_sort_param::make_sortkey() to generate a key for the element.
@param opaque Parameter to send on to make_sortkey().
@retval false OK, true error.
*/
template <class Opaque>
[[nodiscard]] bool push(const Opaque &opaque) {
/*
Add one extra byte to each key, so that sort-key generating functions
won't be returning out-of-space. Since we know there's always room
given a "m_element_size"-sized buffer even in the worst case (by
definition), we could in principle make a special mode flag in
Sort_param::make_sortkey() instead for the case of fixed-length records,
but this is much simpler.
*/
assert(m_element_size < 0xFFFFFFFF);
const uint element_size = m_element_size + 1;
if (m_queue.size() == m_queue.capacity()) {
const Key_type &pq_top = m_queue.top();
const uint rec_sz =
m_sort_param->make_sortkey(pq_top, element_size, opaque);
// UINT_MAX means error, but we do not want to add a dependency
// on class THD here, as in current_thd->is_error().
if (rec_sz == UINT_MAX) return true;
assert(rec_sz <= m_element_size);
m_queue.update_top();
return false;
} else {
const uint rec_sz = m_sort_param->make_sortkey(
m_sort_keys[m_queue.size()], element_size, opaque);
// UINT_MAX means error, but we do not want to add a dependency
// on class THD here, as in current_thd->is_error().
if (rec_sz == UINT_MAX) return true;
assert(rec_sz <= m_element_size);
return m_queue.push(m_sort_keys[m_queue.size()]);
}
}
/**
The number of elements in the queue.
*/
size_t num_elements() const { return m_queue.size(); }
private:
Queue_type m_queue;
Key_type *m_sort_keys;
Key_generator *m_sort_param;
size_t m_element_size;
};
#endif // BOUNDED_QUEUE_INCLUDED