Skip to content

Commit

Permalink
ControlTower that gets its listening sockets from launchd, and exits …
Browse files Browse the repository at this point in the history
…whenever idle and memory usage is deemed too high.
  • Loading branch information
Brian Lovrin committed Jul 27, 2011
1 parent fa37bc6 commit 021deb6
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 29 deletions.
9 changes: 6 additions & 3 deletions bin/control_tower
Expand Up @@ -18,7 +18,7 @@ OptionParser.new do |opts|
@options[:rackup] = rackup
end

opts.on("-p", "--port PORT", Integer, "Port on which to run the server (default: 3000)") do |port|
opts.on("-p", "--port PORT", Integer, "Port on which to run the server (default: 3000) (0 gets the socket from launchd and ignores --address)") do |port|
@options[:port] = port
end

Expand All @@ -45,8 +45,11 @@ app = eval("Rack::Builder.new { #{rackup_config} }").to_app
# Let's get to business!
server = ControlTower::Server.new(app, @options)
if server
puts "You are cleared for take-off!"
puts "Listening on #{@options[:host]}:#{@options[:port]}"
if @options[:port] != 0
puts "Control Tower is listening on #{@options[:host]}:#{@options[:port]}"
else
puts "Control Tower is receiving socket connections from launchd"
end
server.start
else
puts "Mayday! Mayday! Eject! Eject!\n#{$!}"
Expand Down
142 changes: 117 additions & 25 deletions lib/control_tower/rack_socket.rb
@@ -1,5 +1,5 @@
# This file is covered by the Ruby license. See COPYING for more details.
# Copyright (C) 2009-2010, Apple Inc. All rights reserved.
# Copyright (C) 2009-2011, Apple Inc. All rights reserved.

framework 'Foundation'
require 'CTParser'
Expand All @@ -10,30 +10,112 @@
module ControlTower
class RackSocket
VERSION = [1,0].freeze
QUIESCING_MSG = 'Resource limit reached. Redirecting until server quits (will auto-restart).'

def log(msg, prepend_newline=false)
time = Time.now
tnum = Thread.current.inspect
@log_queue.async(@log_group) do
$stderr.puts "CTLOG::---------" if prepend_newline
$stderr.puts "CTLOG::#{Process.pid}#{tnum} (#{time.strftime("%Y-%m-%d %H:%M:%S")}) #{msg}"
end
end

def rsize
`ps -o rss= -p #{Process.pid}`.to_i / 1024.0
end

def initialize(host, port, server, concurrent)
@log_queue = Dispatch::Queue.new('log_queue')
@log_group = Dispatch::Group.new

@under_launchd = (port == 0)
@mem_high_water_mark = ENV['CT_MEM_BOUNCE_MB'].to_i || -1 if @under_launchd # in megabytes; negative instructs to not bounce
log "STARTING with pid=#{Process.pid}" + (@mem_high_water_mark ? " and memory bounce point=#{@mem_high_water_mark} MB" : "")

if @under_launchd
# Hash used for to determine if server is idle. Old entries are cleared by a background timer thread (see below).
@auth_sessions = {}

# remove old sessions (not used for 60 seconds) from @auth_sessions, because there is no way for ControlTower to determine that a session is done
clear_old_auth_sessions_interval_seconds = 60
Dispatch::Source.timer(clear_old_auth_sessions_interval_seconds, clear_old_auth_sessions_interval_seconds, 5, Dispatch::Queue.concurrent) do
@now = Time.now
log "Checking for old auth sessions to remove"
@auth_sessions.delete_if { |session_id, last_used|
if @now-last_used > clear_old_auth_sessions_interval_seconds
log "Removing old auth session: #{session_id}"
true
else
false
end
}
# if idle, check high water mark for memory usage
if @auth_sessions.empty?
mem_used = rsize
if @mem_high_water_mark && mem_used >= @mem_high_water_mark
@status = :closed
log "MEMORY THRESHOLD EXCEEDED: Flaggging to not accept new connections; waiting for existing connections (if any) to finish"
@request_group.wait
log "MEMORY THRESHOLD EXCEEDED: All existing connections done."
sleep 5 # give time for more authn'd requests to come in (might happen due to race condition between accepting a connection and adding its session_id to @auth_sessions)
if @auth_sessions.empty? # double-check to see if the race condition was exploited
log "MEMORY THRESHOLD EXCEEDED: Server is confirmed idle; exiting pid=#{Process.pid}."
exit # We're idle, so just quit now
else
log "Opening back up... a request snuck in."
@status = :open # @auth_sessions isn't empty, which means there are current active sessions. Let them finish (and as a side-effect possibly accepting new connections).
sleep 1
end
else
log "empty session list, but not at memory threshold yet (used=#{mem_used}, threshold=#{@mem_high_water_mark})"
end
end
end

log "setup authnd session clearing timer"
end

@app = server.app
@socket = TCPServer.new(host, port)
@socket.listen(50)
@status = :closed # Start closed and give the server time to start
if @under_launchd
@socket = Socket.for_fd($stdin.fileno) # launchd sockets
else
@socket = TCPServer.new(host, port)
@socket.listen(50)
end
@status = :closed # Start closed and give the server time to start <------ IS THIS IMPORTANT? Try it w/ the suicide version.

log "socket setup"

if concurrent
@multithread = true
@request_queue = Dispatch::Queue.concurrent
puts "Caution! Wake turbulance from heavy aircraft landing on parallel runway.\n(Parallel Request Action ENABLED!)"
puts "Control Tower is operating in concurrent mode."
else
@multithread = false
@request_queue = Dispatch::Queue.new('com.apple.ControlTower.rack_socket_queue')
puts "Control Tower is operating in serial mode."
end
@request_group = Dispatch::Group.new

log "initialization complete."
end

def open
log "opening..."

@status = :open
while (@status == :open)
connection = @socket.accept

log "Control Tower: waiting for connection..."
connection, remote_addrinfo_str = @socket.accept

# -------------- PROCESS REQUEST ASYNCHRONOUSLY ----------------

@request_queue.async(@request_group) do
remote_port, remote_ip = Socket.unpack_sockaddr_in(remote_addrinfo_str) if remote_addrinfo_str
log "** new request received at #{Time.new} from #{remote_ip}:#{remote_port}", true

env = { 'rack.errors' => $stderr,
'rack.multiprocess' => false,
'rack.multithread' => @multithread,
Expand All @@ -42,11 +124,17 @@ def open
resp = nil
x_sendfile_header = 'X-Sendfile'
x_sendfile = nil
log "** done setting rack env"
begin
log "** about to parse request"
request_data = parse!(connection, env)
log "** done parsing request" #: request_data=#{request_data.inspect}"
# log "** env[]=#{env}"
if request_data
request_data['REMOTE_ADDR'] = connection.addr[3]
request_data['REMOTE_ADDR'] = remote_ip
log "** about to app.call()"
status, headers, body = @app.call(request_data)
log "** app.call() is done"#; handling response, body=#{body.inspect}"

# If there's an X-Sendfile header, we'll use sendfile(2)
if headers.has_key?(x_sendfile_header)
Expand All @@ -57,6 +145,9 @@ def open
headers['Content-Length'] = x_sendfile_size
end

@auth_sessions[env['rack.session'].session_id] = Time.now if @under_launchd
log "Added/updated session_id (#{env['rack.session'].session_id}) to @auth_sessions" if @under_launchd

# Unless somebody's already set it for us (or we don't need it), set the Content-Length
unless (status == -1 ||
(status >= 100 and status <= 199) ||
Expand Down Expand Up @@ -96,41 +187,42 @@ def open
end

else
$stderr.puts "Error: No request data received!"
log "Error: No request data received!"
end
rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, Errno::EINVAL
$stderr.puts "Error: Connection terminated!"
log "Error: Connection terminated!"
rescue Object => e
if resp.nil? && !connection.closed?
connection.write "HTTP/1.1 400\r\n\r\n"
else
# We have a response, but there was trouble sending it:
$stderr.puts "Error: Problem transmitting data -- #{e.inspect}"
$stderr.puts e.backtrace.join("\n")
connection.write "HTTP/1.1 500\r\n\r\n"
end
log "Error: Problem transmitting data -- #{e.inspect}"
$stderr.puts e.backtrace.join("\n")
ensure
# We should clean up after our tempfile, if we used one.
input = env['rack.input']
input.unlink if input.class == Tempfile
connection.close rescue nil
end
end
end
end # while :open
end

def close
puts "Received shutdown signal. Waiting for current requests to complete..."
@status = :close

# You get 30 seconds to empty the request queue and get outa here!
Dispatch::Source.timer(30, 0, 1, Dispatch::Queue.concurrent) do
$stderr.puts "Timed out waiting for connections to close"
exit 1
# 60 seconds to empty the request queue
Dispatch::Source.timer(60, 0, 1, Dispatch::Queue.concurrent) do
puts "Timed out waiting for connections to close. Stopping server with pid=#{Process.pid}."
exit
end

@request_group.wait
@socket.close

puts "All requests completed. Stopping server with pid=#{Process.pid}."
exit
end


private

def parse!(connection, env)
Expand All @@ -143,7 +235,7 @@ def parse!(connection, env)
content_length = 0
content_uploaded = 0
connection_handle = NSFileHandle.alloc.initWithFileDescriptor(connection.fileno)

while (parsing_headers || content_uploaded < content_length) do
# Read the availableData on the socket and give up if there's nothing
incoming_bytes = connection_handle.availableData
Expand All @@ -165,14 +257,14 @@ def parse!(connection, env)
env['rack.input'].appendData(incoming_bytes)
end
end

if content_length > 1024 * 1024
body_file = Tempfile.new('control-tower-request-body-')
NSFileHandle.alloc.initWithFileDescriptor(body_file.fileno).writeData(env['rack.input'])
body_file.rewind
env['rack.input'] = body_file
else
env['rack.input'] = StringIO.new(env['rack.input'])
env['rack.input'] = StringIO.new(env['rack.input'], IO::RDONLY)
end
# Returning what we've got...
return env
Expand Down
2 changes: 1 addition & 1 deletion lib/control_tower/server.rb
Expand Up @@ -24,7 +24,7 @@ def start
private

def parse_options(opt)
@port = (opt[:port] || 8080).to_i
@port = opt[:port].to_i
@host = opt[:host] || `hostname`.chomp
@concurrent = opt[:concurrent]
end
Expand Down

0 comments on commit 021deb6

Please sign in to comment.