Skip to content

Commit

Permalink
JRuby support: pooled application manager (instead of fork) - used if…
Browse files Browse the repository at this point in the history
… fork if not supported
  • Loading branch information
mrbrdo committed Nov 24, 2015
1 parent 354637f commit 239ffb9
Show file tree
Hide file tree
Showing 13 changed files with 521 additions and 184 deletions.
59 changes: 15 additions & 44 deletions lib/spring/application.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
require "spring/boot"
require "set"
require "pty"
require "spring/platform"

module Spring
class Application
if Spring.fork?
require 'spring/application/fork_strategy'
include ForkStrategy
else
require 'spring/application/pool_strategy'
include PoolStrategy
end
attr_reader :manager, :watcher, :spring_env, :original_env

def initialize(manager, original_env)
Expand Down Expand Up @@ -114,13 +122,9 @@ def preload
end
end

def eager_preload
with_pty { preload }
end

def run
state :running
manager.puts
manager.puts Process.pid

loop do
IO.select [manager, @interrupt.first]
Expand All @@ -134,6 +138,7 @@ def run
end

def serve(client)
child_started = [false]
log "got client"
manager.puts

Expand All @@ -153,7 +158,7 @@ def serve(client)
ActionDispatch::Reloader.prepare!
end

pid = fork {
fork_child(client, streams, child_started) {
IGNORE_SIGNALS.each { |sig| trap(sig, "DEFAULT") }
trap("TERM", "DEFAULT")

Expand Down Expand Up @@ -182,24 +187,18 @@ def serve(client)

command.call
}

disconnect_database
reset_streams

log "forked #{pid}"
manager.puts pid

wait pid, streams, client
rescue Exception => e
Kernel.exit if exiting? && e.is_a?(SystemExit)

log "exception: #{e}"
manager.puts unless pid
manager.puts unless child_started[0]

if streams && !e.is_a?(SystemExit)
print_exception(stderr, e)
streams.each(&:close)
end

client.puts(1) if pid
client.puts(1) if child_started[0]
client.close
end

Expand Down Expand Up @@ -280,39 +279,11 @@ def print_exception(stream, error)
rest.each { |line| stream.puts("\tfrom #{line}") }
end

def with_pty
PTY.open do |master, slave|
[STDOUT, STDERR, STDIN].each { |s| s.reopen slave }
Thread.new { master.read }
yield
reset_streams
end
end

def reset_streams
[STDOUT, STDERR].each { |stream| stream.reopen(spring_env.log_file) }
STDIN.reopen("/dev/null")
end

def wait(pid, streams, client)
@mutex.synchronize { @waiting << pid }

# Wait in a separate thread so we can run multiple commands at once
Thread.new {
begin
_, status = Process.wait2 pid
log "#{pid} exited with #{status.exitstatus}"

streams.each(&:close)
client.puts(status.exitstatus)
client.close
ensure
@mutex.synchronize { @waiting.delete pid }
exit_if_finished
end
}
end

private

def active_record_configured?
Expand Down
9 changes: 8 additions & 1 deletion lib/spring/application/boot.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@

require "spring/application"

remote_socket =
if ENV["SPRING_SOCKET"]
UNIXSocket.open(ENV.delete("SPRING_SOCKET"))
else
UNIXSocket.for_fd(3)
end

app = Spring::Application.new(
UNIXSocket.for_fd(3),
remote_socket,
Spring::JSON.load(ENV.delete("SPRING_ORIGINAL_ENV").dup)
)

Expand Down
50 changes: 50 additions & 0 deletions lib/spring/application/fork_strategy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
module Spring
class Application
module ForkStrategy
def eager_preload
with_pty { preload }
end

def with_pty
PTY.open do |master, slave|
[STDOUT, STDERR, STDIN].each { |s| s.reopen slave }
Thread.new { master.read }
yield
reset_streams
end
end

def wait(pid, streams, client)
@mutex.synchronize { @waiting << pid }

# Wait in a separate thread so we can run multiple commands at once
Thread.new {
begin
_, status = Process.wait2 pid
log "#{pid} exited with #{status.exitstatus}"

streams.each(&:close)
client.puts(status.exitstatus)
client.close
ensure
@mutex.synchronize { @waiting.delete pid }
exit_if_finished
end
}
end

def fork_child(client, streams, child_started)
pid = fork { yield }
child_started[0] = true

disconnect_database
reset_streams

log "forked #{pid}"
manager.puts pid

wait pid, streams, client
end
end
end
end
30 changes: 30 additions & 0 deletions lib/spring/application/pool_strategy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
module Spring
class Application
module PoolStrategy
def eager_preload
reset_streams
preload
end

def fork_child(client, streams, child_started)
child_started[0] = true
exitstatus = 0
manager.puts Process.pid
begin
log "started #{Process.pid}"
yield
rescue SystemExit => ex
exitstatus = ex.status
end

log "#{Process.pid} exited with #{exitstatus}"

streams.each(&:close)
client.puts(exitstatus)
client.close

exit
end
end
end
end
138 changes: 4 additions & 134 deletions lib/spring/application_manager.rb
Original file line number Diff line number Diff line change
@@ -1,137 +1,7 @@
module Spring
class ApplicationManager
attr_reader :pid, :child, :app_env, :spring_env, :status

def initialize(app_env)
@app_env = app_env
@spring_env = Env.new
@mutex = Mutex.new
@state = :running
end

def log(message)
spring_env.log "[application_manager:#{app_env}] #{message}"
end

# We're not using @mutex.synchronize to avoid the weird "<internal:prelude>:10"
# line which messes with backtraces in e.g. rspec
def synchronize
@mutex.lock
yield
ensure
@mutex.unlock
end

def start
start_child
end

def restart
return if @state == :stopping
start_child(true)
end

def alive?
@pid
end

def with_child
synchronize do
if alive?
begin
yield
rescue Errno::ECONNRESET, Errno::EPIPE
# The child has died but has not been collected by the wait thread yet,
# so start a new child and try again.
log "child dead; starting"
start
yield
end
else
log "child not running; starting"
start
yield
end
end
end

# Returns the pid of the process running the command, or nil if the application process died.
def run(client)
with_child do
child.send_io client
child.gets or raise Errno::EPIPE
end

pid = child.gets.to_i

unless pid.zero?
log "got worker pid #{pid}"
pid
end
rescue Errno::ECONNRESET, Errno::EPIPE => e
log "#{e} while reading from child; returning no pid"
nil
ensure
client.close
end

def stop
log "stopping"
@state = :stopping

if pid
Process.kill('TERM', pid)
Process.wait(pid)
end
rescue Errno::ESRCH, Errno::ECHILD
# Don't care
end

private

def start_child(preload = false)
@child, child_socket = UNIXSocket.pair

Bundler.with_clean_env do
@pid = Process.spawn(
{
"RAILS_ENV" => app_env,
"RACK_ENV" => app_env,
"SPRING_ORIGINAL_ENV" => JSON.dump(Spring::ORIGINAL_ENV),
"SPRING_PRELOAD" => preload ? "1" : "0"
},
"ruby",
"-I", File.expand_path("../..", __FILE__),
"-e", "require 'spring/application/boot'",
3 => child_socket
)
end

start_wait_thread(pid, child) if child.gets
child_socket.close
end

def start_wait_thread(pid, child)
Process.detach(pid)

Thread.new {
# The recv can raise an ECONNRESET, killing the thread, but that's ok
# as if it does we're no longer interested in the child
loop do
IO.select([child])
break if child.recv(1, Socket::MSG_PEEK).empty?
sleep 0.01
end

log "child #{pid} shutdown"

synchronize {
if @pid == pid
@pid = nil
restart
end
}
}
end
module ApplicationManager
end
end

require 'spring/application_manager/fork_strategy'
require 'spring/application_manager/pool_strategy'
Loading

0 comments on commit 239ffb9

Please sign in to comment.