Skip to content

Commit

Permalink
fix: error log reported failed close() for some publisher requests w…
Browse files Browse the repository at this point in the history
…ith large messages,

 fix: occasional memory leak during message deletion, 
 fix: worker messages intended for dead worker processes were not deleted
  • Loading branch information
slact committed Feb 4, 2010
1 parent 8a711a7 commit 349af1e
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
2 changes: 2 additions & 0 deletions changelog.txt
@@ -1,3 +1,5 @@
fix: error log reported failed close() for some publisher requests with large messages
fix: occasional memory leak during message deletion
fix: worker messages intended for dead worker processes were not deleted
0.691 (Feb. 2 2010)
fix: server reload (via SIGHUP signal) was failing
Expand Down
27 changes: 15 additions & 12 deletions src/ngx_http_push_module.c
Expand Up @@ -91,7 +91,7 @@ static void ngx_http_push_reserve_message_locked(ngx_http_push_channel_t *channe

static void ngx_http_push_release_message_locked(ngx_http_push_channel_t *channel, ngx_http_push_msg_t *msg) {
msg->refcount--;
if(msg->queue.next==NULL && !msg->refcount) {
if(msg->queue.next==NULL && msg->refcount<=0) {
//message had been dequeued and nobody needs it anymore
ngx_http_push_free_message_locked(msg, ngx_http_push_shpool);
}
Expand All @@ -109,7 +109,7 @@ static ngx_inline void ngx_http_push_general_delete_message_locked(ngx_http_push
ngx_queue_remove(&msg->queue);
channel->messages--;
}
if(msg->refcount==0 || force) {
if(msg->refcount<=0 || force) {
//nobody needs this message, or we were forced at integer-point to delete
ngx_http_push_free_message_locked(msg, shpool);
}
Expand Down Expand Up @@ -438,7 +438,7 @@ static ngx_int_t ngx_http_push_subscriber_handler(ngx_http_request_t *r) {
//close file when we're done with it
ngx_pool_cleanup_t *cln;
ngx_pool_cleanup_file_t *clnf;

if((cln = ngx_pool_cleanup_add(r->pool, sizeof(ngx_pool_cleanup_file_t)))==NULL) {
return NGX_HTTP_INTERNAL_SERVER_ERROR;
}
Expand Down Expand Up @@ -507,14 +507,15 @@ static ngx_int_t ngx_http_push_broadcast_locked(ngx_http_push_channel_t *channel
ngx_int_t received;
received = channel->subscribers > 0 ? NGX_HTTP_PUSH_MESSAGE_RECEIVED : NGX_HTTP_PUSH_MESSAGE_QUEUED;

if(msg!=NULL && received==NGX_HTTP_PUSH_MESSAGE_RECEIVED) {
ngx_http_push_reserve_message_locked(channel, msg);
}

while((cur=(ngx_http_push_pid_queue_t *)ngx_queue_next(&cur->queue))!=sentinel) {
pid_t worker_pid = cur->pid;
ngx_int_t worker_slot = cur->slot;
ngx_http_push_subscriber_t *subscriber_sentinel= cur->subscriber_sentinel;

if(msg!=NULL) {

}

ngx_shmtx_unlock(&shpool->mutex);
if(worker_pid == ngx_pid) {
//my subscribers
Expand Down Expand Up @@ -875,10 +876,12 @@ static ngx_int_t ngx_http_push_respond_to_subscribers(ngx_http_push_channel_t *c
ngx_pfree(ngx_http_push_pool, buffer);
ngx_pfree(ngx_http_push_pool, chain);

ngx_shmtx_lock(&shpool->mutex);
//message deletion
ngx_http_push_release_message_locked(channel, msg);
ngx_shmtx_unlock(&shpool->mutex);
if(responded_subscribers) {
ngx_shmtx_lock(&shpool->mutex);
//message deletion
ngx_http_push_release_message_locked(channel, msg);
ngx_shmtx_unlock(&shpool->mutex);
}
}
else {
//headers only probably
Expand Down Expand Up @@ -1146,7 +1149,7 @@ static void ngx_http_push_copy_preallocated_buffer(ngx_buf_t *buf, ngx_buf_t *cb
}
if (buf->file!=NULL) {
cbuf->file = (ngx_file_t *) (cbuf+1) + ((buf->temporary || buf->memory) ? ngx_buf_size(buf) : 0);
cbuf->file->fd=buf->file->fd; //used to be set to NGX_INVALID_FILE
cbuf->file->fd=NGX_INVALID_FILE;
cbuf->file->log=NULL;
cbuf->file->offset=buf->file->offset;
cbuf->file->sys_offset=buf->file->sys_offset;
Expand Down

0 comments on commit 349af1e

Please sign in to comment.