From 349af1ec5f6d742af2e190b4b0b7e2f105dcdd96 Mon Sep 17 00:00:00 2001 From: Leo P Date: Wed, 3 Feb 2010 21:19:13 -0500 Subject: [PATCH] fix: error log reported failed close() for some publisher requests with large messages, fix: occasional memory leak during message deletion, fix: worker messages intended for dead worker processes were not deleted --- changelog.txt | 2 ++ src/ngx_http_push_module.c | 27 +++++++++++++++------------ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/changelog.txt b/changelog.txt index 6e906ed4f..a4d37eeb0 100644 --- a/changelog.txt +++ b/changelog.txt @@ -1,3 +1,5 @@ + fix: error log reported failed close() for some publisher requests with large messages + fix: occasional memory leak during message deletion fix: worker messages intended for dead worker processes were not deleted 0.691 (Feb. 2 2010) fix: server reload (via SIGHUP signal) was failing diff --git a/src/ngx_http_push_module.c b/src/ngx_http_push_module.c index f7bdd1717..350599ebd 100644 --- a/src/ngx_http_push_module.c +++ b/src/ngx_http_push_module.c @@ -91,7 +91,7 @@ static void ngx_http_push_reserve_message_locked(ngx_http_push_channel_t *channe static void ngx_http_push_release_message_locked(ngx_http_push_channel_t *channel, ngx_http_push_msg_t *msg) { msg->refcount--; - if(msg->queue.next==NULL && !msg->refcount) { + if(msg->queue.next==NULL && msg->refcount<=0) { //message had been dequeued and nobody needs it anymore ngx_http_push_free_message_locked(msg, ngx_http_push_shpool); } @@ -109,7 +109,7 @@ static ngx_inline void ngx_http_push_general_delete_message_locked(ngx_http_push ngx_queue_remove(&msg->queue); channel->messages--; } - if(msg->refcount==0 || force) { + if(msg->refcount<=0 || force) { //nobody needs this message, or we were forced at integer-point to delete ngx_http_push_free_message_locked(msg, shpool); } @@ -438,7 +438,7 @@ static ngx_int_t ngx_http_push_subscriber_handler(ngx_http_request_t *r) { //close file when we're done with it ngx_pool_cleanup_t *cln; ngx_pool_cleanup_file_t *clnf; - + if((cln = ngx_pool_cleanup_add(r->pool, sizeof(ngx_pool_cleanup_file_t)))==NULL) { return NGX_HTTP_INTERNAL_SERVER_ERROR; } @@ -507,14 +507,15 @@ static ngx_int_t ngx_http_push_broadcast_locked(ngx_http_push_channel_t *channel ngx_int_t received; received = channel->subscribers > 0 ? NGX_HTTP_PUSH_MESSAGE_RECEIVED : NGX_HTTP_PUSH_MESSAGE_QUEUED; + if(msg!=NULL && received==NGX_HTTP_PUSH_MESSAGE_RECEIVED) { + ngx_http_push_reserve_message_locked(channel, msg); + } + while((cur=(ngx_http_push_pid_queue_t *)ngx_queue_next(&cur->queue))!=sentinel) { pid_t worker_pid = cur->pid; ngx_int_t worker_slot = cur->slot; ngx_http_push_subscriber_t *subscriber_sentinel= cur->subscriber_sentinel; - - if(msg!=NULL) { - - } + ngx_shmtx_unlock(&shpool->mutex); if(worker_pid == ngx_pid) { //my subscribers @@ -875,10 +876,12 @@ static ngx_int_t ngx_http_push_respond_to_subscribers(ngx_http_push_channel_t *c ngx_pfree(ngx_http_push_pool, buffer); ngx_pfree(ngx_http_push_pool, chain); - ngx_shmtx_lock(&shpool->mutex); - //message deletion - ngx_http_push_release_message_locked(channel, msg); - ngx_shmtx_unlock(&shpool->mutex); + if(responded_subscribers) { + ngx_shmtx_lock(&shpool->mutex); + //message deletion + ngx_http_push_release_message_locked(channel, msg); + ngx_shmtx_unlock(&shpool->mutex); + } } else { //headers only probably @@ -1146,7 +1149,7 @@ static void ngx_http_push_copy_preallocated_buffer(ngx_buf_t *buf, ngx_buf_t *cb } if (buf->file!=NULL) { cbuf->file = (ngx_file_t *) (cbuf+1) + ((buf->temporary || buf->memory) ? ngx_buf_size(buf) : 0); - cbuf->file->fd=buf->file->fd; //used to be set to NGX_INVALID_FILE + cbuf->file->fd=NGX_INVALID_FILE; cbuf->file->log=NULL; cbuf->file->offset=buf->file->offset; cbuf->file->sys_offset=buf->file->sys_offset;