-
Notifications
You must be signed in to change notification settings - Fork 43
/
retryHandler.h
561 lines (531 loc) · 21 KB
/
retryHandler.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
#ifndef RETRYHANDLER_H
#define RETRYHANDLER_H
#include <promise.h>
#include <base/gcm.h>
#include <karereCommon.h>
#include <base/timers.hpp>
#include <base/trackDelete.h>
#define RETRY_DEBUG_LOGGING 1
#ifdef RETRY_DEBUG_LOGGING
#define RETRY_LOG(fmtString,...) KR_LOG_WARNING("Retry[%s]: " fmtString, mName.c_str(), ##__VA_ARGS__)
#else
#define RETRY_LOG(fmtString,...)
#endif
namespace karere
{
namespace rh
{
/** RetryController states */
typedef enum
{
kStateNotStarted = 0, /** Not started yet, or just reset. Call start() to run */
kStateBitRunning = 0x04, /** If this bit is set in a state code, then the controller is in a running state */
kStateInProgress = 1 | kStateBitRunning, /** An attempt is in progress */
kStateRetryWait = 2 | kStateBitRunning, /** Sleep before next attempt */
kStateFinished = 3 /** Completed, output promise has been resolbed. Call reset() to use again */
} State;
/**
* The type of the promise errors generated by the RetryController. There is only one
* situation when RetryController generates errors itself - when it is aborted.
*/
enum { kErrorType = 0x2e7294d1 };//should resemble 'retryhdl'
/** Default settings */
enum
{
kDefaultMaxAttemptCount = 0,
kDefaultMaxAttemptTimeout = 10000,
kDefaultAttemptTimeoutBias = 2000,
kDefaultMaxSingleWaitTime = 60000,
kDefaultMinInitialDelay = 1000
};
class IRetryController
{
protected:
State mState = kStateNotStarted;
size_t mCurrentAttemptNo = 0;
bool mAutoDestruct = false; //used when we use this object on the heap
std::string mName;
public:
IRetryController(const std::string& aName): mName(aName){}
const std::string& name() const { return mName; }
virtual promise::PromiseBase& start(unsigned delay=0) = 0;
virtual void restart(unsigned delay=0) = 0;
virtual bool abort() = 0;
virtual void reset() = 0;
size_t currentAttemptNo() const { return mCurrentAttemptNo; }
/** Tells the retry handler to delete itself after it has resolved the outupt promise.
* This is convenient in a fire-and-forget scenario. Typically the user keeps
* a copy of the output promise, obtained via getPromise(), which keeps the promise
* alive even of the RetryController object is deleted. See the implementation of
* the standalone function retry() for an example of that.
* \warning This can be set only if the instance is allocated on the heap and not
* on the stack
*/
void setAutoDestroy() { mAutoDestruct = true; }
/** @brief
* The state of the retry handler - whether it has not yet been started, is in progress
* or has finished and the output promise is resolved/rejected.
*/
State state() const { return mState; }
virtual ~IRetryController(){};
};
template <typename CB> inline static void callFuncIfNotNull(const CB& cb) { cb(); }
inline static void callFuncIfNotNull(std::nullptr_t){}
/** @brief
* This is a simple class that retries a promise-returning function call, until the
* returned promise is resolved (indiating that the operation succeeded), a maximum
* number of retries has been reached and the retry handler gives up, or it has been
* canceled by the user. The RetryController
* has an output promise which is resolved when the operation succeeds, or rejected
* if the retry handler gives up. That output promise has the same value type as the
* promise returned by the function. When the function succeeds, the output promise is
* resolved with the value returned by the function. When the retry handler gives up,
* it rejects the output promise with the ::promise::Error object returned by the last
* (failed) call of the function.
*/
template<class Func, class CancelFunc=void*>
class RetryController: public IRetryController, public karere::DeleteTrackable
{
public:
/** @brief
* The value type of the promise returned by the operation and by the RetryController
* itself
*/
typedef typename promise::FuncTraits<Func>::RetType::Type RetType;
protected:
enum { kBitness = sizeof(unsigned)*8-10 }; //maximum exponent of mInitialWaitTime that fits into an unsigned
Func mFunc;
CancelFunc mCancelFunc;
size_t mCurrentAttemptId = 0; //used to detect callbacks from stale attempts. Never reset (unlike mCurrentAttemptNo)
size_t mMaxAttemptCount;
unsigned mAttemptTimeout = 0;
unsigned mMaxAttemptTimeout = 0;
unsigned mMaxSingleWaitTime;
unsigned short mDelayRandPct = 20;
promise::Promise<RetType> mPromise;
unsigned long mTimer = 0;
unsigned short mInitialWaitTime;
unsigned mRestart = 0;
void *appCtx;
DeleteTrackable::Handle wptr;
public:
/** Gets the output promise that is resolved. */
promise::Promise<RetType>& getPromise() {return mPromise;}
void setWaitRandomnessPct(unsigned short pct) { mDelayRandPct = pct; }
/**
* @param func - The function that does the operation being retried.
* This can be a lambda, function object or a C funtion pointer. The function
* must return a promise and take no arguments.
* @param maxSingleWaitTime - the maximum wait time between retries. The wait time
* is calculated by multiplying \c backoffStart by 2^(current retry number). If it
* exceeds maxSingleWaitTime, then it will be set to maxSingleWaitTime.
* @param maxAttemptCount - the maximum number of retries before giving up. If it
* is zero, then the retries will be repeated forever.
* @param backoffStart - the delay before the second retry, which serves as a
* starting point of the exponential formula. By default it is 1000ms, meaning that
* the first wait will be 1s the next 2s, then 4s etc. If it is for example 120ms,
* then the first wait will be 120ms, the next 240ms, then 480ms and so on.
* This can be used for high frequency initial retrying.
*/
RetryController(const std::string& aName
, Func&& func
, CancelFunc&& cancelFunc
, unsigned attemptTimeout
, unsigned maxAttemptTimeout
, DeleteTrackable::Handle wptr
, void *ctx
, unsigned maxSingleWaitTime=kDefaultMaxSingleWaitTime
, size_t maxAttemptCount = kDefaultMaxAttemptCount
, unsigned short backoffStart=1000)
:IRetryController(aName)
, mFunc(std::forward<Func>(func))
, mCancelFunc(std::forward<CancelFunc>(cancelFunc))
, mMaxAttemptCount(maxAttemptCount)
, mAttemptTimeout(attemptTimeout)
, mMaxAttemptTimeout(maxAttemptTimeout)
, mMaxSingleWaitTime(maxSingleWaitTime)
, mInitialWaitTime(backoffStart)
, appCtx(ctx)
, wptr(wptr)
{}
~RetryController()
{
RETRY_LOG("Deleting RetryController instance");
}
/** @brief Starts the retry attempts */
promise::PromiseBase& start(unsigned delay=0)
{
if (mState != kStateNotStarted)
throw std::runtime_error("RetryController: Already started or not reset after finished");
assert(mTimer == 0);
mCurrentAttemptId++;
mCurrentAttemptNo = 1; //mCurrentAttempt increments immediately before the wait delay (if any)
if (delay)
{
RETRY_LOG("Starting retry after the initial delay (%ds)", delay);
mState = kStateRetryWait;
auto wptr = weakHandle();
mTimer = setTimeout([wptr, this]()
{
if (wptr.deleted())
return;
mTimer = 0;
nextTry();
}, delay, appCtx);
}
else
{
nextTry();
}
return mPromise;
}
/**
* @brief abort Aborts the retry attemts
* @return Whether the abort was actually performed, or it was not needed
* (i.e. not yet started or already finished). When the retries
* are aborted, the output promise is immediately rejected with an error of type
* 1 (generic), code 2 (abort) and text "aborted".
*/
bool abort()
{
if ((mState & kStateBitRunning) == 0)
return false;
cancelTimer();
if ((mState == kStateInProgress) && !std::is_same<CancelFunc, void*>::value)
callFuncIfNotNull(mCancelFunc);
mPromise.reject("aborted", promise::kErrAbort, promise::kErrorTypeGeneric);
mPromise = promise::Promise<RetType>();
if (mAutoDestruct)
delete this;
return true;
}
/**
* @brief reset
* Re-initializes the retry handler after it has already finished. Then it can be
* reused.
* \warning After a reset(), the output promise is changed, because a promise cannot
* be reused, so the user must use the new promise by calling getPromise()
* after the reset().
*/
void reset()
{
if (mState == kStateNotStarted)
return;
else if (mState != kStateFinished)
throw std::runtime_error("RetryController::reset: Can't reset while in progress");
assert(mTimer == 0);
mPromise = promise::Promise<RetType>();
mCurrentAttemptNo = 0;
mState = kStateNotStarted;
}
/**
* @brief restart
* Restarts the attempts with the initial backoff value, i.e. as if the controller was just started,
* but keeps the current promise object. If the controller has not yet been started, this call is
* equivalent to start().
* This method can't be called if the controller is in the \c finished state, in which case an exception will
* be thrown
*/
void restart(unsigned delay=0)
{
RETRY_LOG("Restarting RetryController...");
if (mState == kStateFinished)
{
throw std::runtime_error("restart: Already in finished state");
}
else if (mState == kStateInProgress)
{
mRestart = delay ? delay : 1; //schedNextRetry will do the actual restart once the current attempt finishes
RETRY_LOG("Attempt in-progress. RetryController will restart once the current attempt finishes.");
}
else //kStateRetryWait or kStateNotStarted
{
cancelTimer();
mState = kStateNotStarted;
start(delay);
}
}
protected:
unsigned calcAttemptTimeoutNoRandomness()
{
if (mCurrentAttemptNo > kBitness)
{
if (!mAttemptTimeout)
return 0;
else
return mMaxAttemptTimeout;
}
unsigned t = (1 << (mCurrentAttemptNo - 1)) * mAttemptTimeout + kDefaultAttemptTimeoutBias;
if (t <= mMaxAttemptTimeout)
return t;
else
return mMaxAttemptTimeout;
}
unsigned calcWaitTime()
{
unsigned t = calcWaitTimeNoRandomness();
unsigned randRange = (t * mDelayRandPct) / 100;
t = t - randRange + (rand() % 1000) * (randRange * 2) / 1000;
return t;
}
unsigned calcWaitTimeNoRandomness()
{
if (mCurrentAttemptNo > kBitness)
{
if (!mInitialWaitTime)
return 0;
else
return mMaxSingleWaitTime;
}
unsigned t = (1 << (mCurrentAttemptNo-1)) * mInitialWaitTime;
if (t <= mMaxSingleWaitTime)
return t;
else
return mMaxSingleWaitTime;
}
void cancelTimer()
{
if (!mTimer)
return;
cancelTimeout(mTimer, appCtx);
mTimer = 0;
}
template <class P>
void attachThenHandler(P& promise, unsigned attempt)
{
auto wptr = getDelTracker();
promise.then([wptr, this, attempt](const RetType& ret)
{
wptr.throwIfDeleted();
if (attempt != mCurrentAttemptId)
{
RETRY_LOG("A previous timed-out/aborted attempt returned success");
return ret;
}
RETRY_LOG("Input promise succeed. RetryController will be deleted now");
cancelTimer();
mState = kStateFinished;
mPromise.resolve(ret);
mPromise = promise::Promise<RetType>(); //we must release previous promise as it may hold references captured in its lambdas
if (mAutoDestruct)
delete this;
return ret;
});
}
void attachThenHandler(promise::Promise<void>& promise, unsigned attempt)
{
auto track = getDelTracker();
promise.then([track, this, attempt]()
{
track.throwIfDeleted();
if (attempt != mCurrentAttemptId)
{
RETRY_LOG("A previous timed-out/aborted attempt returned success");
return;
}
cancelTimer();
mState = kStateFinished;
mPromise.resolve();
mPromise = promise::Promise<RetType>();
if (mAutoDestruct)
delete this;
});
}
void nextTry()
{
assert(mState == kStateRetryWait || mState == kStateNotStarted);
assert(mTimer == 0);
auto attempt = mCurrentAttemptId;
if (mAttemptTimeout) //set an attempt timeout timer
{
unsigned attemptTimeout = calcAttemptTimeoutNoRandomness();
RETRY_LOG("Setting a timeout for attempt %zu: %u ms", mCurrentAttemptNo, attemptTimeout);
auto wptr = weakHandle();
mTimer = setTimeout([wptr, this, attempt, attemptTimeout]()
{
if (wptr.deleted())
return;
RETRY_LOG("Attempt %zu timed out after %u ms", mCurrentAttemptNo, attemptTimeout);
assert(attempt == mCurrentAttemptId); //if we are in a next attempt, cancelTimer() should have been called and this callback should never fire
mTimer = 0;
static const ::promise::Error timeoutError("timeout", promise::kErrTimeout, promise::kErrorTypeGeneric);
if (!std::is_same<CancelFunc, std::nullptr_t>::value)
{
auto id = mCurrentAttemptId;
callFuncIfNotNull(mCancelFunc);
if (id != mCurrentAttemptId) //cancelFunc failed the input promise and a retry was already scheduled as a result, we have to bail out
{
RETRY_LOG("cancelFunc failed the input promise and a retry was already scheduled as a result: bail out!");
return;
}
}
schedNextRetry(timeoutError);
}, attemptTimeout, appCtx);
}
mState = kStateInProgress;
RETRY_LOG("Starting attempt %zu...", mCurrentAttemptNo);
auto pms = mFunc(mCurrentAttemptNo, wptr);
attachThenHandler(pms, attempt);
pms.fail([this, attempt](const ::promise::Error& err)
{
if (attempt != mCurrentAttemptId)//we are already in another attempt and this callback is from the old attempt, ignore it
{
RETRY_LOG("A previous timed-out/aborted attempt returned failure: %s", err.msg().c_str());
return err;
}
if (mAttemptTimeout)
{
RETRY_LOG("A previous attempt returned failure before timeout expires: %s", err.msg().c_str());
// wait till the attempt timeout expires, it will schedule the next retry
return err;
}
RETRY_LOG("Attempt %zu failed with message '%s'", mCurrentAttemptNo, err.what());
cancelTimer();
schedNextRetry(err);
return err;
});
}
bool schedNextRetry(const ::promise::Error& err)
{
assert(mTimer == 0);
if (mRestart)
{
auto save = mRestart;
mRestart = 0;
mState = kStateNotStarted;
start(save); //will just schedule, because we pass it a nonzero delay. Handles mCurrentAttemptId/No by itself
return true;
}
mCurrentAttemptNo++; //always increment, to mark the end of the previous attempt
mCurrentAttemptId++;
if (mMaxAttemptCount && (mCurrentAttemptNo > mMaxAttemptCount)) //give up
{
RETRY_LOG("Maximum number of attempts (%u) has been reached. RetryController will give up now.");
mState = kStateFinished;
mPromise.reject(err);
mPromise = promise::Promise<RetType>();
if (mAutoDestruct)
delete this;
return false;
}
size_t waitTime = calcWaitTime();
RETRY_LOG("Will retry in %u ms", waitTime);
mState = kStateRetryWait;
//schedule next attempt
auto wptr = weakHandle();
mTimer = setTimeout([wptr, this]()
{
if (wptr.deleted())
return;
mTimer = 0;
nextTry();
}, waitTime, appCtx);
return true;
}
};
//g++ < 4.9 has a bug where one can't specify a lambda as default function parameter,
//so we define that default func parameter for retry() here
static inline void _emptyCancelFunc(){}
} //end namespace rh
/**
* Convenience function to retry a lambda call returning a promise.
* Internally it instantiates a RetryController instance and manages its lifetime
* (by setting autoDestroy() and making the instance destroy itself after finishing).
* The paramaters of this function are forwarder to the RetryController constructor.
* @param cancelFunc The promise-returning (lambda) function to call. This function must take
* no arguments.
* @param maxSingleWaitTime - the maximum time in [ms] to wait between attempts. Default is 30 sec
* @param maxRetries - the maximum number of attempts between giving up and rejecting
* the returned promise. If it is zero, then it will retry forever. Default is 0
@param backoffStart - the wait time after the first try, which is also the starting
point of the backoff time algorithm: \c backoffStart * 2^(current_retry_number).
See the constructor of RetryController for more details
*/
template <class Func, class CancelFunc=decltype(&rh::_emptyCancelFunc)>
static inline auto retry(const std::string& aName, Func&& func, DeleteTrackable::Handle wptr, void *ctx,
CancelFunc&& cancelFunc = &rh::_emptyCancelFunc,
unsigned attemptTimeout = 0,
size_t maxRetries = rh::kDefaultMaxAttemptCount,
size_t maxSingleWaitTime = rh::kDefaultMaxSingleWaitTime,
short backoffStart = rh::kDefaultMinInitialDelay)
->decltype(func(0, wptr))
{
auto self = new rh::RetryController<Func, CancelFunc>(aName,
std::forward<Func>(func), std::forward<CancelFunc>(cancelFunc), attemptTimeout, wptr, ctx,
maxSingleWaitTime, maxRetries, backoffStart);
auto promise = self->getPromise();
self->setAutoDestroy();
self->start(); //self may get destroyed synchronously here, but we have a reference to the promise
return promise;
}
/** Similar to retry(), but returns a heap-allocated RetryController object */
template <class Func, class CancelFunc=void*>
static inline rh::RetryController<Func, CancelFunc>* createRetryController(
const std::string& aName
, Func&& func
, DeleteTrackable::Handle wptr
, void *ctx
, CancelFunc&& cancelFunc = nullptr
, unsigned attemptTimeout = 0
, unsigned maxAttemptTimeout = rh::kDefaultMaxAttemptTimeout
, size_t maxRetries = rh::kDefaultMaxAttemptCount
, size_t maxSingleWaitTime = rh::kDefaultMaxSingleWaitTime
, short backoffStart = rh::kDefaultMinInitialDelay)
{
rh::RetryController<Func, CancelFunc>* retryController = new rh::RetryController<Func, CancelFunc>(
aName
, std::forward<Func>(func)
, std::forward<CancelFunc>(cancelFunc)
, attemptTimeout
, maxAttemptTimeout
, wptr
, ctx
, maxSingleWaitTime
, maxRetries
, backoffStart);
return retryController;
}
/*
template <class T>
inline void _timeoutAttachThenHandler(promise::Promise<T>& in, promise::Promise<T>& out)
{
in.then([out](const T& val) mutable
{
if (!out.done())
out.resolve(val);
});
}
inline void _timeoutAttachThenHandler(promise::Promise<void>& in, promise::Promise<void>& out)
{
in.then([out]() mutable
{
if (!out.done())
out.resolve();
});
}
template <class CB, class CCB=std::nullptr_t>
auto performWithTimeout(CB&& cb, unsigned timeout, CCB&& cancelCb=nullptr)
-> promise::Promise<typename promise::FuncTraits<CB>::RetType::Type>
{
typedef typename promise::FuncTraits<CB>::RetType::Type Type;
promise::Promise<Type> pms;
setTimeout([pms, cancelCb]() mutable
{
if (pms.done())
return;
pms.reject(::promise::Error("Operation timed out", 1, 1));
rh::callFuncIfNotNull(cancelCb);
}, timeout);
auto retpms = cb();
_timeoutAttachThenHandler(retpms, pms);
retpms.fail([pms](const ::promise::Error& err) mutable
{
if (!pms.done())
pms.reject(err);
return err;
});
return pms;
}
*/
}
#endif // RETRYHANDLER_H