Permalink
Browse files

added a new option push_channel_timeout which controls how long a cha…

…nnel should be kept after accepting the last subscriber in the channel with no queued messages and no active subscriber.

this option should be used along with push_subscriber_timeout, and push_channel_timeout should be greater than push_subscriber_timeout

push_channel_timeout should appear along with push_publisher (or parent context); if push_authorized_channels_only is off, push_channel_timeout should also be set in push_subscriber context (or parent context)
  • Loading branch information...
1 parent 5a19e7a commit f485755084b6f366ab6071aaab7bbb9fde8df37c @liucougar liucougar committed Nov 30, 2010
Showing with 46 additions and 8 deletions.
  1. +19 −1 README
  2. +7 −2 src/ngx_http_push_module.c
  3. +5 −1 src/ngx_http_push_module.h
  4. +10 −1 src/ngx_http_push_module_setup.c
  5. +5 −3 src/ngx_http_push_rbtree_util.c
View
20 README
@@ -98,6 +98,24 @@ push_message_timeout [ time ]
If you do not want messages to expire, set this to 0. Applicable only if a
push_publisher is present in this or a child context.
+push_subscriber_timeout [ time ]
+ default: 0
+ context: http, server, location
+ The length of time a subscriber's long-polling connection can last before
+ it's timed out. If you don't want subscriber's connection to timeout, set
+ this to 0. Applicable only if a push_subscriber is present in this or a
+ child context.
+
+push_channel_timeout [ time ]
+ default: 0
+ context: http, server, location
+ The length of time a channel will be removed after it has no subscriber and
+ no queued messages. If you want to remove a channel as soon as possible,
+ set this to 0. Applicable only if a push_publisher or push_subscriber is
+ present in this or a child context and push_subscriber_timeout is greater
+ than 0. This value should be greater than push_subscriber_timeout to make
+ sense.
+
== Security ==
push_authorized_channels_only [ on | off ]
@@ -185,4 +203,4 @@ Last-Modified and Etag headers. (All modern web browsers do this.)
----------------------- Protocol Spec --------------------------------------
This module is unconditionally (fully) compliant with the Basic HTTP Push
-Relay Protocol, Rev. 2.21, found in the file protocol.txt.
+Relay Protocol, Rev. 2.21, found in the file protocol.txt.
@@ -314,7 +314,11 @@ static ngx_int_t ngx_http_push_subscriber_handler(ngx_http_request_t *r) {
//get the channel and check channel authorization while we're at it.
ngx_shmtx_lock(&shpool->mutex);
- channel = (cf->authorize_channel==1 ? ngx_http_push_find_channel : ngx_http_push_get_channel)(id, r->connection->log);
+ if (cf->authorize_channel==1) {
+ channel = ngx_http_push_find_channel(id, r->connection->log);
+ }else{
+ channel = ngx_http_push_get_channel(id, r->connection->log, cf->channel_timeout);
+ }
if (channel==NULL) {
//unable to allocate channel OR channel not found
@@ -330,6 +334,7 @@ static ngx_int_t ngx_http_push_subscriber_handler(ngx_http_request_t *r) {
msg = ngx_http_push_find_message_locked(channel, r, &msg_search_outcome);
channel->last_seen = ngx_time();
+ channel->expires = ngx_time() + cf->channel_timeout;
ngx_shmtx_unlock(&shpool->mutex);
if (cf->ignore_queue_on_no_cache && !ngx_http_push_allow_caching(r)) {
@@ -651,7 +656,7 @@ static void ngx_http_push_publisher_body_handler(ngx_http_request_t * r) {
if(method==NGX_HTTP_POST && (r->headers_in.content_length_n == -1 || r->headers_in.content_length_n == 0)) {
NGX_HTTP_PUSH_PUBLISHER_CHECK_LOCKED(0, 0, r, "push module: trying to push an empty message", shpool);
}
- channel = ngx_http_push_get_channel(id, r->connection->log);
+ channel = ngx_http_push_get_channel(id, r->connection->log, cf->channel_timeout);
NGX_HTTP_PUSH_PUBLISHER_CHECK_LOCKED(channel, NULL, r, "push module: unable to allocate memory for new channel", shpool);
}
//no other request method needs that.
@@ -6,6 +6,8 @@
#define NGX_HTTP_PUSH_DEFAULT_SHM_SIZE 33554432 //32 megs
#define NGX_HTTP_PUSH_DEFAULT_BUFFER_TIMEOUT 3600
#define NGX_HTTP_PUSH_DEFAULT_SUBSCRIBER_TIMEOUT 0 //default: never timeout
+//(liucougar: this is a bit confusing, but it is what's the default behavior before this option is introducecd)
+#define NGX_HTTP_PUSH_DEFAULT_CHANNEL_TIMEOUT 0 //default: timeout immediately
#define NGX_HTTP_PUSH_DEFAULT_MIN_MESSAGES 1
#define NGX_HTTP_PUSH_DEFAULT_MAX_MESSAGES 10
@@ -64,7 +66,8 @@ typedef struct {
ngx_str_t channel_group;
ngx_int_t max_channel_id_length;
ngx_int_t max_channel_subscribers;
- ngx_int_t ignore_queue_on_no_cache;
+ ngx_int_t ignore_queue_on_no_cache;
+ time_t channel_timeout;
} ngx_http_push_loc_conf_t;
//message queue
@@ -106,6 +109,7 @@ typedef struct {
ngx_http_push_pid_queue_t workers_with_subscribers;
ngx_uint_t subscribers;
time_t last_seen;
+ time_t expires;
} ngx_http_push_channel_t;
//cleaning supplies
@@ -106,7 +106,8 @@ static void * ngx_http_push_create_loc_conf(ngx_conf_t *cf) {
lcf->delete_oldest_received_message=NGX_CONF_UNSET;
lcf->max_channel_id_length=NGX_CONF_UNSET;
lcf->max_channel_subscribers=NGX_CONF_UNSET;
- lcf->ignore_queue_on_no_cache=NGX_CONF_UNSET;
+ lcf->ignore_queue_on_no_cache=NGX_CONF_UNSET;
+ lcf->channel_timeout=NGX_CONF_UNSET;
lcf->channel_group.data=NULL;
return lcf;
}
@@ -125,6 +126,7 @@ static char * ngx_http_push_merge_loc_conf(ngx_conf_t *cf, void *parent, void *c
ngx_conf_merge_value(conf->max_channel_id_length, prev->max_channel_id_length, NGX_HTTP_PUSH_MAX_CHANNEL_ID_LENGTH);
ngx_conf_merge_value(conf->max_channel_subscribers, prev->max_channel_subscribers, 0);
ngx_conf_merge_value(conf->ignore_queue_on_no_cache, prev->ignore_queue_on_no_cache, 0);
+ ngx_conf_merge_value(conf->channel_timeout, prev->channel_timeout, NGX_HTTP_PUSH_DEFAULT_CHANNEL_TIMEOUT);
ngx_conf_merge_str_value(conf->channel_group, prev->channel_group, "");
//sanity checks
@@ -370,6 +372,13 @@ static ngx_command_t ngx_http_push_commands[] = {
offsetof(ngx_http_push_loc_conf_t, ignore_queue_on_no_cache),
NULL },
+ { ngx_string("push_channel_timeout"),
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_sec_slot,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ offsetof(ngx_http_push_loc_conf_t, channel_timeout),
+ NULL },
+
ngx_null_command
};
@@ -3,7 +3,7 @@
#include <ngx_http.h>
-static ngx_http_push_channel_t * ngx_http_push_get_channel(ngx_str_t * id, ngx_log_t * log);
+static ngx_http_push_channel_t * ngx_http_push_get_channel(ngx_str_t * id, ngx_log_t * log, time_t timeout);
static ngx_http_push_channel_t * ngx_http_push_find_channel(ngx_str_t * id, ngx_log_t * log);
static ngx_int_t ngx_http_push_delete_channel_locked(ngx_http_push_channel_t *trash);
@@ -28,7 +28,7 @@ static ngx_http_push_channel_t * ngx_http_push_clean_channel_locked(ngx_http_pus
}
}
//at this point, the queue is empty
- return channel->subscribers==0 ? channel : NULL; //if no waiting requests, return this channel to be deleted
+ return (channel->subscribers==0 && (channel->expires <= now)) ? channel : NULL; //if no waiting requests and channel expired, return this channel to be deleted
}
static ngx_int_t ngx_http_push_delete_channel_locked(ngx_http_push_channel_t *trash) {
@@ -132,7 +132,7 @@ static ngx_http_push_channel_t * ngx_http_push_find_channel(ngx_str_t *id, ngx_l
}
//find a channel by id. if channel not found, make one, insert it, and return that.
- static ngx_http_push_channel_t * ngx_http_push_get_channel(ngx_str_t *id, ngx_log_t *log) {
+ static ngx_http_push_channel_t * ngx_http_push_get_channel(ngx_str_t *id, ngx_log_t *log, time_t timeout) {
ngx_rbtree_t *tree;
ngx_http_push_channel_t *up=ngx_http_push_find_channel(id, log);
@@ -157,6 +157,8 @@ static ngx_http_push_channel_t * ngx_http_push_find_channel(ngx_str_t *id, ngx_l
ngx_queue_init(&up->workers_with_subscribers.queue);
up->subscribers=0;
+
+ up->expires = ngx_time() + timeout;
((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->channels++;

0 comments on commit f485755

Please sign in to comment.