-
Notifications
You must be signed in to change notification settings - Fork 487
/
flat_database.h
314 lines (253 loc) · 12 KB
/
flat_database.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
#pragma once
#include "flat_row_eggs.h"
#include "flat_row_versions.h"
#include "flat_update_op.h"
#include "flat_dbase_scheme.h"
#include "flat_dbase_change.h"
#include "flat_dbase_misc.h"
#include "flat_iterator.h"
#include "flat_table_observer.h"
#include "util_basics.h"
namespace NKikimr {
namespace NPageCollection {
class TCookieAllocator;
struct TMemGlob;
}
namespace NTable {
struct TStats;
struct TSizeEnv;
class TRowState;
struct TChange;
class TDatabaseImpl;
class TAnnex;
class TTable;
class TKeyRangeCache;
namespace NRedo {
class TWriter;
}
struct TKeyRange {
TRawVals MinKey;
TRawVals MaxKey;
bool MinInclusive = true;
bool MaxInclusive = true;
};
class TDatabase {
public:
using TMemGlobs = TVector<NPageCollection::TMemGlob>;
using TCookieAllocator = NPageCollection::TCookieAllocator;
using TCounters = TDbStats;
struct TProd {
THolder<TChange> Change;
TVector<std::function<void()>> OnPersistent;
};
struct TChangeCounter {
/**
* Monotonic change counter for a table or an entire database. Serial
* is incremented and persisted on each successful Commit() that has
* data changes (i.e. not empty). Note: this may or may not be zero
* when table has no changes, or when all changes have been compacted.
*/
ui64 Serial = 0;
/**
* Monotonic epoch of a table's current memtable. This is incremented
* each time a memtable is flushed and a new one is started. The
* current memtable may or may not have additional changes.
*/
TEpoch Epoch = TEpoch::Zero();
TChangeCounter() = default;
TChangeCounter(ui64 serial, TEpoch epoch)
: Serial(serial)
, Epoch(epoch)
{}
bool operator==(const TChangeCounter& rhs) const = default;
bool operator!=(const TChangeCounter& rhs) const = default;
/**
* Compares two change counters, such that when a < b then b either
* has more changes than a, or it's impossible to determine.
*/
bool operator<(const TChangeCounter& rhs) const;
};
TDatabase(const TDatabase&) = delete;
TDatabase(TDatabaseImpl *databaseImpl = nullptr) noexcept;
~TDatabase();
void SetTableObserver(ui32 table, TIntrusivePtr<ITableObserver> ptr) noexcept;
/**
* Returns durable monotonic change counter for a table (or a database when
* table = Max<ui32>() by default).
*/
TChangeCounter Head(ui32 table = Max<ui32>()) const noexcept;
/*_ Call Next() before accessing each row including the 1st row. */
TAutoPtr<TTableIter> Iterate(ui32 table, TRawVals key, TTagsRef tags, ELookup) const noexcept;
TAutoPtr<TTableIter> IterateExact(ui32 table, TRawVals key, TTagsRef tags,
TRowVersion snapshot = TRowVersion::Max(),
const ITransactionMapPtr& visible = nullptr,
const ITransactionObserverPtr& observer = nullptr) const noexcept;
TAutoPtr<TTableIter> IterateRange(ui32 table, const TKeyRange& range, TTagsRef tags,
TRowVersion snapshot = TRowVersion::Max(),
const ITransactionMapPtr& visible = nullptr,
const ITransactionObserverPtr& observer = nullptr) const noexcept;
TAutoPtr<TTableReverseIter> IterateRangeReverse(ui32 table, const TKeyRange& range, TTagsRef tags,
TRowVersion snapshot = TRowVersion::Max(),
const ITransactionMapPtr& visible = nullptr,
const ITransactionObserverPtr& observer = nullptr) const noexcept;
template<class TIteratorType>
TAutoPtr<TIteratorType> IterateRangeGeneric(ui32 table, const TKeyRange& range, TTagsRef tags,
TRowVersion snapshot = TRowVersion::Max(),
const ITransactionMapPtr& visible = nullptr,
const ITransactionObserverPtr& observer = nullptr) const noexcept;
// NOTE: the row refeneces data in some internal buffers that get invalidated on the next Select() or Commit() call
EReady Select(ui32 table, TRawVals key, TTagsRef tags, TRowState& row,
ui64 readFlags = 0, TRowVersion snapshot = TRowVersion::Max(),
const ITransactionMapPtr& visible = nullptr,
const ITransactionObserverPtr& observer = nullptr) const noexcept;
EReady Select(ui32 table, TRawVals key, TTagsRef tags, TRowState& row, TSelectStats& stats,
ui64 readFlags = 0, TRowVersion snapshot = TRowVersion::Max(),
const ITransactionMapPtr& visible = nullptr,
const ITransactionObserverPtr& observer = nullptr) const noexcept;
TSelectRowVersionResult SelectRowVersion(
ui32 table, TRawVals key, ui64 readFlags = 0,
const ITransactionMapPtr& visible = nullptr,
const ITransactionObserverPtr& observer = nullptr) const noexcept;
TSelectRowVersionResult SelectRowVersion(
ui32 table, TArrayRef<const TCell> key, ui64 readFlags = 0,
const ITransactionMapPtr& visible = nullptr,
const ITransactionObserverPtr& observer = nullptr) const noexcept;
bool Precharge(ui32 table, TRawVals minKey, TRawVals maxKey,
TTagsRef tags, ui64 readFlags, ui64 itemsLimit, ui64 bytesLimit,
EDirection direction = EDirection::Forward,
TRowVersion snapshot = TRowVersion::Max());
TSizeEnv CreateSizeEnv();
void CalculateReadSize(TSizeEnv& env, ui32 table, TRawVals minKey, TRawVals maxKey,
TTagsRef tags, ui64 readFlags, ui64 itemsLimit, ui64 bytesLimit,
EDirection direction = EDirection::Forward,
TRowVersion snapshot = TRowVersion::Max());
void Update(ui32 table, ERowOp, TRawVals key, TArrayRef<const TUpdateOp>, TRowVersion rowVersion = TRowVersion::Min());
void UpdateTx(ui32 table, ERowOp, TRawVals key, TArrayRef<const TUpdateOp>, ui64 txId);
void RemoveTx(ui32 table, ui64 txId);
void CommitTx(ui32 table, ui64 txId, TRowVersion rowVersion = TRowVersion::Min());
/**
* Returns true when table has an open transaction that is not committed or removed yet
*/
bool HasOpenTx(ui32 table, ui64 txId) const;
bool HasTxData(ui32 table, ui64 txId) const;
bool HasCommittedTx(ui32 table, ui64 txId) const;
bool HasRemovedTx(ui32 table, ui64 txId) const;
/**
* Returns a set of open transactions in the provided table. This only
* includes transactions with changes that are neither committed nor
* removed.
*/
const absl::flat_hash_set<ui64>& GetOpenTxs(ui32 table) const;
/**
* Returns a number of open transactions in the provided table. This only
* includes transactions with changes that are neither committed nor
* removed.
*/
size_t GetOpenTxCount(ui32 table) const;
/**
* Remove row versions [lower, upper) from the given table
*
* Once committed this cannot be undone. This is a hint to the underlying
* storage that row versions in the given range would no longer be accessed
* and may be compacted and garbage collected. Using row versions from the
* given range in future or even ongoing transactions/scans may or may not
* produce inconsistent results.
*/
void RemoveRowVersions(ui32 table, const TRowVersion& lower, const TRowVersion& upper);
/**
* Returns currently committed ranges of removed row versions
*/
const TRowVersionRanges& GetRemovedRowVersions(ui32 table) const;
void NoMoreReadsForTx();
TAlter& Alter(); /* Begin DDL ALTER script */
TEpoch TxSnapTable(ui32 table);
const TScheme& GetScheme() const noexcept;
TIntrusiveConstPtr<TRowScheme> GetRowScheme(ui32 table) const noexcept;
TPartView GetPartView(ui32 table, const TLogoBlobID &bundle) const;
TVector<TPartView> GetTableParts(ui32 table) const;
TVector<TIntrusiveConstPtr<TColdPart>> GetTableColdParts(ui32 table) const;
void EnumerateTableParts(ui32 table, const std::function<void(const TPartView&)>& callback) const;
void EnumerateTableColdParts(ui32 table, const std::function<void(const TIntrusiveConstPtr<TColdPart>&)>& callback) const;
void EnumerateTableTxStatusParts(ui32 table, const std::function<void(const TIntrusiveConstPtr<TTxStatusPart>&)>& callback) const;
void EnumerateTxStatusParts(const std::function<void(const TIntrusiveConstPtr<TTxStatusPart>&)>& callback) const;
ui64 GetTableMemSize(ui32 table, TEpoch epoch = TEpoch::Max()) const;
ui64 GetTableMemRowCount(ui32 tableId) const;
ui64 GetTableMemOpsCount(ui32 tableId) const;
ui64 GetTableIndexSize(ui32 table) const;
ui64 GetTableSearchHeight(ui32 table) const;
ui64 EstimateRowSize(ui32 table) const;
const TCounters& Counters() const noexcept;
TString SnapshotToLog(ui32 table, TTxStamp);
TAutoPtr<TSubset> Subset(ui32 table, TArrayRef<const TLogoBlobID> bundle, TEpoch before) const;
TAutoPtr<TSubset> Subset(ui32 table, TEpoch before, TRawVals from, TRawVals to) const;
TAutoPtr<TSubset> ScanSnapshot(ui32 table, TRowVersion snapshot = TRowVersion::Max());
bool HasBorrowed(ui32 table, ui64 selfTabletId) const;
TBundleSlicesMap LookupSlices(ui32 table, TArrayRef<const TLogoBlobID> bundles) const;
void ReplaceSlices(ui32 table, TBundleSlicesMap slices);
void Replace(ui32 table, TArrayRef<const TPartView>, const TSubset&);
void ReplaceTxStatus(ui32 table, TArrayRef<const TIntrusiveConstPtr<TTxStatusPart>>, const TSubset&);
void Merge(ui32 table, TPartView);
void Merge(ui32 table, TIntrusiveConstPtr<TColdPart>);
void Merge(ui32 table, TIntrusiveConstPtr<TTxStatusPart>);
void DebugDumpTable(ui32 table, IOutputStream& str, const NScheme::TTypeRegistry& typeRegistry) const;
void DebugDump(IOutputStream& str, const NScheme::TTypeRegistry& typeRegistry) const;
TKeyRangeCache* DebugGetTableErasedKeysCache(ui32 table) const;
/**
* Returns true when current transaction has changes to commit
*/
bool HasChanges() const;
/**
* Rollback all current transaction changes
*
* Similar to aborting transaction and then starting a new one
*/
void RollbackChanges();
// executor interface
void Begin(TTxStamp, IPages& env);
TProd Commit(TTxStamp, bool commit, TCookieAllocator* = nullptr);
TGarbage RollUp(TTxStamp, TArrayRef<const char> delta, TArrayRef<const char> redo, TMemGlobs annex);
void RollUpRemoveRowVersions(ui32 table, const TRowVersion& lower, const TRowVersion& upper);
size_t GetCommitRedoBytes() const;
TCompactionStats GetCompactionStats(ui32 table) const;
/**
* Adds a callback, which is called when database changes are committed
*/
template<class TCallback>
void OnCommit(TCallback&& callback) {
OnCommit_.emplace_back(std::forward<TCallback>(callback));
}
/**
* Adds a callback, which is called when database changes are rolled back
*
* @param callback
*/
template<class TCallback>
void OnRollback(TCallback&& callback) {
OnRollback_.emplace_back(std::forward<TCallback>(callback));
}
/**
* Adds a callback, which is called when database changes are persistent
*/
template<class TCallback>
void OnPersistent(TCallback&& callback) {
OnPersistent_.emplace_back(std::forward<TCallback>(callback));
}
private:
TTable* Require(ui32 tableId) const noexcept;
TTable* RequireForUpdate(ui32 tableId) const noexcept;
private:
const THolder<TDatabaseImpl> DatabaseImpl;
bool NoMoreReadsFlag;
IPages* Env = nullptr;
THolder<TChange> Change;
TAutoPtr<TAlter> Alter_;
TAutoPtr<TAnnex> Annex;
TAutoPtr<NRedo::TWriter> Redo;
TVector<ui32> ModifiedRefs;
TVector<TUpdateOp> ModifiedOps;
mutable TDeque<TPartIter> TempIterators; // Keeps the last result of Select() valid
TVector<std::function<void()>> OnCommit_;
TVector<std::function<void()>> OnRollback_;
TVector<std::function<void()>> OnPersistent_;
};
}}