Skip to content

Commit

Permalink
fix socket leak on reload process for sockets used in inter process c…
Browse files Browse the repository at this point in the history
…ommunication and connections
  • Loading branch information
wandenberg committed May 4, 2014
1 parent 3d3a204 commit 1a79b02
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 6 deletions.
21 changes: 19 additions & 2 deletions misc/spec/mix/send_signals_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
body = 'body'
response = response2 = ''
pid = pid2 = 0
open_sockets_1 = 0

nginx_run_server(config, :timeout => 60) do |conf|
EventMachine.run do
Expand All @@ -77,22 +78,30 @@
resp_1["by_worker"].count.should eql(1)
pid = resp_1["by_worker"][0]['pid'].to_i

open_sockets_1 = `lsof -p #{Process.getpgid pid} | grep socket | wc -l`.strip

socket = open_socket(nginx_host, nginx_port)
socket.print "GET /sub/#{channel} HTTP/1.1\r\nHost: test\r\nX-Nginx-PushStream-Mode: long-polling\r\n\r\n"

# send reload signal
`#{ nginx_executable } -c #{ conf.configuration_filename } -s reload > /dev/null 2>&1`
end
end
end

# check if first worker die
EM.add_periodic_timer(0.5) do
timer = EM.add_periodic_timer(0.5) do

# check statistics again
pub_4 = EventMachine::HttpRequest.new(nginx_address + '/channels-stats').get :head => headers
pub_4.callback do
resp_3 = JSON.parse(pub_4.response)
resp_3.has_key?("by_worker").should be_true

if (resp_3["by_worker"].count == 1) && (pid != resp_3["by_worker"][0]['pid'].to_i)
old_process_running = Process.getpgid(pid) rescue false
if !old_process_running && (resp_3["by_worker"].count == 1) && (pid != resp_3["by_worker"][0]['pid'].to_i)
timer.cancel

# publish a message
pub_2 = EventMachine::HttpRequest.new(nginx_address + '/pub?id=' + channel.to_s).post :head => headers, :body => body
pub_2.callback do
Expand All @@ -111,7 +120,15 @@
resp_2["published_messages"].to_i.should eql(1)
resp_2["subscribers"].to_i.should eql(1)

open_sockets_2 = `lsof -p #{Process.getpgid resp_3["by_worker"][0]['pid'].to_i} | grep socket | wc -l`.strip
open_sockets_2.should eql(open_sockets_1)

EventMachine.stop

# send stop signal
`#{ nginx_executable } -c #{ conf.configuration_filename } -s stop > /dev/null 2>&1`
error_log = File.read(conf.error_log)
error_log.should_not include("open socket")
end
end
end
Expand Down
12 changes: 9 additions & 3 deletions src/ngx_http_push_stream_module_ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ ngx_http_push_stream_init_ipc(ngx_cycle_t *cycle, ngx_int_t workers)
*/

for(i=0; i<workers; i++) {
while (s < last_expected_process && ngx_processes[s].pid != -1) {
while (s < last_expected_process && ngx_processes[s].pid != NGX_INVALID_FILE) {
// find empty existing slot
s++;
}
Expand Down Expand Up @@ -188,6 +188,9 @@ ngx_http_push_stream_alert_shutting_down_workers(void)
for(i = 0; i < NGX_MAX_PROCESSES; i++) {
if (global_data->ipc[i].pid > 0) {
ngx_http_push_stream_alert_worker_shutting_down_cleanup(global_data->ipc[i].pid, i, ngx_cycle->log);
ngx_close_channel((ngx_socket_t *) ngx_http_push_stream_socketpairs[i], ngx_cycle->log);
ngx_http_push_stream_socketpairs[i][0] = NGX_INVALID_FILE;
ngx_http_push_stream_socketpairs[i][1] = NGX_INVALID_FILE;
}
}
}
Expand Down Expand Up @@ -236,7 +239,7 @@ ngx_http_push_stream_clean_worker_data(ngx_http_push_stream_shm_data_t *data)

ngx_shmtx_unlock(&shpool->mutex);

data->ipc[ngx_process_slot].pid = -1;
data->ipc[ngx_process_slot].pid = NGX_INVALID_FILE;
data->ipc[ngx_process_slot].subscribers = 0;
}

Expand Down Expand Up @@ -302,7 +305,10 @@ ngx_http_push_stream_channel_handler(ngx_event_t *ev)
static ngx_int_t
ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log, ngx_channel_t command)
{
return ngx_write_channel(ngx_http_push_stream_socketpairs[slot][0], &command, sizeof(ngx_channel_t), log);
if (ngx_http_push_stream_socketpairs[slot][0] != NGX_INVALID_FILE) {
return ngx_write_channel(ngx_http_push_stream_socketpairs[slot][0], &command, sizeof(ngx_channel_t), log);
}
return NGX_OK;
}


Expand Down
1 change: 0 additions & 1 deletion src/ngx_http_push_stream_module_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ ngx_http_push_stream_cleanup_shutting_down_worker_data(ngx_http_push_stream_shm_

while (!ngx_queue_empty(&thisworker_data->subscribers_queue)) {
ngx_http_push_stream_subscriber_t *subscriber = ngx_queue_data(ngx_queue_head(&thisworker_data->subscribers_queue), ngx_http_push_stream_subscriber_t, worker_queue);
subscriber->request->keepalive = 0;
if (subscriber->longpolling) {
ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(subscriber->request);
} else {
Expand Down

0 comments on commit 1a79b02

Please sign in to comment.