Permalink
Browse files

change the workers_with_subscribers type on the ngx_http_push_stream_…

…channel_t structure
  • Loading branch information...
1 parent 4809c1e commit 3693c9c6ba28b205f166a48859d8085eb6c1de5b @wandenberg committed Mar 8, 2013
View
2 include/ngx_http_push_stream_module.h
@@ -137,7 +137,7 @@ typedef struct {
ngx_int_t last_message_tag;
ngx_uint_t stored_messages;
ngx_uint_t subscribers;
- ngx_http_push_stream_pid_queue_t workers_with_subscribers;
+ ngx_queue_t workers_with_subscribers;
ngx_queue_t message_queue;
time_t last_activity_time;
time_t expires;
View
50 src/ngx_http_push_stream_module_ipc.c
@@ -152,13 +152,15 @@ ngx_http_push_stream_ipc_init_worker()
static ngx_int_t
ngx_http_push_stream_unsubscribe_worker_locked(ngx_http_push_stream_channel_t *channel, ngx_slab_pool_t *shpool)
{
- ngx_http_push_stream_pid_queue_t *sentinel = &channel->workers_with_subscribers;
- ngx_http_push_stream_pid_queue_t *cur = sentinel;
-
- while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
- if ((cur->pid == ngx_pid) || (cur->slot == ngx_process_slot)) {
- ngx_queue_remove(&cur->queue);
- ngx_slab_free_locked(shpool, cur);
+ ngx_http_push_stream_pid_queue_t *worker;
+ ngx_queue_t *cur_worker;
+
+ cur_worker = &channel->workers_with_subscribers;
+ while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
+ worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
+ if ((worker->pid == ngx_pid) || (worker->slot == ngx_process_slot)) {
+ ngx_queue_remove(&worker->queue);
+ ngx_slab_free_locked(shpool, worker);
break;
}
}
@@ -304,20 +306,21 @@ ngx_http_push_stream_process_worker_message(void)
// that's quite bad you see. a previous worker died with an undelivered message.
// but all its subscribers' connections presumably got canned, too. so it's not so bad after all.
- ngx_http_push_stream_pid_queue_t *channel_worker_sentinel = &worker_msg->channel->workers_with_subscribers;
- ngx_http_push_stream_pid_queue_t *channel_worker_cur = channel_worker_sentinel;
+ ngx_queue_t *cur_worker;
+ ngx_http_push_stream_pid_queue_t *worker;
ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push stream module: worker %i intercepted a message intended for another worker process (%i) that probably died", ngx_pid, worker_msg->pid);
// delete that invalid sucker
- while ((channel_worker_cur != NULL) && (channel_worker_cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&channel_worker_cur->queue)) != channel_worker_sentinel) {
- if (channel_worker_cur->pid == worker_msg->pid) {
+ cur_worker = &worker_msg->channel->workers_with_subscribers;
+ while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &worker_msg->channel->workers_with_subscribers)) {
+ worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
+ if (worker->pid == worker_msg->pid) {
ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, 0, "push stream module: reference to worker %i will be removed", worker_msg->pid);
ngx_shmtx_lock(&shpool->mutex);
- ngx_queue_remove(&channel_worker_cur->queue);
- ngx_slab_free_locked(shpool, channel_worker_cur);
+ ngx_queue_remove(&worker->queue);
+ ngx_slab_free_locked(shpool, worker);
ngx_shmtx_unlock(&shpool->mutex);
- channel_worker_cur = NULL;
break;
}
}
@@ -361,22 +364,25 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http
{
// subscribers are queued up in a local pool. Queue heads, however, are located
// in shared memory, identified by pid.
- ngx_http_push_stream_pid_queue_t *sentinel = &channel->workers_with_subscribers;
- ngx_http_push_stream_pid_queue_t *cur = sentinel;
+ ngx_http_push_stream_pid_queue_t *worker;
+ ngx_queue_t *cur_worker;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_flag_t queue_was_empty[NGX_MAX_PROCESSES];
ngx_shmtx_lock(&shpool->mutex);
- while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
- ngx_http_push_stream_send_worker_message_locked(channel, &cur->subscribers_sentinel, cur->pid, cur->slot, msg, &queue_was_empty[cur->slot], log);
+ cur_worker = &channel->workers_with_subscribers;
+ while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
+ worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
+ ngx_http_push_stream_send_worker_message_locked(channel, &worker->subscribers_sentinel, worker->pid, worker->slot, msg, &queue_was_empty[worker->slot], log);
}
ngx_shmtx_unlock(&shpool->mutex);
- cur = sentinel;
- while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != sentinel) {
+ cur_worker = &channel->workers_with_subscribers;
+ while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
+ worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
// interprocess communication breakdown
- if (queue_was_empty[cur->slot] && (ngx_http_push_stream_alert_worker_check_messages(cur->pid, cur->slot, log) != NGX_OK)) {
- ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: error communicating with worker process, pid: %P, slot: %d", cur->pid, cur->slot);
+ if (queue_was_empty[worker->slot] && (ngx_http_push_stream_alert_worker_check_messages(worker->pid, worker->slot, log) != NGX_OK)) {
+ ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: error communicating with worker process, pid: %P, slot: %d", worker->pid, worker->slot);
}
}
View
14 src/ngx_http_push_stream_module_subscriber.c
@@ -664,7 +664,7 @@ ngx_http_push_stream_create_worker_subscriber_channel_sentinel_locked(ngx_slab_p
}
// initialize
- ngx_queue_insert_tail(&channel->workers_with_subscribers.queue, &worker_sentinel->queue);
+ ngx_queue_insert_tail(&channel->workers_with_subscribers, &worker_sentinel->queue);
worker_sentinel->pid = ngx_pid;
worker_sentinel->slot = ngx_process_slot;
@@ -692,7 +692,8 @@ ngx_http_push_stream_create_channel_subscription(ngx_http_request_t *r, ngx_http
static ngx_int_t
ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpool, ngx_str_t *channel_id, ngx_http_push_stream_subscription_t *subscription, ngx_http_push_stream_subscription_t *subscriptions_sentinel, ngx_log_t *log)
{
- ngx_http_push_stream_pid_queue_t *cur, *worker_subscribers_sentinel = NULL;
+ ngx_queue_t *cur_worker;
+ ngx_http_push_stream_pid_queue_t *worker, *worker_subscribers_sentinel = NULL;
ngx_http_push_stream_channel_t *channel;
ngx_http_push_stream_queue_elem_t *element_subscriber;
@@ -702,10 +703,11 @@ ngx_http_push_stream_assing_subscription_to_channel_locked(ngx_slab_pool_t *shpo
return NGX_ERROR;
}
- cur = &channel->workers_with_subscribers;
- while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != &channel->workers_with_subscribers) {
- if (cur->pid == ngx_pid) {
- worker_subscribers_sentinel = cur;
+ cur_worker = &channel->workers_with_subscribers;
+ while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
+ worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
+ if (worker->pid == ngx_pid) {
+ worker_subscribers_sentinel = worker;
break;
}
}
View
38 src/ngx_http_push_stream_module_utils.c
@@ -61,26 +61,28 @@ static void
ngx_http_push_stream_delete_channels(ngx_http_push_stream_shm_data_t *data, ngx_slab_pool_t *shpool)
{
ngx_http_push_stream_channel_t *channel;
- ngx_http_push_stream_pid_queue_t *cur_worker;
+ ngx_http_push_stream_pid_queue_t *worker;
+ ngx_queue_t *cur_worker;
ngx_http_push_stream_queue_elem_t *cur;
ngx_http_push_stream_subscription_t *cur_subscription;
ngx_queue_t *prev_channel, *cur_channel = &data->channels_to_delete;
- while ((cur_channel = ngx_queue_next(cur_channel)) != &data->channels_to_delete) {
+ while ((cur_channel = ngx_queue_next(cur_channel)) && (cur_channel != NULL) && (cur_channel != &data->channels_to_delete)) {
channel = ngx_queue_data(cur_channel, ngx_http_push_stream_channel_t, queue);
// remove subscribers if any
if (channel->subscribers > 0) {
cur_worker = &channel->workers_with_subscribers;
// find the current worker
- while ((cur_worker = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur_worker->queue)) != &channel->workers_with_subscribers) {
- if (cur_worker->pid == ngx_pid) {
+ while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
+ worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
+ if (worker->pid == ngx_pid) {
// to each subscriber of this channel in this worker
- while (!ngx_queue_empty(&cur_worker->subscribers_sentinel.queue)) {
- cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&cur_worker->subscribers_sentinel.queue);
+ while (!ngx_queue_empty(&worker->subscribers_sentinel.queue)) {
+ cur = (ngx_http_push_stream_queue_elem_t *) ngx_queue_next(&worker->subscribers_sentinel.queue);
ngx_http_push_stream_subscriber_t *subscriber = (ngx_http_push_stream_subscriber_t *) cur->value;
// find the subscription for the channel being deleted
@@ -662,7 +664,8 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool)
ngx_http_push_stream_channel_t *channel;
ngx_slab_pool_t *shpool = (ngx_slab_pool_t *) ngx_http_push_stream_shm_zone->shm.addr;
ngx_http_push_stream_shm_data_t *data = (ngx_http_push_stream_shm_data_t *) ngx_http_push_stream_shm_zone->data;
- ngx_http_push_stream_pid_queue_t *cur;
+ ngx_http_push_stream_pid_queue_t *worker;
+ ngx_queue_t *cur_worker;
ngx_shmtx_lock(&shpool->mutex);
@@ -690,13 +693,14 @@ ngx_http_push_stream_delete_channel(ngx_str_t *id, ngx_pool_t *temp_pool)
}
// send signal to each worker with subscriber to this channel
- cur = &channel->workers_with_subscribers;
+ cur_worker = &channel->workers_with_subscribers;
- if (ngx_queue_empty(&channel->workers_with_subscribers.queue)) {
+ if (ngx_queue_empty(&channel->workers_with_subscribers)) {
ngx_http_push_stream_alert_worker_delete_channel(ngx_pid, ngx_process_slot, ngx_cycle->log);
} else {
- while ((cur = (ngx_http_push_stream_pid_queue_t *) ngx_queue_next(&cur->queue)) != &channel->workers_with_subscribers) {
- ngx_http_push_stream_alert_worker_delete_channel(cur->pid, cur->slot, ngx_cycle->log);
+ while ((cur_worker = ngx_queue_next(cur_worker)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
+ worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
+ ngx_http_push_stream_alert_worker_delete_channel(worker->pid, worker->slot, ngx_cycle->log);
}
}
}
@@ -786,11 +790,13 @@ static void
nxg_http_push_stream_free_channel_memory_locked(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel)
{
// delete the worker-subscriber queue
- ngx_http_push_stream_pid_queue_t *cur;
+ ngx_http_push_stream_pid_queue_t *worker;
+ ngx_queue_t *cur_worker;
- while ((cur = (ngx_http_push_stream_pid_queue_t *)ngx_queue_next(&channel->workers_with_subscribers.queue)) != &channel->workers_with_subscribers) {
- ngx_queue_remove(&cur->queue);
- ngx_slab_free_locked(shpool, cur);
+ while ((cur_worker = ngx_queue_head(&channel->workers_with_subscribers)) && (cur_worker != NULL) && (cur_worker != &channel->workers_with_subscribers)) {
+ worker = ngx_queue_data(cur_worker, ngx_http_push_stream_pid_queue_t, queue);
+ ngx_queue_remove(&worker->queue);
+ ngx_slab_free_locked(shpool, worker);
}
if (channel->channel_deleted_message != NULL) ngx_http_push_stream_free_message_memory_locked(shpool, channel->channel_deleted_message);
@@ -834,7 +840,7 @@ ngx_http_push_stream_free_memory_of_expired_messages_and_channels(ngx_flag_t for
ngx_queue_t *cur;
ngx_shmtx_lock(&shpool->mutex);
- while ((cur = ngx_queue_head(&data->messages_trash)) != &data->messages_trash) {
+ while ((cur = ngx_queue_head(&data->messages_trash)) && (cur != NULL) && (cur != &data->messages_trash)) {
message = ngx_queue_data(cur, ngx_http_push_stream_msg_t, queue);
if (force || ((message->workers_ref_count <= 0) && (ngx_time() > message->expires))) {
View
2 src/ngx_http_push_stream_rbtree_util.c
@@ -170,7 +170,7 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st
ngx_http_push_stream_initialize_channel(channel);
// initialize workers_with_subscribers queues only when a channel is created
- ngx_queue_init(&channel->workers_with_subscribers.queue);
+ ngx_queue_init(&channel->workers_with_subscribers);
ngx_shmtx_unlock(&shpool->mutex);
return channel;

0 comments on commit 3693c9c

Please sign in to comment.