Permalink
Browse files

Add multiple acceptor threads

  • Loading branch information...
1 parent c2e6206 commit dafc94ee58c6e7641bdcc45c3ab918eb4da87eda @evanphx evanphx committed Jan 30, 2012
Showing with 112 additions and 64 deletions.
  1. +17 −0 lib/puma/accept.rb
  2. +68 −43 lib/puma/server.rb
  3. +27 −21 lib/puma/thread_pool.rb
View
@@ -0,0 +1,17 @@
+require 'socket'
+
+module Puma
+ if TCPServer.method_defined? :accept_nonblock2
+ def self.try_accept(sock)
+ sock.accept_nonblock2
+ end
+ else
+ def self.try_accept(sock)
+ begin
+ sock.accept_nonblock
+ rescue Errno::EAGAIN
+ return nil
+ end
+ end
+ end
+end
View
@@ -5,6 +5,7 @@
require 'puma/const'
require 'puma/events'
require 'puma/null_io'
+require 'puma/accept'
require 'puma/puma_http11'
@@ -38,15 +39,17 @@ def initialize(app, events=Events::DEFAULT)
@app = app
@events = events
- @check, @notify = IO.pipe
- @ios = [@check]
+ @check = Queue.new
+ @sockets = []
@status = :stop
@min_threads = 0
@max_threads = 16
@auto_trim_time = 1
+ @acceptors = 2
+
@thread = nil
@thread_pool = nil
@@ -105,7 +108,7 @@ def add_tcp_listener(host, port, optimize_for_latency=true, backlog=1024)
s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
end
s.listen backlog
- @ios << s
+ @sockets << s
end
def add_ssl_listener(host, port, ctx, optimize_for_latency=true, backlog=1024)
@@ -114,13 +117,13 @@ def add_ssl_listener(host, port, ctx, optimize_for_latency=true, backlog=1024)
s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
end
s.listen backlog
- @ios << OpenSSL::SSL::SSLServer.new(s, ctx)
+ @sockets << OpenSSL::SSL::SSLServer.new(s, ctx)
end
# Tell the server to listen on +path+ as a UNIX domain socket.
#
def add_unix_listener(path)
- @ios << UNIXServer.new(path)
+ @sockets << UNIXServer.new(path)
end
def backlog
@@ -131,6 +134,46 @@ def running
@thread_pool and @thread_pool.spawned
end
+ class Acceptor
+ def initialize(events, sockets, pool, close)
+ @events = events
+ @sockets = sockets
+ @pool = pool
+ @close = close
+ @thread = nil
+ end
+
+ attr_reader :thread
+
+ def run
+ @thread = Thread.new do
+ sockets = @sockets + [@close]
+ pool = @pool
+ close = @close
+
+ run = true
+
+ begin
+ while run
+ ios = IO.select sockets
+ ios.first.each do |sock|
+ if sock == close
+ run = false
+ break
+ end
+
+ if client = Puma.try_accept(sock)
+ pool << client
+ end
+ end
+ end
+ rescue Object => e
+ @events.unknown_error self, e, "Listen loop"
+ end
+ end
+ end
+ end
+
# Runs the server. It returns the thread used so you can join it.
# The thread is always available via #thread to be join'd
#
@@ -149,53 +192,35 @@ def run
@thread = Thread.new do
begin
- check = @check
- sockets = @ios
- pool = @thread_pool
+ r, w = IO.pipe
+
+ accepts = []
+
+ @acceptors.times do
+ a = Acceptor.new(@events, @sockets, @thread_pool, r)
+ a.run
+ accepts << a
+ end
while @status == :run
- begin
- ios = IO.select sockets
- ios.first.each do |sock|
- if sock == check
- break if handle_check
- else
- pool << sock.accept
- end
- end
- rescue Errno::ECONNABORTED
- # client closed the socket even before accept
- client.close rescue nil
- rescue Object => e
- @events.unknown_error self, e, "Listen loop"
- end
+ @status = @check.pop
end
- graceful_shutdown if @status == :stop
+ if @status == :stop
+ w << "!"
+ accepts.each { |a| a.thread.join }
+ graceful_shutdown
+ end
ensure
- @ios.each { |i| i.close }
+ @sockets.each { |i| i.close }
+ r.close
+ w.close
end
end
return @thread
end
- # :nodoc:
- def handle_check
- cmd = @check.read(1)
-
- case cmd
- when STOP_COMMAND
- @status = :stop
- return true
- when HALT_COMMAND
- @status = :halt
- return true
- end
-
- return false
- end
-
# Given a connection on +client+, handle the incoming requests.
#
# This method support HTTP Keep-Alive so it may, depending on if the client
@@ -559,14 +584,14 @@ def graceful_shutdown
#
def stop(sync=false)
@persistent_wakeup.close
- @notify << STOP_COMMAND
+ @check << :stop
@thread.join if @thread && sync
end
def halt(sync=false)
@persistent_wakeup.close
- @notify << HALT_COMMAND
+ @check << :halt
@thread.join if @thread && sync
end
View
@@ -47,29 +47,35 @@ def spawn_thread
end
th = Thread.new do
- todo = @todo
- block = @block
-
- while true
- work = todo.pop
-
- case work
- when Stop
- break
- when Trim
- @mutex.synchronize do
- @trim_requested -= 1
+ begin
+ todo = @todo
+ block = @block
+
+ tid = Thread.current.__id__
+
+ while true
+ work = todo.pop
+
+ case work
+ when Stop
+ break
+ when Trim
+ @mutex.synchronize do
+ @trim_requested -= 1
+ end
+
+ break
+ else
+ block.call work
end
-
- break
- else
- block.call work
end
- end
-
- @mutex.synchronize do
- @spawned -= 1
- @workers.delete th
+ rescue Exception => e
+ p e
+ ensure
+ @mutex.synchronize do
+ @spawned -= 1
+ @workers.delete th
+ end
end
end

0 comments on commit dafc94e

Please sign in to comment.