Permalink
Browse files

concurency setting stuff (not yet, but sort of)

  • Loading branch information...
slact committed Oct 8, 2009
1 parent 848de5b commit baaa2755b5a7b5f09b7571f84816a030412f00d0
Showing with 76 additions and 27 deletions.
  1. +13 −0 README
  2. +44 −9 src/ngx_http_push_module.c
  3. +2 −0 src/ngx_http_push_module.h
  4. +17 −18 src/ngx_http_push_module_setup.c
View
13 README
@@ -23,6 +23,19 @@ push_listener
not be responded to until a message for the listener (identified by
$push_id) becomes available. See protocol documentation for more info.
+push_listener_concurrency
+
+push_listener_concurrency [ last | first | broadcast ]
+ default: last
+ context: http, server, location
+ Controls how multiple listener requests to the same channel id are handled.
+ The values work as follows:
+ broadcast: any number of listener requests may be long-polling.
+ last: only the most recent listener request is kept, all others get a 409
+ Conflict response.
+ first: only the oldest listener request is kept, all others get a 409
+ Conflict response.
+
push_message_timeout [ time ]
default: 1h
context: http, server, location
View
@@ -225,12 +225,14 @@ static ngx_int_t ngx_http_push_listener_handler(ngx_http_request_t *r) {
}
ngx_http_discard_request_body(r); //don't care about the rest of this request
-
+ if(ngx_http_push_handle_listener_concurrency_setting(cf->concurrency, node, r, shpool) == NGX_DECLINED) { //this request was declined for some reason.
+ //status codes and whatnot should have already been written. just get out of here quickly.
+ return NGX_OK;
+ }
ngx_shmtx_lock(&shpool->mutex);
//expired messages are already removed from queue during get_node()
msg = ngx_http_push_find_message(node, r, &status);
node->last_seen = ngx_time();
-
//no matching message and it wasn't because an expired message was requested.
if (status==NGX_DONE) {
//this means we must wait for a message.
@@ -282,13 +284,43 @@ static ngx_int_t ngx_http_push_listener_handler(ngx_http_request_t *r) {
}
}
+static ngx_str_t ngx_http_push_409_Conflict = ngx_string("409 Conflict");
+static ngx_int_t ngx_http_push_handle_listener_concurrency_setting(ngx_int_t concurrency, ngx_http_push_node_t *node, ngx_http_request_t *r, ngx_slab_pool_t *shpool) {
+ if(concurrency==NGX_HTTP_PUSH_LISTENER_BROADCAST) {
+ return NGX_OK;
+ }
+ else{
+ ngx_shmtx_lock(&shpool->mutex);
+ ngx_http_push_listener_t *listener;
+ ngx_http_request_t *r_listener;
+ ngx_queue_t *sentinel = &node->listener_queue->queue;
+ ngx_queue_t *cur = (concurrency == NGX_HTTP_PUSH_LISTENER_FIRSTIN) ? sentinel->next->next : sentinel->next;
+ if(concurrency == NGX_HTTP_PUSH_LISTENER_FIRSTIN && node->listener_queue_size>0) {
+ ngx_http_push_reply_status_only(r, NGX_HTTP_NOT_FOUND, &ngx_http_push_409_Conflict);
+ return NGX_DECLINED;
+ }
+ while (cur!=sentinel) {
+ //send a 409 Conflict to everyone waiting for something
+ listener = ngx_queue_data(cur, ngx_http_push_listener_t, queue);
+ cur=cur->next;
+ r_listener=listener->request;
+ ngx_shmtx_unlock(&shpool->mutex);
+ ngx_http_push_reply_status_only(r_listener, NGX_HTTP_NOT_FOUND, &ngx_http_push_409_Conflict);
+ ngx_shmtx_lock(&shpool->mutex);
+ }
+ ngx_shmtx_unlock(&shpool->mutex);
+ return NGX_OK;
+ }
+}
+
#define NGX_HTTP_PUSH_SENDER_CHECK(val, fail, r, errormessage) \
if (val == fail) { \
ngx_log_error(NGX_LOG_ERR, (r)->connection->log, 0, errormessage); \
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR); \
return; \
}
+static ngx_str_t ngx_http_push_410_Gone = ngx_string("410 Gone");
static void ngx_http_push_sender_body_handler(ngx_http_request_t * r) {
ngx_str_t id;
ngx_http_push_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_module);
@@ -439,13 +471,7 @@ static void ngx_http_push_sender_body_handler(ngx_http_request_t * r) {
r_listener=listener->request;
listener->cleanup->node=NULL; //the node may be deleted by the time we get to the request pool cleanup.
ngx_shmtx_unlock(&shpool->mutex);
- //here comes the 410
- r_listener->headers_out.status=NGX_HTTP_NOT_FOUND; //play nice, NGINX.
- r_listener->headers_out.status_line.len =sizeof("410 Gone")- 1;
- r_listener->headers_out.status_line.data=(u_char *) "410 Gone";
- r_listener->headers_out.content_length_n = 0;
- r_listener->header_only = 1;
- ngx_http_finalize_request(r_listener, ngx_http_send_header(r_listener));
+ ngx_http_push_reply_status_only(r_listener, NGX_HTTP_NOT_FOUND, &ngx_http_push_410_Gone);
ngx_shmtx_lock(&shpool->mutex);
}
ngx_http_push_delete_node((ngx_rbtree_t *) ngx_http_push_shm_zone->data, (ngx_rbtree_node_t *) node, shpool);
@@ -653,3 +679,12 @@ static void ngx_http_push_listener_cleanup(ngx_http_push_listener_cleanup_t *dat
ngx_slab_free(data->shpool, data->listener);
}
}
+
+static void ngx_http_push_reply_status_only(ngx_http_request_t *r, ngx_int_t code, ngx_str_t *statusline) {
+ r->headers_out.status=code;
+ r->headers_out.status_line.len =statusline->len;
+ r->headers_out.status_line.data=statusline->data;
+ r->headers_out.content_length_n = 0;
+ r->header_only = 1;
+ ngx_http_finalize_request(r, ngx_http_send_header(r));
+}
@@ -72,6 +72,7 @@ static ngx_int_t ngx_http_push_node_info(ngx_http_request_t *r, ngx_uint_t me
static char * ngx_http_push_listener(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); //push_listener hook
static ngx_int_t ngx_http_push_listener_handler(ngx_http_request_t * r);
static ngx_str_t * ngx_http_push_listener_get_etag(ngx_http_request_t * r);
+static ngx_int_t ngx_http_push_handle_listener_concurrency_setting(ngx_int_t concurrency, ngx_http_push_node_t *node, ngx_http_request_t *r, ngx_slab_pool_t *shpool);
//response generating stuff
static ngx_int_t ngx_http_push_set_listener_header(ngx_http_request_t *r, ngx_http_push_msg_t *msg);
@@ -90,6 +91,7 @@ static ngx_int_t ngx_http_push_init_shm_zone(ngx_shm_zone_t * shm_zone, void
static ngx_int_t ngx_http_push_postconfig(ngx_conf_t *cf);
static ngx_int_t ngx_http_push_add_cache_control(ngx_http_request_t *r, ngx_str_t *value);
static char * ngx_http_push_set_listener_concurrency(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
+static void ngx_http_push_reply_status_only(ngx_http_request_t *r, ngx_int_t code, ngx_str_t *statusline);
static ngx_http_push_msg_t * ngx_http_push_dequeue_message(ngx_http_push_node_t * node); // doesn't free associated memory
static ngx_http_push_msg_t * ngx_http_push_find_message(ngx_http_push_node_t * node, ngx_http_request_t *r, ngx_int_t *status);
@@ -31,15 +31,15 @@ static ngx_command_t ngx_http_push_commands[] = {
{ ngx_string("push_listener"),
NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS,
ngx_http_push_listener,
- 0,
+ NGX_HTTP_LOC_CONF_OFFSET,
0,
NULL },
{ ngx_string("push_listener_concurrency"),
- NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_NOARGS,
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_http_push_set_listener_concurrency,
NGX_HTTP_LOC_CONF_OFFSET,
- 0,
+ offsetof(ngx_http_push_loc_conf_t, concurrency),
NULL },
//deprecated and misleading. remove no earlier than november 2009.
@@ -148,21 +148,21 @@ static void * ngx_http_push_create_main_conf(ngx_conf_t *cf) {
//location config stuff
static void * ngx_http_push_create_loc_conf(ngx_conf_t *cf) {
- ngx_http_push_loc_conf_t *lcf;
- lcf = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_loc_conf_t));
+ ngx_http_push_loc_conf_t *lcf = ngx_pcalloc(cf->pool, sizeof(ngx_http_push_loc_conf_t));
if(lcf == NULL) {
return NGX_CONF_ERROR;
}
lcf->buffer_timeout=NGX_CONF_UNSET;
lcf->max_message_queue_size=NGX_CONF_UNSET;
+ lcf->concurrency=NGX_CONF_UNSET;
return lcf;
}
static char * ngx_http_push_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) {
ngx_http_push_loc_conf_t *prev = parent, *conf = child;
ngx_conf_merge_sec_value(conf->buffer_timeout, prev->buffer_timeout, NGX_HTTP_PUSH_DEFAULT_BUFFER_TIMEOUT);
ngx_conf_merge_value(conf->max_message_queue_size, prev->max_message_queue_size, NGX_HTTP_PUSH_DEFAULT_MESSAGE_QUEUE_SIZE);
- ngx_conf_merge_value(conf->concurrency, prev->concurrency, NGX_HTTP_PUSH_LISTENER_BROADCAST);
+ ngx_conf_merge_value(conf->concurrency, prev->concurrency, NGX_HTTP_PUSH_LISTENER_LASTIN);
return NGX_CONF_OK;
}
@@ -181,18 +181,17 @@ static char *ngx_http_push_setup_handler(ngx_conf_t *cf, void * conf, ngx_int_t
}
static char *ngx_http_push_set_listener_concurrency(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
- ngx_http_push_loc_conf_t *plcf = ngx_http_conf_get_module_loc_conf(cf, ngx_http_core_module);
- char *c = NULL;
- if(ngx_conf_set_str_slot(cf, cmd, c) == NGX_CONF_OK) {
- if(ngx_strcmp(c, "first")) {
- plcf->concurrency=NGX_HTTP_PUSH_LISTENER_FIRSTIN;
- }
- else if(ngx_strcmp(c, "last")) {
- plcf->concurrency=NGX_HTTP_PUSH_LISTENER_LASTIN;
- }
- else { //broadcast
- plcf->concurrency=NGX_HTTP_PUSH_LISTENER_BROADCAST;
- }
+ ngx_str_t *value=&(((ngx_str_t *) cf->args->elts)[1]);
+ ngx_int_t *field = (ngx_int_t *) ((char *) conf + cmd->offset);
+
+ if(ngx_strncmp(value->data, "first", 5)==0) {
+ *field=NGX_HTTP_PUSH_LISTENER_FIRSTIN;
+ }
+ else if(ngx_strncmp(value->data, "last", 4)==0) {
+ *field=NGX_HTTP_PUSH_LISTENER_LASTIN;
+ }
+ else { //broadcast
+ *field=NGX_HTTP_PUSH_LISTENER_BROADCAST;
}
return NGX_CONF_OK;
}

0 comments on commit baaa275

Please sign in to comment.