Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #10 from stripe/gdb

Improve the Einhorn interface
  • Loading branch information...
commit d3bf0399170e8661e901d9179934992c0630866b 2 parents e9496c0 + b9f91ff
@gdb gdb authored
View
15 History.txt
@@ -0,0 +1,15 @@
+=== 0.4.0 2012-09-26
+
+* Switch the command-socket protocol from line-oriented JSON to
+ line-oriented YAML. If you've written your own client to communicate
+ with the einhorn command-socket, you will need to update it. (The
+ bundled einhorn/client is already updated.)
+* Have the 'state' command return a YAML'd state rather than a #pretty_inspect'd
+ state
+* Made einhornsh script-friendly
+* Switched over to address specification via -b option and environment variables;
+ deprecated but didn't remove old interface.
+* Allow einhorn to signal a given worker multiple times
+* Add 'signal' and 'die' commands to Einhornsh
+* Add exponential backoff to spinup if new processes are dying before being acked
+* Add last_upgraded field to State
View
8 README.md
@@ -102,6 +102,10 @@ run). Einhorn relies on file permissions to ensure that no malicious
users can gain access. Run with a `-d DIRECTORY` to change the
directory where the socket will live.
+Note that the command socket uses a line-oriented YAML protocol, and
+you should ensure you trust clients to send arbitrary YAML messages
+into your process.
+
### Seamless upgrades
You can cause your code to be seamlessly reloaded by upgrading the
@@ -167,8 +171,8 @@ To use preloading, just give Einhorn a `-p PATH_TO_CODE`, and make
sure you've defined an `einhorn_main` method.
In order to maximize compatibility, we've worked to minimize Einhorn's
-dependencies. At the moment Einhorn imports 'rubygems' and 'json'. (If
-these turn out to be issues, we could probably find a workaround.)
+dependencies. It has no dependencies outside of the Ruby standard
+library.
### Command name
View
66 bin/einhorn
@@ -44,15 +44,25 @@ You can communicate your running Einhorn process via `einhornsh`:
### Server sockets
If your process is a server and listens on one or more sockets,
-Einhorn can open these sockets and pass them to the workers. Program
-arguments of the form
+Einhorn can open these sockets and pass them to the workers. You can
+specify the addresses to bind by passing one or more `-b ADDR`
+arguments:
- srv:(IP:PORT)[<,OPT>...]
- --MY-OPT=srv:(IP:PORT)[<,OPT>...]
+ einhorn -b 127.0.0.1:1234 my-command
+ einhorn -b 127.0.0.1:1234,r -b 127.0.0.1:1235 my-command
+
+Each address is specified as an ip/port pair, possibly accompanied by options:
+
+ ADDR := (IP:PORT)[<,OPT>...]
+
+In the worker process, the opened file descriptors will be represented
+as a space-separated list of file descriptor numbers in the
+EINHORN_FDS environment variable (respecting the order that the `-b`
+options were provided in):
+
+ EINHORN_FDS="6" # 127.0.0.1:1234
+ EINHORN_FDS="6 7" # 127.0.0.1:1234,r 127.0.0.1:1235
-Will be interpreted as a request to open a server socket bound to
-IP:PORT. The argument will be replaced with `FD` and `---MY-OPT=FD`,
-respectively, where `FD` is the file descriptor number of the socket.
Valid opts are:
r, so_reuseaddr: set SO_REUSEADDR on the server socket
@@ -60,11 +70,11 @@ Valid opts are:
You can for example run:
- $ einhorn -m manual -n 4 example/time_server srv:127.0.0.1:2345,r
+ $ einhorn -b 127.0.0.1:2345,r -m manual -n 4 -- example/time_server
Which will run 4 copies of
- example/time_server 6
+ EINHORN_FDS=6 example/time_server
Where file descriptor 6 is a server socket bound to `127.0.0.1:2345`
and with `SO_REUSEADDR` set. It is then your application's job to
@@ -78,6 +88,10 @@ run). Einhorn relies on file permissions to ensure that no malicious
users can gain access. Run with a `-d DIRECTORY` to change the
directory where the socket will live.
+Note that the command socket uses a line-oriented YAML protocol, and
+you should ensure you trust clients to send arbitrary YAML messages
+into your process.
+
### Seamless upgrades
You can cause your code to be seamlessly reloaded by upgrading the
@@ -116,9 +130,9 @@ string
to the UNIX socket pointed to by the environment variable
`EINHORN_SOCK_PATH`. (Be sure to include a trailing newline.)
-To make things even easier, you can pass a `-b` to Einhorn, in which
+To make things even easier, you can pass a `-g` to Einhorn, in which
case you just need to `write()` the above message to the open file
-descriptor pointed to by `EINHORN_FD`.
+descriptor pointed to by `EINHORN_SOCK_FD`.
(See `lib/einhorn/worker.rb` for details of these and other socket
discovery mechanisms.)
@@ -143,8 +157,8 @@ To use preloading, just give Einhorn a `-p PATH_TO_CODE`, and make
sure you've defined an `einhorn_main` method.
In order to maximize compatibility, we've worked to minimize Einhorn's
-dependencies. At the moment Einhorn imports 'rubygems' and 'json'. (If
-these turn out to be issues, we could probably find a workaround.)
+dependencies. It has no dependencies outside of the Ruby standard
+library.
### Command name
@@ -170,12 +184,18 @@ if true # $0 == __FILE__
Einhorn::TransientState.environ = ENV.to_hash
optparse = OptionParser.new do |opts|
- opts.on('-b', '--command-socket-as-fd', 'Leave the command socket open as a file descriptor, passed in the EINHORN_FD environment variable. This allows your worker processes to ACK without needing to know where on the filesystem the command socket lives.') do
- Einhorn::State.command_socket_as_fd = true
+ opts.on('-b ADDR', '--bind ADDR', 'Bind an address and add the corresponding FD to EINHORN_FDS') do |addr|
+ unless addr =~ /\A([^:]+):(\d+)((?:,\w+)*)\Z/
+ raise "Invalid value for #{addr.inspect}: bind address must be of the form address:port[,flags...]"
+ end
+
+ host = $1
+ port = Integer($2)
+ flags = $3.split(',').select {|flag| flag.length > 0}.map {|flag| flag.downcase}
+ Einhorn::State.bind << [host, port, flags]
end
opts.on('-c CMD_NAME', '--command-name CMD_NAME', 'Set the command name in ps to this value') do |cmd_name|
- Einhorn::TransientState.stateful = false
Einhorn::State.cmd_name = cmd_name
end
@@ -191,6 +211,10 @@ if true # $0 == __FILE__
Einhorn::State.lockfile = lockfile
end
+ opts.on('-g', '--command-socket-as-fd', 'Leave the command socket open as a file descriptor, passed in the EINHORN_SOCK_FD environment variable. This allows your worker processes to ACK without needing to know where on the filesystem the command socket lives.') do
+ Einhorn::State.command_socket_as_fd = true
+ end
+
opts.on('-h', '--help', 'Display this message') do
opts.banner = Einhorn::Executable.einhorn_usage(true)
puts opts
@@ -202,8 +226,6 @@ if true # $0 == __FILE__
end
opts.on('-l', '--backlog N', 'Connection backlog (assuming this is a server)') do |b|
- raise "Cannot pass options if stateful" if Einhorn::TransientState.stateful
- Einhorn::TransientState.stateful = false
Einhorn::State.config[:backlog] = b.to_i
end
@@ -228,13 +250,10 @@ if true # $0 == __FILE__
end
opts.on('-n', '--number N', 'Number of copies to spin up') do |n|
- raise "Cannot pass options if stateful" if Einhorn::TransientState.stateful
- Einhorn::TransientState.stateful = false
Einhorn::State.config[:number] = n.to_i
end
opts.on('-p PATH', '--preload PATH', 'Load this code into memory, and fork but do not exec upon spawn. Must define an "einhorn_main" method') do |path|
- Einhorn::TransientState.stateful = false
Einhorn::State.path = path
end
@@ -243,8 +262,6 @@ if true # $0 == __FILE__
end
opts.on('-s', '--seconds N', 'Number of seconds to wait until respawning') do |b|
- raise "Cannot pass options if stateful" if Einhorn::TransientState.stateful
- Einhorn::TransientState.stateful = false
Einhorn::State.config[:seconds] = s.to_i
end
@@ -253,9 +270,6 @@ if true # $0 == __FILE__
end
opts.on('--with-state-fd STATE', '[Internal option] With file descriptor containing state') do |fd|
- raise "Cannot be stateful if options are passed" unless Einhorn::TransientState.stateful.nil?
- Einhorn::TransientState.stateful = true
-
read = IO.for_fd(Integer(fd))
state = read.read
read.close
View
61 bin/einhornsh
@@ -3,6 +3,7 @@ require 'logger'
require 'optparse'
require 'readline'
+require 'shellwords'
require 'rubygems'
require 'einhorn'
@@ -15,35 +16,46 @@ module Einhorn
end
def run
- puts "Enter 'help' if you're not sure what to do."
- puts
- puts 'Type "quit" or "exit" to quit at any time'
+ emit("Enter 'help' if you're not sure what to do.")
+ emit
+ emit('Type "quit" or "exit" to quit at any time')
while line = Readline.readline('> ', true)
- if ['quit', 'exit'].include?(line)
- puts "Goodbye!"
+ command, args = parse_command(line)
+ if ['quit', 'exit'].include?(command)
+ emit("Goodbye!")
return
end
begin
- response = @client.command({'command' => line})
+ response = @client.command({'command' => command, 'args' => args})
rescue Errno::EPIPE => e
- puts "einhornsh: Error communicating with Einhorn: #{e} (#{e.class})"
- puts "einhornsh: Attempting to reconnect..."
+ emit("einhornsh: Error communicating with Einhorn: #{e} (#{e.class})")
+ emit("einhornsh: Attempting to reconnect...")
reconnect
retry
end
- puts response['message']
+
+ if response.kind_of?(Hash)
+ puts response['message']
+ else
+ puts "Invalid response type #{response.class}: #{response.inspect}"
+ end
end
end
+ def parse_command(line)
+ command, *args = Shellwords.shellsplit(line)
+ [command, args]
+ end
+
def reconnect
begin
@client = Einhorn::Client.for_path(@path_to_socket)
rescue Errno::ENOENT => e
# TODO: The exit here is a biit of a layering violation.
- puts <<EOF
+ Einhorn::EinhornSH.emit(<<EOF, true)
Could not connect to Einhorn master process:
#{e}
@@ -53,12 +65,28 @@ should pass einhornsh the cmd_name (-c argument) provided to Einhorn.
EOF
exit(1)
end
- ehlo
+ ehlo if interactive?
end
def ehlo
response = @client.command('command' => 'ehlo', 'user' => ENV['USER'])
- puts response['message']
+ emit(response['message'])
+ end
+
+ def self.emit(message=nil, force=false)
+ $stderr.puts(message || '') if interactive? || force
+ end
+
+ def self.interactive?
+ $stdin.isatty
+ end
+
+ def emit(*args)
+ self.class.emit(*args)
+ end
+
+ def interactive?
+ self.class.interactive?
end
end
end
@@ -75,7 +103,7 @@ as a positional argument or using `-c`. If you're running your Einhorn
with a `-d`, provide the same argument here."
opts.on('-h', '--help', 'Display this message') do
- puts opts
+ Einhorn::EinhornSH.emit(opts, true)
exit(1)
end
@@ -90,11 +118,14 @@ with a `-d`, provide the same argument here."
optparse.parse!
if ARGV.length > 1
- puts optparse
+ Einhorn::EinhornSH.emit(optparse, true)
return 1
end
- Signal.trap("INT") {puts; exit(0)}
+ Signal.trap("INT") do
+ Einhorn::EinhornSH.emit
+ exit(0)
+ end
path_to_socket = options[:socket_path]
View
4 einhorn.gemspec
@@ -13,8 +13,8 @@ Gem::Specification.new do |gem|
gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
gem.name = "einhorn"
gem.require_paths = ["lib"]
- # maybe swap out for YAML? Then don't need any gems.
- gem.add_dependency('json')
+
+ gem.add_development_dependency('rake')
gem.add_development_dependency('shoulda')
gem.add_development_dependency('mocha')
gem.version = Einhorn::VERSION
View
9 example/thin_example
@@ -28,8 +28,10 @@ end
def einhorn_main
puts "Called with #{ARGV.inspect}"
- if ARGV.length == 0
- raise "Need to call with at least one argument. Try running 'einhorn #{$0} srv:127.0.0.1:5000,r,n srv:127.0.0.1:5001,r,n' and then running 'curl 127.0.0.1:5000' or 'curl 127.0.0.1:5001'"
+ einhorn_fds = Einhorn::Worker.einhorn_fds
+
+ unless einhorn_fds
+ raise "Need to call with at least one bound socket. Try running 'einhorn -b 127.0.0.1:5000,r,n -b 127.0.0.1:5001,r,n #{$0}' and then running 'curl 127.0.0.1:5000' or 'curl 127.0.0.1:5001'"
end
Einhorn::Worker.graceful_shutdown do
@@ -39,8 +41,7 @@ def einhorn_main
Einhorn::Worker.ack!
EventMachine.run do
- ARGV.each_with_index do |arg, i|
- sock = Integer(arg)
+ einhorn_fds.each_with_index do |sock, i|
srv = Thin::Server.new(sock, App.new(i), :reuse => true)
srv.start
end
View
18 example/time_server
@@ -1,28 +1,24 @@
#!/usr/bin/env ruby
#
# A simple example showing how to use Einhorn's shared-socket
-# features. Einhorn translates the srv:(addr:port[,flags...]) spec in
-# the arg string into a file descriptor number.
+# features. Einhorn translates the (addr:port[,flags...]) bind spec in
+# into a file descriptor number in the EINHORN_FDS environment variable.
#
# Invoke through Einhorn as
#
-# einhorn ./time_server srv:127.0.0.1:2345,r
+# einhorn -b 127.0.0.1:2345,r ./time_server
#
# or, if you want to try out preloading:
#
-# einhorn -p ./time_server ./time_server srv:127.0.0.1:2345,r
-
+# einhorn -b 127.0.0.1:2345,r -p ./time_server ./time_server
require 'rubygems'
require 'einhorn/worker'
def einhorn_main
- puts "Called with #{ARGV.inspect}"
-
- if ARGV.length != 1
- raise "Need to call with a port spec as the first argument. Try running 'einhorn #{$0} srv:127.0.0.1:2345,r' and then running 'nc 127.0.0.1 2345'"
- end
+ puts "Called with ENV['EINHORN_FDS']: #{ENV['EINHORN_FDS']}"
- socket = Socket.for_fd(Integer(ARGV[0]))
+ fd_num = Einhorn::Worker.socket!
+ socket = Socket.for_fd(fd_num)
# Came up successfully, so let's set up graceful handler and ACK the
# master.
View
19 lib/einhorn.rb
@@ -6,8 +6,6 @@
require 'tmpdir'
require 'yaml'
-require 'rubygems'
-
module Einhorn
module AbstractState
def default_state; raise NotImplementedError.new('Override in extended modules'); end
@@ -40,6 +38,8 @@ def self.default_state
:version => 0,
:sockets => {},
:orig_cmd => nil,
+ :bind => [],
+ :bind_fds => [],
:cmd => nil,
:script_name => nil,
:respawn => true,
@@ -55,7 +55,9 @@ def self.default_state
:command_socket_as_fd => false,
:socket_path => nil,
:pidfile => nil,
- :lockfile => nil
+ :lockfile => nil,
+ :consecutive_deaths_before_ack => 0,
+ :last_upgraded => nil
}
end
end
@@ -220,9 +222,19 @@ def self.worker_ps_name
Einhorn::State.cmd_name ? "ruby #{Einhorn::State.cmd_name}" : Einhorn::State.orig_cmd.join(' ')
end
+ def self.socketify_env!
+ Einhorn::State.bind.each do |host, port, flags|
+ fd = bind(host, port, flags)
+ Einhorn::State.bind_fds << fd
+ end
+ end
+
+ # This duplicates some code from the environment path, but is
+ # deprecated so that's ok.
def self.socketify!(cmd)
cmd.map! do |arg|
if arg =~ /^(.*=|)srv:([^:]+):(\d+)((?:,\w+)*)$/
+ log_error("Using deprecated command-line configuration for Einhorn; should upgrade to the environment variable interface.")
opt = $1
host = $2
port = $3
@@ -250,6 +262,7 @@ def self.run
Einhorn::State.cmd = ARGV.dup
# TODO: don't actually alter ARGV[0]?
Einhorn::State.cmd[0] = which(Einhorn::State.cmd[0])
+ socketify_env!
socketify!(Einhorn::State.cmd)
end
View
45 lib/einhorn/client.rb
@@ -1,8 +1,34 @@
-require 'json'
require 'set'
+require 'uri'
+require 'yaml'
module Einhorn
class Client
+ # Keep this in this file so client can be loaded entirely
+ # standalone by user code.
+ module Transport
+ def self.send_message(socket, message)
+ line = serialize_message(message)
+ socket.write(line)
+ end
+
+ def self.receive_message(socket)
+ line = socket.readline
+ deserialize_message(line)
+ end
+
+ def self.serialize_message(message)
+ serialized = YAML.dump(message)
+ escaped = URI.escape(serialized, "%\n")
+ escaped + "\n"
+ end
+
+ def self.deserialize_message(line)
+ serialized = URI.unescape(line)
+ YAML.load(serialized)
+ end
+ end
+
@@responseless_commands = Set.new(['worker:ack'])
def self.for_path(path_to_socket)
@@ -20,9 +46,8 @@ def initialize(socket)
end
def command(command_hash)
- command = JSON.generate(command_hash) + "\n"
- write(command)
- recvmessage if expect_response?(command_hash)
+ Transport.send_message(@socket, command_hash)
+ Transport.receive_message(@socket) if expect_response?(command_hash)
end
def expect_response?(command_hash)
@@ -32,17 +57,5 @@ def expect_response?(command_hash)
def close
@socket.close
end
-
- private
-
- def write(bytes)
- @socket.write(bytes)
- end
-
- # TODO: use a streaming JSON parser instead?
- def recvmessage
- line = @socket.readline
- JSON.parse(line)
- end
end
end
View
73 lib/einhorn/command.rb
@@ -1,7 +1,6 @@
require 'pp'
require 'set'
require 'tmpdir'
-require 'json'
require 'einhorn/command/interface'
@@ -30,9 +29,17 @@ def self.mourn(pid)
Einhorn::State.children.delete(pid)
+ # Unacked worker
+ if spec[:type] == :worker && !spec[:acked]
+ Einhorn::State.consecutive_deaths_before_ack += 1
+ extra = ' before it was ACKed'
+ else
+ extra = nil
+ end
+
case type = spec[:type]
when :worker
- Einhorn.log_info("===> Exited worker #{pid.inspect}")
+ Einhorn.log_info("===> Exited worker #{pid.inspect}#{extra}")
when :state_passer
Einhorn.log_debug("===> Exited state passing process #{pid.inspect}")
else
@@ -78,13 +85,23 @@ def self.register_ack(pid)
return
end
+ if Einhorn::State.consecutive_deaths_before_ack > 0
+ extra = ", breaking the streak of #{Einhorn::State.consecutive_deaths_before_ack} consecutive unacked workers dying"
+ else
+ extra = nil
+ end
+ Einhorn::State.consecutive_deaths_before_ack = 0
+
spec[:acked] = true
- Einhorn.log_info("Up to #{Einhorn::WorkerPool.ack_count} / #{Einhorn::WorkerPool.ack_target} #{Einhorn::State.ack_mode[:type]} ACKs")
+ Einhorn.log_info("Up to #{Einhorn::WorkerPool.ack_count} / #{Einhorn::WorkerPool.ack_target} #{Einhorn::State.ack_mode[:type]} ACKs#{extra}")
# Could call cull here directly instead, I believe.
Einhorn::Event.break_loop
end
- def self.signal_all(signal, children)
+ def self.signal_all(signal, children=nil, record=true)
+ children ||= Einhorn::WorkerPool.workers
+
+ signaled = []
Einhorn.log_info("Sending #{signal} to #{children.inspect}")
children.each do |child|
@@ -93,17 +110,22 @@ def self.signal_all(signal, children)
next
end
- if spec[:signaled].include?(child)
- Einhorn.log_error("Not sending #{signal} to already-signaled child #{child.inspect}. The fact we tried this probably indicates a bug in Einhorn.")
- next
+ if record
+ if spec[:signaled].include?(signal)
+ Einhorn.log_error("Re-sending #{signal} to already-signaled child #{child.inspect}. It may be slow to spin down, or it may be swallowing #{signal}s.")
+ end
+ spec[:signaled].add(signal)
end
- spec[:signaled].add(child)
begin
Process.kill(signal, child)
rescue Errno::ESRCH
+ else
+ signaled << child
end
end
+
+ "Successfully sent #{signal}s to #{signaled.length} processes: #{signaled.inspect}"
end
def self.increment
@@ -118,7 +140,7 @@ def self.increment
def self.decrement
if Einhorn::State.config[:number] <= 1
output = "Can't decrease number of workers (already at #{Einhorn::State.config[:number]}). Run kill #{$$} if you really want to kill einhorn."
- $stderr.puts output
+ $stderr.puts(output)
return output
end
@@ -190,7 +212,7 @@ def self.spinup(cmd=nil)
Einhorn::Event.close_all_for_worker
Einhorn.set_argv(cmd, true)
- pass_command_socket_info
+ prepare_child_environment
einhorn_main
end
else
@@ -204,7 +226,7 @@ def self.spinup(cmd=nil)
# cloexec on everything.
Einhorn::Event.close_all_for_worker
- pass_command_socket_info
+ prepare_child_environment
exec [cmd[0], cmd[0]], *cmd[1..-1]
end
end
@@ -229,15 +251,19 @@ def self.spinup(cmd=nil)
end
end
- def self.pass_command_socket_info
+ def self.prepare_child_environment
# This is run from the child
ENV['EINHORN_MASTER_PID'] = Process.ppid.to_s
ENV['EINHORN_SOCK_PATH'] = Einhorn::Command::Interface.socket_path
if Einhorn::State.command_socket_as_fd
socket = UNIXSocket.open(Einhorn::Command::Interface.socket_path)
Einhorn::TransientState.socket_handles << socket
- ENV['EINHORN_FD'] = socket.fileno.to_s
+ ENV['EINHORN_SOCK_FD'] = socket.fileno.to_s
end
+ # Try to match Upstart's internal support for space-separated FD
+ # lists. (I don't think anyone actually uses that functionality,
+ # but seems reasonable enough.)
+ ENV['EINHORN_FDS'] = Einhorn::State.bind_fds.map(&:to_s).join(' ')
end
def self.full_upgrade
@@ -261,6 +287,12 @@ def self.upgrade_workers
Einhorn.log_info("Starting upgrade to #{Einhorn::State.version}...")
end
+ # Reset this, since we've just upgraded to a new universe (I'm
+ # not positive this is the right behavior, but it's not
+ # obviously wrong.)
+ Einhorn::State.consecutive_deaths_before_ack = 0
+ Einhorn::State.last_upgraded = Time.now
+
Einhorn::State.version += 1
replenish_immediately
end
@@ -311,14 +343,23 @@ def self.replenish_gradually
return if Einhorn::TransientState.has_outstanding_spinup_timer
return unless Einhorn::WorkerPool.missing_worker_count > 0
- spinup_interval = Einhorn::State.config[:seconds]
+ # Exponentially backoff automated spinup if we're just having
+ # things die before ACKing
+ spinup_interval = Einhorn::State.config[:seconds] * (1.5 ** Einhorn::State.consecutive_deaths_before_ack)
seconds_ago = (Time.now - Einhorn::State.last_spinup).to_f
if seconds_ago > spinup_interval
- Einhorn.log_debug("Last spinup was #{seconds_ago}s ago, and spinup_interval is #{spinup_interval}, so spinning up a new process")
+ msg = "Last spinup was #{seconds_ago}s ago, and spinup_interval is #{spinup_interval}s, so spinning up a new process"
+
+ if Einhorn::State.consecutive_deaths_before_ack > 0
+ Einhorn.log_info("#{msg} (there have been #{Einhorn::State.consecutive_deaths_before_ack} consecutive unacked worker deaths)")
+ else
+ Einhorn.log_debug(msg)
+ end
+
spinup
else
- Einhorn.log_debug("Last spinup was #{seconds_ago}s ago, and spinup_interval is #{spinup_interval}, so not spinning up a new process")
+ Einhorn.log_debug("Last spinup was #{seconds_ago}s ago, and spinup_interval is #{spinup_interval}s, so not spinning up a new process")
end
Einhorn::TransientState.has_outstanding_spinup_timer = true
View
99 lib/einhorn/command/interface.rb
@@ -148,17 +148,17 @@ def self.default_pidfile(cmd_name=nil)
## Signals
def self.install_handlers
Signal.trap("INT") do
- Einhorn::Command.signal_all("USR2", Einhorn::State.children.keys)
+ Einhorn::Command.signal_all("USR2", Einhorn::WorkerPool.workers)
Einhorn::State.respawn = false
end
Signal.trap("TERM") do
- Einhorn::Command.signal_all("TERM", Einhorn::State.children.keys)
+ Einhorn::Command.signal_all("TERM", Einhorn::WorkerPool.workers)
Einhorn::State.respawn = false
end
# Note that quit is a bit different, in that it will actually
# make Einhorn quit without waiting for children to exit.
Signal.trap("QUIT") do
- Einhorn::Command.signal_all("QUIT", Einhorn::State.children.keys)
+ Einhorn::Command.signal_all("QUIT", Einhorn::WorkerPool.workers)
Einhorn::State.respawn = false
exit(1)
end
@@ -166,12 +166,12 @@ def self.install_handlers
Signal.trap("ALRM") {Einhorn::Command.full_upgrade}
Signal.trap("CHLD") {Einhorn::Event.break_loop}
Signal.trap("USR2") do
- Einhorn::Command.signal_all("USR2", Einhorn::State.children.keys)
+ Einhorn::Command.signal_all("USR2", Einhorn::WorkerPool.workers)
Einhorn::State.respawn = false
end
at_exit do
if Einhorn::State.kill_children_on_exit && Einhorn::TransientState.whatami == :master
- Einhorn::Command.signal_all("USR2", Einhorn::State.children.keys)
+ Einhorn::Command.signal_all("USR2", Einhorn::WorkerPool.workers)
Einhorn::State.respawn = false
end
end
@@ -201,14 +201,13 @@ def self.send_message(conn, response)
if response.kind_of?(String)
response = {'message' => response}
end
- message = pack_message(response)
- conn.write(message)
+ Einhorn::Client::Transport.send_message(conn, response)
end
def self.generate_response(conn, command)
begin
- request = JSON.parse(command)
- rescue JSON::ParserError => e
+ request = Einhorn::Client::Transport.deserialize_message(command)
+ rescue ArgumentError => e
return {
'message' => "Could not parse command: #{e}"
}
@@ -235,17 +234,6 @@ def self.generate_response(conn, command)
end
end
- def self.pack_message(message_struct)
- begin
- JSON.generate(message_struct) + "\n"
- rescue JSON::GeneratorError => e
- response = {
- 'message' => "Error generating JSON message for #{message_struct.inspect} (this indicates a bug): #{e}"
- }
- JSON.generate(response) + "\n"
- end
- end
-
def self.command_descriptions
command_specs = @@commands.select do |_, spec|
spec[:description]
@@ -291,7 +279,7 @@ def self.unrecognized_command(conn, request)
end
command 'state', "Get a dump of Einhorn's current state" do
- Einhorn::Command.dumpable_state.pretty_inspect
+ YAML.dump(Einhorn::Command.dumpable_state)
end
command 'reload', 'Reload Einhorn' do |conn, _|
@@ -332,5 +320,74 @@ def self.unrecognized_command(conn, request)
Einhorn::Command.full_upgrade
nil
end
+
+ command 'signal', 'Send one or more signals to all workers (args: SIG1 [SIG2 ...])' do |conn, request|
+ args = request['args']
+ if message = validate_args(args)
+ next message
+ end
+
+ args = normalize_signals(args)
+
+ if message = validate_signals(args)
+ next message
+ end
+
+ results = args.map do |signal|
+ Einhorn::Command.signal_all(signal, nil, false)
+ end
+
+ results.join("\n")
+ end
+
+ command 'die', 'Send SIGNAL (default: SIGUSR2) to all workers, stop spawning new ones, and exit once all workers die (args: [SIGNAL])' do |conn, request|
+ # TODO: dedup this code with signal
+ args = request['args']
+ if message = validate_args(args)
+ next message
+ end
+
+ args = normalize_signals(args)
+
+ if message = validate_signals(args)
+ next message
+ end
+
+ signal = args[0] || "USR2"
+
+ response = Einhorn::Command.signal_all(signal, Einhorn::WorkerPool.workers)
+ Einhorn::State.respawn = false
+
+ "Einhorn is going down! #{response}"
+ end
+
+ def self.validate_args(args)
+ return 'No args provided' unless args
+ return 'Args must be an array' unless args.kind_of?(Array)
+
+ args.each do |arg|
+ return "Argument is a #{arg.class}, not a string: #{arg.inspect}" unless arg.kind_of?(String)
+ end
+
+ nil
+ end
+
+ def self.validate_signals(args)
+ args.each do |signal|
+ unless Signal.list.include?(signal)
+ return "Invalid signal: #{signal.inspect}"
+ end
+ end
+
+ nil
+ end
+
+ def self.normalize_signals(args)
+ args.map do |signal|
+ signal = signal.upcase
+ signal = $1 if signal =~ /\ASIG(.*)\Z/
+ signal
+ end
+ end
end
end
View
31 lib/einhorn/worker.rb
@@ -38,8 +38,8 @@ def self.ack(*args)
#
# @discovery: How to discover the master process's command socket.
# :env: Discover the path from ENV['EINHORN_SOCK_PATH']
- # :fd: Just use the file descriptor in ENV['EINHORN_FD'].
- # Must run the master with the -b flag. This is mostly
+ # :fd: Just use the file descriptor in ENV['EINHORN_SOCK_FD'].
+ # Must run the master with the -g flag. This is mostly
# useful if you don't have a nice library like Einhorn::Worker.
# Then @arg being true causes the FD to be left open after ACK;
# otherwise it is closed.
@@ -57,7 +57,7 @@ def self.ack!(discovery=:env, arg=nil)
socket = ENV['EINHORN_SOCK_PATH']
client = Einhorn::Client.for_path(socket)
when :fd
- raise "No EINHORN_FD provided in environment. Did you run einhorn with the -b flag?" unless fd_str = ENV['EINHORN_FD']
+ raise "No EINHORN_SOCK_FD provided in environment. Did you run einhorn with the -g flag?" unless fd_str = ENV['EINHORN_SOCK_FD']
fd = Integer(fd_str)
client = Einhorn::Client.for_fd(fd)
@@ -78,6 +78,31 @@ def self.ack!(discovery=:env, arg=nil)
true
end
+ def self.socket(number=0)
+ fds = einhorn_fds
+ fds ? fds[number] : nil
+ end
+
+ def self.socket!(number=0)
+ unless fds = einhorn_fds
+ raise "No EINHORN_FDS provided in environment. Are you running under Einhorn?"
+ end
+
+ unless number < fds.length
+ raise "Only #{fds.length} FDs available, but FD #{number} was requested"
+ end
+
+ fds[number]
+ end
+
+ def self.einhorn_fds
+ unless raw_fds = ENV['EINHORN_FDS']
+ return nil
+ end
+
+ raw_fds.split(' ').map {|fd| Integer(fd)}
+ end
+
# Call this to handle graceful shutdown requests to your app.
def self.graceful_shutdown(&blk)
Signal.trap('USR2', &blk)
View
4 lib/einhorn/worker_pool.rb
@@ -1,5 +1,9 @@
module Einhorn
module WorkerPool
+ def self.workers
+ Einhorn::State.children.keys
+ end
+
def self.unsignaled_workers
Einhorn::State.children.select do |pid, spec|
spec[:signaled].length == 0
View
1  test/test_helper.rb
@@ -1,4 +1,5 @@
require 'rubygems'
+require 'bundler/setup'
require 'test/unit'
require 'mocha'
View
49 test/unit/einhorn/client.rb
@@ -0,0 +1,49 @@
+require File.expand_path(File.join(File.dirname(__FILE__), '../../test_helper'))
+
+require 'einhorn'
+
+class ClientTest < Test::Unit::TestCase
+ def message
+ {:foo => ['%bar', '%baz']}
+ end
+
+ def serialized
+ "--- %0A:foo: %0A- \"%25bar\"%0A- \"%25baz\"%0A\n"
+ end
+
+ context "when sending a message" do
+ should "write a serialized line" do
+ socket = mock
+ socket.expects(:write).with(serialized)
+ Einhorn::Client::Transport.send_message(socket, message)
+ end
+ end
+
+ context "when receiving a message" do
+ should "deserialize a single line" do
+ socket = mock
+ socket.expects(:readline).returns(serialized)
+ result = Einhorn::Client::Transport.receive_message(socket)
+ assert_equal(result, message)
+ end
+ end
+
+ context "when {de,}serializing a message" do
+ should "serialize and escape a message as expected" do
+ actual = Einhorn::Client::Transport.serialize_message(message)
+ assert_equal(serialized, actual)
+ end
+
+ should "deserialize and unescape a message as expected" do
+ actual = Einhorn::Client::Transport.deserialize_message(serialized)
+ assert_equal(message, actual)
+ end
+
+ should "raise an error when deserializing invalid YAML" do
+ invalid_serialized = "-%0A\t-"
+ assert_raises(ArgumentError) do
+ Einhorn::Client::Transport.deserialize_message(invalid_serialized)
+ end
+ end
+ end
+end
View
10 test/unit/einhorn/command/interface.rb
@@ -9,14 +9,16 @@ class InterfaceTest < Test::Unit::TestCase
should "call that command" do
conn = stub(:log_debug => nil)
conn.expects(:write).once.with do |message|
- parsed = JSON.parse(message)
+ # Remove trailing newline
+ message = message[0...-1]
+ parsed = YAML.load(URI.unescape(message))
parsed['message'] =~ /Welcome gdb/
end
request = {
'command' => 'ehlo',
'user' => 'gdb'
}
- Interface.process_command(conn, JSON.generate(request))
+ Interface.process_command(conn, YAML.dump(request))
end
end
@@ -27,7 +29,7 @@ class InterfaceTest < Test::Unit::TestCase
request = {
'command' => 'made-up',
}
- Interface.process_command(conn, JSON.generate(request))
+ Interface.process_command(conn, YAML.dump(request))
end
end
@@ -41,7 +43,7 @@ class InterfaceTest < Test::Unit::TestCase
'pid' => 1234
}
Einhorn::Command.expects(:register_manual_ack).once.with(1234)
- Interface.process_command(conn, JSON.generate(request))
+ Interface.process_command(conn, YAML.dump(request))
end
end
end
View
2  test/unit/einhorn/event.rb
@@ -14,7 +14,7 @@ def self.reset
end
class EventTest < Test::Unit::TestCase
- context "when run the event loop" do
+ context "when running the event loop" do
setup do
Einhorn::Event.reset
end
Please sign in to comment.
Something went wrong with that request. Please try again.