Skip to content

Commit

Permalink
changed min_messaged_recipients to a more useful push_delete_oldest_r…
Browse files Browse the repository at this point in the history
…eceived_message
  • Loading branch information
slact committed Jan 27, 2010
1 parent 0846c64 commit 7a0394b
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 33 deletions.
10 changes: 5 additions & 5 deletions README
Expand Up @@ -82,12 +82,12 @@ push_message_buffer_length [ number ]
push_max_message_buffer_length and push_min_message_buffer_length to this push_max_message_buffer_length and push_min_message_buffer_length to this
value. value.


push_min_message_recipients [ number ] push_delete_oldest_received_message [ on | off ]
default: 0 default: off
context: http, server, location context: http, server, location
The number of times a message must be received before it is considered for When enabled, as soon as the oldest message in a channel's message queue has
deletion. Useful to guarantee message delivery. (This does NOT override the been received by a subscriber, it is deleted. Recommend avoiding this
push_max_message_buffer_length setting). directive as it violates subscribers' assumptions of GET request idempotence.


push_message_timeout [ time ] push_message_timeout [ time ]
default: 1h default: 1h
Expand Down
1 change: 1 addition & 0 deletions changelog.txt
@@ -1,3 +1,4 @@
change: removed push_min_message_recipients, added push_delete_oldest_received_message
0.69 (Nov. 17 2009) 0.69 (Nov. 17 2009)
fix: publisher got a 201 Created response even if the channel had no subscribers at the time (should be 202 Accepted) fix: publisher got a 201 Created response even if the channel had no subscribers at the time (should be 202 Accepted)
fix: small memory leak after each message broadcast to a channel fix: small memory leak after each message broadcast to a channel
Expand Down
34 changes: 15 additions & 19 deletions src/ngx_http_push_module.c
Expand Up @@ -74,7 +74,7 @@ static ngx_http_push_msg_t *ngx_http_push_get_latest_message_locked(ngx_http_pus
return ngx_queue_data(qmsg, ngx_http_push_msg_t, queue); return ngx_queue_data(qmsg, ngx_http_push_msg_t, queue);
} }


//shpool must be locked. No memory is freed. //shpool must be locked. No memory is freed. O(1)
static ngx_http_push_msg_t *ngx_http_push_get_oldest_message_locked(ngx_http_push_channel_t * channel) { static ngx_http_push_msg_t *ngx_http_push_get_oldest_message_locked(ngx_http_push_channel_t * channel) {
ngx_queue_t *sentinel = &channel->message_queue->queue; ngx_queue_t *sentinel = &channel->message_queue->queue;
if(ngx_queue_empty(sentinel)) { if(ngx_queue_empty(sentinel)) {
Expand Down Expand Up @@ -385,9 +385,6 @@ static ngx_int_t ngx_http_push_subscriber_handler(ngx_http_request_t *r) {
//found the message //found the message
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
msg->refcount++; // this probably isn't necessary, but i'm not thinking too straight at the moment. so just in case. msg->refcount++; // this probably isn't necessary, but i'm not thinking too straight at the moment. so just in case.
if(msg->recipients_left!=NGX_INFINITE_MESSAGE_RECIPIENTS) {
msg->recipients_left -- ;
}
NGX_HTTP_PUSH_MAKE_ETAG(msg->message_tag, etag, ngx_palloc, r->pool); NGX_HTTP_PUSH_MAKE_ETAG(msg->message_tag, etag, ngx_palloc, r->pool);
if(etag==NULL) { if(etag==NULL) {
//oh, nevermind... //oh, nevermind...
Expand Down Expand Up @@ -670,7 +667,7 @@ static void ngx_http_push_publisher_body_handler(ngx_http_request_t * r) {
time_t message_timeout = cf->buffer_timeout; time_t message_timeout = cf->buffer_timeout;
msg->expires = (message_timeout==0 ? 0 : (ngx_time() + message_timeout)); msg->expires = (message_timeout==0 ? 0 : (ngx_time() + message_timeout));


msg->recipients_left = cf->min_message_recipients == 0 ? NGX_INFINITE_MESSAGE_RECIPIENTS : cf->min_message_recipients; msg->delete_oldest_received = cf->delete_oldest_received_message;


//FMI (For My Information): shm is still locked. //FMI (For My Information): shm is still locked.
switch(ngx_http_push_broadcast_message_locked(channel, msg, r->connection->log, shpool)) { switch(ngx_http_push_broadcast_message_locked(channel, msg, r->connection->log, shpool)) {
Expand All @@ -693,10 +690,6 @@ static void ngx_http_push_publisher_body_handler(ngx_http_request_t * r) {
//update the number of times the message was received. //update the number of times the message was received.
//in the interest of premature optimization, I assume all //in the interest of premature optimization, I assume all
//current subscribers have received the message successfully. //current subscribers have received the message successfully.
if(msg->recipients_left != NGX_INFINITE_MESSAGE_RECIPIENTS) { //we care about # of times message has been received
msg->recipients_left -= subscribers;
//warning: overflow for 2^31+ subscribers -- a very unlikely number.
}
break; break;


case NGX_ERROR: case NGX_ERROR:
Expand Down Expand Up @@ -864,6 +857,19 @@ static ngx_int_t ngx_http_push_respond_to_subscribers(ngx_http_push_channel_t *c
ngx_pfree(ngx_http_push_pool, buffer); ngx_pfree(ngx_http_push_pool, buffer);
ngx_pfree(ngx_http_push_pool, chain); ngx_pfree(ngx_http_push_pool, chain);


ngx_shmtx_lock(&shpool->mutex);
//message deletion
msg->refcount--; //refcount kept mostly for internal debuggery.

if(msg->queue.next==NULL && !msg->refcount) {
//message had been dequeued and nobody needs it anymore
ngx_http_push_free_message_locked(msg, shpool);
}

if(msg->delete_oldest_received && ngx_http_push_get_oldest_message_locked(channel) == msg) {
ngx_http_push_delete_message_locked(channel, msg, shpool);
}
ngx_shmtx_unlock(&shpool->mutex);
} }
else { else {
//headers only probably //headers only probably
Expand All @@ -884,16 +890,6 @@ static ngx_int_t ngx_http_push_respond_to_subscribers(ngx_http_push_channel_t *c
ngx_shmtx_lock(&shpool->mutex); ngx_shmtx_lock(&shpool->mutex);
channel->subscribers-=responded_subscribers; channel->subscribers-=responded_subscribers;
//is the message still needed? //is the message still needed?
if(msg!=NULL && (--msg->refcount)==0) {
if(msg->recipients_left <= 0) {
//received min_message_recipients times
ngx_http_push_delete_message_locked(channel, msg, shpool);
}
if(msg->queue.next==NULL) {
//message was dequeued, and nobody needs it anymore
ngx_http_push_free_message_locked(msg, shpool);
}
}
ngx_shmtx_unlock(&shpool->mutex); ngx_shmtx_unlock(&shpool->mutex);
ngx_pfree(ngx_http_push_pool, sentinel); ngx_pfree(ngx_http_push_pool, sentinel);
return NGX_OK; return NGX_OK;
Expand Down
6 changes: 2 additions & 4 deletions src/ngx_http_push_module.h
Expand Up @@ -58,22 +58,20 @@ typedef struct {
ngx_int_t subscriber_poll_mechanism; ngx_int_t subscriber_poll_mechanism;
ngx_int_t authorize_channel; ngx_int_t authorize_channel;
ngx_int_t store_messages; ngx_int_t store_messages;
ngx_int_t min_message_recipients; ngx_int_t delete_oldest_received_message;
ngx_str_t channel_group; ngx_str_t channel_group;
ngx_int_t max_channel_id_length; ngx_int_t max_channel_id_length;
ngx_int_t max_channel_subscribers; ngx_int_t max_channel_subscribers;
} ngx_http_push_loc_conf_t; } ngx_http_push_loc_conf_t;


#define NGX_INFINITE_MESSAGE_RECIPIENTS (2^31)-1

//message queue //message queue
typedef struct { typedef struct {
ngx_queue_t queue; //this MUST be first. ngx_queue_t queue; //this MUST be first.
ngx_str_t content_type; ngx_str_t content_type;
// ngx_str_t charset; // ngx_str_t charset;
ngx_buf_t *buf; ngx_buf_t *buf;
time_t expires; time_t expires;
ngx_int_t recipients_left; //# of times message is received before it should be deleted. ngx_int_t delete_oldest_received;
time_t message_time; //tag message by time time_t message_time; //tag message by time
ngx_int_t message_tag; //used in conjunction with message_time if more than one message have the same time. ngx_int_t message_tag; //used in conjunction with message_time if more than one message have the same time.
ngx_int_t refcount; ngx_int_t refcount;
Expand Down
10 changes: 5 additions & 5 deletions src/ngx_http_push_module_setup.c
Expand Up @@ -102,7 +102,7 @@ static void * ngx_http_push_create_loc_conf(ngx_conf_t *cf) {
lcf->subscriber_poll_mechanism=NGX_CONF_UNSET; lcf->subscriber_poll_mechanism=NGX_CONF_UNSET;
lcf->authorize_channel=NGX_CONF_UNSET; lcf->authorize_channel=NGX_CONF_UNSET;
lcf->store_messages=NGX_CONF_UNSET; lcf->store_messages=NGX_CONF_UNSET;
lcf->min_message_recipients=NGX_CONF_UNSET; lcf->delete_oldest_received_message=NGX_CONF_UNSET;
lcf->max_channel_id_length=NGX_CONF_UNSET; lcf->max_channel_id_length=NGX_CONF_UNSET;
lcf->max_channel_subscribers=NGX_CONF_UNSET; lcf->max_channel_subscribers=NGX_CONF_UNSET;
lcf->channel_group.data=NULL; lcf->channel_group.data=NULL;
Expand All @@ -118,7 +118,7 @@ static char * ngx_http_push_merge_loc_conf(ngx_conf_t *cf, void *parent, void *c
ngx_conf_merge_value(conf->subscriber_poll_mechanism, prev->subscriber_poll_mechanism, NGX_HTTP_PUSH_MECHANISM_LONGPOLL); ngx_conf_merge_value(conf->subscriber_poll_mechanism, prev->subscriber_poll_mechanism, NGX_HTTP_PUSH_MECHANISM_LONGPOLL);
ngx_conf_merge_value(conf->authorize_channel, prev->authorize_channel, 0); ngx_conf_merge_value(conf->authorize_channel, prev->authorize_channel, 0);
ngx_conf_merge_value(conf->store_messages, prev->store_messages, 1); ngx_conf_merge_value(conf->store_messages, prev->store_messages, 1);
ngx_conf_merge_value(conf->min_message_recipients, prev->min_message_recipients, NGX_HTTP_PUSH_MIN_MESSAGE_RECIPIENTS); ngx_conf_merge_value(conf->delete_oldest_received_message, prev->delete_oldest_received_message, 0);
ngx_conf_merge_value(conf->max_channel_id_length, prev->max_channel_id_length, NGX_HTTP_PUSH_MAX_CHANNEL_ID_LENGTH); ngx_conf_merge_value(conf->max_channel_id_length, prev->max_channel_id_length, NGX_HTTP_PUSH_MAX_CHANNEL_ID_LENGTH);
ngx_conf_merge_value(conf->max_channel_subscribers, prev->max_channel_subscribers, 0); ngx_conf_merge_value(conf->max_channel_subscribers, prev->max_channel_subscribers, 0);
ngx_conf_merge_str_value(conf->channel_group, prev->channel_group, ""); ngx_conf_merge_str_value(conf->channel_group, prev->channel_group, "");
Expand Down Expand Up @@ -286,11 +286,11 @@ static ngx_command_t ngx_http_push_commands[] = {
0, 0,
NULL }, NULL },


{ ngx_string("push_min_message_recipients"), { ngx_string("push_delete_oldest_received_message"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1, NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot, ngx_conf_set_flag_slot,
NGX_HTTP_LOC_CONF_OFFSET, NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_loc_conf_t, min_message_recipients), offsetof(ngx_http_push_loc_conf_t, delete_oldest_received_message),
NULL }, NULL },


{ ngx_string("push_publisher"), { ngx_string("push_publisher"),
Expand Down

0 comments on commit 7a0394b

Please sign in to comment.