Skip to content

Commit

Permalink
renamed push_server and push_client to push_sender and push_listener,…
Browse files Browse the repository at this point in the history
… respectively
  • Loading branch information
slact committed Aug 27, 2009
1 parent 47e00d3 commit 22eb295
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 96 deletions.
45 changes: 5 additions & 40 deletions README
Expand Up @@ -9,16 +9,16 @@ to your application.
http {
#maximum amount of memory the push module is allowed to use
#for buffering and stuff
push_shm_size 12M; #default is 3M
push_buffer_size 12M; #default is 3M

#push_server
server {
listen localhost:8089;
location / {
default_type text/plain;
set $push_id $arg_id; #/?id=239aff3 or somesuch
push_server;
push_buffer_timeout 2h; #buffered messages expire after 2 hours
push_source;
push_message_timeout 2h; #buffered messages expire after 2 hours
}
}

Expand All @@ -28,7 +28,7 @@ http {
location / {
default_type text/plain;
set $push_id $arg_id; #/?id=239aff3 or somesuch
push_client;
push_listener;
}
}
}
Expand Down Expand Up @@ -73,45 +73,10 @@ It's a damn good idea to make sure the push_server location is not visible
publically, as it is intended for use only by your application.

--------- "Protocol" spec -------------
Let push_server refer to a location (or server) configured with the push_server; option.
Let push_client refer to a location (or server) configured with the push_client; option.

at push_client:
Only POST requests are allowed to this location. The response will be delayed until a message
for the $push_id of said requests becomes available, at which point its contents will be sent
in the body of the response with a 200 OK status code.

When two requests to this location have the same $push_id, messages will be delivered to the
newer of the two. The older request receives an immediate response with a 409 Conflict status
code.

at push_server:
POST, PUT, GET, and DELETE requests are allowed at this location.
A POST request will have its body ("the message"), sent to the request identified by
$push_id, with the Content-Type header forwarded. If no such request exists, the message
will be queued until it can be sent. Multiple messages can be queued, and will be received
in a FILO order. When the request results in an immediate sending of the message (i.e. there
was a waiting request identified by $push_id), the response will have a 201 Created status.
If the message is queued, a 202 Accepted status is sent back.

A PUT request behaves identically to a POST with the exception that messages will NOT be queued.
Instead, if this request is sent while there is no waiting request identified by $push_id,
the message will be lost, and the server will return a 200 OK status code.

A DELETE deletes all messages intended for $push_id. If there is a waiting
request on push_client identified by $push_id, it will be immediately responded to with a
410 Gone status code.

All valid requests will be responded to with a 200 OK (everything went well, and no message was
or will be sent), 201 Created (Message was sent), 202 Accepted (message was queued), or
404 Not Found (only for GET, and DELETE, if no such id is known). All 2xx-status responses
will contain a body with information about the id -- the # of queued messaged and the number of
seconds since a client made a request on that id (or "never" if it was never made or if it had
expired)
see http://wiki.github.com/slact/nginx_http_push_module/queuing-long-poll-relay-protocol
-----------------------------

TODO:
- Implelent DELETE for push_server.
- Add a push_accomodate_strangers setting (presently defaulting to on). When set to off, requests
with a previously-unseen $push_id will be rejected.
- When POSTing to push_server, if Content-Type is "message/http", the response sent to $push_id
Expand Down
88 changes: 44 additions & 44 deletions src/ngx_http_push_module.c
Expand Up @@ -58,13 +58,13 @@ static ngx_int_t ngx_http_push_set_id(ngx_str_t *id, ngx_http_request_t *r, ngx_
} \
}

#define ngx_http_push_remove_client_request_locked(node) \
#define ngx_http_push_remove_listener_request_locked(node) \
(node)->request = NULL; \
if((node)->cleanup!=NULL) { \
(node)->cleanup->node=NULL; \
}

static ngx_int_t ngx_http_push_client_handler(ngx_http_request_t *r) {
static ngx_int_t ngx_http_push_listener_handler(ngx_http_request_t *r) {
ngx_http_push_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_module);
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_shm_zone->shm.addr;
ngx_str_t id;
Expand All @@ -91,7 +91,7 @@ static ngx_int_t ngx_http_push_client_handler(ngx_http_request_t *r) {
if (node->request!=NULL) { //oh shit, someone's already waiting for a message on this id.
ngx_shmtx_lock(&shpool->mutex);
existing_request = node->request;
ngx_http_push_remove_client_request_locked(node);
ngx_http_push_remove_listener_request_locked(node);
ngx_shmtx_unlock(&shpool->mutex);
ngx_http_finalize_request(existing_request, NGX_HTTP_CONFLICT); //bump the old request.
}
Expand All @@ -108,17 +108,17 @@ static ngx_int_t ngx_http_push_client_handler(ngx_http_request_t *r) {
r->read_event_handler = ngx_http_test_reading; //definitely test to see if the connection got closed or something.

//attach a cleaner to remove the request from the node, if need be
ngx_pool_cleanup_t *cln = ngx_pool_cleanup_add(r->pool, sizeof(ngx_http_push_client_cleanup_t));
ngx_pool_cleanup_t *cln = ngx_pool_cleanup_add(r->pool, sizeof(ngx_http_push_listener_cleanup_t));
if (cln == NULL) { //make sure we can.
return NGX_ERROR;
}
ngx_shmtx_lock(&shpool->mutex);
node->cleanup = ((ngx_http_push_client_cleanup_t *) cln->data);
node->cleanup = ((ngx_http_push_listener_cleanup_t *) cln->data);
ngx_shmtx_unlock(&shpool->mutex);
cln->handler = (ngx_pool_cleanup_pt) ngx_http_push_client_cleanup;
((ngx_http_push_client_cleanup_t *) cln->data)->node = node;
((ngx_http_push_client_cleanup_t *) cln->data)->request = r;
((ngx_http_push_client_cleanup_t *) cln->data)->shpool = shpool;
cln->handler = (ngx_pool_cleanup_pt) ngx_http_push_listener_cleanup;
((ngx_http_push_listener_cleanup_t *) cln->data)->node = node;
((ngx_http_push_listener_cleanup_t *) cln->data)->request = r;
((ngx_http_push_listener_cleanup_t *) cln->data)->shpool = shpool;

return NGX_DONE; //and wait.
}
Expand Down Expand Up @@ -157,7 +157,7 @@ static ngx_int_t ngx_http_push_client_handler(ngx_http_request_t *r) {

ngx_shmtx_lock(&shpool->mutex);
}
rc = ngx_http_push_set_client_header(r, &msg->content_type); //content type is copied
rc = ngx_http_push_set_listener_header(r, &msg->content_type); //content type is copied
out = ngx_http_push_create_output_chain(r, msg->buf); //buffer is copied
out->buf->file=file;
//we no longer need the message and can free its shm slab.
Expand All @@ -172,17 +172,17 @@ static ngx_int_t ngx_http_push_client_handler(ngx_http_request_t *r) {
return rc;
}

return ngx_http_push_set_client_body(r, out);
return ngx_http_push_set_listener_body(r, out);
}
}

static void ngx_http_push_server_body_handler(ngx_http_request_t * r) {
static void ngx_http_push_sender_body_handler(ngx_http_request_t * r) {
ngx_str_t id;
ngx_http_push_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_module);
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_shm_zone->shm.addr;
ngx_buf_t *buf, *buf_copy;
ngx_http_push_node_t *node;
ngx_http_request_t *r_client = NULL;
ngx_http_request_t *r_listener = NULL;
ngx_uint_t method = r->method;

time_t last_seen = 0;
Expand All @@ -205,7 +205,7 @@ static void ngx_http_push_server_body_handler(ngx_http_request_t * r) {
node = find_node(&id, (ngx_rbtree_t *) ngx_http_push_shm_zone->data, shpool, r->connection->log);
}
if (node!=NULL) {
r_client = node->request;
r_listener = node->request;
queue_len = node->message_queue_len;
last_seen = node->last_seen;
}
Expand Down Expand Up @@ -233,8 +233,8 @@ static void ngx_http_push_server_body_handler(ngx_http_request_t * r) {
size_t content_type_len;
content_type_len = (r->headers_in.content_type==NULL ? 0 : r->headers_in.content_type->value.len);

if (r_client==NULL && r->method == NGX_HTTP_POST && cf->buffer_enabled!=0) {
//no clients are waiting for the message, and buffers are not disabled. create the message in shared memory for storage
if (r_listener==NULL && r->method == NGX_HTTP_POST && cf->buffer_enabled!=0) {
//no listeners are waiting for the message, and buffers are not disabled. create the message in shared memory for storage
msg = ngx_slab_alloc(shpool, sizeof(ngx_http_push_msg_t) + content_type_len);
if (msg==NULL) {
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push module: unable to allocate message in shared memory");
Expand Down Expand Up @@ -289,41 +289,41 @@ static void ngx_http_push_server_body_handler(ngx_http_request_t * r) {
time_t timeout = cf->buffer_timeout;
msg->expires= timeout==0 ? 0 : (ngx_time() + timeout);
ngx_shmtx_unlock(&shpool->mutex);
//okay, done storing. now respond to the server request
//okay, done storing. now respond to the sender request
r->headers_out.status=NGX_HTTP_OK;
r->headers_out.status_line.len =sizeof("202 Accepted")- 1;
r->headers_out.status_line.data=(u_char *) "202 Accepted";
}
else if(r_client!=NULL) {
else if(r_listener!=NULL) {
ngx_int_t rc;
rc = ngx_http_push_set_client_header(r_client, (content_type_len>0 ? &r->headers_in.content_type->value : NULL));
rc = ngx_http_push_set_listener_header(r_listener, (content_type_len>0 ? &r->headers_in.content_type->value : NULL));
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
ngx_http_finalize_request(r_client, rc);
ngx_http_finalize_request(r_listener, rc);
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}
rc = ngx_http_send_header(r_client);
rc = ngx_http_send_header(r_listener);
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
ngx_http_finalize_request(r_client, rc);
ngx_http_finalize_request(r_listener, rc);
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return;
}

if(buf->in_file && buf->file->fd!=NGX_INVALID_FILE){
//delete file when the push_server request finishes
//delete file when the push_sender request finishes
ngx_http_push_add_pool_cleaner_delete_file(r->pool, buf->file);
}
ngx_shmtx_lock(&shpool->mutex);
//we don't want the client request cleanup to accidentally access an already freed node on cleanup
ngx_http_push_remove_client_request_locked(node);
//we don't want the listener request cleanup to accidentally access an already freed node on cleanup
ngx_http_push_remove_listener_request_locked(node);
ngx_shmtx_unlock(&shpool->mutex);
ngx_http_finalize_request(r_client, ngx_http_push_set_client_body(r_client, ngx_http_push_create_output_chain(r_client, buf)));
ngx_http_finalize_request(r_listener, ngx_http_push_set_listener_body(r_listener, ngx_http_push_create_output_chain(r_listener, buf)));

r->headers_out.status=NGX_HTTP_OK;
r->headers_out.status_line.len =sizeof("201 Created")- 1;
r->headers_out.status_line.data=(u_char *) "201 Created";
}
else { //r_client==NULL && r->method == NGX_HTTP_PUT is all that remains
else { //r_listener==NULL && r->method == NGX_HTTP_PUT is all that remains
r->headers_out.status=NGX_HTTP_OK;
}
}
Expand All @@ -337,19 +337,19 @@ static void ngx_http_push_server_body_handler(ngx_http_request_t * r) {
while((msg=ngx_http_push_dequeue_message(node))!=NULL) {
ngx_slab_free_locked(shpool, msg);
};
r_client = node->request;
ngx_http_push_remove_client_request_locked(node);
r_listener = node->request;
ngx_http_push_remove_listener_request_locked(node);
ngx_http_push_delete_node((ngx_rbtree_t *) ngx_http_push_shm_zone->data, (ngx_rbtree_node_t *) node, shpool);
ngx_shmtx_unlock(&shpool->mutex);

if(r_client!=NULL) {
//respond to the client request with a 410
r_client->headers_out.status=NGX_HTTP_NOT_FOUND;
r_client->headers_out.status_line.len =sizeof("410 Gone")- 1;
r_client->headers_out.status_line.data=(u_char *) "410 Gone";
r_client->headers_out.content_length_n = 0;
r_client->header_only = 1;
ngx_http_finalize_request(r_client, ngx_http_send_header(r_client));
if(r_listener!=NULL) {
//respond to the listener request with a 410
r_listener->headers_out.status=NGX_HTTP_NOT_FOUND;
r_listener->headers_out.status_line.len =sizeof("410 Gone")- 1;
r_listener->headers_out.status_line.data=(u_char *) "410 Gone";
r_listener->headers_out.content_length_n = 0;
r_listener->header_only = 1;
ngx_http_finalize_request(r_listener, ngx_http_send_header(r_listener));
}

r->headers_out.status=NGX_HTTP_OK;
Expand Down Expand Up @@ -397,27 +397,27 @@ static ngx_int_t ngx_http_push_node_info(ngx_http_request_t *r, ngx_uint_t queue
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}

return ngx_http_push_set_client_body(r, ngx_http_push_create_output_chain(r, b));
return ngx_http_push_set_listener_body(r, ngx_http_push_create_output_chain(r, b));
}

static ngx_int_t ngx_http_push_server_handler(ngx_http_request_t * r) {
static ngx_int_t ngx_http_push_sender_handler(ngx_http_request_t * r) {
ngx_int_t rc;

/* Instruct ngx_http_read_client_request_body to store the request
/* Instruct ngx_http_read_listener_request_body to store the request
body entirely in a memory buffer or in a file */
r->request_body_in_single_buf = 1;
r->request_body_in_persistent_file = 1;
r->request_body_in_clean_file = 0;
r->request_body_file_log_level = 0;

rc = ngx_http_read_client_request_body(r, ngx_http_push_server_body_handler);
rc = ngx_http_read_client_request_body(r, ngx_http_push_sender_body_handler);
if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
return rc;
}
return NGX_DONE;
}

static ngx_int_t ngx_http_push_set_client_header(ngx_http_request_t *r, ngx_str_t *content_type) {
static ngx_int_t ngx_http_push_set_listener_header(ngx_http_request_t *r, ngx_str_t *content_type) {
//content-type is _copied_
if (content_type!=NULL && content_type->data!=NULL && content_type->len > 0) {
r->headers_out.content_type.len=content_type->len;
Expand Down Expand Up @@ -447,15 +447,15 @@ static ngx_chain_t * ngx_http_push_create_output_chain(ngx_http_request_t *r, ng
return out;
}

static ngx_int_t ngx_http_push_set_client_body(ngx_http_request_t *r, ngx_chain_t *out)
static ngx_int_t ngx_http_push_set_listener_body(ngx_http_request_t *r, ngx_chain_t *out)
{
if (out==NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
return ngx_http_output_filter(r, out);
}

static void ngx_http_push_client_cleanup(ngx_http_push_client_cleanup_t *data) {
static void ngx_http_push_listener_cleanup(ngx_http_push_listener_cleanup_t *data) {
if(data->node!=NULL) {
ngx_shmtx_lock(&data->shpool->mutex);
if(data->node->request == data->request) {
Expand Down
24 changes: 12 additions & 12 deletions src/ngx_http_push_module.h
Expand Up @@ -31,7 +31,7 @@ typedef struct {
ngx_http_request_t *request;
ngx_http_push_node_t *node;
ngx_slab_pool_t *shpool;
} ngx_http_push_client_cleanup_t;
} ngx_http_push_listener_cleanup_t;

//our typecast-friendly rbtree node
struct ngx_http_push_node_s {
Expand All @@ -41,26 +41,26 @@ struct ngx_http_push_node_s {
ngx_uint_t message_queue_len;
ngx_http_request_t *request;
time_t last_seen;
ngx_http_push_client_cleanup_t *cleanup;
ngx_http_push_listener_cleanup_t *cleanup;
};

//server stuff
static char * ngx_http_push_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); //push_server hook
static ngx_int_t ngx_http_push_server_handler(ngx_http_request_t * r);
static void ngx_http_push_server_body_handler(ngx_http_request_t * r);
//sender stuff
static char * ngx_http_push_sender(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); //push_sender hook
static ngx_int_t ngx_http_push_sender_handler(ngx_http_request_t * r);
static void ngx_http_push_sender_body_handler(ngx_http_request_t * r);
static ngx_int_t ngx_http_push_node_info(ngx_http_request_t *r, ngx_uint_t queue_len, time_t last_seen);

//client stuff
static char * ngx_http_push_client(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); //push_client hook
static ngx_int_t ngx_http_push_client_handler(ngx_http_request_t * r);
//listener stuff
static char * ngx_http_push_listener(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); //push_listener hook
static ngx_int_t ngx_http_push_listener_handler(ngx_http_request_t * r);

static ngx_int_t ngx_http_push_set_client_header(ngx_http_request_t *r, ngx_str_t *content_type);
static ngx_int_t ngx_http_push_set_listener_header(ngx_http_request_t *r, ngx_str_t *content_type);
static ngx_chain_t* ngx_http_push_create_output_chain(ngx_http_request_t *r, ngx_buf_t *buf);
static ngx_int_t ngx_http_push_set_client_body(ngx_http_request_t *r, ngx_chain_t *out);
static ngx_int_t ngx_http_push_set_listener_body(ngx_http_request_t *r, ngx_chain_t *out);

static ngx_int_t ngx_http_push_add_pool_cleaner_delete_file(ngx_pool_t *pool, ngx_file_t *file);

static void ngx_http_push_client_cleanup(ngx_http_push_client_cleanup_t * data); //request pool cleaner
static void ngx_http_push_listener_cleanup(ngx_http_push_listener_cleanup_t * data); //request pool cleaner

//misc stuff
ngx_shm_zone_t * ngx_http_push_shm_zone = NULL;
Expand Down

0 comments on commit 22eb295

Please sign in to comment.