-
Notifications
You must be signed in to change notification settings - Fork 82
/
Copy pathAsynchronousInsertQueue.h
188 lines (146 loc) · 6.59 KB
/
AsynchronousInsertQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#pragma once
#include <Parsers/IAST_fwd.h>
#include <Common/CurrentThread.h>
#include <Common/RWLock.h>
#include <Common/ThreadPool.h>
#include <Core/Settings.h>
#include <Poco/Logger.h>
#include <unordered_map>
namespace DB
{
/// A queue, that stores data for insert queries and periodically flushes it to tables.
/// The data is grouped by table, format and settings of insert query.
class AsynchronousInsertQueue : public WithContext
{
public:
using Milliseconds = std::chrono::milliseconds;
/// Using structure to allow and benefit from designated initialization and not mess with a positional arguments in ctor.
struct Timeout
{
Milliseconds busy;
Milliseconds stale;
};
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size, size_t max_data_size, const Timeout & timeouts);
~AsynchronousInsertQueue();
void push(ASTPtr query, ContextPtr query_context);
void waitForProcessingQuery(const String & query_id, const Milliseconds & timeout);
private:
struct InsertQuery
{
ASTPtr query;
Settings settings;
InsertQuery(const ASTPtr & query_, const Settings & settings_);
InsertQuery(const InsertQuery & other);
InsertQuery & operator=(const InsertQuery & other);
bool operator==(const InsertQuery & other) const;
struct Hash { UInt64 operator()(const InsertQuery & insert_query) const; };
};
struct UserMemoryTrackerSwitcher
{
explicit UserMemoryTrackerSwitcher(MemoryTracker * new_tracker)
{
auto * thread_tracker = CurrentThread::getMemoryTracker();
prev_untracked_memory = current_thread->untracked_memory;
prev_memory_tracker_parent = thread_tracker->getParent();
current_thread->untracked_memory = 0;
thread_tracker->setParent(new_tracker);
}
~UserMemoryTrackerSwitcher()
{
CurrentThread::flushUntrackedMemory();
auto * thread_tracker = CurrentThread::getMemoryTracker();
current_thread->untracked_memory = prev_untracked_memory;
thread_tracker->setParent(prev_memory_tracker_parent);
}
MemoryTracker * prev_memory_tracker_parent;
Int64 prev_untracked_memory;
};
struct InsertData
{
struct Entry
{
public:
const String bytes;
const String query_id;
MemoryTracker * const user_memory_tracker;
Entry(String && bytes_, String && query_id_, MemoryTracker * user_memory_tracker_);
void finish(std::exception_ptr exception_ = nullptr);
bool wait(const Milliseconds & timeout) const;
bool isFinished() const;
std::exception_ptr getException() const;
private:
mutable std::mutex mutex;
mutable std::condition_variable cv;
bool finished = false;
std::exception_ptr exception;
};
~InsertData()
{
auto it = entries.begin();
// Entries must be destroyed in context of user who runs async insert.
// Each entry in the list may correspond to a different user,
// so we need to switch current thread's MemoryTracker parent on each iteration.
while (it != entries.end())
{
UserMemoryTrackerSwitcher switcher((*it)->user_memory_tracker);
it = entries.erase(it);
}
}
using EntryPtr = std::shared_ptr<Entry>;
std::list<EntryPtr> entries;
size_t size = 0;
/// Timestamp of the first insert into queue, or after the last queue dump.
/// Used to detect for how long the queue is active, so we can dump it by timer.
std::chrono::time_point<std::chrono::steady_clock> first_update = std::chrono::steady_clock::now();
/// Timestamp of the last insert into queue.
/// Used to detect for how long the queue is stale, so we can dump it by another timer.
std::chrono::time_point<std::chrono::steady_clock> last_update;
};
using InsertDataPtr = std::unique_ptr<InsertData>;
/// A separate container, that holds a data and a mutex for it.
/// When it's needed to process current chunk of data, it can be moved for processing
/// and new data can be recreated without holding a lock during processing.
struct Container
{
std::mutex mutex;
InsertDataPtr data;
};
using Queue = std::unordered_map<InsertQuery, std::shared_ptr<Container>, InsertQuery::Hash>;
using QueueIterator = Queue::iterator;
mutable std::shared_mutex rwlock;
Queue queue;
using QueryIdToEntry = std::unordered_map<String, InsertData::EntryPtr>;
mutable std::mutex currently_processing_mutex;
QueryIdToEntry currently_processing_queries;
/// Logic and events behind queue are as follows:
/// - busy_timeout: if queue is active for too long and there are a lot of rapid inserts, then we dump the data, so it doesn't
/// grow for a long period of time and users will be able to select new data in deterministic manner.
/// - stale_timeout: if queue is stale for too long, then we dump the data too, so that users will be able to select the last
/// piece of inserted data.
/// - max_data_size: if the maximum size of data is reached, then again we dump the data.
const size_t max_data_size; /// in bytes
const Milliseconds busy_timeout;
const Milliseconds stale_timeout;
std::atomic<bool> shutdown{false};
ThreadPool pool; /// dump the data only inside this pool.
ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck()
ThreadFromGlobalPool dump_by_last_update_thread; /// uses stale_timeout and staleCheck()
ThreadFromGlobalPool cleanup_thread; /// uses busy_timeout and cleanup()
Poco::Logger * log = &Poco::Logger::get("AsynchronousInsertQueue");
void busyCheck();
void staleCheck();
void cleanup();
/// Should be called with shared or exclusively locked 'rwlock'.
void pushImpl(InsertData::EntryPtr entry, QueueIterator it);
void scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context);
static void processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context);
template <typename E>
static void finishWithException(const ASTPtr & query, const std::list<InsertData::EntryPtr> & entries, const E & exception);
public:
Queue getQueue() const
{
std::shared_lock lock(rwlock);
return queue;
}
};
}