Skip to content

Loading…

Implementation of 'put-unique' command. #180

Open
wants to merge 1 commit into from

6 participants

@netsweng

This command can be used to solve many of the de-duplication issues
by not allowing duplicate messages in the queue to begin with. It has
no impact on the performance on the regular put command as it is
implmented as a parrallel command. Instead of using the job number
or name to try and find wither a message exists already, this command
compares the entire message body to determine uniqueness.

This code is in production using a 1.4.6 version of beanstalkd. This
is a quick forward port, which has not been thoroughly tested, but the
porting was straightforward, so there probably won't be any suprises.

@netsweng netsweng Add 'put-unique' command.
This command can be used to solve many of the de-duplication issues
by not allowing duplicate messages in the queue to begin with. It has
no impact on the performance on the regular put command as it is
implmented as a parrallel command. Instead of using the job number
or name to try and find wither a message exists already, this command
compares the entire message body to determine uniqueness.

This code is in production using a 1.4.6 version of beanstalkd. This
is a quick forward port, which has not been thoroughly tested, but the
porting was straightforward, so there probably won't be any suprises.
89e186c
@emanuelecasadio

That's nice a nice one, but wouldn't it be better just to flag a queue as a "set" (does not allow duplicates)? In this case we don't need to use a special "put".

@netsweng

It could be accomplished either way. From the perspective of an application using beanstalkd, I like having a distinct put-unique to make it more obvious of the semantics being used. If your app is complicated enough to be using both put and put-unique, then it probably would be helpful to be able to easily differentiate the calls in the code.

@andreascreten

Any chance we can get this kind of functionality implemented @kr?

@Doerge Doerge referenced this pull request in earl/beanstalkc
Open

Add put-unique #50

@svisser

+1

@tjchambers

My two cents. I have had to bandaid my infrastructure around beanstalk to support this sort of problem in both beanstalk and previously with another job queue (gearman). While put-unique would work for some applications, it doesn't work for mine if it is implemented without regard to the status of the jobs already in the queue.

If I have a job in a non-reserved state then put-unique is fine, the non-reserved job I don't want duplicated.

If however the job being compared to the put-unique job being inserted is already reserved (i.e. being processed), then I am in a bind, as my new job may still be valid (during the processing interval "something" has changed and I want to "re-process" the same data).

SO while put-unique may work for some applications, the fact that AFAICT it is not sensitive or conditional to the state of the compared to jobs means it could actually be far less useful in practice.

@svisser

It depends on the purpose of the jobs really. In my case your example doesn't apply; it's mainly to prevent duplicated work though it's not really an issue if a job is processed twice (idempotent operations).

@tjchambers

I guess I misunderstood. I thought that put-unique would not do anything if the job was already in the queue with the same payload.

In my case it is perfectly ok if they run twice (my jobs are all idempotent), and duplicate work IS what I am trying to prevent. What I can't afford to have happen is that they do not run ONCE. Which I thought would happen if a job is about complete in the queue and then the next put-unique causes the next iteration of the same job to NOT be queued. I misunderstood that it would still be queued I guess and therefore run twice.

@tjchambers

I may not be using the term 'idempotent' in the same manner as the official definition. My apologies.

In my case I can, at will, run my jobs over and over and get 'correct' results. I can run them simultaneously and get 'correct' results. What I CANNOT do is run them on todays data and expect that they will results in the same answer as tomorrows data when the input dataset changes in the interim. Like a simple sum of the number of people retweeting a tweet. Summing that right now will give me an answer. Summing it again an hour from now using the same original tweet and the same logic may result in a DIFFERENT answer. If I see a new tweet has come in and I go to queue a summarization job for the retweet total, and my original summarization job is reserved, it may be done, may not be done, is somewhere in between ... don't know. I do know that the new tweet may invalidate the original result if I don't re-sum it. So I need to queue another of the same job. If however my original summarization job is in 'ready', 'delayed', etc. state, then I know that process has not started and I do not need another to be queued as that would be duplicate effort.

@svisser

This patch sends back MSG_INSERTED_FMT when it successfully puts the message but also when it drops the message due to the uniqueness check (see the diff, prot.c, line 852: netsweng@89e186c). I think that should be changed as a client may require actions to be taken only when a message was put.

The current change may have been done for compatibility reasons with existing clients (e.g., the Python client https://github.com/earl/beanstalkc has a list of known successful responses) but I think a new message should be introduced when this feature is included in a public release.

@JalfResi

+1 @svisser We use a fork of this version of beanstalkd for this unique feature in production. Having the put-unique command tell you if a duplicate was found and no put action was taken would be incredibly useful indeed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Apr 27, 2013
  1. @netsweng

    Add 'put-unique' command.

    netsweng committed
    This command can be used to solve many of the de-duplication issues
    by not allowing duplicate messages in the queue to begin with. It has
    no impact on the performance on the regular put command as it is
    implmented as a parrallel command. Instead of using the job number
    or name to try and find wither a message exists already, this command
    compares the entire message body to determine uniqueness.
    
    This code is in production using a 1.4.6 version of beanstalkd. This
    is a quick forward port, which has not been thoroughly tested, but the
    porting was straightforward, so there probably won't be any suprises.
Showing with 91 additions and 3 deletions.
  1. +4 −1 dat.h
  2. +32 −0 job.c
  3. +55 −2 prot.c
View
5 dat.h
@@ -138,7 +138,8 @@ struct job {
Jobrec r; // persistent fields; these get written to the wal
/* bookeeping fields; these are in-memory only */
- char pad[6];
+ char pad[5];
+ char unique;
tube tube;
job prev, next; /* linked list of jobs */
job ht_next; /* Next job in a hash table list */
@@ -203,11 +204,13 @@ void job_free(job j);
/* Lookup a job by job ID */
job job_find(uint64 job_id);
+job job_find_by_body(job j);
/* the void* parameters are really job pointers */
void job_setheappos(void*, int);
int job_pri_less(void*, void*);
int job_delay_less(void*, void*);
+int job_body_cmp(job a, job b);
job job_copy(job j);
View
32 job.c
@@ -84,6 +84,27 @@ job_find(uint64 job_id)
}
job
+job_find_by_body(job j)
+{
+ job jh = NULL;
+ int index;
+
+ for (index = 0, jh = all_jobs[index];
+ index < all_jobs_cap-1;
+ index++, jh = all_jobs[index]) {
+ do {
+ if( jh &&
+ (jh->r.state != Invalid) &&
+ (job_body_cmp(j,jh) == 0) ) {
+ return jh;
+ }
+ } while( jh && ((jh=jh->ht_next) != NULL) );
+ }
+
+ return jh;
+}
+
+job
allocate_job(int body_size)
{
job j;
@@ -172,6 +193,17 @@ job_delay_less(void *ax, void *bx)
return a->r.id < b->r.id;
}
+int
+job_body_cmp(job a, job b)
+{
+ int bsize = min(a->r.body_size, b->r.body_size);
+
+ if (a->r.body_size > b->r.body_size) return 1;
+ if (a->r.body_size < b->r.body_size) return -1;
+ if (a->tube != b->tube) return -1;
+ return memcmp(a->body, b->body, bsize);
+}
+
job
job_copy(job j)
{
View
57 prot.c
@@ -24,6 +24,7 @@ size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
"0123456789-+/;.$_()"
#define CMD_PUT "put "
+#define CMD_PUT_UNIQUE "put-unique "
#define CMD_PEEKJOB "peek "
#define CMD_PEEK_READY "peek-ready"
#define CMD_PEEK_DELAYED "peek-delayed"
@@ -136,7 +137,8 @@ size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
#define OP_QUIT 22
#define OP_PAUSE_TUBE 23
#define OP_JOBKICK 24
-#define TOTAL_OPS 25
+#define OP_PUT_UNIQUE 25
+#define TOTAL_OPS 26
#define STATS_FMT "---\n" \
"current-jobs-urgent: %u\n" \
@@ -145,6 +147,7 @@ size_t job_data_size_limit = JOB_DATA_SIZE_LIMIT_DEFAULT;
"current-jobs-delayed: %u\n" \
"current-jobs-buried: %u\n" \
"cmd-put: %" PRIu64 "\n" \
+ "cmd-put-unique: %" PRIu64 "\n" \
"cmd-peek: %" PRIu64 "\n" \
"cmd-peek-ready: %" PRIu64 "\n" \
"cmd-peek-delayed: %" PRIu64 "\n" \
@@ -273,6 +276,7 @@ static const char * op_names[] = {
CMD_QUIT,
CMD_PAUSE_TUBE,
CMD_JOBKICK,
+ CMD_PUT_UNIQUE,
};
static job remove_buried_job(job j);
@@ -739,6 +743,7 @@ static int
which_cmd(Conn *c)
{
#define TEST_CMD(s,c,o) if (strncmp((s), (c), CONSTSTRLEN(c)) == 0) return (o);
+ TEST_CMD(c->cmd, CMD_PUT_UNIQUE, OP_PUT_UNIQUE);
TEST_CMD(c->cmd, CMD_PUT, OP_PUT);
TEST_CMD(c->cmd, CMD_PEEKJOB, OP_PEEKJOB);
TEST_CMD(c->cmd, CMD_PEEK_READY, OP_PEEK_READY);
@@ -822,7 +827,7 @@ static void
enqueue_incoming_job(Conn *c)
{
int r;
- job j = c->in_job;
+ job j = c->in_job, j2=NULL;
c->in_job = NULL; /* the connection no longer owns this job */
c->in_job_read = 0;
@@ -842,6 +847,11 @@ enqueue_incoming_job(Conn *c)
return reply_serr(c, MSG_DRAINING);
}
+ if ( j->unique && (j2=job_find_by_body(j)) != NULL ) {
+ job_free(j);
+ return reply_line(c, STATE_SENDWORD, MSG_INSERTED_FMT, j2->r.id);
+ }
+
if (j->walresv) return reply_serr(c, MSG_INTERNAL_ERROR);
j->walresv = walresvput(&c->srv->wal, j);
if (!j->walresv) return reply_serr(c, MSG_OUT_OF_MEMORY);
@@ -891,6 +901,7 @@ fmt_stats(char *buf, size_t size, void *x)
get_delayed_job_ct(),
global_stat.buried_ct,
op_ct[OP_PUT],
+ op_ct[OP_PUT_UNIQUE],
op_ct[OP_PEEKJOB],
op_ct[OP_PEEK_READY],
op_ct[OP_PEEK_DELAYED],
@@ -1253,6 +1264,48 @@ dispatch_cmd(Conn *c)
maybe_enqueue_incoming_job(c);
break;
+ case OP_PUT_UNIQUE:
+ r = read_pri(&pri, c->cmd + 10, &delay_buf);
+ if (r) return reply_msg(c, MSG_BAD_FORMAT);
+
+ r = read_delay(&delay, delay_buf, &ttr_buf);
+ if (r) return reply_msg(c, MSG_BAD_FORMAT);
+
+ r = read_ttr(&ttr, ttr_buf, &size_buf);
+ if (r) return reply_msg(c, MSG_BAD_FORMAT);
+
+ errno = 0;
+ body_size = strtoul(size_buf, &end_buf, 10);
+ if (errno) return reply_msg(c, MSG_BAD_FORMAT);
+
+ op_ct[type]++;
+
+ if (body_size > job_data_size_limit) {
+ /* throw away the job body and respond with JOB_TOO_BIG */
+ return skip(c, body_size + 2, MSG_JOB_TOO_BIG);
+ }
+
+ /* don't allow trailing garbage */
+ if (end_buf[0] != '\0') return reply_msg(c, MSG_BAD_FORMAT);
+
+ connsetproducer(c);
+
+ c->in_job = make_job(pri, delay, ttr ? : 1, body_size + 2, c->use);
+ c->in_job->unique = 1;
+
+ /* OOM? */
+ if (!c->in_job) {
+ /* throw away the job body and respond with OUT_OF_MEMORY */
+ twarnx("server error: " MSG_OUT_OF_MEMORY);
+ return skip(c, body_size + 2, MSG_OUT_OF_MEMORY);
+ }
+
+ fill_extra_data(c);
+
+ /* it's possible we already have a complete job */
+ maybe_enqueue_incoming_job(c);
+
+ break;
case OP_PEEK_READY:
/* don't allow trailing garbage */
if (c->cmd_len != CMD_PEEK_READY_LEN + 2) {
Something went wrong with that request. Please try again.