Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial import of Einhorn

  • Loading branch information...
commit 3cc1e1e452ce8e7a1960b747d57881303b50691b 0 parents
@gdb gdb authored
Showing with 2,751 additions and 0 deletions.
  1. +17 −0 .gitignore
  2. +4 −0 Gemfile
  3. +22 −0 LICENSE
  4. +236 −0 README.md
  5. +79 −0 README.md.in
  6. +19 −0 Rakefile
  7. +284 −0 bin/einhorn
  8. +120 −0 bin/einhornsh
  9. +21 −0 einhorn.gemspec
  10. +19 −0 example/pool_worker.rb
  11. +52 −0 example/thin_example
  12. +48 −0 example/time_server
  13. +282 −0 lib/einhorn.rb
  14. +48 −0 lib/einhorn/client.rb
  15. +336 −0 lib/einhorn/command.rb
  16. +336 −0 lib/einhorn/command/interface.rb
  17. +150 −0 lib/einhorn/event.rb
  18. +132 −0 lib/einhorn/event/abstract_text_descriptor.rb
  19. +20 −0 lib/einhorn/event/ack_timer.rb
  20. +58 −0 lib/einhorn/event/command_server.rb
  21. +45 −0 lib/einhorn/event/connection.rb
  22. +6 −0 lib/einhorn/event/loop_breaker.rb
  23. +23 −0 lib/einhorn/event/persistent.rb
  24. +39 −0 lib/einhorn/event/timer.rb
  25. +3 −0  lib/einhorn/version.rb
  26. +94 −0 lib/einhorn/worker.rb
  27. +56 −0 lib/einhorn/worker_pool.rb
  28. +7 −0 test/test_helper.rb
  29. +38 −0 test/unit/einhorn.rb
  30. +21 −0 test/unit/einhorn/command.rb
  31. +47 −0 test/unit/einhorn/command/interface.rb
  32. +89 −0 test/unit/einhorn/event.rb
17 .gitignore
@@ -0,0 +1,17 @@
+*.gem
+*.rbc
+.bundle
+.config
+.yardoc
+Gemfile.lock
+InstalledFiles
+_yardoc
+coverage
+doc/
+lib/bundler/man
+pkg
+rdoc
+spec/reports
+test/tmp
+test/version_tmp
+tmp
4 Gemfile
@@ -0,0 +1,4 @@
+source 'https://rubygems.org'
+
+# Specify your gem's dependencies in einhorn.gemspec
+gemspec
22 LICENSE
@@ -0,0 +1,22 @@
+Copyright (c) 2012 Stripe (info@stripe.com)
+
+MIT License
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
236 README.md
@@ -0,0 +1,236 @@
+# Einhorn: the language-independent shared socket manager
+
+![Einhorn](https://stripe.com/img/blog/posts/meet-einhorn/einhorn.png)
+
+Let's say you have a server process which processes one request at a
+time. Your site is becoming increasingly popular, and this one process
+is no longer able to handle all of your inbound connections. However,
+you notice that your box's load number is low.
+
+So you start thinking about how to handle more requests. You could
+rewrite your server to use threads, but threads are a pain to program
+against (and maybe you're writing in Python or Ruby where you don't
+have true threads anyway). You could rewrite your server to be
+event-driven, but that'd require a ton of effort, and it wouldn't help
+you go beyond one core. So instead, you decide to just run multiple
+copies of your server process.
+
+Enter Einhorn. Einhorn makes it easy to run (and keep alive) multiple
+copies of a single long-lived process. If that process is a server
+listening on some socket, Einhorn will open the socket in the master
+process so that it's shared among the workers.
+
+Einhorn is designed to be compatible with arbitrary languages and
+frameworks, requiring minimal modification of your
+application. Einhorn is simple to configure and run.
+
+## Installation
+
+Install from Rubygems as:
+
+ $ gem install einhorn
+
+Or build from source by:
+
+ $ gem build einhorn.gemspec
+
+And then install the built gem.
+
+## Usage
+
+Einhorn is the language-independent shared socket manager. Run
+`einhorn -h` to see detailed usage. At a high level, usage looks like
+the following:
+
+ einhorn [options] program
+
+Einhorn will open one or more shared sockets and run multiple copies
+of your process. You can seamlessly reload your code, dynamically
+reconfigure Einhorn, and more.
+
+## Overview
+
+To set Einhorn up as a master process running 3 copies of `sleep 5`:
+
+ $ einhorn -n 3 sleep 5
+
+You can communicate your running Einhorn process via `einhornsh`:
+
+ $ einhornsh
+ Welcome gdb! You are speaking to Einhorn Master Process 11902
+ Enter 'help' if you're not sure what to do.
+
+ Type "quit" or "exit" to quit at any time
+ > help
+ You are speaking to the Einhorn command socket. You can run the following commands:
+ ...
+
+### Server sockets
+
+If your process is a server and listens on one or more sockets,
+Einhorn can open these sockets and pass them to the workers. Program
+arguments of the form
+
+ srv:(IP:PORT)[<,OPT>...]
+ --MY-OPT=srv:(IP:PORT)[<,OPT>...]
+
+Will be interpreted as a request to open a server socket bound to
+IP:PORT. The argument will be replaced with `FD` and `---MY-OPT=FD`,
+respectively, where `FD` is the file descriptor number of the socket.
+Valid opts are:
+
+ r, so_reuseaddr: set SO_REUSEADDR on the server socket
+ n, o_nonblock: set O_NONBLOCK on the server socket
+
+You can for example run:
+
+ $ einhorn -m manual -n 4 example/time_server srv:127.0.0.1:2345,r
+
+Which will run 4 copies of
+
+ example/time_server 6
+
+Where file descriptor 6 is a server socket bound to `127.0.0.1:2345`
+and with `SO_REUSEADDR` set. It is then your application's job to
+figure out how to `accept()` on this file descriptor.
+
+### Command socket
+
+Einhorn opens a UNIX socket to which you can send commands (run
+`help` in `einhornsh` to see what admin commands you can
+run). Einhorn relies on file permissions to ensure that no malicious
+users can gain access. Run with a `-d DIRECTORY` to change the
+directory where the socket will live.
+
+### Seamless upgrades
+
+You can cause your code to be seamlessly reloaded by upgrading the
+worker code on disk and running
+
+ $ einhornsh
+ ...
+ > upgrade
+
+Once the new workers have been spawned, Einhorn will send each old
+worker a SIGUSR2. SIGUSR2 should be interpreted as a request for a
+graceful shutdown.
+
+### ACKs
+
+After Einhorn spawns a worker, it will only consider the worker up
+once it has received an ACK. Currently two ACK mechanisms are
+supported: manual and timer.
+
+#### Manual ACK
+
+A manual ACK (configured by providing a `-m manual`) requires your
+application to send a command to the command socket once it's
+ready. This is the safest ACK mechanism. If you're writing in Ruby,
+just do
+
+ require 'einhorn/worker'
+ Einhorn::Worker.ack!
+
+in your worker code. If you're writing in a different language, or
+don't want to include Einhorn in your namespace, you can send the
+string
+
+ {"command":"worker:ack", "pid":PID}
+
+to the UNIX socket pointed to by the environment variable
+`EINHORN_SOCK_PATH`. (Be sure to include a trailing newline.)
+
+To make things even easier, you can pass a `-b` to Einhorn, in which
+case you just need to `write()` the above message to the open file
+descriptor pointed to by `EINHORN_FD`.
+
+(See `lib/einhorn/worker.rb` for details of these and other socket
+discovery mechanisms.)
+
+#### Timer ACK [default]
+
+By default, Einhorn will use a timer ACK of 1 second. That means that
+if your process hasn't exited after 1 second, it is considered ACK'd
+and healthy. You can modify this timeout to be more appropriate for
+your application (and even set to 0 if desired). Just pass a `-m
+FLOAT`.
+
+### Preloading
+
+If you're running a Ruby process, Einhorn can optionally preload its
+code, so it only has to load the code once per upgrade rather than
+once per worker process. This also saves on memory overhead, since all
+of the code in these processes will be stored only once using your
+operating system's copy-on-write features.
+
+To use preloading, just give Einhorn a `-p PATH_TO_CODE`, and make
+sure you've defined an `einhorn_main` method.
+
+In order to maximize compatibility, we've worked to minimize Einhorn's
+dependencies. At the moment Einhorn imports 'rubygems' and 'json'. (If
+these turn out to be issues, we could probably find a workaround.)
+
+### Command name
+
+You can set the name that Einhorn and your workers show in PS. Just
+pass `-c <name>`.
+
+### Options
+
+ -b, --command-socket-as-fd Leave the command socket open as a file descriptor, passed in the EINHORN_FD environment variable. This allows your worker processes to ACK without needing to know where on the filesystem the command socket lives.
+ -c, --command-name CMD_NAME Set the command name in ps to this value
+ -d, --socket-path PATH Where to open the Einhorn command socket
+ -e, --pidfile PIDFILE Where to write out the Einhorn pidfile
+ -f, --lockfile LOCKFILE Where to store the Einhorn lockfile
+ -h, --help Display this message
+ -k, --kill-children-on-exit If Einhorn exits unexpectedly, gracefully kill all its children
+ -l, --backlog N Connection backlog (assuming this is a server)
+ -m, --ack-mode MODE What kinds of ACK to expect from workers. Choices: FLOAT (number of seconds until assumed alive), manual (process will speak to command socket when ready). Default is MODE=1.
+ -n, --number N Number of copies to spin up
+ -p, --preload PATH Load this code into memory, and fork but do not exec upon spawn. Must define an "einhorn_main" method
+ -q, --quiet Make output quiet (can be reconfigured on the fly)
+ -s, --seconds N Number of seconds to wait until respawning
+ -v, --verbose Make output verbose (can be reconfigured on the fly)
+ --with-state-fd STATE [Internal option] With file descriptor containing state
+ --version Show version
+
+
+## Contributing
+
+Contributions are definitely welcome. To contribute, just follow the
+usual workflow:
+
+1. Fork Einhorn
+2. Create your feature branch (`git checkout -b my-new-feature`)
+3. Commit your changes (`git commit -am 'Added some feature'`)
+4. Push to the branch (`git push origin my-new-feature`)
+5. Create new Github pull request
+
+## History
+
+Einhorn came about when Stripe was investigating seamless code
+upgrading solutions for our API worker processes. We really liked the
+process model of [Unicorn](http://unicorn.bogomips.org/), but didn't
+want to use its HTTP functionality. So Einhorn was born, providing the
+master process functionality of Unicorn (and similar preforking
+servers) to a wider array of applications.
+
+See https://stripe.com/blog/meet-einhorn for more background.
+
+Stripe currently uses Einhorn in production for a number of
+services. Our Thin + EventMachine servers currently require patches to
+both Thin and EventMachine (to support file-descriptor passing). You
+can obtain these patches from our public forks of the
+[respective](https://github.com/stripe/thin)
+[projects](https://github.com/stripe/eventmachine). Check out
+`example/thin_example` for an example of running Thin under Einhorn.
+
+## Compatibility
+
+Einhorn was developed and tested under Ruby 1.8.7.
+
+## About
+
+Einhorn is a project of [Stripe](https://stripe.com), led by [Greg
+Brockman](https://twitter.com/thegdb). Feel free to get in touch at
+info@stripe.com.
79 README.md.in
@@ -0,0 +1,79 @@
+# Einhorn: the language-independent shared socket manager
+
+![Einhorn](https://stripe.com/img/blog/posts/meet-einhorn/einhorn.png)
+
+Let's say you have a server process which processes one request at a
+time. Your site is becoming increasingly popular, and this one process
+is no longer able to handle all of your inbound connections. However,
+you notice that your box's load number is low.
+
+So you start thinking about how to handle more requests. You could
+rewrite your server to use threads, but threads are a pain to program
+against (and maybe you're writing in Python or Ruby where you don't
+have true threads anyway). You could rewrite your server to be
+event-driven, but that'd require a ton of effort, and it wouldn't help
+you go beyond one core. So instead, you decide to just run multiple
+copies of your server process.
+
+Enter Einhorn. Einhorn makes it easy to run (and keep alive) multiple
+copies of a single long-lived process. If that process is a server
+listening on some socket, Einhorn will open the socket in the master
+process so that it's shared among the workers.
+
+Einhorn is designed to be compatible with arbitrary languages and
+frameworks, requiring minimal modification of your
+application. Einhorn is simple to configure and run.
+
+## Installation
+
+Install from Rubygems as:
+
+ $ gem install einhorn
+
+Or build from source by:
+
+ $ gem build einhorn.gemspec
+
+And then install the built gem.
+
+[[usage]]
+
+## Contributing
+
+Contributions are definitely welcome. To contribute, just follow the
+usual workflow:
+
+1. Fork Einhorn
+2. Create your feature branch (`git checkout -b my-new-feature`)
+3. Commit your changes (`git commit -am 'Added some feature'`)
+4. Push to the branch (`git push origin my-new-feature`)
+5. Create new Github pull request
+
+## History
+
+Einhorn came about when Stripe was investigating seamless code
+upgrading solutions for our API worker processes. We really liked the
+process model of [Unicorn](http://unicorn.bogomips.org/), but didn't
+want to use its HTTP functionality. So Einhorn was born, providing the
+master process functionality of Unicorn (and similar preforking
+servers) to a wider array of applications.
+
+See https://stripe.com/blog/meet-einhorn for more background.
+
+Stripe currently uses Einhorn in production for a number of
+services. Our Thin + EventMachine servers currently require patches to
+both Thin and EventMachine (to support file-descriptor passing). You
+can obtain these patches from our public forks of the
+[respective](https://github.com/stripe/thin)
+[projects](https://github.com/stripe/eventmachine). Check out
+`example/thin_example` for an example of running Thin under Einhorn.
+
+## Compatibility
+
+Einhorn was developed and tested under Ruby 1.8.7.
+
+## About
+
+Einhorn is a project of [Stripe](https://stripe.com), led by [Greg
+Brockman](https://twitter.com/thegdb). Feel free to get in touch at
+info@stripe.com.
19 Rakefile
@@ -0,0 +1,19 @@
+#!/usr/bin/env rake
+require 'bundler/gem_tasks'
+require 'rake/testtask'
+
+desc 'Rebuild the README with the latest usage from einhorn'
+task :readme do
+ Dir.chdir(File.dirname(__FILE__))
+ readme = File.read('README.md.in')
+ usage = `bin/einhorn -h`
+ readme.gsub!('[[usage]]', usage)
+ File.open('README.md', 'w') {|f| f.write(readme)}
+end
+
+Rake::TestTask.new do |t|
+ t.libs = ["lib"]
+ # t.warning = true
+ t.verbose = true
+ t.test_files = FileList['test/unit/**/*.rb']
+end
284 bin/einhorn
@@ -0,0 +1,284 @@
+#!/usr/bin/env ruby
+# Author: Greg Brockman <gdb@stripe.com>
+
+require 'rubygems'
+require 'einhorn'
+
+module Einhorn
+ module Executable
+ def self.einhorn_usage(long)
+ usage = <<EOF
+## Usage
+
+Einhorn is the language-independent shared socket manager. Run
+`einhorn -h` to see detailed usage. At a high level, usage looks like
+the following:
+
+ einhorn [options] program
+
+Einhorn will open one or more shared sockets and run multiple copies
+of your process. You can seamlessly reload your code, dynamically
+reconfigure Einhorn, and more.
+EOF
+
+ if long
+ usage << <<EOF
+
+## Overview
+
+To set Einhorn up as a master process running 3 copies of `sleep 5`:
+
+ $ einhorn -n 3 sleep 5
+
+You can communicate your running Einhorn process via `einhornsh`:
+
+ $ einhornsh
+ Welcome gdb! You are speaking to Einhorn Master Process 11902
+ Enter 'help' if you're not sure what to do.
+
+ Type "quit" or "exit" to quit at any time
+ > help
+ You are speaking to the Einhorn command socket. You can run the following commands:
+ ...
+
+### Server sockets
+
+If your process is a server and listens on one or more sockets,
+Einhorn can open these sockets and pass them to the workers. Program
+arguments of the form
+
+ srv:(IP:PORT)[<,OPT>...]
+ --MY-OPT=srv:(IP:PORT)[<,OPT>...]
+
+Will be interpreted as a request to open a server socket bound to
+IP:PORT. The argument will be replaced with `FD` and `---MY-OPT=FD`,
+respectively, where `FD` is the file descriptor number of the socket.
+Valid opts are:
+
+ r, so_reuseaddr: set SO_REUSEADDR on the server socket
+ n, o_nonblock: set O_NONBLOCK on the server socket
+
+You can for example run:
+
+ $ einhorn -m manual -n 4 example/time_server srv:127.0.0.1:2345,r
+
+Which will run 4 copies of
+
+ example/time_server 6
+
+Where file descriptor 6 is a server socket bound to `127.0.0.1:2345`
+and with `SO_REUSEADDR` set. It is then your application's job to
+figure out how to `accept()` on this file descriptor.
+
+### Command socket
+
+Einhorn opens a UNIX socket to which you can send commands (run
+`help` in `einhornsh` to see what admin commands you can
+run). Einhorn relies on file permissions to ensure that no malicious
+users can gain access. Run with a `-d DIRECTORY` to change the
+directory where the socket will live.
+
+### Seamless upgrades
+
+You can cause your code to be seamlessly reloaded by upgrading the
+worker code on disk and running
+
+ $ einhornsh
+ ...
+ > upgrade
+
+Once the new workers have been spawned, Einhorn will send each old
+worker a SIGUSR2. SIGUSR2 should be interpreted as a request for a
+graceful shutdown.
+
+### ACKs
+
+After Einhorn spawns a worker, it will only consider the worker up
+once it has received an ACK. Currently two ACK mechanisms are
+supported: manual and timer.
+
+#### Manual ACK
+
+A manual ACK (configured by providing a `-m manual`) requires your
+application to send a command to the command socket once it's
+ready. This is the safest ACK mechanism. If you're writing in Ruby,
+just do
+
+ require 'einhorn/worker'
+ Einhorn::Worker.ack!
+
+in your worker code. If you're writing in a different language, or
+don't want to include Einhorn in your namespace, you can send the
+string
+
+ {"command":"worker:ack", "pid":PID}
+
+to the UNIX socket pointed to by the environment variable
+`EINHORN_SOCK_PATH`. (Be sure to include a trailing newline.)
+
+To make things even easier, you can pass a `-b` to Einhorn, in which
+case you just need to `write()` the above message to the open file
+descriptor pointed to by `EINHORN_FD`.
+
+(See `lib/einhorn/worker.rb` for details of these and other socket
+discovery mechanisms.)
+
+#### Timer ACK [default]
+
+By default, Einhorn will use a timer ACK of 1 second. That means that
+if your process hasn't exited after 1 second, it is considered ACK'd
+and healthy. You can modify this timeout to be more appropriate for
+your application (and even set to 0 if desired). Just pass a `-m
+FLOAT`.
+
+### Preloading
+
+If you're running a Ruby process, Einhorn can optionally preload its
+code, so it only has to load the code once per upgrade rather than
+once per worker process. This also saves on memory overhead, since all
+of the code in these processes will be stored only once using your
+operating system's copy-on-write features.
+
+To use preloading, just give Einhorn a `-p PATH_TO_CODE`, and make
+sure you've defined an `einhorn_main` method.
+
+In order to maximize compatibility, we've worked to minimize Einhorn's
+dependencies. At the moment Einhorn imports 'rubygems' and 'json'. (If
+these turn out to be issues, we could probably find a workaround.)
+
+### Command name
+
+You can set the name that Einhorn and your workers show in PS. Just
+pass `-c <name>`.
+EOF
+ end
+
+ usage << <<EOF
+
+### Options
+
+EOF
+ end
+ end
+end
+
+# Would be nice if this could be loadable rather than always
+# executing, but when run under gem it's a bit hard to do so.
+if true # $0 == __FILE__
+ Einhorn::TransientState.script_name = $0
+ Einhorn::TransientState.argv = ARGV.dup
+
+ optparse = OptionParser.new do |opts|
+ opts.on('-b', '--command-socket-as-fd', 'Leave the command socket open as a file descriptor, passed in the EINHORN_FD environment variable. This allows your worker processes to ACK without needing to know where on the filesystem the command socket lives.') do
+ Einhorn::State.command_socket_as_fd = true
+ end
+
+ opts.on('-c CMD_NAME', '--command-name CMD_NAME', 'Set the command name in ps to this value') do |cmd_name|
+ Einhorn::TransientState.stateful = false
+ Einhorn::State.cmd_name = cmd_name
+ end
+
+ opts.on('-d PATH', '--socket-path PATH', 'Where to open the Einhorn command socket') do |path|
+ Einhorn::State.socket_path = path
+ end
+
+ opts.on('-e PIDFILE', '--pidfile PIDFILE', 'Where to write out the Einhorn pidfile') do |pidfile|
+ Einhorn::State.pidfile = pidfile
+ end
+
+ opts.on('-f LOCKFILE', '--lockfile LOCKFILE', 'Where to store the Einhorn lockfile') do |lockfile|
+ Einhorn::State.lockfile = lockfile
+ end
+
+ opts.on('-h', '--help', 'Display this message') do
+ opts.banner = Einhorn::Executable.einhorn_usage(true)
+ puts opts
+ exit(1)
+ end
+
+ opts.on('-k', '--kill-children-on-exit', 'If Einhorn exits unexpectedly, gracefully kill all its children') do
+ Einhorn::State.kill_children_on_exit = true
+ end
+
+ opts.on('-l', '--backlog N', 'Connection backlog (assuming this is a server)') do |b|
+ raise "Cannot pass options if stateful" if Einhorn::TransientState.stateful
+ Einhorn::TransientState.stateful = false
+ Einhorn::State.config[:backlog] = b.to_i
+ end
+
+ opts.on('-m MODE', '--ack-mode MODE', 'What kinds of ACK to expect from workers. Choices: FLOAT (number of seconds until assumed alive), manual (process will speak to command socket when ready). Default is MODE=1.') do |mode|
+ # Try manual
+ if mode == 'manual'
+ Einhorn::State.ack_mode = {:type => :manual}
+ next
+ end
+
+ # Try float
+ begin
+ parsed = Float(mode)
+ rescue ArgumentError
+ else
+ Einhorn::State.ack_mode = {:type => :timer, :timeout => parsed}
+ next
+ end
+
+ # Give up
+ raise "Invalid ack-mode #{mode.inspect} (valid modes: FLOAT or manual)"
+ end
+
+ opts.on('-n', '--number N', 'Number of copies to spin up') do |n|
+ raise "Cannot pass options if stateful" if Einhorn::TransientState.stateful
+ Einhorn::TransientState.stateful = false
+ Einhorn::State.config[:number] = n.to_i
+ end
+
+ opts.on('-p PATH', '--preload PATH', 'Load this code into memory, and fork but do not exec upon spawn. Must define an "einhorn_main" method') do |path|
+ Einhorn::TransientState.stateful = false
+ Einhorn::State.path = path
+ end
+
+ opts.on('-q', '--quiet', 'Make output quiet (can be reconfigured on the fly)') do
+ Einhorn::Command.louder(false)
+ end
+
+ opts.on('-s', '--seconds N', 'Number of seconds to wait until respawning') do |b|
+ raise "Cannot pass options if stateful" if Einhorn::TransientState.stateful
+ Einhorn::TransientState.stateful = false
+ Einhorn::State.config[:seconds] = s.to_i
+ end
+
+ opts.on('-v', '--verbose', 'Make output verbose (can be reconfigured on the fly)') do
+ Einhorn::Command.louder(false)
+ end
+
+ opts.on('--with-state-fd STATE', '[Internal option] With file descriptor containing state') do |fd|
+ raise "Cannot be stateful if options are passed" unless Einhorn::TransientState.stateful.nil?
+ Einhorn::TransientState.stateful = true
+
+ read = IO.for_fd(Integer(fd))
+ state = read.read
+ read.close
+
+ Einhorn.restore_state(state)
+ end
+
+ opts.on('--version', 'Show version') do
+ puts Einhorn::VERSION
+ exit
+ end
+ end
+ optparse.order!
+
+ if ARGV.length < 1
+ optparse.banner = Einhorn::Executable.einhorn_usage(false)
+ puts optparse
+ exit(1)
+ end
+
+ ret = Einhorn.run
+ begin
+ exit(ret)
+ rescue TypeError
+ exit(0)
+ end
+end
120 bin/einhornsh
@@ -0,0 +1,120 @@
+#!/usr/bin/env ruby
+require 'logger'
+require 'optparse'
+
+require 'readline'
+
+require 'rubygems'
+require 'einhorn'
+
+module Einhorn
+ class EinhornSH
+ def initialize(path_to_socket)
+ @path_to_socket = path_to_socket
+ reconnect
+ end
+
+ def run
+ puts "Enter 'help' if you're not sure what to do."
+ puts
+ puts 'Type "quit" or "exit" to quit at any time'
+
+ while line = Readline.readline('> ', true)
+ if ['quit', 'exit'].include?(line)
+ puts "Goodbye!"
+ return
+ end
+
+ begin
+ response = @client.command({'command' => line})
+ rescue Errno::EPIPE => e
+ puts "einhornsh: Error communicating with Einhorn: #{e} (#{e.class})"
+ puts "einhornsh: Attempting to reconnect..."
+ reconnect
+
+ retry
+ end
+ puts response['message']
+ end
+ end
+
+ def reconnect
+ begin
+ @client = Einhorn::Client.for_path(@path_to_socket)
+ rescue Errno::ENOENT => e
+ # TODO: The exit here is a biit of a layering violation.
+ puts <<EOF
+Could not connect to Einhorn master process:
+
+ #{e}
+
+HINT: Are you sure you are running an Einhorn master? If so, you
+should pass einhornsh the cmd_name (-c argument) provided to Einhorn.
+EOF
+ exit(1)
+ end
+ ehlo
+ end
+
+ def ehlo
+ response = @client.command('command' => 'ehlo', 'user' => ENV['USER'])
+ puts response['message']
+ end
+ end
+end
+
+def main
+ options = {}
+ optparse = OptionParser.new do |opts|
+ opts.banner = "Usage: #{$0} [options] [cmd_name]
+
+Welcome to Einhornsh: the Einhorn shell.
+
+Pass the cmd_name of the Einhorn master you are connecting to either
+as a positional argument or using `-c`. If you're running your Einhorn
+with a `-d`, provide the same argument here."
+
+ opts.on('-h', '--help', 'Display this message') do
+ puts opts
+ exit(1)
+ end
+
+ opts.on('-c CMD_NAME', '--command-name CMD_NAME', 'Connect to the Einhorn master with this cmd_name') do |cmd_name|
+ options[:cmd_name] = cmd_name
+ end
+
+ opts.on('-d PATH', '--socket-path PATH', 'Path to the Einhorn command socket') do |path|
+ options[:socket_path] = path
+ end
+ end
+ optparse.parse!
+
+ if ARGV.length > 1
+ puts optparse
+ return 1
+ end
+
+ Signal.trap("INT") {puts; exit(0)}
+
+ path_to_socket = options[:socket_path]
+
+ unless path_to_socket
+ cmd_name = options[:cmd_name] || ARGV[0]
+ path_to_socket = Einhorn::Command::Interface.default_socket_path(cmd_name)
+ end
+
+ sh = Einhorn::EinhornSH.new(path_to_socket)
+ sh.run
+ return 0
+end
+
+# Would be nice if this could be loadable rather than always
+# executing, but when run under gem it's a bit hard to do so.
+if true # $0 == __FILE__
+ ret = main
+ begin
+ exit(ret)
+ rescue TypeError
+ exit(0)
+ end
+end
21 einhorn.gemspec
@@ -0,0 +1,21 @@
+# -*- encoding: utf-8 -*-
+require File.expand_path('../lib/einhorn/version', __FILE__)
+
+Gem::Specification.new do |gem|
+ gem.authors = ["Greg Brockman"]
+ gem.email = ["gdb@stripe.com"]
+ gem.summary = "Einhorn: the language-independent shared socket manager"
+ gem.description = "Einhorn makes it easy to run multiple instances of an application server, all listening on the same port. You can also seamlessly restart your workers without dropping any requests. Einhorn requires minimal application-level support, making it easy to use with an existing project."
+ gem.homepage = "https://github.com/stripe/einhorn"
+
+ gem.files = `git ls-files`.split($\)
+ gem.executables = gem.files.grep(%r{^bin/}).map{ |f| File.basename(f) }
+ gem.test_files = gem.files.grep(%r{^(test|spec|features)/})
+ gem.name = "einhorn"
+ gem.require_paths = ["lib"]
+ # maybe swap out for YAML? Then don't need any gems.
+ gem.add_dependency('json')
+ gem.add_development_dependency('shoulda')
+ gem.add_development_dependency('mocha')
+ gem.version = Einhorn::VERSION
+end
19 example/pool_worker.rb
@@ -0,0 +1,19 @@
+#!/usr/bin/env ruby
+#
+# Used as an example of preloading in Einhorn blog post
+# (https://stripe.com/blog/meet-einhorn). Program name ends in .rb in
+# order to make explicit that it's written in Ruby, though this isn't
+# actually necessary for preloading to work.
+#
+# Run as
+#
+# einhorn -p ./pool_worker.rb ./pool_worker.rb
+
+puts "From PID #{$$}: loading #{__FILE__}"
+
+def einhorn_main
+ while true
+ puts "From PID #{$$}: Doing some work"
+ sleep 1
+ end
+end
52 example/thin_example
@@ -0,0 +1,52 @@
+#!/usr/bin/env ruby
+#
+# An example application using our patched Thin and EventMachine. You
+# can obtain these from:
+#
+# https://github.com/stripe/thin.git, and
+# https://github.com/stripe/eventmachine.git
+
+require 'rubygems'
+require 'einhorn'
+
+# Make sure we're using the patched versions.
+gem 'thin', '1.3.2.stripe.0'
+gem 'eventmachine', '1.0.0.beta.4.stripe.0'
+
+require 'thin'
+
+class App
+ def initialize(id)
+ @id = id
+ end
+
+ def call(env)
+ return [200, {}, "[#{$$}] From server instance #{@id}: Got your request!\n"]
+ end
+end
+
+def einhorn_main
+ puts "Called with #{ARGV.inspect}"
+
+ if ARGV.length == 0
+ raise "Need to call with at least one argument. Try running 'einhorn #{$0} srv:127.0.0.1:5000,r,n srv:127.0.0.1:5001,r,n' and then running 'curl 127.0.0.1:5000' or 'curl 127.0.0.1:5001'"
+ end
+
+ Einhorn::Worker.graceful_shutdown do
+ puts "#{$$} is now exiting..."
+ exit(0)
+ end
+ Einhorn::Worker.ack!
+
+ EventMachine.run do
+ ARGV.each_with_index do |arg, i|
+ sock = Integer(arg)
+ srv = Thin::Server.new(sock, App.new(i), :reuse => true)
+ srv.start
+ end
+ end
+end
+
+if $0 == __FILE__
+ einhorn_main
+end
48 example/time_server
@@ -0,0 +1,48 @@
+#!/usr/bin/env ruby
+#
+# A simple example showing how to use Einhorn's shared-socket
+# features. Einhorn translates the srv:(addr:port[,flags...]) spec in
+# the arg string into a file descriptor number.
+#
+# Invoke through Einhorn as
+#
+# einhorn ./time_server srv:127.0.0.1:2345,r
+#
+# or, if you want to try out preloading:
+#
+# einhorn -p ./time_server ./time_server srv:127.0.0.1:2345,r
+
+require 'rubygems'
+require 'einhorn/worker'
+
+def einhorn_main
+ puts "Called with #{ARGV.inspect}"
+
+ if ARGV.length != 1
+ raise "Need to call with a port spec as the first argument. Try running 'einhorn #{$0} srv:127.0.0.1:2345,r' and then running 'nc 127.0.0.1 2345'"
+ end
+
+ socket = Socket.for_fd(Integer(ARGV[0]))
+
+ # Came up successfully, so let's set up graceful handler and ACK the
+ # master.
+ Einhorn::Worker.graceful_shutdown do
+ puts "Goodbye from #{$$}"
+ exit(0)
+ end
+ Einhorn::Worker.ack!
+
+ # Real work happens here.
+ begin
+ while true
+ accepted, _ = socket.accept
+ accepted.write("[#{$$}] The current time is: #{Time.now}!\n")
+ accepted.close
+ end
+ rescue Exception
+ end
+end
+
+if $0 == __FILE__
+ einhorn_main
+end
282 lib/einhorn.rb
@@ -0,0 +1,282 @@
+require 'fcntl'
+require 'optparse'
+require 'pp'
+require 'set'
+require 'socket'
+require 'tmpdir'
+require 'yaml'
+
+require 'rubygems'
+
+module Einhorn
+ module AbstractState
+ def default_state; raise NotImplementedError.new('Override in extended modules'); end
+ def state; @state ||= default_state; end
+ def state=(v); @state = v; end
+
+ def method_missing(name, *args)
+ if (name.to_s =~ /(.*)=$/) && state.has_key?($1.to_sym)
+ state.send(:[]=, $1.to_sym, *args)
+ elsif state.has_key?(name)
+ state[name]
+ else
+ ds = default_state
+ if ds.has_key?(name)
+ ds[name]
+ else
+ super
+ end
+ end
+ end
+ end
+
+ module State
+ extend AbstractState
+ def self.default_state
+ {
+ :children => {},
+ :config => {:number => 1, :backlog => 100, :seconds => 1},
+ :versions => {},
+ :version => 0,
+ :sockets => {},
+ :orig_cmd => nil,
+ :cmd => nil,
+ :script_name => nil,
+ :respawn => true,
+ :upgrading => false,
+ :reloading_for_preload_upgrade => false,
+ :path => nil,
+ :cmd_name => nil,
+ :verbosity => 1,
+ :generation => 0,
+ :last_spinup => nil,
+ :ack_mode => {:type => :timer, :timeout => 1},
+ :kill_children_on_exit => false,
+ :command_socket_as_fd => false,
+ :socket_path => nil,
+ :pidfile => nil,
+ :lockfile => nil
+ }
+ end
+ end
+
+ module TransientState
+ extend AbstractState
+ def self.default_state
+ {
+ :whatami => :master,
+ :preloaded => false,
+ :script_name => nil,
+ :argv => [],
+ :has_outstanding_spinup_timer => false,
+ :stateful => nil,
+ # Holds references so that the GC doesn't go and close your sockets.
+ :socket_handles => Set.new
+ }
+ end
+ end
+
+ def self.restore_state(state)
+ parsed = YAML.load(state)
+ Einhorn::State.state = parsed[:state]
+ Einhorn::Event.restore_persistent_descriptors(parsed[:persistent_descriptors])
+ # Do this after setting state so verbosity is right9
+ Einhorn.log_info("Using loaded state: #{parsed.inspect}")
+ end
+
+ def self.print_state
+ log_info(Einhorn::State.state.pretty_inspect)
+ end
+
+ def self.bind(addr, port, flags)
+ log_info("Binding to #{addr}:#{port} with flags #{flags.inspect}")
+ sd = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0)
+
+ if flags.include?('r') || flags.include?('so_reuseaddr')
+ sd.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1)
+ end
+
+ sd.bind(Socket.pack_sockaddr_in(port, addr))
+ sd.listen(Einhorn::State.config[:backlog])
+
+ if flags.include?('n') || flags.include?('o_nonblock')
+ fl = sd.fcntl(Fcntl::F_GETFL)
+ sd.fcntl(Fcntl::F_SETFL, fl | Fcntl::O_NONBLOCK)
+ end
+
+ Einhorn::TransientState.socket_handles << sd
+ sd.fileno
+ end
+
+ # Implement these ourselves so it plays nicely with state persistence
+ def self.log_debug(msg)
+ $stderr.puts("#{log_tag} DEBUG: #{msg}") if Einhorn::State.verbosity <= 0
+ end
+ def self.log_info(msg)
+ $stderr.puts("#{log_tag} INFO: #{msg}") if Einhorn::State.verbosity <= 1
+ end
+ def self.log_error(msg)
+ $stderr.puts("#{log_tag} ERROR: #{msg}") if Einhorn::State.verbosity <= 2
+ end
+
+ private
+
+ def self.log_tag
+ case whatami = Einhorn::TransientState.whatami
+ when :master
+ "[MASTER #{$$}]"
+ when :worker
+ "[WORKER #{$$}]"
+ when :state_passer
+ "[STATE_PASSER #{$$}]"
+ else
+ "[UNKNOWN (#{whatami.inspect}) #{$$}]"
+ end
+ end
+
+ public
+
+ def self.which(cmd)
+ if cmd.include?('/')
+ return cmd if File.exists?(cmd)
+ raise "Could not find #{cmd}"
+ else
+ ENV['PATH'].split(':').each do |f|
+ abs = File.join(f, cmd)
+ return abs if File.exists?(abs)
+ end
+ raise "Could not find #{cmd} in PATH"
+ end
+ end
+
+ # Not really a thing, but whatever.
+ def self.is_script(file)
+ File.open(file) do |f|
+ bytes = f.read(2)
+ bytes == '#!'
+ end
+ end
+
+ def self.preload
+ if path = Einhorn::State.path
+ set_argv(Einhorn::State.cmd, false)
+
+ begin
+ # If it's not going to be requireable, then load it.
+ if !path.end_with?('.rb') && File.exists?(path)
+ log_info("Loading #{path} (if this hangs, make sure your code can be properly loaded as a library)")
+ load path
+ else
+ log_info("Requiring #{path} (if this hangs, make sure your code can be properly loaded as a library)")
+ require path
+ end
+ rescue Exception => e
+ log_info("Proceeding with postload -- could not load #{path}: #{e} (#{e.class})\n #{e.backtrace.join("\n ")}")
+ else
+ if defined?(einhorn_main)
+ log_info("Successfully loaded #{path}")
+ Einhorn::TransientState.preloaded = true
+ else
+ log_info("Proceeding with postload -- loaded #{path}, but no einhorn_main method was defined")
+ end
+ end
+ end
+ end
+
+ def self.set_argv(cmd, set_ps_name)
+ # TODO: clean up this hack
+ idx = 0
+ if cmd[0] =~ /(^|\/)ruby$/
+ idx = 1
+ elsif !is_script(cmd[0])
+ log_info("WARNING: Going to set $0 to #{cmd[idx]}, but it doesn't look like a script")
+ end
+
+ if set_ps_name
+ # Note this will mess up $0 if we try using it in our code, but
+ # we don't so that's basically ok. It's a bit annoying that this
+ # is how Ruby exposes changing the output of ps. Note that Ruby
+ # doesn't seem to shrink your cmdline buffer, so ps just ends up
+ # having lots of trailing spaces if we set $0 to something
+ # short. In the future, we could try to not pass einhorn's
+ # state in ARGV.
+ $0 = worker_ps_name
+ end
+
+ ARGV[0..-1] = cmd[idx+1..-1]
+ log_info("Set#{set_ps_name ? " $0 = #{$0.inspect}, " : nil} ARGV = #{ARGV.inspect}")
+ end
+
+ def self.set_master_ps_name
+ $0 = master_ps_name
+ end
+
+ def self.master_ps_name
+ "einhorn: #{worker_ps_name}"
+ end
+
+ def self.worker_ps_name
+ Einhorn::State.cmd_name ? "ruby #{Einhorn::State.cmd_name}" : Einhorn::State.orig_cmd.join(' ')
+ end
+
+ def self.socketify!(cmd)
+ cmd.map! do |arg|
+ if arg =~ /^(.*=|)srv:([^:]+):(\d+)((?:,\w+)*)$/
+ opt = $1
+ host = $2
+ port = $3
+ flags = $4.split(',').select {|flag| flag.length > 0}.map {|flag| flag.downcase}
+ fd = (Einhorn::State.sockets[[host, port]] ||= bind(host, port, flags))
+ "#{opt}#{fd}"
+ else
+ arg
+ end
+ end
+ end
+
+ def self.run
+ Einhorn::Command::Interface.init
+ Einhorn::Event.init
+
+ unless Einhorn::TransientState.stateful
+ if Einhorn::State.config[:number] < 1
+ log_error("You need to spin up at least at least 1 copy of the process")
+ return
+ end
+ Einhorn::Command::Interface.persistent_init
+
+ Einhorn::State.orig_cmd = ARGV.dup
+ Einhorn::State.cmd = ARGV.dup
+ # TODO: don't actually alter ARGV[0]?
+ Einhorn::State.cmd[0] = which(Einhorn::State.cmd[0])
+ socketify!(Einhorn::State.cmd)
+ end
+
+ set_master_ps_name
+ preload
+
+ # In the middle of upgrading
+ if Einhorn::State.reloading_for_preload_upgrade
+ Einhorn::Command.upgrade_workers
+ Einhorn::State.reloading_for_preload_upgrade = false
+ end
+
+ while Einhorn::State.respawn || Einhorn::State.children.size > 0
+ log_debug("Entering event loop")
+ # All of these are non-blocking
+ Einhorn::Command.reap
+ Einhorn::Command.replenish
+ Einhorn::Command.cull
+
+ # Make sure to do this last, as it's blocking.
+ Einhorn::Event.loop_once
+ end
+ end
+end
+
+require 'einhorn/command'
+require 'einhorn/client'
+require 'einhorn/event'
+require 'einhorn/worker'
+require 'einhorn/worker_pool'
+require 'einhorn/version'
48 lib/einhorn/client.rb
@@ -0,0 +1,48 @@
+require 'json'
+require 'set'
+
+module Einhorn
+ class Client
+ @@responseless_commands = Set.new(['worker:ack'])
+
+ def self.for_path(path_to_socket)
+ socket = UNIXSocket.open(path_to_socket)
+ self.new(socket)
+ end
+
+ def self.for_fd(fileno)
+ socket = UNIXSocket.for_fd(fileno)
+ self.new(socket)
+ end
+
+ def initialize(socket)
+ @socket = socket
+ end
+
+ def command(command_hash)
+ command = JSON.generate(command_hash) + "\n"
+ write(command)
+ recvmessage if expect_response?(command_hash)
+ end
+
+ def expect_response?(command_hash)
+ !@@responseless_commands.include?(command_hash['command'])
+ end
+
+ def close
+ @socket.close
+ end
+
+ private
+
+ def write(bytes)
+ @socket.write(bytes)
+ end
+
+ # TODO: use a streaming JSON parser instead?
+ def recvmessage
+ line = @socket.readline
+ JSON.parse(line)
+ end
+ end
+end
336 lib/einhorn/command.rb
@@ -0,0 +1,336 @@
+require 'pp'
+require 'set'
+require 'tmpdir'
+require 'json'
+
+require 'einhorn/command/interface'
+
+module Einhorn
+ module Command
+ def self.reap
+ begin
+ while true
+ Einhorn.log_debug('Going to reap a child process')
+
+ pid = Process.wait(-1, Process::WNOHANG)
+ return unless pid
+ mourn(pid)
+ Einhorn::Event.break_loop
+ end
+ rescue Errno::ECHILD
+ end
+ end
+
+ # Mourn the death of your child
+ def self.mourn(pid)
+ unless spec = Einhorn::State.children[pid]
+ Einhorn.log_error("Could not find any config for exited child #{pid.inspect}! This probably indicates a bug in Einhorn.")
+ return
+ end
+
+ Einhorn::State.children.delete(pid)
+
+ case type = spec[:type]
+ when :worker
+ Einhorn.log_info("===> Exited worker #{pid.inspect}")
+ when :state_passer
+ Einhorn.log_debug("===> Exited state passing process #{pid.inspect}")
+ else
+ Einhorn.log_error("===> Exited process #{pid.inspect} has unrecgonized type #{type.inspect}: #{spec.inspect}")
+ end
+ end
+
+ def self.register_manual_ack(pid)
+ ack_mode = Einhorn::State.ack_mode
+ unless ack_mode[:type] == :manual
+ Einhorn.log_error("Received a manual ACK for #{pid.inspect}, but ack_mode is #{ack_mode.inspect}. Ignoring ACK.")
+ return
+ end
+ Einhorn.log_info("Received a manual ACK from #{pid.inspect}")
+ register_ack(pid)
+ end
+
+ def self.register_timer_ack(time, pid)
+ ack_mode = Einhorn::State.ack_mode
+ unless ack_mode[:type] == :timer
+ Einhorn.log_error("Received a timer ACK for #{pid.inspect}, but ack_mode is #{ack_mode.inspect}. Ignoring ACK.")
+ return
+ end
+
+ unless Einhorn::State.children[pid]
+ # TODO: Maybe cancel pending ACK timers upon death?
+ Einhorn.log_debug("Worker #{pid.inspect} died before its timer ACK happened.")
+ return
+ end
+
+ Einhorn.log_info("Worker #{pid.inspect} has been up for #{time}s, so we are considering it alive.")
+ register_ack(pid)
+ end
+
+ def self.register_ack(pid)
+ unless spec = Einhorn::State.children[pid]
+ Einhorn.log_error("Could not find state for PID #{pid.inspect}; ignoring ACK.")
+ return
+ end
+
+ if spec[:acked]
+ Einhorn.log_error("Pid #{pid.inspect} already ACKed; ignoring new ACK.")
+ return
+ end
+
+ spec[:acked] = true
+ Einhorn.log_info("Up to #{Einhorn::WorkerPool.ack_count} / #{Einhorn::WorkerPool.ack_target} #{Einhorn::State.ack_mode[:type]} ACKs")
+ # Could call cull here directly instead, I believe.
+ Einhorn::Event.break_loop
+ end
+
+ def self.signal_all(signal, children)
+ Einhorn.log_info("Sending #{signal} to #{children.inspect}")
+
+ children.each do |child|
+ unless spec = Einhorn::State.children[child]
+ Einhorn.log_error("Trying to send #{signal} to dead child #{child.inspect}. The fact we tried this probably indicates a bug in Einhorn.")
+ next
+ end
+
+ if spec[:signaled].include?(child)
+ Einhorn.log_error("Not sending #{signal} to already-signaled child #{child.inspect}. The fact we tried this probably indicates a bug in Einhorn.")
+ next
+ end
+ spec[:signaled].add(child)
+
+ begin
+ Process.kill(signal, child)
+ rescue Errno::ESRCH
+ end
+ end
+ end
+
+ def self.increment
+ Einhorn::Event.break_loop
+ old = Einhorn::State.config[:number]
+ new = (Einhorn::State.config[:number] += 1)
+ output = "Incrementing number of workers from #{old} -> #{new}"
+ $stderr.puts(output)
+ output
+ end
+
+ def self.decrement
+ if Einhorn::State.config[:number] <= 1
+ output = "Can't decrease number of workers (already at #{Einhorn::State.config[:number]}). Run kill #{$$} if you really want to kill einhorn."
+ $stderr.puts output
+ return output
+ end
+
+ Einhorn::Event.break_loop
+ old = Einhorn::State.config[:number]
+ new = (Einhorn::State.config[:number] -= 1)
+ output = "Decrementing number of workers from #{old} -> #{new}"
+ $stderr.puts(output)
+ output
+ end
+
+ def self.dumpable_state
+ global_state = Einhorn::State.state
+ descriptor_state = Einhorn::Event.persistent_descriptors.map do |descriptor|
+ descriptor.to_state
+ end
+
+ {
+ :state => global_state,
+ :persistent_descriptors => descriptor_state
+ }
+ end
+
+ def self.reload
+ Einhorn.log_info("Reloading einhorn (#{Einhorn::TransientState.script_name})...")
+
+ # In case there's anything lurking
+ $stdout.flush
+
+ # Spawn a child to pass the state through the pipe
+ read, write = IO.pipe
+ fork do
+ Einhorn::TransientState.whatami = :state_passer
+ Einhorn::State.generation += 1
+ Einhorn::State.children[$$] = {
+ :type => :state_passer
+ }
+ read.close
+
+ write.write(YAML.dump(dumpable_state))
+ write.close
+
+ exit(0)
+ end
+ write.close
+
+ Einhorn::Event.uninit
+
+ exec [Einhorn::TransientState.script_name, Einhorn::TransientState.script_name], *(['--with-state-fd', read.fileno.to_s, '--'] + Einhorn::State.cmd)
+ end
+
+ def self.spinup(cmd=nil)
+ cmd ||= Einhorn::State.cmd
+ if Einhorn::TransientState.preloaded
+ pid = fork do
+ Einhorn::TransientState.whatami = :worker
+
+ Einhorn.log_info('About to tear down Einhorn state and run einhorn_main')
+ Einhorn::Command::Interface.uninit
+ Einhorn::Event.close_all_for_worker
+ Einhorn.set_argv(cmd, true)
+
+ pass_command_socket_info
+ einhorn_main
+ end
+ else
+ pid = fork do
+ Einhorn::TransientState.whatami = :worker
+
+ Einhorn.log_info("About to exec #{cmd.inspect}")
+ # Here's the only case where cloexec would help. Since we
+ # have to track and manually close FDs for other cases, we
+ # may as well just reuse close_all rather than also set
+ # cloexec on everything.
+ Einhorn::Event.close_all_for_worker
+
+ pass_command_socket_info
+ exec [cmd[0], cmd[0]], *cmd[1..-1]
+ end
+ end
+
+ Einhorn.log_info("===> Launched #{pid}")
+ Einhorn::State.children[pid] = {
+ :type => :worker,
+ :version => Einhorn::State.version,
+ :acked => false,
+ :signaled => Set.new
+ }
+ Einhorn::State.last_spinup = Time.now
+
+ # Set up whatever's needed for ACKing
+ ack_mode = Einhorn::State.ack_mode
+ case type = ack_mode[:type]
+ when :timer
+ Einhorn::Event::ACKTimer.open(ack_mode[:timeout], pid)
+ when :manual
+ else
+ Einhorn.log_error("Unrecognized ACK mode #{type.inspect}")
+ end
+ end
+
+ def self.pass_command_socket_info
+ # This is run from the child
+ ENV['EINHORN_MASTER_PID'] = Process.ppid.to_s
+ ENV['EINHORN_SOCK_PATH'] = Einhorn::Command::Interface.socket_path
+ if Einhorn::State.command_socket_as_fd
+ socket = UNIXSocket.open(Einhorn::Command::Interface.socket_path)
+ Einhorn::TransientState.socket_handles << socket
+ ENV['EINHORN_FD'] = socket.fileno.to_s
+ end
+ end
+
+ def self.full_upgrade
+ if Einhorn::State.path && !Einhorn::State.reloading_for_preload_upgrade
+ reload_for_preload_upgrade
+ else
+ upgrade_workers
+ end
+ end
+
+ def self.reload_for_preload_upgrade
+ Einhorn::State.reloading_for_preload_upgrade = true
+ reload
+ end
+
+ def self.upgrade_workers
+ if Einhorn::State.upgrading
+ Einhorn.log_info("Currently upgrading (#{Einhorn::WorkerPool.ack_count} / #{Einhorn::WorkerPool.ack_target} ACKs; bumping version and starting over)...")
+ else
+ Einhorn::State.upgrading = true
+ Einhorn.log_info("Starting upgrade to #{Einhorn::State.version}...")
+ end
+
+ Einhorn::State.version += 1
+ replenish_immediately
+ end
+
+ def self.cull
+ acked = Einhorn::WorkerPool.ack_count
+ target = Einhorn::WorkerPool.ack_target
+
+ if Einhorn::State.upgrading && acked >= target
+ Einhorn::State.upgrading = false
+ Einhorn.log_info("Upgrade to version #{Einhorn::State.version} complete.")
+ end
+
+ old_workers = Einhorn::WorkerPool.old_workers
+ if !Einhorn::State.upgrading && old_workers.length > 0
+ Einhorn.log_info("Killing off #{old_workers.length} old workers.")
+ signal_all("USR2", old_workers)
+ end
+
+ if acked > target
+ excess = Einhorn::WorkerPool.acked_unsignaled_modern_workers[0...(acked-target)]
+ Einhorn.log_info("Have too many workers at the current version, so killing off #{excess.length} of them.")
+ signal_all("USR2", excess)
+ end
+ end
+
+ def self.replenish
+ return unless Einhorn::State.respawn
+
+ if !Einhorn::State.last_spinup
+ replenish_immediately
+ else
+ replenish_gradually
+ end
+ end
+
+ def self.replenish_immediately
+ missing = Einhorn::WorkerPool.missing_worker_count
+ if missing <= 0
+ Einhorn.log_error("Missing is currently #{missing.inspect}, but should always be > 0 when replenish_immediately is called. This probably indicates a bug in Einhorn.")
+ return
+ end
+ Einhorn.log_info("Launching #{missing} new workers")
+ missing.times {spinup}
+ end
+
+ def self.replenish_gradually
+ return if Einhorn::TransientState.has_outstanding_spinup_timer
+ return unless Einhorn::WorkerPool.missing_worker_count > 0
+
+ spinup_interval = Einhorn::State.config[:seconds]
+ seconds_ago = (Time.now - Einhorn::State.last_spinup).to_f
+
+ if seconds_ago > spinup_interval
+ Einhorn.log_debug("Last spinup was #{seconds_ago}s ago, and spinup_interval is #{spinup_interval}, so spinning up a new process")
+ spinup
+ else
+ Einhorn.log_debug("Last spinup was #{seconds_ago}s ago, and spinup_interval is #{spinup_interval}, so not spinning up a new process")
+ end
+
+ Einhorn::TransientState.has_outstanding_spinup_timer = true
+ Einhorn::Event::Timer.open(spinup_interval) do
+ Einhorn::TransientState.has_outstanding_spinup_timer = false
+ replenish
+ end
+ end
+
+ def self.quieter(log=true)
+ Einhorn::State.verbosity += 1 if Einhorn::State.verbosity < 2
+ output = "Verbosity set to #{Einhorn::State.verbosity}"
+ Einhorn.log_info(output) if log
+ output
+ end
+
+ def self.louder(log=true)
+ Einhorn::State.verbosity -= 1 if Einhorn::State.verbosity > 0
+ output = "Verbosity set to #{Einhorn::State.verbosity}"
+ Einhorn.log_info(output) if log
+ output
+ end
+ end
+end
336 lib/einhorn/command/interface.rb
@@ -0,0 +1,336 @@
+require 'tmpdir'
+require 'socket'
+
+module Einhorn::Command
+ module Interface
+ @@commands = {}
+ @@command_server = nil
+
+ def self.command_server=(server)
+ raise "Command server already set" if @@command_server && server
+ @@command_server = server
+ end
+
+ def self.command_server
+ @@command_server
+ end
+
+ def self.init
+ install_handlers
+ at_exit do
+ if Einhorn::TransientState.whatami == :master
+ to_remove = [pidfile]
+ # Don't nuke socket_path if we never successfully acquired it
+ to_remove << socket_path if @@command_server
+ to_remove.each do |file|
+ begin
+ File.unlink(file)
+ rescue Errno::ENOENT
+ end
+ end
+ end
+ end
+ end
+
+ def self.persistent_init
+ socket = open_command_socket
+ Einhorn::Event::CommandServer.open(socket)
+
+ # Could also rewrite this on reload. Might be useful in case
+ # someone goes and accidentally clobbers/deletes. Should make
+ # sure that the clobber is atomic if we we were do do that.
+ write_pidfile
+ end
+
+ def self.open_command_socket
+ path = socket_path
+
+ with_file_lock do
+ # Need to avoid time-of-check to time-of-use bugs in blowing
+ # away and recreating the old socketfile.
+ destroy_old_command_socket(path)
+ UNIXServer.new(path)
+ end
+ end
+
+ # Lock against other Einhorn workers. Unfortunately, have to leave
+ # this lockfile lying around forever.
+ def self.with_file_lock(&blk)
+ path = lockfile
+ File.open(path, 'w', 0600) do |f|
+ unless f.flock(File::LOCK_EX|File::LOCK_NB)
+ raise "File lock already acquired by another Einhorn process. This likely indicates you tried to run Einhorn masters with the same cmd_name at the same time. This is a pretty rare race condition."
+ end
+
+ blk.call
+ end
+ end
+
+ def self.destroy_old_command_socket(path)
+ # Socket isn't actually owned by anyone
+ begin
+ sock = UNIXSocket.new(path)
+ rescue Errno::ECONNREFUSED
+ # This happens with non-socket files and when the listening
+ # end of a socket has exited.
+ rescue Errno::ENOENT
+ # Socket doesn't exist
+ return
+ else
+ # Rats, it's still active
+ sock.close
+ raise Errno::EADDRINUSE.new("Another process (probably another Einhorn) is listening on the Einhorn command socket at #{path}. If you'd like to run this Einhorn as well, pass a `-d PATH_TO_SOCKET` to change the command socket location.")
+ end
+
+ # Socket should still exist, so don't need to handle error.
+ stat = File.stat(path)
+ unless stat.socket?
+ raise Errno::EADDRINUSE.new("Non-socket file present at Einhorn command socket path #{path}. Either remove that file and restart Einhorn, or pass a `-d PATH_TO_SOCKET` to change the command socket location.")
+ end
+
+ Einhorn.log_info("Blowing away old Einhorn command socket at #{path}. This likely indicates a previous Einhorn worker which exited uncleanly.")
+ # Whee, blow it away.
+ File.unlink(path)
+ end
+
+ def self.write_pidfile
+ file = pidfile
+ Einhorn.log_info("Writing PID to #{file}")
+ File.open(file, 'w') {|f| f.write($$)}
+ end
+
+ def self.uninit
+ remove_handlers
+ end
+
+ def self.socket_path
+ Einhorn::State.socket_path || default_socket_path
+ end
+
+ def self.default_socket_path(cmd_name=nil)
+ cmd_name ||= Einhorn::State.cmd_name
+ if cmd_name
+ filename = "einhorn-#{cmd_name}.sock"
+ else
+ filename = "einhorn.sock"
+ end
+ File.join(Dir.tmpdir, filename)
+ end
+
+ def self.lockfile
+ Einhorn::State.lockfile || default_lockfile_path
+ end
+
+ def self.default_lockfile_path(cmd_name=nil)
+ cmd_name ||= Einhorn::State.cmd_name
+ if cmd_name
+ filename = "einhorn-#{cmd_name}.lock"
+ else
+ filename = "einhorn.lock"
+ end
+ File.join(Dir.tmpdir, filename)
+ end
+
+ def self.pidfile
+ Einhorn::State.pidfile || default_pidfile
+ end
+
+ def self.default_pidfile(cmd_name=nil)
+ cmd_name ||= Einhorn::State.cmd_name
+ if cmd_name
+ filename = "einhorn-#{cmd_name}.pid"
+ else
+ filename = "einhorn.pid"
+ end
+ File.join(Dir.tmpdir, filename)
+ end
+
+ ## Signals
+ def self.install_handlers
+ Signal.trap("INT") do
+ Einhorn::Command.signal_all("USR2", Einhorn::State.children.keys)
+ Einhorn::State.respawn = false
+ end
+ Signal.trap("TERM") do
+ Einhorn::Command.signal_all("TERM", Einhorn::State.children.keys)
+ Einhorn::State.respawn = false
+ end
+ # Note that quit is a bit different, in that it will actually
+ # make Einhorn quit without waiting for children to exit.
+ Signal.trap("QUIT") do
+ Einhorn::Command.signal_all("QUIT", Einhorn::State.children.keys)
+ Einhorn::State.respawn = false
+ exit(1)
+ end
+ Signal.trap("HUP") {Einhorn::Command.reload}
+ Signal.trap("ALRM") {Einhorn::Command.full_upgrade}
+ Signal.trap("CHLD") {Einhorn::Event.break_loop}
+ Signal.trap("USR2") do
+ Einhorn::Command.signal_all("USR2", Einhorn::State.children.keys)
+ Einhorn::State.respawn = false
+ end
+ at_exit do
+ if Einhorn::State.kill_children_on_exit && Einhorn::TransientState.whatami == :master
+ Einhorn::Command.signal_all("USR2", Einhorn::State.children.keys)
+ Einhorn::State.respawn = false
+ end
+ end
+ end
+
+ def self.remove_handlers
+ %w{INT TERM QUIT HUP ALRM CHLD USR2}.each do |signal|
+ Signal.trap(signal, "DEFAULT")
+ end
+ end
+
+ ## Commands
+ def self.command(name, description=nil, &code)
+ @@commands[name] = {:description => description, :code => code}
+ end
+
+ def self.process_command(conn, command)
+ response = generate_response(conn, command)
+ if !response.nil?
+ send_message(conn, response)
+ else
+ conn.log_debug("Got back nil response, so not responding to command.")
+ end
+ end
+
+ def self.send_message(conn, response)
+ if response.kind_of?(String)
+ response = {'message' => response}
+ end
+ message = pack_message(response)
+ conn.write(message)
+ end
+
+ def self.generate_response(conn, command)
+ begin
+ request = JSON.parse(command)
+ rescue JSON::ParserError => e
+ return {
+ 'message' => "Could not parse command: #{e}"
+ }
+ end
+
+ unless command_name = request['command']
+ return {
+ 'message' => 'No "command" parameter provided; not sure what you want me to do.'
+ }
+ end
+
+ if command_spec = @@commands[command_name]
+ conn.log_debug("Received command: #{command.inspect}")
+ begin
+ return command_spec[:code].call(conn, request)
+ rescue StandardError => e
+ msg = "Error while processing command #{command_name.inspect}: #{e} (#{e.class})\n #{e.backtrace.join("\n ")}"
+ conn.log_error(msg)
+ return msg
+ end
+ else
+ conn.log_debug("Received unrecognized command: #{command.inspect}")
+ return unrecognized_command(conn, request)
+ end
+ end
+
+ def self.pack_message(message_struct)
+ begin
+ JSON.generate(message_struct) + "\n"
+ rescue JSON::GeneratorError => e
+ response = {
+ 'message' => "Error generating JSON message for #{message_struct.inspect} (this indicates a bug): #{e}"
+ }
+ JSON.generate(response) + "\n"
+ end
+ end
+
+ def self.command_descriptions
+ command_specs = @@commands.select do |_, spec|
+ spec[:description]
+ end.sort_by {|name, _| name}
+
+ command_specs.map do |name, spec|
+ "#{name}: #{spec[:description]}"
+ end.join("\n")
+ end
+
+ def self.unrecognized_command(conn, request)
+ <<EOF
+Unrecognized command: #{request['command'].inspect}
+
+#{command_descriptions}
+EOF
+ end
+
+ # Used by workers
+ command 'worker:ack' do |conn, request|
+ if pid = request['pid']
+ Einhorn::Command.register_manual_ack(pid)
+ else
+ conn.log_error("Invalid request (no pid): #{request.inspect}")
+ end
+ # Throw away this connection in case the application forgets to
+ conn.close
+ nil
+ end
+
+ # Used by einhornsh
+ command 'ehlo' do |conn, request|
+ <<EOF
+Welcome #{request['user']}! You are speaking to Einhorn Master Process #{$$}#{Einhorn::State.cmd_name ? " (#{Einhorn::State.cmd_name})" : ''}
+EOF
+ end
+
+ command 'help', 'Print out available commands' do
+"You are speaking to the Einhorn command socket. You can run the following commands:
+
+#{command_descriptions}
+"
+ end
+
+ command 'state', "Get a dump of Einhorn's current state" do
+ Einhorn::Command.dumpable_state.pretty_inspect
+ end
+
+ command 'reload', 'Reload Einhorn' do |conn, _|
+ # TODO: make reload actually work (command socket reopening is
+ # an issue). Would also be nice if user got a confirmation that
+ # the reload completed, though that's not strictly necessary.
+
+ # In the normal case, this will do a write
+ # synchronously. Otherwise, the bytes will be stuck into the
+ # buffer and lost upon reload.
+ send_message(conn, 'Reloading, as commanded')
+ Einhorn::Command.reload
+
+ # Reload should not return
+ raise "Not reachable"
+ end
+
+ command 'inc', 'Increment the number of Einhorn child processes' do
+ Einhorn::Command.increment
+ end
+
+ command 'dec', 'Decrement the number of Einhorn child processes' do
+ Einhorn::Command.decrement
+ end
+
+ command 'quieter', 'Decrease verbosity' do
+ Einhorn::Command.quieter
+ end
+
+ command 'louder', 'Increase verbosity' do
+ Einhorn::Command.louder
+ end
+
+ command 'upgrade', 'Upgrade all Einhorn workers. This may result in Einhorn reloading its own code as well.' do |conn, _|
+ # TODO: send confirmation when this is done
+ send_message(conn, 'Upgrading, as commanded')
+ # This or may not return
+ Einhorn::Command.full_upgrade
+ nil
+ end
+ end
+end
150 lib/einhorn/event.rb
@@ -0,0 +1,150 @@
+require 'set'
+
+module Einhorn
+ module Event
+ @@loopbreak_reader = nil
+ @@loopbreak_writer = nil
+ @@readable = {}
+ @@writeable = {}
+ @@timers = {}
+
+ def self.init
+ readable, writeable = IO.pipe
+ @@loopbreak_reader = LoopBreaker.open(readable)
+ @@loopbreak_writer = writeable
+ end
+
+ def self.uninit
+ # These don't need to persist across Einhorn reloads, so let's not keep.
+ @@loopbreak_reader.close
+ @@loopbreak_writer.close
+ end
+
+ def self.close_all
+ uninit
+ (@@readable.values + @@writeable.values).each do |descriptors|
+ descriptors.each do |descriptor|
+ descriptor.close
+ end
+ end
+ end
+
+ def self.close_all_for_worker
+ close_all
+ end
+
+ def self.persistent_descriptors
+ descriptor_sets = @@readable.values + @@writeable.values + @@timers.values
+ descriptors = descriptor_sets.inject {|a, b| a | b}
+ descriptors.select {|descriptor| Einhorn::Event::Persistent.persistent?(descriptor)}
+ end
+
+ def self.restore_persistent_descriptors(persistent_descriptors)
+ persistent_descriptors.each do |descriptor_state|
+ Einhorn::Event::Persistent.from_state(descriptor_state)
+ end
+ end
+
+ def self.register_readable(reader)
+ @@readable[reader.to_io] ||= Set.new
+ @@readable[reader.to_io] << reader
+ end
+
+ def self.deregister_readable(reader)
+ readers = @@readable[reader.to_io]
+ readers.delete(reader)
+ @@readable.delete(reader.to_io) if readers.length == 0
+ end
+
+ def self.readable_fds
+ readers = @@readable.keys
+ Einhorn.log_debug("Readable fds are #{readers.inspect}")
+ readers
+ end
+
+ def self.register_writeable(writer)
+ @@writeable[writer.to_io] ||= Set.new
+ @@writeable[writer.to_io] << writer
+ end
+
+ def self.deregister_writeable(writer)
+ writers = @@writeable[writer.to_io]
+ writers.delete(writer)
+ @@readable.delete(writer.to_io) if writers.length == 0
+ end
+
+ def self.writeable_fds
+ writers = @@writeable.select do |io, writers|
+ writers.any? {|writer| writer.write_pending?}
+ end.map {|io, writers| io}
+ Einhorn.log_debug("Writeable fds are #{writers.inspect}")
+ writers
+ end
+
+ def self.register_timer(timer)
+ @@timers[timer.expires_at] ||= Set.new
+ @@timers[timer.expires_at] << timer
+ end
+
+ def self.deregister_timer(timer)
+ timers = @@timers[timer.expires_at]
+ timers.delete(timer)
+ @@timers.delete(timer.expires_at) if timers.length == 0
+ end
+
+ def self.loop_once
+ run_selectables
+ run_timers
+ end
+
+ def self.timeout
+ # (expires_at of the next timer) - now
+ if expires_at = @@timers.keys.sort[0]
+ expires_at - Time.now
+ else
+ nil
+ end
+ end
+
+ def self.run_selectables
+ time = timeout
+ Einhorn.log_debug("Loop timeout is #{time.inspect}")
+ # Time's already up
+ return if time && time < 0
+
+ readable, writeable, _ = IO.select(readable_fds, writeable_fds, nil, time)
+ (readable || []).each do |io|
+ @@readable[io].each {|reader| reader.notify_readable}
+ end
+
+ (writeable || []).each do |io|
+ @@writeable[io].each {|writer| writer.notify_writeable}
+ end
+ end
+
+ def self.run_timers
+ @@timers.select {|expires_at, _| expires_at <= Time.now}.each do |expires_at, timers|
+ # Going to be modifying the set, so let's dup it.
+ timers.dup.each {|timer| timer.ring!}
+ end
+ end
+
+ def self.break_loop
+ Einhorn.log_debug("Breaking the loop")
+ begin
+ @@loopbreak_writer.write_nonblock('a')
+ rescue Errno::EWOULDBLOCK, Errno::EAGAIN
+ Einhorn.log_error("Loop break pipe is full -- probably means that we are quite backlogged")
+ end
+ end
+ end
+end
+
+require 'einhorn/event/persistent'
+require 'einhorn/event/timer'
+
+require 'einhorn/event/abstract_text_descriptor'
+require 'einhorn/event/ack_timer'
+require 'einhorn/event/command_server'
+require 'einhorn/event/connection'
+require 'einhorn/event/loop_breaker'
132 lib/einhorn/event/abstract_text_descriptor.rb
@@ -0,0 +1,132 @@
+module Einhorn::Event
+ class AbstractTextDescriptor
+ attr_accessor :read_buffer, :write_buffer
+ attr_reader :client_id
+
+ @@instance_counter = 0
+
+ def self.open(sock)
+ self.new(sock)
+ end
+
+ def initialize(sock)
+ @@instance_counter += 1
+
+ @socket = sock
+ @client_id = "#{@@instance_counter}:#{sock.fileno}"
+
+ @read_buffer = ""
+ @write_buffer = ""
+
+ @closed = false
+
+ register!
+ end
+
+ def close
+ @closed = true
+ deregister!
+ @socket.close
+ end
+
+ # API method
+ def read(&blk)
+ raise "Already registered a read block" if @read_blk
+ raise "No block provided" unless blk
+ raise "Must provide a block that accepts two arguments" unless blk.arity == 2
+
+ @read_blk = blk
+ notify_readable # Read what you can
+ end
+
+ def notify_readable
+ while true
+ begin
+ return if @closed
+ chunk = @socket.read_nonblock(1024)
+ rescue Errno::EAGAIN
+ break
+ rescue EOFError, Errno::EPIPE
+ close
+ break
+ else
+ log_debug("read #{chunk.length} bytes (#{chunk.inspect[0..20]})")
+ @read_buffer << chunk
+ process_read_buffer
+ end
+ end
+ end
+
+ # API method
+ def write(data)
+ @write_buffer << data
+ notify_writeable # Write what you can
+ end
+
+ def write_pending?
+ @write_buffer.length > 0
+ end
+
+ def notify_writeable
+ begin
+ return if @closed
+ written = @socket.write_nonblock(@write_buffer)
+ rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR
+ rescue Errno::EPIPE
+ close
+ else
+ log_debug("wrote #{written} bytes")
+ @write_buffer = @write_buffer[written..-1]
+ end
+ end
+
+ def to_io
+ @socket
+ end
+
+ def register!
+ Einhorn::Event.register_readable(self)
+ Einhorn::Event.register_writeable(self)
+ end
+
+ def deregister!
+ Einhorn::Event.deregister_readable(self)
+ Einhorn::Event.deregister_writeable(self)
+ end
+
+ def process_read_buffer
+ while true
+ if @read_buffer.length > 0
+ break unless split = parse_record
+ record, remainder = split
+ log_debug("Read a record of #{record.length} bytes.")
+ @read_buffer = remainder
+ consume_record(record)
+ else
+ break
+ end
+ end
+ end
+
+ # Override in subclass. This lets you do streaming reads.
+ def parse_record
+ [@read_buffer, '']
+ end
+
+ def consume_record(record)
+ raise NotImplementedError.new
+ end
+
+ def log_debug(msg)
+ Einhorn.log_debug("[client #{client_id}] #{msg}")
+ end
+
+ def log_info(msg)
+ Einhorn.log_info("[client #{client_id}] #{msg}")
+ end
+
+ def log_error(msg)
+ Einhorn.log_error("[client #{client_id}] #{msg}")
+ end
+ end
+end
20 lib/einhorn/event/ack_timer.rb
@@ -0,0 +1,20 @@
+module Einhorn::Event
+ class ACKTimer < Timer
+ include Persistent
+
+ def initialize(time, pid, start=nil)
+ super(time, start) do
+ Einhorn::Command.register_timer_ack(time, pid)
+ end
+ @pid = pid
+ end
+
+ def to_state
+ {:class => self.class.to_s, :time => @time, :start => @start, :pid => @pid}
+ end
+
+ def self.from_state(state)
+ self.open(state[:time], state[:pid], state[:start])
+ end
+ end
+end
58 lib/einhorn/event/command_server.rb
@@ -0,0 +1,58 @@
+module Einhorn::Event
+ class CommandServer
+ include Persistent
+
+ def self.open(server)
+ self.new(server)
+ end
+
+ def initialize(server)
+ @server = server
+
+ @closed = false
+
+ register!
+ end
+
+ def notify_readable
+ begin
+ while true
+ return if @closed
+ sock = @server.accept_nonblock
+ Connection.open(sock)
+ end
+ rescue Errno::EAGAIN
+ end
+ end
+
+ def to_io
+ @server
+ end
+
+ def to_state
+ {:class => self.class.to_s, :server => @server.fileno}
+ end
+
+ def self.from_state(state)
+ fd = state[:server]
+ socket = UNIXServer.for_fd(fd)
+ self.open(socket)
+ end
+
+ def close
+ @closed = true
+ deregister!
+ @server.close
+ end
+
+ def register!
+ Einhorn::Command::Interface.command_server = self
+ Einhorn::Event.register_readable(self)
+ end
+
+ def deregister!
+ Einhorn::Command::Interface.command_server = nil
+ Einhorn::Event.deregister_readable(self)
+ end
+ end
+end
45 lib/einhorn/event/connection.rb
@@ -0,0 +1,45 @@
+module Einhorn::Event
+ class Connection < AbstractTextDescriptor
+ include Persistent
+
+ def parse_record
+ split = @read_buffer.split("\n", 2)
+ if split.length > 1
+ split
+ else
+ nil
+ end
+ end
+
+ def consume_record(command)
+ Einhorn::Command::Interface.process_command(self, command)
+ end
+
+ def to_state
+ state = {:class => self.class.to_s, :socket => @socket.fileno}
+ # Don't include by default because it's not that pretty
+ state[:read_buffer] = @read_buffer if @read_buffer.length > 0
+ state[:write_buffer] = @write_buffer if @write_buffer.length > 0
+ state
+ end
+
+ def self.from_state(state)
+ fd = state[:socket]
+ socket = Socket.for_fd(fd)
+ conn = self.open(socket)
+ conn.read_buffer = state[:read_buffer] if state[:read_buffer]
+ conn.write_buffer = state[:write_buffer] if state[:write_buffer]
+ conn
+ end
+
+ def register!
+ log_info("client connected")
+ super
+ end
+
+ def deregister!
+ log_info("client disconnected") if Einhorn::TransientState.whatami == :master
+ super
+ end
+ end
+end
6 lib/einhorn/event/loop_breaker.rb
@@ -0,0 +1,6 @@
+# TODO: set lots of cloexecs
+module Einhorn::Event
+ class LoopBreaker < AbstractTextDescriptor
+ def consume_record(record); end
+ end
+end