Skip to content

Commit

Permalink
split push_message_buffer_length into push_min_message_buffer_length …
Browse files Browse the repository at this point in the history
…and push_max_message_buffer_length
  • Loading branch information
slact committed Oct 17, 2009
1 parent 7587b85 commit 2878765
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 7 deletions.
13 changes: 9 additions & 4 deletions README
Expand Up @@ -41,12 +41,17 @@ push_max_reserved_memory [ size ]
The size of the memory chunk this module will use for all message queuing
and buffering.

push_message_buffer_length [ number ]
push_min_message_buffer_length [ number ]
default: 5
context: http, server, location
The maximum number of messages to store per channel. Old messages are removed
when a channel's message buffer length exceeds this setting. Set to 0 to
disable buffering.
The minimum number of messages to store per channel. A channel's message
buffer, will cointain at least this many most recent messages.

push_max_message_buffer_length [ number ]
default: 255
context: http, server, location
The minimum number of messages to store per channel. A channel's message
buffer, will cointain at least this many most recent messages.

push_min_message_recipients [ number ]
default: 0
Expand Down
7 changes: 7 additions & 0 deletions src/ngx_http_push_module.c
Expand Up @@ -79,6 +79,7 @@ static ngx_inline void ngx_http_push_delete_message(ngx_slab_pool_t *shpool, ngx

// remove a message from queue and free all associated memory. assumes shpool is already locked.
static ngx_inline void ngx_http_push_delete_message_locked(ngx_slab_pool_t *shpool, ngx_http_push_node_t *node, ngx_http_push_msg_t *msg) {
if (msg==NULL) { return; }
if(node!=NULL) {
ngx_queue_remove(&msg->queue);
node->message_queue_size--;
Expand Down Expand Up @@ -461,10 +462,16 @@ static void ngx_http_push_sender_body_handler(ngx_http_request_t * r) {

//now see if the queue is too big -- we do this at the end because message queue size may be set to zero, and we don't want special-case code for that.
if(node->message_queue_size > (ngx_uint_t) cf->max_message_queue_size) {
//exceeeds max queue size. force-delete oldest message
ngx_http_push_delete_message(shpool, node, ngx_http_push_get_oldest_message_locked(node));
}
if(node->message_queue_size > (ngx_uint_t) cf->min_message_queue_size) {
//exceeeds min queue size. maybe delete the oldest message
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_msg_t *msg = ngx_http_push_get_oldest_message_locked(node);
NGX_HTTP_PUSH_SENDER_CHECK_LOCKED(msg, NULL, r, "push module: oldest message not found", shpool);
if(msg->received >= (ngx_uint_t) cf->min_message_recipients) {
//received more than min_message_recipients times
ngx_http_push_delete_message_locked(shpool, node, msg);
}
ngx_shmtx_unlock(&shpool->mutex);
Expand Down
4 changes: 3 additions & 1 deletion src/ngx_http_push_module.h
Expand Up @@ -6,6 +6,7 @@
typedef struct {
ngx_int_t index;
time_t buffer_timeout;
ngx_int_t min_message_queue_size;
ngx_int_t max_message_queue_size;
ngx_int_t listener_concurrency;
ngx_int_t listener_poll_mechanism;
Expand All @@ -16,7 +17,8 @@ typedef struct {

#define NGX_HTTP_PUSH_DEFAULT_SHM_SIZE 3145728 //3 megs
#define NGX_HTTP_PUSH_DEFAULT_BUFFER_TIMEOUT 3600
#define NGX_HTTP_PUSH_DEFAULT_MESSAGE_QUEUE_SIZE 5
#define NGX_HTTP_PUSH_DEFAULT_MIN_MESSAGE_QUEUE_SIZE 5
#define NGX_HTTP_PUSH_DEFAULT_MAX_MESSAGE_QUEUE_SIZE 255
ngx_str_t NGX_HTTP_PUSH_CACHE_CONTROL_VALUE = ngx_string("no-cache");

#define NGX_HTTP_PUSH_LISTENER_LASTIN 0
Expand Down
13 changes: 11 additions & 2 deletions src/ngx_http_push_module_setup.c
Expand Up @@ -14,7 +14,14 @@ static ngx_command_t ngx_http_push_commands[] = {
offsetof(ngx_http_push_main_conf_t, shm_size),
NULL },

{ ngx_string("push_message_buffer_length"),
{ ngx_string("push_min_message_buffer_length"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
NGX_HTTP_LOC_CONF_OFFSET,
offsetof(ngx_http_push_loc_conf_t, min_message_queue_size),
NULL },

{ ngx_string("push_max_message_buffer_length"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
NGX_HTTP_LOC_CONF_OFFSET,
Expand Down Expand Up @@ -183,6 +190,7 @@ static void * ngx_http_push_create_loc_conf(ngx_conf_t *cf) {
}
lcf->buffer_timeout=NGX_CONF_UNSET;
lcf->max_message_queue_size=NGX_CONF_UNSET;
lcf->min_message_queue_size=NGX_CONF_UNSET;
lcf->listener_concurrency=NGX_CONF_UNSET;
lcf->listener_poll_mechanism=NGX_CONF_UNSET;
lcf->authorize_channel=NGX_CONF_UNSET;
Expand All @@ -194,7 +202,8 @@ static void * ngx_http_push_create_loc_conf(ngx_conf_t *cf) {
static char * ngx_http_push_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) {
ngx_http_push_loc_conf_t *prev = parent, *conf = child;
ngx_conf_merge_sec_value(conf->buffer_timeout, prev->buffer_timeout, NGX_HTTP_PUSH_DEFAULT_BUFFER_TIMEOUT);
ngx_conf_merge_value(conf->max_message_queue_size, prev->max_message_queue_size, NGX_HTTP_PUSH_DEFAULT_MESSAGE_QUEUE_SIZE);
ngx_conf_merge_value(conf->max_message_queue_size, prev->max_message_queue_size, NGX_HTTP_PUSH_DEFAULT_MAX_MESSAGE_QUEUE_SIZE);
ngx_conf_merge_value(conf->min_message_queue_size, prev->min_message_queue_size, NGX_HTTP_PUSH_DEFAULT_MIN_MESSAGE_QUEUE_SIZE);
ngx_conf_merge_value(conf->listener_concurrency, prev->listener_concurrency, NGX_HTTP_PUSH_LISTENER_LASTIN);
ngx_conf_merge_value(conf->listener_poll_mechanism, prev->listener_poll_mechanism, NGX_HTTP_PUSH_LISTENER_LONGPOLL);
ngx_conf_merge_value(conf->authorize_channel, prev->authorize_channel, 0);
Expand Down

0 comments on commit 2878765

Please sign in to comment.