Skip to content

Commit

Permalink
all source requests now respond with a body of info about the message…
Browse files Browse the repository at this point in the history
… queue and last-seen destination request. Also, implemented GET for push_source
  • Loading branch information
slact committed Aug 26, 2009
1 parent 872cb85 commit 7f56492
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 5 deletions.
52 changes: 47 additions & 5 deletions src/ngx_http_push_module.c
Expand Up @@ -185,6 +185,9 @@ static void ngx_http_push_source_body_handler(ngx_http_request_t * r) {
ngx_http_request_t *r_client = NULL;
ngx_uint_t method = r->method;

time_t last_seen = 0;
ngx_uint_t queue_len = 0;

if(ngx_http_push_set_id(&id, r, cf) !=NGX_OK) {
return;
}
Expand All @@ -203,6 +206,8 @@ static void ngx_http_push_source_body_handler(ngx_http_request_t * r) {
}
if (node!=NULL) {
r_client = node->request;
queue_len = node->message_queue_len;
last_seen = node->last_seen;
}
ngx_shmtx_unlock(&shpool->mutex);

Expand Down Expand Up @@ -268,6 +273,7 @@ static void ngx_http_push_source_body_handler(ngx_http_request_t * r) {
msg->buf=buf_copy;
ngx_queue_insert_tail(&node->message_queue->queue, &msg->queue);
node->message_queue_len++;
queue_len = node->message_queue_len;

//store the content-type
if(content_type_len>0) {
Expand Down Expand Up @@ -313,7 +319,9 @@ static void ngx_http_push_source_body_handler(ngx_http_request_t * r) {
ngx_shmtx_unlock(&shpool->mutex);
ngx_http_finalize_request(r_client, ngx_http_push_set_destination_body(r_client, ngx_http_push_create_output_chain(r_client, buf)));

r->headers_out.status=NGX_HTTP_CREATED;
r->headers_out.status=NGX_HTTP_OK;
r->headers_out.status_line.len =sizeof("201 Created")- 1;
r->headers_out.status_line.data=(u_char *) "201 Created";
}
else { //r_client==NULL && r->method == NGX_HTTP_PUT is all that remains
r->headers_out.status=NGX_HTTP_OK;
Expand Down Expand Up @@ -348,14 +356,48 @@ static void ngx_http_push_source_body_handler(ngx_http_request_t * r) {
}
else {
r->headers_out.status=NGX_HTTP_NOT_FOUND;
r->header_only = 1;
}
}
else {
ngx_http_finalize_request(r, NGX_HTTP_NOT_ALLOWED);
return;
}
if (r->header_only || node==NULL) {
r->header_only = 1;
r->headers_out.content_length_n = 0;
ngx_http_finalize_request(r, ngx_http_send_header(r));
}
else {
ngx_http_finalize_request(r, ngx_http_push_node_info(r, queue_len, last_seen));
}
return;
}

static ngx_int_t ngx_http_push_node_info(ngx_http_request_t *r, ngx_uint_t queue_len, time_t last_seen) {
ngx_buf_t *b;
ngx_uint_t len;
len = sizeof("queued: \r\n") + NGX_INT_T_LEN + sizeof("requested: \r\n") + NGX_INT_T_LEN;

r->headers_out.content_length_n = 0;
r->header_only = 1;
b = ngx_create_temp_buf(r->pool, len);
if (b == NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
b->last = ngx_sprintf(b->last, "queued: %ui\r\n", queue_len);
if(last_seen==0){
b->last = ngx_cpymem(b->last, "requested: never\r\n", sizeof("requested: never\r\n") - 1);
}
else {
b->last = ngx_sprintf(b->last, "requested: %ui\r\n", ngx_time() - last_seen);
}

ngx_http_finalize_request(r, ngx_http_send_header(r));
return;
r->headers_out.content_type.len = sizeof("text/plain") - 1;
r->headers_out.content_type.data = (u_char *) "text/plain";
if (ngx_http_send_header(r) > NGX_HTTP_SPECIAL_RESPONSE) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}

return ngx_http_push_set_destination_body(r, ngx_http_push_create_output_chain(r, b));
}

static ngx_int_t ngx_http_push_source_handler(ngx_http_request_t * r) {
Expand Down
1 change: 1 addition & 0 deletions src/ngx_http_push_module.h
Expand Up @@ -48,6 +48,7 @@ struct ngx_http_push_node_s {
static char * ngx_http_push_source(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); //push_source hook
static ngx_int_t ngx_http_push_source_handler(ngx_http_request_t * r);
static void ngx_http_push_source_body_handler(ngx_http_request_t * r);
static ngx_int_t ngx_http_push_node_info(ngx_http_request_t *r, ngx_uint_t queue_len, time_t last_seen);

//destination stuff
static char * ngx_http_push_destination(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); //push_destination hook
Expand Down
2 changes: 2 additions & 0 deletions src/ngx_http_push_rbtree_util.c
Expand Up @@ -8,6 +8,7 @@ static void ngx_rbtree_generic_insert( ngx_rbtree_node_t *temp, ngx_rbtree_n
static void ngx_http_push_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
static int ngx_http_push_compare_rbtree_node(const ngx_rbtree_node_t *v_left, const ngx_rbtree_node_t *v_right);
static ngx_int_t ngx_http_push_delete_node(ngx_rbtree_t *tree, ngx_rbtree_node_t *trash, ngx_slab_pool_t *shpool);
static ngx_http_push_node_t * clean_node(ngx_http_push_node_t * node, ngx_slab_pool_t * shpool);

static ngx_http_push_node_t * clean_node(ngx_http_push_node_t * node, ngx_slab_pool_t * shpool) {
ngx_queue_t *sentinel = &node->message_queue->queue;
Expand All @@ -16,6 +17,7 @@ static ngx_http_push_node_t * clean_node(ngx_http_push_node_t * node, ngx_slab_p
while(!ngx_queue_empty(sentinel)){
msg = ngx_queue_data(ngx_queue_head(sentinel), ngx_http_push_msg_t, queue);
if (msg!=NULL && msg->expires != 0 && now > msg->expires) {
node->message_queue_len--;
ngx_queue_remove((&msg->queue));
ngx_slab_free_locked(shpool, msg);
}
Expand Down

0 comments on commit 7f56492

Please sign in to comment.