Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Controller actions are processed in a separate thread for live

responses.

Processing controller actions in a separate thread allows us to work
around the rack api - we can allow the user to set status and headers,
then block until the first bytes are written.  As soon as the first
bytes are written, the main thread can return the status, headers, and
(essentially) a queue for the body.
  • Loading branch information...
commit 38cfbb8aa7e4aa4d9467e1706c50c3573cc714aa 1 parent 06c9e17
@tenderlove tenderlove authored
View
28 actionpack/lib/action_controller/metal/live.rb
@@ -6,8 +6,7 @@ module Live
class Response < ActionDispatch::Response
class Buffer < ActionDispatch::Response::Buffer # :nodoc:
def initialize(response)
- @response = response
- @buf = Queue.new
+ super(response, Queue.new)
end
def write(string)
@@ -59,5 +58,30 @@ def build_buffer(response, body)
buf
end
end
+
+ def process(name)
+ t1 = Thread.current
+ locals = t1.keys.map { |key| [key, t1[key]] }
+
+ # This processes the action in a child thread. It lets us return the
+ # response code and headers back up the rack stack, and still process
+ # the body in parallel with sending data to the client
+ Thread.new {
+ t2 = Thread.current
+ t2.abort_on_exception = true
+
+ # Since we're processing the view in a different thread, copy the
+ # thread locals from the main thread to the child thread. :'(
+ locals.each { |k,v| t2[k] = v }
+
+ begin
+ super(name)
+ ensure
+ @_response.commit!
+ end
+ }
+
+ @_response.await_commit
+ end
end
end
View
12 actionpack/lib/action_controller/test_case.rb
@@ -517,8 +517,8 @@ def process(action, http_method = 'GET', *args)
end
def setup_controller_request_and_response
- @request = TestRequest.new
- @response = TestResponse.new
+ @request = build_request
+ @response = build_response
@response.request = @request
@controller = nil unless defined? @controller
@@ -539,6 +539,14 @@ def setup_controller_request_and_response
end
end
+ def build_request
+ TestRequest.new
+ end
+
+ def build_response
+ TestResponse.new
+ end
+
included do
include ActionController::TemplateAssertions
include ActionDispatch::Assertions
View
72 actionpack/test/controller/live_stream_test.rb
@@ -1,16 +1,41 @@
require 'abstract_unit'
+require 'active_support/concurrency/latch'
module ActionController
- class StreamingResponseTest < ActionController::TestCase
+ class LiveStreamTest < ActionController::TestCase
class TestController < ActionController::Base
+ include ActionController::Live
+
+ attr_accessor :latch, :tc
+
def self.controller_path
'test'
end
def basic_stream
+ response.headers['Content-Type'] = 'text/event-stream'
+ %w{ hello world }.each do |word|
+ response.stream.write word
+ end
+ response.stream.close
+ end
+
+ def blocking_stream
+ response.headers['Content-Type'] = 'text/event-stream'
+ %w{ hello world }.each do |word|
+ response.stream.write word
+ latch.await
+ end
+ response.stream.close
+ end
+
+ def thread_locals
+ tc.assert_equal 'aaron', Thread.current[:setting]
+ tc.refute_equal Thread.current.object_id, Thread.current[:originating_thread]
+
+ response.headers['Content-Type'] = 'text/event-stream'
%w{ hello world }.each do |word|
response.stream.write word
- response.stream.write "\n"
end
response.stream.close
end
@@ -18,9 +43,50 @@ def basic_stream
tests TestController
+ class TestResponse < Live::Response
+ def recycle!
+ initialize
+ end
+ end
+
+ def build_response
+ TestResponse.new
+ end
+
def test_write_to_stream
+ @controller = TestController.new
get :basic_stream
- assert_equal "hello\nworld\n", @response.body
+ assert_equal "helloworld", @response.body
+ assert_equal 'text/event-stream', @response.headers['Content-Type']
+ end
+
+ def test_async_stream
+ @controller.latch = ActiveSupport::Concurrency::Latch.new
+ parts = ['hello', 'world']
+
+ @controller.request = @request
+ @controller.response = @response
+
+ t = Thread.new(@response) { |resp|
+ resp.stream.each do |part|
+ assert_equal parts.shift, part
+ ol = @controller.latch
+ @controller.latch = ActiveSupport::Concurrency::Latch.new
+ ol.release
+ end
+ }
+
+ @controller.process :blocking_stream
+
+ assert t.join
+ end
+
+ def test_thread_locals_get_copied
+ @controller.tc = self
+ Thread.current[:originating_thread] = Thread.current.object_id
+ Thread.current[:setting] = 'aaron'
+
+ get :thread_locals
end
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.