Permalink
Browse files

split push_message_buffer_length into push_min_message_buffer_length …

…and push_max_message_buffer_length
  • Loading branch information...
1 parent 7587b85 commit 2878765640ae8641babb91e22b2b143df34b9c81 @slact committed Oct 17, 2009
Showing with 30 additions and 7 deletions.
  1. +9 −4 README
  2. +7 −0 src/ngx_http_push_module.c
  3. +3 −1 src/ngx_http_push_module.h
  4. +11 −2 src/ngx_http_push_module_setup.c
View
13 README
@@ -41,12 +41,17 @@ push_max_reserved_memory [ size ]
The size of the memory chunk this module will use for all message queuing
and buffering.
-push_message_buffer_length [ number ]
+push_min_message_buffer_length [ number ]
default: 5
context: http, server, location
- The maximum number of messages to store per channel. Old messages are removed
- when a channel's message buffer length exceeds this setting. Set to 0 to
- disable buffering.
+ The minimum number of messages to store per channel. A channel's message
+ buffer, will cointain at least this many most recent messages.
+
+push_max_message_buffer_length [ number ]
+ default: 255
+ context: http, server, location
+ The minimum number of messages to store per channel. A channel's message
+ buffer, will cointain at least this many most recent messages.
push_min_message_recipients [ number ]
default: 0
@@ -79,6 +79,7 @@ static ngx_inline void ngx_http_push_delete_message(ngx_slab_pool_t *shpool, ngx
// remove a message from queue and free all associated memory. assumes shpool is already locked.
static ngx_inline void ngx_http_push_delete_message_locked(ngx_slab_pool_t *shpool, ngx_http_push_node_t *node, ngx_http_push_msg_t *msg) {
+ if (msg==NULL) { return; }
if(node!=NULL) {
ngx_queue_remove(&msg->queue);
node->message_queue_size--;
@@ -461,10 +462,16 @@ static void ngx_http_push_sender_body_handler(ngx_http_request_t * r) {
//now see if the queue is too big -- we do this at the end because message queue size may be set to zero, and we don't want special-case code for that.
if(node->message_queue_size > (ngx_uint_t) cf->max_message_queue_size) {
+ //exceeeds max queue size. force-delete oldest message
+ ngx_http_push_delete_message(shpool, node, ngx_http_push_get_oldest_message_locked(node));
+ }
+ if(node->message_queue_size > (ngx_uint_t) cf->min_message_queue_size) {
+ //exceeeds min queue size. maybe delete the oldest message
ngx_shmtx_lock(&shpool->mutex);
ngx_http_push_msg_t *msg = ngx_http_push_get_oldest_message_locked(node);
NGX_HTTP_PUSH_SENDER_CHECK_LOCKED(msg, NULL, r, "push module: oldest message not found", shpool);
if(msg->received >= (ngx_uint_t) cf->min_message_recipients) {
+ //received more than min_message_recipients times
ngx_http_push_delete_message_locked(shpool, node, msg);
}
ngx_shmtx_unlock(&shpool->mutex);
@@ -6,6 +6,7 @@
typedef struct {
ngx_int_t index;
time_t buffer_timeout;
+ ngx_int_t min_message_queue_size;
ngx_int_t max_message_queue_size;
ngx_int_t listener_concurrency;
ngx_int_t listener_poll_mechanism;
@@ -16,7 +17,8 @@ typedef struct {
#define NGX_HTTP_PUSH_DEFAULT_SHM_SIZE 3145728 //3 megs
#define NGX_HTTP_PUSH_DEFAULT_BUFFER_TIMEOUT 3600
-#define NGX_HTTP_PUSH_DEFAULT_MESSAGE_QUEUE_SIZE 5
+#define NGX_HTTP_PUSH_DEFAULT_MIN_MESSAGE_QUEUE_SIZE 5
+#define NGX_HTTP_PUSH_DEFAULT_MAX_MESSAGE_QUEUE_SIZE 255
ngx_str_t NGX_HTTP_PUSH_CACHE_CONTROL_VALUE = ngx_string("no-cache");
#define NGX_HTTP_PUSH_LISTENER_LASTIN 0
@@ -14,7 +14,14 @@ static ngx_command_t ngx_http_push_commands[] = {
offsetof(ngx_http_push_main_conf_t, shm_size),
NULL },
- { ngx_string("push_message_buffer_length"),
+ { ngx_string("push_min_message_buffer_length"),
+ NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_num_slot,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ offsetof(ngx_http_push_loc_conf_t, min_message_queue_size),
+ NULL },
+
+ { ngx_string("push_max_message_buffer_length"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
NGX_HTTP_LOC_CONF_OFFSET,
@@ -183,6 +190,7 @@ static void * ngx_http_push_create_loc_conf(ngx_conf_t *cf) {
}
lcf->buffer_timeout=NGX_CONF_UNSET;
lcf->max_message_queue_size=NGX_CONF_UNSET;
+ lcf->min_message_queue_size=NGX_CONF_UNSET;
lcf->listener_concurrency=NGX_CONF_UNSET;
lcf->listener_poll_mechanism=NGX_CONF_UNSET;
lcf->authorize_channel=NGX_CONF_UNSET;
@@ -194,7 +202,8 @@ static void * ngx_http_push_create_loc_conf(ngx_conf_t *cf) {
static char * ngx_http_push_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) {
ngx_http_push_loc_conf_t *prev = parent, *conf = child;
ngx_conf_merge_sec_value(conf->buffer_timeout, prev->buffer_timeout, NGX_HTTP_PUSH_DEFAULT_BUFFER_TIMEOUT);
- ngx_conf_merge_value(conf->max_message_queue_size, prev->max_message_queue_size, NGX_HTTP_PUSH_DEFAULT_MESSAGE_QUEUE_SIZE);
+ ngx_conf_merge_value(conf->max_message_queue_size, prev->max_message_queue_size, NGX_HTTP_PUSH_DEFAULT_MAX_MESSAGE_QUEUE_SIZE);
+ ngx_conf_merge_value(conf->min_message_queue_size, prev->min_message_queue_size, NGX_HTTP_PUSH_DEFAULT_MIN_MESSAGE_QUEUE_SIZE);
ngx_conf_merge_value(conf->listener_concurrency, prev->listener_concurrency, NGX_HTTP_PUSH_LISTENER_LASTIN);
ngx_conf_merge_value(conf->listener_poll_mechanism, prev->listener_poll_mechanism, NGX_HTTP_PUSH_LISTENER_LONGPOLL);
ngx_conf_merge_value(conf->authorize_channel, prev->authorize_channel, 0);

0 comments on commit 2878765

Please sign in to comment.