Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
macournoyer committed Feb 16, 2013
1 parent 89d4b23 commit d70fa88
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 134 deletions.
18 changes: 18 additions & 0 deletions README.md
Expand Up @@ -57,6 +57,24 @@ See examples/thin.conf.rb for a sample configuration file.

Run `thin -h` to list available options.

## Configuration

TODO

## Asynchronous Response

First make sure to add `use Thin::Async` in your `thin.conf.rb` or wherever you're setting up your middleware stack.

def call(env)
EM.add_timer(1) do
env['async.callback'].call [200, {'Content-Type' => 'text/plain'}, ['async!']]
end

[100, {'X-Thin-Defer' => 'response'}, []]
end

_Note that this feature is incompatible with Thin v1.x async feature. To fallback to the old behavior, add `use Thin::CatchAsync` after `use Thin::Async` in your middleware stack._

## License
Ruby License, http://www.ruby-lang.org/en/LICENSE.txt.

Expand Down
1 change: 1 addition & 0 deletions lib/thin.rb
Expand Up @@ -7,6 +7,7 @@ module Backends
autoload :SingleProcess, "thin/backends/single_process"
end

autoload :AsyncResponse, "thin/async_response"
autoload :Configurator, "thin/configurator"
autoload :Connection, "thin/connection"
autoload :FastEnumerator, "thin/fast_enumerator"
Expand Down
67 changes: 67 additions & 0 deletions lib/thin/async_response.rb
@@ -0,0 +1,67 @@
module Thin
class AsyncResponse
class Body
def initialize
@queue = []
end

def <<(chunk)
@queue << chunk
schedule_dequeue
end

def each(&block)
@callback = block
schedule_dequeue
end

private
def schedule_dequeue
return unless @callback
EM.next_tick do
next unless chunk = @queue.shift
@callback.call(chunk)
schedule_dequeue unless @queue.empty?
end
end
end

def initialize(env, status=200, headers={})
connection = env['thin.connection']
# Fallback to thin.connection methods
@callback = env['async.callback'] || connection.method(:send_response)
@close = env['async.close'] || connection.method(:close)

@status = status
@headers = headers
@body = Body.new
@head_sent = false

if block_given?
yield self
finish
end
end

def send_head
return if @head_sent
EM.next_tick { @callback.call [@status, @headers, @body] }
@head_sent = true
end

def write(data)
send_head
@body << data
end
alias << write

def done
send_head
EM.next_tick @close
end

def finish
[100, {'X-Thin-Defer' => 'response'}, []]
end
end
end
5 changes: 3 additions & 2 deletions lib/thin/connection.rb
Expand Up @@ -83,7 +83,9 @@ def process

# Send the HTTP response back to the client.
def send_response(response)
@response.close if @response
@response = Response.new(*response)

defer = @response.headers.delete('X-Thin-Defer')

# Defer the entire response. We're going to respond later.
Expand All @@ -101,8 +103,6 @@ def send_response(response)
# Send the head (status & headers)
write @response.head

trigger 'send' and return if defer == 'body'

# Send the body
@response.body.each { |chunk| write chunk }

Expand Down Expand Up @@ -145,6 +145,7 @@ def write(data)

private
def trigger(event)
# TODO should support several callbacks
if callback = @request && @request.env["thin.on_#{event}"]
callback.call
end
Expand Down
147 changes: 29 additions & 118 deletions lib/thin/middlewares/async.rb
@@ -1,146 +1,57 @@
module Thin
class Async
class Callback
def initialize(method, env)
@method = method
def initialize(env, &callback)
@env = env
@callback = callback
end

def call(response)
@method.call(response, @env)
@callback.call(response, @env)
end
end

# Middleware stack for an async response.
# Since the response is already produced here, middleware that modify the request (env)
# won't have any effect.
class Stack
def initialize(&builder)
builder = Rack::Builder.new(&builder)
builder.run(self)
@app = builder.to_app
end

def call(env)
@response
end

def call_with(env, response)
@response = response
@app.call(env)
ensure
@response = nil
end
end

def initialize(app, &builder)
@app = app
@builder = Rack::Builder.new(&builder)
@stack = Stack.new(&builder)
end

def call(env)
# Connection may be closed unless the App#call response was a [-1, ...]
# It should be noted that connection objects will linger until this
# callback is no longer referenced, so be tidy!
env['async.callback'] = Callback.new(method(:async_call), env)
env['async.callback'] = Callback.new(env) { |reponse, env| async_call reponse, env }
env['async.close'] = lambda { env['thin.connection'].close }

@app.call(env)
end

def async_call(response, env)
# TODO refactor this to prevent creating a proc on each call
@builder.run(proc { |env| response })
status, headers, body = *@builder.call(env)
status, headers, body = *@stack.call_with(env, response)

connection = env['thin.connection']
headers['X-Thin-Defer'] = 'close'

close = proc { connection.close }
body.callback(&close) if body.respond_to?(:callback)
body.errback(&close) if body.respond_to?(:errback)

connection.call [status, headers, body]
end
end

# Response whos body is sent asynchronously.
#
# A nice wrapper around Thin's obscure async callback used to send response body asynchronously.
# Which means you can send the response in chunks while allowing Thin to process other requests.
#
# Crazy delicious with em-http-request for file upload, image processing, proxying, etc.
#
# == _WARNING_
# You should not use long blocking operations (Net::HTTP or slow shell calls) with this as it
# will prevent the EventMachine event loop from running and block all other requests.
#
# Also disable the Rack::Lint middleware to use Thin's async feature since it requires sending
# back an invalid status code to the server.
#
# == Usage
# Inside your Rack app #call(env):
#
# response = Thin::AsyncResponse.new(env)
# response.status = 201
# response.headers["X-Muffin-Mode"] = "ACTIVATED!"
#
# response << "this is ... "
#
# EM.add_timer(1) do
# # This will be sent to the client 1 sec later without blocking other requests.
# response << "async!"
# response.done
# end
#
# response.finish
#
class AsyncResponse
include Rack::Response::Helpers

class DeferrableBody
include EM::Deferrable

def initialize
@queue = []
end

def call(body)
@queue << body
schedule_dequeue
end

def each(&blk)
@body_callback = blk
schedule_dequeue
end

private
def schedule_dequeue
return unless @body_callback
EM.next_tick do
next unless body = @queue.shift
body.each do |chunk|
@body_callback.call(chunk)
end
schedule_dequeue unless @queue.empty?
end
end
end

attr_reader :headers, :callback
attr_accessor :status

def initialize(env, status=200, headers={})
@callback = env['async.callback']
@closer = env['thin.close']
@body = DeferrableBody.new
@status = status
@headers = headers
@headers_sent = false

yield self if block_given?
end

def send_headers(response=nil)
return if @headers_sent
@callback.call response || [@status, @headers, @body]
@headers_sent = true
end

def write(body)
send_headers
@body.call(body.respond_to?(:each) ? body : [body])
end
alias :<< :write

# Tell Thin the response is complete and the connection can be closed.
def done(response=nil)
send_headers(response)
EM.next_tick { @closer.close }
end

# Tell Thin the response is gonna be sent asynchronously.
# The status code of -1 is the magic trick here.
def finish
Response::ASYNC
end
end
end
13 changes: 4 additions & 9 deletions lib/thin/middlewares/chunked.rb
Expand Up @@ -14,24 +14,19 @@ def initialize(body)
@body = body
end

def each
term = TERM
def each(&block)
@callback = block
@body.each do |chunk|
size = Rack::Utils.bytesize(chunk)
next if size == 0

chunk = chunk.dup.force_encoding(Encoding::BINARY) if chunk.respond_to?(:force_encoding)
yield [size.to_s(16), term, chunk, term].join
end

if @body.respond_to?(:callback)
@body.callback { yield TAIL }
else
yield TAIL
yield [size.to_s(16), TERM, chunk, TERM].join
end
end

def close
@callback.call TAIL
@body.close if @body.respond_to?(:close)
end
end
Expand Down
4 changes: 2 additions & 2 deletions lib/thin/middlewares/stream_file.rb
Expand Up @@ -16,14 +16,14 @@ def call(env)
headers['Transfer-Encoding'] = 'chunked'
end

headers['X-Thin-Defer'] = 'body'
headers['X-Thin-Defer'] = 'close'

env['thin.on_send'] = proc do
send_file connection, body.to_path, chunked
end
end

[status, headers, body]
[status, headers, []]
end

def chunked?(env)
Expand Down
2 changes: 1 addition & 1 deletion lib/thin/middlewares/streamed.rb
Expand Up @@ -23,7 +23,7 @@ def call(env)
end
tick_loop.on_stop { connection.close }

headers['X-Thin-Defer'] = 'body'
headers['X-Thin-Defer'] = 'close'

[status, headers, body]
end
Expand Down
6 changes: 4 additions & 2 deletions v2.todo
Expand Up @@ -19,7 +19,9 @@ x Threading
- Graceful stop
x Daemonizing
x Add option to turn off async chunked encoding
- Deffered GC: https://github.com/defunkt/unicorn/blob/master/lib/unicorn/oob_gc.rb
- Stats
- Rails plugin for async response

Optimizations:
- Stock 200 OK response (store in frozen const)
- Stock 200 OK response (store in frozen const)
- Deffered GC: https://github.com/defunkt/unicorn/blob/master/lib/unicorn/oob_gc.rb

0 comments on commit d70fa88

Please sign in to comment.