-
Notifications
You must be signed in to change notification settings - Fork 559
/
internal_contexts.h
388 lines (353 loc) · 12.1 KB
/
internal_contexts.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
#pragma once
#include <atomic>
#include <cassert>
#include <cstdint>
#include <cstring>
#include <deque>
#include <unordered_map>
#include <string>
#include "address.h"
#include "guid.h"
#include "hash_bucket.h"
#include "native_buffer_pool.h"
#include "record.h"
#include "state_transitions.h"
#include "thread.h"
namespace FASTER {
namespace core {
/// Internal contexts, used by FASTER.
enum class OperationType : uint8_t {
Read,
RMW,
Upsert,
Insert,
Delete
};
enum class OperationStatus : uint8_t {
SUCCESS,
NOT_FOUND,
RETRY_NOW,
RETRY_LATER,
RECORD_ON_DISK,
SUCCESS_UNMARK,
NOT_FOUND_UNMARK,
CPR_SHIFT_DETECTED
};
/// Internal FASTER context.
template <class K>
class PendingContext : public IAsyncContext {
public:
typedef K key_t;
protected:
PendingContext(OperationType type_, IAsyncContext& caller_context_,
AsyncCallback caller_callback_)
: type{ type_ }
, caller_context{ &caller_context_ }
, caller_callback{ caller_callback_ }
, version{ UINT32_MAX }
, phase{ Phase::INVALID }
, result{ Status::Pending }
, address{ Address::kInvalidAddress }
, entry{ HashBucketEntry::kInvalidEntry } {
}
public:
/// The deep-copy constructor.
PendingContext(const PendingContext& other, IAsyncContext* caller_context_)
: type{ other.type }
, caller_context{ caller_context_ }
, caller_callback{ other.caller_callback }
, version{ other.version }
, phase{ other.phase }
, result{ other.result }
, address{ other.address }
, entry{ other.entry } {
}
public:
/// Go async, for the first time.
void go_async(Phase phase_, uint32_t version_, Address address_, HashBucketEntry entry_) {
phase = phase_;
version = version_;
address = address_;
entry = entry_;
}
/// Go async, again.
void continue_async(Address address_, HashBucketEntry entry_) {
address = address_;
entry = entry_;
}
virtual const key_t& key() const = 0;
/// Caller context.
IAsyncContext* caller_context;
/// Caller callback.
AsyncCallback caller_callback;
/// Checkpoint version.
uint32_t version;
/// Checkpoint phase.
Phase phase;
/// Type of operation (Read, Upsert, RMW, etc.).
OperationType type;
/// Result of operation.
Status result;
/// Address of the record being read or modified.
Address address;
/// Hash table entry that (indirectly) leads to the record being read or modified.
HashBucketEntry entry;
};
/// FASTER's internal Read() context.
/// An internal Read() context that has gone async and lost its type information.
template <class K>
class AsyncPendingReadContext : public PendingContext<K> {
public:
typedef K key_t;
protected:
AsyncPendingReadContext(IAsyncContext& caller_context_, AsyncCallback caller_callback_)
: PendingContext<key_t>(OperationType::Read, caller_context_, caller_callback_) {
}
/// The deep copy constructor.
AsyncPendingReadContext(AsyncPendingReadContext& other, IAsyncContext* caller_context)
: PendingContext<key_t>(other, caller_context) {
}
public:
virtual void Get(const void* rec) = 0;
virtual void GetAtomic(const void* rec) = 0;
};
/// A synchronous Read() context preserves its type information.
template <class RC>
class PendingReadContext : public AsyncPendingReadContext<typename RC::key_t> {
public:
typedef RC read_context_t;
typedef typename read_context_t::key_t key_t;
typedef typename read_context_t::value_t value_t;
typedef Record<key_t, value_t> record_t;
PendingReadContext(read_context_t& caller_context_, AsyncCallback caller_callback_)
: AsyncPendingReadContext<key_t>(caller_context_, caller_callback_) {
}
/// The deep copy constructor.
PendingReadContext(PendingReadContext& other, IAsyncContext* caller_context_)
: AsyncPendingReadContext<key_t>(other, caller_context_) {
}
protected:
Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
return IAsyncContext::DeepCopy_Internal(*this, PendingContext<key_t>::caller_context,
context_copy);
}
private:
inline const read_context_t& read_context() const {
return *static_cast<const read_context_t*>(PendingContext<key_t>::caller_context);
}
inline read_context_t& read_context() {
return *static_cast<read_context_t*>(PendingContext<key_t>::caller_context);
}
public:
/// Accessors.
inline const key_t& key() const final {
return read_context().key();
}
inline void Get(const void* rec) final {
const record_t* record = reinterpret_cast<const record_t*>(rec);
read_context().Get(record->value());
}
inline void GetAtomic(const void* rec) final {
const record_t* record = reinterpret_cast<const record_t*>(rec);
read_context().GetAtomic(record->value());
}
};
/// FASTER's internal Upsert() context.
/// An internal Upsert() context that has gone async and lost its type information.
template <class K>
class AsyncPendingUpsertContext : public PendingContext<K> {
public:
typedef K key_t;
protected:
AsyncPendingUpsertContext(IAsyncContext& caller_context_, AsyncCallback caller_callback_)
: PendingContext<key_t>(OperationType::Upsert, caller_context_, caller_callback_) {
}
/// The deep copy constructor.
AsyncPendingUpsertContext(AsyncPendingUpsertContext& other, IAsyncContext* caller_context)
: PendingContext<key_t>(other, caller_context) {
}
public:
virtual void Put(void* rec) = 0;
virtual bool PutAtomic(void* rec) = 0;
virtual uint32_t value_size() const = 0;
};
/// A synchronous Upsert() context preserves its type information.
template <class UC>
class PendingUpsertContext : public AsyncPendingUpsertContext<typename UC::key_t> {
public:
typedef UC upsert_context_t;
typedef typename upsert_context_t::key_t key_t;
typedef typename upsert_context_t::value_t value_t;
typedef Record<key_t, value_t> record_t;
PendingUpsertContext(upsert_context_t& caller_context_, AsyncCallback caller_callback_)
: AsyncPendingUpsertContext<key_t>(caller_context_, caller_callback_) {
}
/// The deep copy constructor.
PendingUpsertContext(PendingUpsertContext& other, IAsyncContext* caller_context_)
: AsyncPendingUpsertContext<key_t>(other, caller_context_) {
}
protected:
Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
return IAsyncContext::DeepCopy_Internal(*this, PendingContext<key_t>::caller_context,
context_copy);
}
private:
inline const upsert_context_t& upsert_context() const {
return *static_cast<const upsert_context_t*>(PendingContext<key_t>::caller_context);
}
inline upsert_context_t& upsert_context() {
return *static_cast<upsert_context_t*>(PendingContext<key_t>::caller_context);
}
public:
/// Accessors.
inline const key_t& key() const final {
return upsert_context().key();
}
inline void Put(void* rec) final {
record_t* record = reinterpret_cast<record_t*>(rec);
upsert_context().Put(record->value());
}
inline bool PutAtomic(void* rec) final {
record_t* record = reinterpret_cast<record_t*>(rec);
return upsert_context().PutAtomic(record->value());
}
inline constexpr uint32_t value_size() const final {
return upsert_context().value_size();
}
};
/// FASTER's internal Rmw() context.
/// An internal Rmw() context that has gone async and lost its type information.
template <class K>
class AsyncPendingRmwContext : public PendingContext<K> {
public:
typedef K key_t;
protected:
AsyncPendingRmwContext(IAsyncContext& caller_context_, AsyncCallback caller_callback_)
: PendingContext<key_t>(OperationType::RMW, caller_context_, caller_callback_) {
}
/// The deep copy constructor.
AsyncPendingRmwContext(AsyncPendingRmwContext& other, IAsyncContext* caller_context)
: PendingContext<key_t>(other, caller_context) {
}
public:
/// Set initial value.
virtual void RmwInitial(void* rec) = 0;
/// RCU.
virtual void RmwCopy(const void* old_rec, void* rec) = 0;
/// in-place update.
virtual bool RmwAtomic(void* rec) = 0;
/// Get value size for initial value or in-place update
virtual uint32_t value_size() const = 0;
/// Get value size for RCU
virtual uint32_t value_size(const void* old_rec) const = 0;
};
/// A synchronous Rmw() context preserves its type information.
template <class MC>
class PendingRmwContext : public AsyncPendingRmwContext<typename MC::key_t> {
public:
typedef MC rmw_context_t;
typedef typename rmw_context_t::key_t key_t;
typedef typename rmw_context_t::value_t value_t;
typedef Record<key_t, value_t> record_t;
PendingRmwContext(rmw_context_t& caller_context_, AsyncCallback caller_callback_)
: AsyncPendingRmwContext<key_t>(caller_context_, caller_callback_) {
}
/// The deep copy constructor.
PendingRmwContext(PendingRmwContext& other, IAsyncContext* caller_context_)
: AsyncPendingRmwContext<key_t>(other, caller_context_) {
}
protected:
Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
return IAsyncContext::DeepCopy_Internal(*this, PendingContext<key_t>::caller_context,
context_copy);
}
private:
const rmw_context_t& rmw_context() const {
return *static_cast<const rmw_context_t*>(PendingContext<key_t>::caller_context);
}
rmw_context_t& rmw_context() {
return *static_cast<rmw_context_t*>(PendingContext<key_t>::caller_context);
}
public:
/// Accessors.
const key_t& key() const {
return rmw_context().key();
}
/// Set initial value.
inline void RmwInitial(void* rec) final {
record_t* record = reinterpret_cast<record_t*>(rec);
rmw_context().RmwInitial(record->value());
}
/// RCU.
inline void RmwCopy(const void* old_rec, void* rec) final {
const record_t* old_record = reinterpret_cast<const record_t*>(old_rec);
record_t* record = reinterpret_cast<record_t*>(rec);
rmw_context().RmwCopy(old_record->value(), record->value());
}
/// in-place update.
inline bool RmwAtomic(void* rec) final {
record_t* record = reinterpret_cast<record_t*>(rec);
return rmw_context().RmwAtomic(record->value());
}
/// Get value size for initial value or in-place update
inline constexpr uint32_t value_size() const final {
return rmw_context().value_size();
}
/// Get value size for RCU
inline constexpr uint32_t value_size(const void* old_rec) const final {
const record_t* old_record = reinterpret_cast<const record_t*>(old_rec);
return rmw_context().value_size(old_record->value());
}
};
class AsyncIOContext;
/// Per-thread execution context. (Just the stuff that's checkpointed to disk.)
struct PersistentExecContext {
PersistentExecContext()
: serial_num{ 0 }
, version{ 0 }
, guid{} {
}
void Initialize(uint32_t version_, const Guid& guid_, uint64_t serial_num_) {
serial_num = serial_num_;
version = version_;
guid = guid_;
}
uint64_t serial_num;
uint32_t version;
/// Unique identifier for this session.
Guid guid;
};
static_assert(sizeof(PersistentExecContext) == 32, "sizeof(PersistentExecContext) != 32");
/// Per-thread execution context. (Also includes state kept in-memory-only.)
struct ExecutionContext : public PersistentExecContext {
/// Default constructor.
ExecutionContext()
: phase{ Phase::INVALID }
, io_id{ 0 } {
}
void Initialize(Phase phase_, uint32_t version_, const Guid& guid_, uint64_t serial_num_) {
assert(retry_requests.empty());
assert(pending_ios.empty());
assert(io_responses.empty());
PersistentExecContext::Initialize(version_, guid_, serial_num_);
phase = phase_;
retry_requests.clear();
io_id = 0;
pending_ios.clear();
io_responses.clear();
}
Phase phase;
/// Retry request contexts are stored inside the deque.
std::deque<IAsyncContext*> retry_requests;
/// Assign a unique ID to every I/O request.
uint64_t io_id;
/// For each pending I/O, maps io_id to the hash of the key being retrieved.
std::unordered_map<uint64_t, KeyHash> pending_ios;
/// The I/O completion thread hands the PendingContext back to the thread that issued the
/// request.
concurrent_queue<AsyncIOContext*> io_responses;
};
}
} // namespace FASTER::core