Permalink
Browse files

added live responses which can be written and read in separate threads

  • Loading branch information...
1 parent d4433a3 commit af0a9f9eefaee3a8120cfd8d05cbc431af376da3 @tenderlove tenderlove committed Jul 30, 2012
@@ -1,5 +1,6 @@
require 'abstract_controller'
require 'action_dispatch'
+require 'action_controller/metal/live'
module ActionController
extend ActiveSupport::Autoload
@@ -0,0 +1,42 @@
+require 'action_dispatch/http/response'
+
+module ActionController
+ module Live
+ class Response < ActionDispatch::Response
+ class Buffer < ActionDispatch::Response::Buffer # :nodoc:
+ def initialize(response)
+ @response = response
+ @buf = Queue.new
+ end
+
+ def write(string)
+ unless @response.committed?
+ @response.headers["Cache-Control"] = "no-cache"
+ @response.headers.delete("Content-Length")
+ end
+
+ super
+ end
+
+ def each
+ while str = @buf.pop
+ yield str
+ end
+ end
+
+ def close
+ super
+ @buf.push nil
+ end
+ end
+
+ private
+
+ def build_buffer(response, body)
+ buf = Buffer.new response
+ body.each { |part| buf.write part }
+ buf
+ end
+ end
+ end
+end
@@ -0,0 +1,26 @@
+require 'abstract_unit'
+
+module ActionController
+ class StreamingResponseTest < ActionController::TestCase
+ class TestController < ActionController::Base
+ def self.controller_path
+ 'test'
+ end
+
+ def basic_stream
+ %w{ hello world }.each do |word|
+ response.stream.write word
+ response.stream.write "\n"
+ end
+ response.stream.close
+ end
+ end
+
+ tests TestController
+
+ def test_write_to_stream
+ get :basic_stream
+ assert_equal "hello\nworld\n", @response.body
+ end
+ end
+end
@@ -0,0 +1,34 @@
+require 'abstract_unit'
+require 'active_support/concurrency/latch'
+
+module ActionController
+ module Live
+ class ResponseTest < ActiveSupport::TestCase
+ def setup
+ @response = Live::Response.new
+ end
+
+ def test_parallel
+ latch = ActiveSupport::Concurrency::Latch.new
+
+ t = Thread.new {
+ @response.stream.write 'foo'
+ latch.await
+ @response.stream.close
+ }
+
+ @response.each do |part|
+ assert_equal 'foo', part
+ latch.release
+ end
+ assert t.join
+ end
+
+ def test_setting_body_populates_buffer
+ @response.body = 'omg'
+ @response.close
+ assert_equal ['omg'], @response.body_parts
+ end
+ end
+ end
+end
@@ -0,0 +1,27 @@
+require 'thread'
+require 'monitor'
+
+module ActiveSupport
+ module Concurrency
+ class Latch
+ def initialize(count = 1)
+ @count = count
+ @lock = Monitor.new
+ @cv = @lock.new_cond
+ end
+
+ def release
+ @lock.synchronize do
+ @count -= 1 if @count > 0
+ @cv.broadcast if @count.zero?
+ end
+ end
+
+ def await
+ @lock.synchronize do
+ @cv.wait_while { @count > 0 }
+ end
+ end
+ end
+ end
+end

0 comments on commit af0a9f9

Please sign in to comment.