forked from antirez/disque-module
/
queue.c
1216 lines (1051 loc) · 45.8 KB
/
queue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* Queue implementation. In Disque a given node can have active jobs
* (not ACKed) that are not queued. Only queued jobs can be retrieved by
* workers via GETJOB. This file implements the local node data structures
* and functions to model the queue.
*
* ---------------------------------------------------------------------------
*
* Copyright (c) 2014-2019, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved. This code is under the AGPL license, check the
* LICENSE file for more info. */
#include "disque.h"
#include <math.h>
void signalQueueAsReady(RedisModuleCtx *ctx, queue *q);
/* ------------------------ Low level queue functions ----------------------- */
/* Job comparison inside a skiplist: by ctime, if ctime is the same by
* job ID. */
int skiplistCompareJobsInQueue(const void *a, const void *b) {
const job *ja = a, *jb = b;
if (ja->ctime > jb->ctime) return 1;
if (jb->ctime > ja->ctime) return -1;
return memcmp(ja->id,jb->id,JOB_ID_LEN);
}
/* Crete a new queue, register it in the queue hash tables.
* On success the pointer to the new queue is returned. If a queue with the
* same name already NULL is returned. */
queue *createQueue(const char *name, size_t namelen) {
mstime_t now_ms = mstime();
if (raxFind(Queues,(unsigned char*)name,namelen) != raxNotFound)
return NULL;
queue *q = RedisModule_Alloc(sizeof(queue));
q->name = sdsnewlen(name,namelen);
q->flags = 0;
q->sl = skiplistCreate(skiplistCompareJobsInQueue);
q->ctime = q->atime = now_ms/1000;
q->needjobs_bcast_time = 0;
q->needjobs_bcast_attempt = 0;
q->needjobs_adhoc_time = 0;
q->needjobs_adhoc_attempt = 0;
q->needjobs_responders = NULL; /* Created on demand to save memory. */
q->clients = NULL; /* Created on demand to save memory. */
q->current_import_jobs_time = now_ms;
q->current_import_jobs_count = 0;
q->prev_import_jobs_time = now_ms;
q->prev_import_jobs_count = 0;
q->jobs_in = 0;
q->jobs_out = 0;
raxInsert(Queues,(unsigned char*)name,namelen,q,NULL);
return q;
}
/* Return the queue by name, or NULL if it does not exist. */
queue *lookupQueue(const char *name, size_t namelen) {
queue *q = raxFind(Queues,(unsigned char*)name,namelen);
if (q == raxNotFound) return NULL;
return q;
}
/* Destroy a queue and unregisters it. On success C_OK is returned,
* otherwise if no queue exists with the specified name, C_ERR is
* returned. */
int destroyQueue(const char *name, size_t namelen) {
queue *q = lookupQueue(name,namelen);
if (!q) return C_ERR;
raxRemove(Queues,(unsigned char*)name,namelen,NULL);
sdsfree(q->name);
skiplistFree(q->sl);
if (q->needjobs_responders) raxFree(q->needjobs_responders);
if (q->clients) {
RedisModule_Assert(listLength(q->clients) == 0);
listRelease(q->clients);
}
RedisModule_Free(q);
return C_OK;
}
/* Send a job as a return value of a command. This is about jobs but inside
* queue.c since this is the format used in order to return a job from a
* queue, as an array [queue_name,id,body]. */
void addReplyJob(RedisModuleCtx *ctx, job *j, int flags) {
int arraylen = 3;
if (flags & GETJOB_FLAG_WITHCOUNTERS) arraylen += 4;
RedisModule_ReplyWithArray(ctx,arraylen);
RedisModule_ReplyWithStringBuffer(ctx,j->queue,sdslen(j->queue));
RedisModule_ReplyWithStringBuffer(ctx,j->id,JOB_ID_LEN);
RedisModule_ReplyWithStringBuffer(ctx,j->body,sdslen(j->body));
/* Job additional information is returned as key-value pairs. */
if (flags & GETJOB_FLAG_WITHCOUNTERS) {
RedisModule_ReplyWithSimpleString(ctx,"nacks");
RedisModule_ReplyWithLongLong(ctx,j->num_nacks);
RedisModule_ReplyWithSimpleString(ctx,"additional-deliveries");
RedisModule_ReplyWithLongLong(ctx,j->num_deliv);
}
}
/* ------------------------ Queue higher level API -------------------------- */
/* Queue the job and change its state accordingly. If the job is already
* in QUEUED state, or the job has retry set to 0 and the JOB_FLAG_DELIVERED
* flat set, C_ERR is returned, otherwise C_OK is returned and the operation
* succeeds.
*
* The nack argument is set to 1 if the enqueue is the result of a client
* negative acknowledge. */
int enqueueJob(RedisModuleCtx *ctx, job *job, int nack) {
if (job->state == JOB_STATE_QUEUED || job->qtime == 0) return C_ERR;
if (job->retry == 0 && job->flags & JOB_FLAG_DELIVERED) return C_ERR;
mstime_t now_ms = mstime();
RedisModule_Log(ctx,"verbose","QUEUED %.*s", JOB_ID_LEN, job->id);
job->state = JOB_STATE_QUEUED;
/* Put the job into the queue and update the time we'll queue it again. */
if (job->retry) {
job->flags |= JOB_FLAG_BCAST_WILLQUEUE;
job->qtime = now_ms +
job->retry*1000 +
randomTimeError(DISQUE_TIME_ERR);
} else {
job->qtime = 0; /* Never re-queue at most once jobs. */
}
/* The first time a job is queued we don't need to broadcast a QUEUED
* message, to save bandwidth. But the next times, when the job is
* re-queued for lack of acknowledge, this is useful to (best effort)
* avoid multiple nodes to re-queue the same job. */
if (job->flags & JOB_FLAG_BCAST_QUEUED || nack) {
unsigned char flags = nack ? DISQUE_MSG_FLAG0_INCR_NACKS :
DISQUE_MSG_FLAG0_INCR_DELIV;
clusterBroadcastQueued(ctx,job,flags);
/* Other nodes will increment their NACKs / additional deliveries
* counters when they'll receive the QUEUED message. We need to
* do the same for the local copy of the job. */
if (nack)
job->num_nacks++;
else
job->num_deliv++;
} else {
job->flags |= JOB_FLAG_BCAST_QUEUED; /* Next time, broadcast. */
}
updateJobAwakeTime(job,0);
queue *q = lookupQueue(job->queue,sdslen(job->queue));
if (!q) q = createQueue(job->queue,sdslen(job->queue));
RedisModule_Assert(skiplistInsert(q->sl,job) != NULL);
q->atime = now_ms/1000;
q->jobs_in++;
if (!(q->flags & QUEUE_FLAG_PAUSED_OUT)) signalQueueAsReady(ctx,q);
return C_OK;
}
/* Remove a job from the queue. Returns C_OK if the job was there and
* is now removed (updating the job state back to ACTIVE), otherwise
* C_ERR is returned. */
int dequeueJob(job *job) {
if (job->state != JOB_STATE_QUEUED) return C_ERR;
queue *q = lookupQueue(job->queue,sdslen(job->queue));
if (!q) return C_ERR;
RedisModule_Assert(skiplistDelete(q->sl,job));
job->state = JOB_STATE_ACTIVE; /* Up to the caller to override this. */
RedisModule_Log(NULL,"verbose","DE-QUEUED %.*s", JOB_ID_LEN, job->id);
return C_OK;
}
/* Fetch a job from the specified queue if any, updating the job state
* as it gets fetched back to ACTIVE. If there are no jobs pending in the
* specified queue, NULL is returned.
*
* The returned job is, among the jobs available, the one with lower
* 'ctime'.
*
* If 'qlen' is not NULL, the residual length of the queue is stored
* at *qlen. */
job *queueFetchJob(RedisModuleCtx *ctx, queue *q, unsigned long *qlen) {
if (skiplistLength(q->sl) == 0) return NULL;
job *j = skiplistPopHead(q->sl);
j->state = JOB_STATE_ACTIVE;
j->flags |= JOB_FLAG_DELIVERED;
q->atime = time(NULL);
q->jobs_out++;
if (qlen) *qlen = skiplistLength(q->sl);
/* Jobs that have a retry set to 0 (at most once delivery semantics)
* need to change state in the AOF as well: this way after a restart
* we don't risk putting it into the queue again.
*
* Note that however when the AOF fsync policy is not strong enough,
* after a crash the job may end in the queue again, so Disque offers
* an option to load jobs in "active" state instead of "queued" state
* for additional safety.
*
* Disque can also be configured to log all the dequeue operations in
* order to provide a better crash-recovery experience (less duplicated
* jobs on restart). */
if ((ConfigPersistDequeued == DISQUE_PERSIST_DEQUEUED_ATMOSTONCE &&
j->retry == 0) ||
ConfigPersistDequeued == DISQUE_PERSIST_DEQUEUED_ALL)
{
AOFDequeueJob(ctx,j);
}
return j;
}
/* Return the length of the queue, or zero if NULL is passed here. */
unsigned long queueLength(queue *q) {
if (!q) return 0;
return skiplistLength(q->sl);
}
/* Queue length by queue name. The function returns 0 if the queue does
* not exist. */
unsigned long queueNameLength(const char *qname, size_t qnamelen) {
return queueLength(lookupQueue(qname,qnamelen));
}
/* Remove a queue that was not accessed for enough time, has no clients
* blocked, has no jobs inside. If the queue is removed C_OK is
* returned, otherwise C_ERR is returned. */
#define QUEUE_MAX_IDLE_TIME (60*5)
int GCQueue(queue *q, time_t max_idle_time) {
time_t idle = time(NULL) - q->atime;
if (idle < max_idle_time) return C_ERR;
if (q->clients && listLength(q->clients) != 0) return C_ERR;
if (skiplistLength(q->sl)) return C_ERR;
if (q->flags & QUEUE_FLAG_PAUSED_ALL) return C_ERR;
destroyQueue(q->name,sdslen(q->name));
return C_OK;
}
/* This function is called from serverCron() in order to incrementally remove
* from memory queues which are found to be idle and empty. */
int evictIdleQueues(RedisModuleCtx *ctx) {
mstime_t start = mstime();
time_t max_idle_time = QUEUE_MAX_IDLE_TIME;
long sampled = 0, evicted = 0;
if (getMemoryWarningLevel(ctx) > 0) max_idle_time /= 30;
if (getMemoryWarningLevel(ctx) > 1) max_idle_time = 2;
/* XXX: TODO: It is better to remember the last queue name scanned and
* continue from there instead of using a random walk. */
raxIterator ri;
raxStart(&ri,Queues);
while (raxSize(Queues) != 0) {
raxSeek(&ri,"^",NULL,0);
raxRandomWalk(&ri,0);
queue *q = ri.data;
sampled++;
if (GCQueue(q,max_idle_time) == C_OK) evicted++;
/* First exit condition: we are able to expire less than 10% of
* entries. */
if (sampled > 10 && (evicted * 10) < sampled) break;
/* Second exit condition: we are looping for some time and maybe
* we are using more than one or two milliseconds of time. */
if (((sampled+1) % 1000) == 0 && mstime()-start > 1) break;
}
raxStop(&ri);
return evicted;
}
/* -------------------------- Blocking on queues ---------------------------- */
/* GETJOB timeout reply. */
int getjobClientReply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModule_ReplyWithNull(ctx);
RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx);
cleanupClientBlockedForJobs(ctx,bc);
return REDISMODULE_OK;
}
/* This is the structure we use as value in the dictionary of BlockedClients,
* associating blocked client pointers to metadata about the client
* blocked. */
typedef struct BlockedClientData {
RedisModuleBlockedClient *bc;
int flags;
RedisModuleString **queues;
int numqueues;
} BlockedClientData;
/* Create a valid BlockedClientData structure performing the needed
* allocations. */
BlockedClientData *createBlockedClientData(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc, RedisModuleString **queues, int numqueues, int flags)
{
BlockedClientData *bcd = RedisModule_Alloc(sizeof(*bcd));
bcd->bc = bc;
bcd->flags = flags;
bcd->numqueues = numqueues;
bcd->queues = RedisModule_Alloc(sizeof(RedisModuleString*)*numqueues);
for (int j = 0; j < numqueues; j++) {
bcd->queues[j] = queues[j];
RedisModule_RetainString(ctx,queues[j]);
}
return bcd;
}
/* Free the object returned by createBlockedClientData(). */
void freeBlockedClientData(RedisModuleCtx *ctx, BlockedClientData *bcd) {
for (int j = 0; j < bcd->numqueues; j++)
RedisModule_FreeString(ctx,bcd->queues[j]);
RedisModule_Free(bcd->queues);
RedisModule_Free(bcd);
}
/* Remove the client from all the queues it is blocking for.
* Also remove the client from the module dictionary of blocked
* clients, and free the associated data structures. */
void cleanupClientBlockedForJobs(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc) {
BlockedClientData *bcd = raxFind(BlockedClients,(unsigned char*)&bc,sizeof(bc));
for (int j = 0; j < bcd->numqueues; j++) {
size_t qnamelen;
const char *qname = RedisModule_StringPtrLen(bcd->queues[j],&qnamelen);
queue *q = lookupQueue(qname,qnamelen);
RedisModule_Assert(q != NULL);
listDelNode(q->clients,listSearchKey(q->clients,bc));
if (listLength(q->clients) == 0) {
listRelease(q->clients);
q->clients = NULL;
GCQueue(q,QUEUE_MAX_IDLE_TIME);
}
}
freeBlockedClientData(ctx,bcd);
raxRemove(BlockedClients,(unsigned char*)&bc,sizeof(bc),NULL);
}
/* Handle blocking if GETJOB found no jobs in the specified queues.
*
* 1) We set q->clients to the list of clients blocking for this queue
* (the value of the list items is a BlockedClientData structure).
* 2) We set BlockedClients as well, as a dictionary of queues a client
* is blocked for. So we can resolve queues from clients. This is useful
* in order to clear the blocked client list when the client
* disconnects.
* 3) When elements are added to queues with blocked clients, we call
* signalQueueAsReady(), that will send data to clients blocked in such
* queues.
*/
void blockForJobs(RedisModuleCtx *ctx, RedisModuleString **queues, int numqueues, mstime_t timeout, uint64_t flags) {
RedisModuleBlockedClient *bc;
BlockedClientData *bcd;
bc = RedisModule_BlockClient(ctx,NULL,getjobClientReply,NULL,timeout);
bcd = createBlockedClientData(ctx,bc,queues,numqueues,flags);
RedisModule_SetDisconnectCallback(bc,cleanupClientBlockedForJobs);
raxInsert(BlockedClients,(unsigned char*)&bc,sizeof(bc),bcd,NULL);
/* Create the queues that do not exist yet, add the client to them.
* XXX FIXME: to block to the same queue multiple times should not be
* allowed. */
for (int j = 0; j < numqueues; j++) {
size_t qnamelen;
const char *qname = RedisModule_StringPtrLen(queues[j],&qnamelen);
queue *q = lookupQueue(qname,qnamelen);
if (!q) q = createQueue(qname,qnamelen);
/* Add this client to the list of clients in the queue. */
if (q->clients == NULL) q->clients = listCreate();
listAddNodeTail(q->clients,bc);
}
}
/* Send data to the clients blocked on the specified queue. */
void handleClientsBlockedOnQueue(RedisModuleCtx *ctx, queue *q) {
int numclients = listLength(q->clients);
while(numclients--) {
unsigned long qlen;
listNode *ln = listFirst(q->clients);
RedisModuleBlockedClient *bc = ln->value;
job *j = queueFetchJob(ctx,q,&qlen);
if (!j) return; /* There are no longer jobs in this queue. */
if (qlen == 0) needJobsForQueue(ctx,q,NEEDJOBS_REACHED_ZERO);
BlockedClientData *bcd = raxFind(BlockedClients,(unsigned char*)&bc,sizeof(bc));
RedisModuleCtx *tsc = RedisModule_GetThreadSafeContext(bc);
RedisModule_ReplyWithArray(tsc,1);
addReplyJob(tsc,j,bcd->flags);
RedisModule_FreeThreadSafeContext(tsc);
RedisModule_UnblockClient(bc,NULL);
cleanupClientBlockedForJobs(ctx,bc);
}
}
/* Unblock clients waiting for a given queue if it received messages. */
void signalQueueAsReady(RedisModuleCtx *ctx, queue *q) {
if (q->clients == NULL || listLength(q->clients) == 0) return;
handleClientsBlockedOnQueue(ctx,q);
}
/* We need to scan blocked clients periodically in order to send GETJOB messages
* around about this queue: other nodes may have messages for the clients
* blocked here in this node. */
int clientsCronSendNeedJobs(RedisModuleCtx *ctx) {
raxIterator ri;
raxStart(&ri,BlockedClients);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
BlockedClientData *bcd = ri.data;
for (int j = 0; j < bcd->numqueues; j++) {
needJobsForQueueName(ctx,bcd->queues[j],NEEDJOBS_CLIENTS_WAITING);
}
}
return 0;
}
/* ------------------------------ Federation -------------------------------- */
/* Return a very rough estimate of the current import message rate.
* Imported messages are messaged received as NEEDJOBS replies. */
#define IMPORT_RATE_WINDOW 5000 /* 5 seconds max window. */
uint32_t getQueueImportRate(queue *q) {
time_t now_ms = mstime();
double elapsed = now_ms - q->prev_import_jobs_time;
double messages = (double)q->prev_import_jobs_count +
q->current_import_jobs_count;
/* If we did not received any message in the latest few seconds,
* consider the import rate zero. */
if ((now_ms - q->current_import_jobs_time) > IMPORT_RATE_WINDOW)
return 0;
/* Min interval is 50 ms in order to never overestimate. */
if (elapsed < 50) elapsed = 50;
return ceil((double)messages*1000/elapsed);
}
/* Called every time we import a job, this will update our counters
* and state in order to update the import/sec estimate. */
void updateQueueImportRate(queue *q) {
time_t now_ms = mstime();
/* If the current second no longer matches the current counter
* timestamp, copy the old timestamp/counter into 'prev', and
* start a new counter with an updated time. */
if (now_ms - q->current_import_jobs_time > 1000) {
q->prev_import_jobs_time = q->current_import_jobs_time;
q->prev_import_jobs_count = q->current_import_jobs_count;
q->current_import_jobs_time = now_ms;
q->current_import_jobs_count = 0;
}
/* Anyway, update the current counter. */
q->current_import_jobs_count++;
}
/* Check the queue source nodes list (nodes that replied with jobs to our
* NEEDJOBS request), purge the ones that timed out, and return the number
* of sources which are still valid. */
unsigned long getQueueValidResponders(queue *q) {
time_t now = time(NULL);
if (q->needjobs_responders == NULL ||
raxSize(q->needjobs_responders) == 0) return 0;
raxIterator ri;
raxStart(&ri,q->needjobs_responders);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
time_t lastmsg = (long)ri.data;
if (now-lastmsg > 5) {
raxRemove(q->needjobs_responders,ri.key,ri.key_len,NULL);
raxSeek(&ri,">",ri.key,ri.key_len);
}
}
raxStop(&ri);
return raxSize(q->needjobs_responders);
}
/* This function is called every time we realize we need jobs for a given
* queue, because we have clients blocked into this queue which are currently
* empty.
*
* Calling this function may result into NEEDJOBS messages send to specific
* nodes that are remembered as potential sources for messages in this queue,
* or more rarely into NEEDJOBS messages to be broadcast cluster-wide in
* order to discover new nodes that may be source of messages.
*
* This function in called in two different contests:
*
* 1) When a client attempts to fetch messages for an empty queue.
* 2) From time to time for every queue we have clients blocked into.
* 3) When a queue reaches 0 jobs since the last was fetched.
*
* When called in case 1 and 2, type is set to NEEDJOBS_CLIENTS_WAITING,
* for case 3 instead type is set to NEEDJOBS_REACHED_ZERO, and in this
* case the node may send a NEEDJOBS message to the set of known sources
* for this queue, regardless of needjobs_adhoc_time value (that is, without
* trying to throttle the requests, since there is an active flow of messages
* between this node and source nodes).
*/
/* Min and max amount of jobs we request to other nodes. */
#define NEEDJOBS_MIN_REQUEST 5
#define NEEDJOBS_MAX_REQUEST 100
#define NEEDJOBS_BCAST_ALL_MIN_DELAY 2000 /* 2 seconds. */
#define NEEDJOBS_BCAST_ALL_MAX_DELAY 30000 /* 30 seconds. */
#define NEEDJOBS_BCAST_ADHOC_MIN_DELAY 25 /* 25 milliseconds. */
#define NEEDJOBS_BCAST_ADHOC_MAX_DELAY 2000 /* 2 seconds. */
void needJobsForQueue(RedisModuleCtx *ctx, queue *q, int type) {
uint32_t import_per_sec; /* Jobs import rate in the latest secs. */
uint32_t to_fetch; /* Number of jobs we should try to obtain. */
unsigned long num_responders = 0;
mstime_t bcast_delay, adhoc_delay;
mstime_t now = mstime();
/* Don't ask for jobs if we are leaving the cluster. */
if (myselfLeaving()) return;
import_per_sec = getQueueImportRate(q);
/* When called with NEEDJOBS_REACHED_ZERO, we have to do something only
* if there is some active traffic, in order to improve latency.
* Otherwise we wait for the first client to block, that will trigger
* a new call to this function, but with NEEDJOBS_CLIENTS_WAITING type. */
if (type == NEEDJOBS_REACHED_ZERO && import_per_sec == 0) return;
/* Guess how many replies we need from each node. If we already have
* a list of sources, assume that each source is capable of providing
* some message. */
num_responders = getQueueValidResponders(q);
to_fetch = NEEDJOBS_MIN_REQUEST;
if (num_responders > 0)
to_fetch = import_per_sec / num_responders;
/* Trim number of jobs to request to min/max values. */
if (to_fetch < NEEDJOBS_MIN_REQUEST) to_fetch = NEEDJOBS_MIN_REQUEST;
else if (to_fetch > NEEDJOBS_MAX_REQUEST) to_fetch = NEEDJOBS_MAX_REQUEST;
/* Broadcast the message cluster from time to time.
* We use exponential intervals (with a max time limit) */
bcast_delay = NEEDJOBS_BCAST_ALL_MIN_DELAY *
(1 << q->needjobs_bcast_attempt);
if (bcast_delay > NEEDJOBS_BCAST_ALL_MAX_DELAY)
bcast_delay = NEEDJOBS_BCAST_ALL_MAX_DELAY;
if (now - q->needjobs_bcast_time > bcast_delay) {
q->needjobs_bcast_time = now;
q->needjobs_bcast_attempt++;
/* Cluster-wide broadcasts are just to discover nodes,
* ask for a single job in this case. */
clusterSendNeedJobs(ctx,q->name,1,NULL);
}
/* If the queue reached zero, or if the delay elapsed and we
* have at least a source node, send an ad-hoc message to
* nodes known to be sources for this queue.
*
* We use exponential delays here as well (but don't care about
* the delay if the queue just dropped to zero), however with
* much shorter times compared to the cluster-wide broadcast. */
adhoc_delay = NEEDJOBS_BCAST_ADHOC_MIN_DELAY *
(1 << q->needjobs_adhoc_attempt);
if (adhoc_delay > NEEDJOBS_BCAST_ADHOC_MAX_DELAY)
adhoc_delay = NEEDJOBS_BCAST_ADHOC_MAX_DELAY;
if ((type == NEEDJOBS_REACHED_ZERO ||
now - q->needjobs_adhoc_time > adhoc_delay) &&
num_responders > 0)
{
q->needjobs_adhoc_time = now;
q->needjobs_adhoc_attempt++;
clusterSendNeedJobs(ctx,q->name,to_fetch,q->needjobs_responders);
}
}
/* needJobsForQueue() wrapper taking a queue name instead of a queue
* structure. The queue will be created automatically if non existing. */
void needJobsForQueueName(RedisModuleCtx *ctx, RedisModuleString *qname, int type) {
size_t namelen;
const char *name = RedisModule_StringPtrLen(qname,&namelen);
queue *q = lookupQueue(name,namelen);
/* Create the queue if it does not exist. We need the queue structure
* to store meta-data needed to broadcast NEEDJOBS messages anyway. */
if (!q) q = createQueue(name,namelen);
needJobsForQueue(ctx,q,type);
}
/* Called from cluster.c when a YOURJOBS message is received. */
void receiveYourJobs(RedisModuleCtx *ctx, const char *node, uint32_t numjobs, unsigned char *serializedjobs, uint32_t serializedlen) {
queue *q;
uint32_t j;
unsigned char *nextjob = serializedjobs;
for (j = 0; j < numjobs; j++) {
uint32_t remlen = serializedlen - (nextjob-serializedjobs);
job *job, *sj = deserializeJob(ctx,nextjob,remlen,&nextjob,SER_MESSAGE);
if (sj == NULL) {
RedisModule_Log(ctx,"warning",
"The %d-th job received via YOURJOBS from %.40s is corrupted.",
(int)j+1, node);
return;
}
/* If the job does not exist, we need to add it to our jobs.
* Otherwise just get a reference to the job we already have
* in memory and free the deserialized one. */
job = lookupJob(sj->id);
if (job) {
freeJob(sj);
} else {
job = sj;
job->state = JOB_STATE_ACTIVE;
registerJob(job);
}
/* Don't need to send QUEUED when adding this job into the queue,
* we are just moving from the queue of one node to another. */
job->flags &= ~JOB_FLAG_BCAST_QUEUED;
/* If we are receiving a job with retry set to 0, let's set
* job->qtime to non-zero, to force enqueueJob() to queue the job
* the first time. As a side effect the function will set the qtime
* value to 0, preventing a successive enqueue of the job */
if (job->retry == 0)
job->qtime = mstime(); /* Any value will do. */
if (enqueueJob(ctx,job,0) == C_ERR) continue;
/* Update queue stats needed to optimize nodes federation. */
q = lookupQueue(job->queue,sdslen(job->queue));
if (!q) q = createQueue(job->queue,sdslen(job->queue));
if (q->needjobs_responders == NULL)
q->needjobs_responders = raxNew();
if (raxInsert(q->needjobs_responders,
(unsigned char*)node, REDISMODULE_NODE_ID_LEN,
(void*)(long)time(NULL), NULL))
{
/* That's a new node! We reset the broadcast attempt counter, that
* will model the delay to wait before every cluster-wide
* broadcast, every time we receive jobs from a node not already
* known as a source. */
q->needjobs_bcast_attempt = 0;
}
updateQueueImportRate(q);
q->needjobs_adhoc_attempt = 0;
}
}
/* Called from cluster.c when a NEEDJOBS message is received. */
void receiveNeedJobs(RedisModuleCtx *ctx, const char *node, const char *qname, size_t qnamelen, uint32_t count) {
queue *q = lookupQueue(qname,qnamelen);
unsigned long qlen = queueLength(q);
uint32_t replyjobs = count; /* Number of jobs we are willing to provide. */
uint32_t j;
/* Ignore requests for jobs if:
* 1) No such queue here, or queue is empty.
* 2) We are actively importing jobs ourselves for this queue. */
if (qlen == 0 || getQueueImportRate(q) > 0) return;
/* Ignore request if queue is paused in output. */
if (q->flags & QUEUE_FLAG_PAUSED_OUT) return;
/* To avoid that a single node is able to deplete our queue easily,
* we provide the number of jobs requested only if we have more than
* 2 times what it requested. Otherwise we provide at max half the jobs
* we have, but always at least a single job. */
if (qlen < count*2) replyjobs = qlen/2;
if (replyjobs == 0) replyjobs = 1;
job *jobs[NEEDJOBS_MAX_REQUEST];
for (j = 0; j < replyjobs; j++) {
jobs[j] = queueFetchJob(ctx,q,NULL);
RedisModule_Assert(jobs[j] != NULL);
}
clusterSendYourJobs(ctx,node,jobs,replyjobs);
/* It's possible that we sent jobs with retry=0. Remove them from
* the local node since to take duplicates does not make sense for
* jobs having the replication level of 1 by contract. */
for (j = 0; j < replyjobs; j++) {
job *job = jobs[j];
if (job->retry == 0) {
unregisterJob(ctx,job);
freeJob(job);
}
}
}
/* ------------------------------ Queue pausing -----------------------------
*
* There is very little here since pausing a queue is basically just changing
* its flags. Then what changing the PAUSE flags means, is up to the different
* parts of Disque implementing the behavior of queues. */
/* Changes the paused state of the queue and handles serving again blocked
* clients if needed.
*
* 'flag' must be QUEUE_FLAG_PAUSED_IN or QUEUE_FLAG_PAUSED_OUT
* 'set' is true if we have to set this state or 0 if we have
* to clear this state. */
void queueChangePausedState(RedisModuleCtx *ctx, queue *q, int flag, int set) {
uint32_t orig_flags = q->flags;
if (set) q->flags |= flag;
else q->flags &= ~flag;
if ((orig_flags & QUEUE_FLAG_PAUSED_OUT) &&
!(q->flags & QUEUE_FLAG_PAUSED_OUT))
{
signalQueueAsReady(ctx,q);
}
}
/* Called from cluster.c when a PAUSE message is received. */
void receivePauseQueue(RedisModuleCtx *ctx, const char *qname, size_t qnamelen, uint32_t flags) {
queue *q = lookupQueue(qname,qnamelen);
/* If the queue does not exist, and flags are cleared, there is nothing
* to do. Otherwise we have to create the queue. */
if (!q) {
if (flags == 0) return;
q = createQueue(qname,qnamelen);
}
/* Replicate the sender pause flag in our queue. */
queueChangePausedState(ctx,q,QUEUE_FLAG_PAUSED_IN,
(flags & QUEUE_FLAG_PAUSED_IN) != 0);
queueChangePausedState(ctx,q,QUEUE_FLAG_PAUSED_OUT,
(flags & QUEUE_FLAG_PAUSED_OUT) != 0);
}
/* Return the string "in", "out", "all" or "none" depending on the paused
* state of the specified queue flags. */
char *queueGetPausedStateString(uint32_t qflags) {
qflags &= QUEUE_FLAG_PAUSED_ALL;
if (qflags == QUEUE_FLAG_PAUSED_ALL) {
return "all";
} else if (qflags == QUEUE_FLAG_PAUSED_IN) {
return "in";
} else if (qflags == QUEUE_FLAG_PAUSED_OUT) {
return "out";
} else {
return "none";
}
}
/* This function completely deletes all the data in Disque: jobs and queues.
* It has no effects in the persistence, so it's up to the caller to
* replicate the command to flush all the data if needed. */
void flushAllJobsAndQueues(RedisModuleCtx *ctx) {
/* Free all the jobs. as a side effect this should also unblock
* all the clients blocked in jobs that are being replicated. */
raxIterator ri;
raxStart(&ri,Jobs);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
job *job = ri.data;
unregisterJob(ctx,job);
freeJob(job);
raxSeek(&ri,">",ri.key,ri.key_len);
}
raxStop(&ri);
/* Unblock all the clients blocked on queues: there will be no
* queues soon. */
raxStart(&ri,BlockedClients);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
RedisModuleBlockedClient *bc;
memcpy(&bc,ri.key,sizeof(bc));
RedisModuleCtx *tsc = RedisModule_GetThreadSafeContext(bc);
RedisModule_ReplyWithNull(tsc);
RedisModule_FreeThreadSafeContext(tsc);
cleanupClientBlockedForJobs(ctx,bc);
raxSeek(&ri,">",ri.key,ri.key_len);
}
raxStop(&ri);
/* Destroy all the queues. */
raxStart(&ri,Queues);
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
destroyQueue((char*)ri.key,ri.key_len);
raxSeek(&ri,">",ri.key,ri.key_len);
}
raxStop(&ri);
}
/* ------------------------- Queue related commands ------------------------- */
/* QLEN <qname> -- Return the number of jobs queued. */
int qlenCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 2) return RedisModule_WrongArity(ctx);
size_t qnamelen;
const char *qname = RedisModule_StringPtrLen(argv[1],&qnamelen);
RedisModule_ReplyWithLongLong(ctx,queueNameLength(qname,qnamelen));
return REDISMODULE_OK;
}
/* GETJOB [NOHANG] [TIMEOUT <ms>] [COUNT <count>] FROM <qname1>
* <qname2> ... <qnameN>.
*
* Get jobs from the specified queues. By default COUNT is 1, so just one
* job will be returned. If there are no jobs in any of the specified queues
* the command will block.
*
* When there are jobs in more than one of the queues, the command guarantees
* to return jobs in the order the queues are specified. If COUNT allows
* more jobs to be returned, queues are scanned again and again in the same
* order popping more elements. */
int getjobCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc < 2) return RedisModule_WrongArity(ctx);
mstime_t timeout = 0; /* Block forever by default. */
long long count = 1, emitted_jobs = 0;
int nohang = 0; /* Don't block even if all the queues are empty. */
int withcounters = 0; /* Also return NACKs and deliveries counters. */
RedisModuleString **queues = NULL;
int j, numqueues = 0;
/* Parse args. */
for (j = 1; j < argc; j++) {
const char *opt = RedisModule_StringPtrLen(argv[j],NULL);
int lastarg = j == argc-1;
if (!strcasecmp(opt,"nohang")) {
nohang = 1;
} else if (!strcasecmp(opt,"withcounters")) {
withcounters = 1;
} else if (!strcasecmp(opt,"timeout") && !lastarg) {
if (getTimeoutFromObjectOrReply(ctx,argv[j+1],&timeout,
UNIT_MILLISECONDS) != C_OK) return REDISMODULE_OK;
j++;
} else if (!strcasecmp(opt,"count") && !lastarg) {
int retval = RedisModule_StringToLongLong(argv[j+1],&count);
if (retval != REDISMODULE_OK || count <= 0) {
RedisModule_ReplyWithError(ctx,
"ERR COUNT must be a number greater than zero");
return REDISMODULE_OK;
}
j++;
} else if (!strcasecmp(opt,"from")) {
queues = argv+j+1;
numqueues = argc - j - 1;
break; /* Don't process options after this. */
} else {
return RedisModule_ReplyWithError(ctx,
"ERR Unrecognized option given");
}
}
/* FROM is mandatory. */
if (queues == NULL || numqueues == 0) {
return RedisModule_ReplyWithError(ctx,"ERR FROM is mandatory");
}
/* First: try to avoid blocking if there is at least one job in at
* least one queue. */
while(1) {
long old_emitted = emitted_jobs;
for (j = 0; j < numqueues; j++) {
unsigned long qlen;
size_t qnamelen;
const char *qname = RedisModule_StringPtrLen(queues[j],&qnamelen);
queue *q = lookupQueue(qname,qnamelen);
job *job = NULL;
if (q && !(q->flags & QUEUE_FLAG_PAUSED_OUT))
job = queueFetchJob(ctx,q,&qlen);
if (!job) {
if (!q)
needJobsForQueueName(ctx,queues[j],NEEDJOBS_CLIENTS_WAITING);
else
needJobsForQueue(ctx,q,NEEDJOBS_CLIENTS_WAITING);
continue;
} else if (job && qlen == 0) {
needJobsForQueue(ctx,q,NEEDJOBS_REACHED_ZERO);
}
if (emitted_jobs == 0)
RedisModule_ReplyWithArray(ctx,REDISMODULE_POSTPONED_ARRAY_LEN);
addReplyJob(ctx,job,withcounters ? GETJOB_FLAG_WITHCOUNTERS :
GETJOB_FLAG_NONE);
count--;
emitted_jobs++;
if (count == 0) break;
}
/* When we reached count or when we are no longer making
* progresses (no jobs left in our queues), stop. */
if (count == 0 || old_emitted == emitted_jobs) break;
}
/* Set the array length and return if we emitted jobs. */
if (emitted_jobs) {
RedisModule_ReplySetArrayLength(ctx,emitted_jobs);
return REDISMODULE_OK;
}
/* If NOHANG was given and there are no jobs, return NULL. */
if (nohang)
return RedisModule_ReplyWithNull(ctx);
/* If this node is leaving the cluster, we can't block waiting for
* jobs: this would trigger the federation with other nodes in order
* to import jobs here. Just return a -LEAVING error. */
if (myselfLeaving())
return RedisModule_ReplyWithError(ctx,
"LEAVING this node is leaving the cluster. "
"Try another one please.");
/* If we reached this point, we need to block. */
blockForJobs(ctx,queues,numqueues,timeout,
withcounters ? GETJOB_FLAG_WITHCOUNTERS : GETJOB_FLAG_NONE);
return REDISMODULE_OK;
}
/* ENQUEUE job-id-1 job-id-2 ... job-id-N
* NACK job-id-1 job-id-2 ... job-id-N
*
* If the job is active, queue it if job retry != 0.
* If the job is in any other state, do nothing.
* If the job is not known, do nothing.
*
* NOTE: Even jobs with retry set to 0 are enqueued! Be aware that
* using this command may violate the at-most-once contract.
*
* Return the number of jobs actually move from active to queued state.
*
* The difference between ENQUEUE and NACK is that the latter will propagate
* cluster messages in a way that makes the nacks counter in the receiver
* to increment. */
int enqueueGenericCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int nack) {
int j, enqueued = 0;
if (validateJobIDs(ctx,argv+1,argc-1) == C_ERR)
return REDISMODULE_OK;
/* Enqueue all the jobs in active state. */
for (j = 1; j < argc; j++) {
const char *jobid = RedisModule_StringPtrLen(argv[j],NULL);
job *job = lookupJob(jobid);
if (job == NULL) continue;
if (job->state == JOB_STATE_ACTIVE && enqueueJob(ctx,job,nack) == C_OK)
enqueued++;
}
return RedisModule_ReplyWithLongLong(ctx,enqueued);
}
/* See enqueueGenericCommand(). */
int enqueueCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return enqueueGenericCommand(ctx,argv,argc,0);
}
/* See enqueueGenericCommand(). */
int nackCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
return enqueueGenericCommand(ctx,argv,argc,1);
}
/* DEQUEUE job-id-1 job-id-2 ... job-id-N
*
* If the job is queued, remove it from queue and change state to active.
* If the job is in any other state, do nothing.
* If the job is not known, do nothing.
*
* Return the number of jobs actually moved from queue to active state. */
int dequeueCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
int j, dequeued = 0;
if (validateJobIDs(ctx,argv+1,argc-1) == C_ERR) return REDISMODULE_OK;
/* Enqueue all the jobs in active state. */
for (j = 1; j < argc; j++) {
const char *jobid = RedisModule_StringPtrLen(argv[j],NULL);
job *job = lookupJob(jobid);
if (job == NULL) continue;
if (job->state == JOB_STATE_QUEUED && dequeueJob(job) == C_OK)
dequeued++;
}
return RedisModule_ReplyWithLongLong(ctx,dequeued);
}