Permalink
Browse files

add #stream helper

  • Loading branch information...
1 parent 9f69232 commit 49d4c90052ba2138ae4a79afd1fe1b7eeb4cd44c @rkh rkh committed Aug 17, 2011
Showing with 219 additions and 4 deletions.
  1. +54 −0 README.rdoc
  2. +64 −3 lib/sinatra/base.rb
  3. +1 −1 test/response_test.rb
  4. +100 −0 test/streaming_test.rb
View
@@ -188,6 +188,9 @@ That way we can, for instance, easily implement a streaming example:
get('/') { Stream.new }
+You can also use the +stream+ helper method (described below) to reduce boiler
+plate and embed the streaming logic in the route.
+
=== Custom Route Matchers
As shown above, Sinatra ships with built-in support for using String patterns
@@ -813,6 +816,54 @@ Similar to the body, you can also set the status code and headers:
Like +body+, +headers+ and +status+ with no arguments can be used to access
their current values.
+=== Streaming Responses
+
+Sometimes you want to start sending out data while still generating parts of
+the response body. In extreme examples, you want to keep sending data until
+the client closes the connection. You can use the +stream+ helper to avoid
+creating your own wrapper:
+
+ get '/' do
+ stream do |out|
+ out << "It's gonna be legen -\n"
+ sleep 0.5
+ out << " (wait for it) \n"
+ sleep 1
+ out << "- dary!\n"
+ end
+ end
+
+This allows you to implement streaming APIs,
+{Server Sent Events}[http://dev.w3.org/html5/eventsource/] and can be used as
+basis for {WebSockets}[http://en.wikipedia.org/wiki/WebSocket]. It can also be
+used to increase throughput if some but not all content depends on a slow
+resource.
+
+Note that the streaming behavior, especially the number of concurrent request,
+highly depends on the web server used to serve the application. Some servers,
+like WEBRick, might not even support streaming at all. If the server does not
+support streaming, the body will be sent all at once after the block passed to
++stream+ finished executing.
+
+If the optional parameter is set to +false+, it will not call +close+ on the
+stream object, allowing you to close it at any later point in the execution
+flow. This only works on evented servers, like Thin and Rainbows. Other
+servers will still close the stream.
+
+ set :server, :thin
+ connections = []
+
+ get '/' do
+ # keep stream open
+ stream(false) { |out| connections << out }
+ end
+
+ post '/' do
+ # write to all open streams
+ connections.each { |out| out << params[:message] << "\n" }
+ "message sent"
+ end
+
=== Logging
In the request scope, the +logger+ helper exposes a +Logger+ instance:
@@ -1243,6 +1294,9 @@ You can access those options via <tt>settings</tt>:
Use an explicit array when setting multiple values:
<tt>set :static_cache_control, [:public, :max_age => 300]</tt>
+[threaded] If set to +true+, will tell Thin to use
+ <tt>EventMachine.defer</tt> for processing the request.
+
[views] views folder.
== Error Handling
View
@@ -78,7 +78,11 @@ def finish
elsif Array === body and not [204, 304].include?(status.to_i)
headers["Content-Length"] = body.inject(0) { |l, p| l + Rack::Utils.bytesize(p) }.to_s
end
- super
+
+ # Rack::Response#finish sometimes returns self as response body. We don't want that.
+ status, headers, result = super
+ result = body if result == self
+ [status, headers, result]
end
end
@@ -225,6 +229,61 @@ def send_file(path, opts={})
not_found
end
+ # Class of the response body in case you use #stream.
+ #
+ # Three things really matter: The front and back block (back being the
+ # blog generating content, front the one sending it to the client) and
+ # the scheduler, integrating with whatever concurrency feature the Rack
+ # handler is using.
+ #
+ # Scheduler has to respond to defer and schedule.
+ class Stream
+ def self.schedule(*) yield end
+ def self.defer(*) yield end
+
+ def initialize(scheduler = self.class, close = true, &back)
+ @back, @scheduler, @callback, @close = back.to_proc, scheduler, nil, close
+ end
+
+ def close
+ @scheduler.schedule { @callback.call if @callback }
+ end
+
+ def each(&front)
+ @front = front
+ @scheduler.defer do
+ begin
+ @back.call(self)
+ rescue Exception => e
+ @scheduler.schedule { raise e }
+ end
+ close if @close
+ end
+ end
+
+ def <<(data)
+ @scheduler.schedule { @front.call(data.to_s) }
+ self
+ end
+
+ def callback(&block)
+ @callback = block
+ end
+
+ alias errback callback
+ end
+
+ # Allows to start sending data to the client even though later parts of
+ # the response body have not yet been generated.
+ #
+ # The close parameter specifies whether Stream#close should be called
+ # after the block has been executed. This is only relevant for evented
+ # servers like Thin or Rainbows.
+ def stream(close = true, &block)
+ scheduler = env['async.callback'] ? EventMachine : Stream
+ body Stream.new(scheduler, close, &block)
+ end
+
# Specify response freshness policy for HTTP caches (Cache-Control header).
# Any number of non-value directives (:public, :private, :no_cache,
# :no_store, :must_revalidate, :proxy_revalidate) may be passed along with
@@ -1204,8 +1263,9 @@ def run!(options={})
"on #{port} for #{environment} with backup from #{handler_name}"
end
[:INT, :TERM].each { |sig| trap(sig) { quit!(server, handler_name) } }
+ server.threaded = settings.threaded if server.respond_to? :threaded=
set :running, true
- yield handler if block_given?
+ yield server if block_given?
end
rescue Errno::EADDRINUSE => e
$stderr.puts "== Someone is already performing on port #{port}!"
@@ -1277,7 +1337,7 @@ def detect_rack_handler
servers = Array(server)
servers.each do |server_name|
begin
- return Rack::Handler.get(server_name)
+ return Rack::Handler.get(server_name.to_s)
rescue LoadError
rescue NameError
end
@@ -1406,6 +1466,7 @@ class << self
set :views, Proc.new { root && File.join(root, 'views') }
set :reload_templates, Proc.new { development? }
set :lock, false
+ set :threaded, true
set :public_folder, Proc.new { root && File.join(root, 'public') }
set :static, Proc.new { public_folder && File.exist?(public_folder) }
@@ -37,7 +37,7 @@ class ResponseTest < Test::Unit::TestCase
@response.body = ['Hello', 'World!', '']
status, headers, body = @response.finish
assert_equal '14', headers['Content-Length']
- assert_equal @response.body, body.body
+ assert_equal @response.body, body
end
it 'does not call #to_ary or #inject on the body' do
View
@@ -0,0 +1,100 @@
+require File.expand_path('../helper', __FILE__)
+
+class StreamingTest < Test::Unit::TestCase
+ Stream = Sinatra::Helpers::Stream
+
+ it 'returns the concatinated body' do
+ mock_app do
+ get '/' do
+ stream do |out|
+ out << "Hello" << " "
+ out << "World!"
+ end
+ end
+ end
+
+ get('/')
+ assert_body "Hello World!"
+ end
+
+ it 'always yields strings' do
+ stream = Stream.new { |out| out << :foo }
+ stream.each { |str| assert_equal 'foo', str }
+ end
+
+ it 'postpones body generation' do
+ step = 0
+
+ stream = Stream.new do |out|
+ 10.times do
+ out << step
+ step += 1
+ end
+ end
+
+ stream.each do |s|
+ assert_equal s, step.to_s
+ step += 1
+ end
+ end
+
+ it 'calls the callback after it is done' do
+ step = 0
+ final = 0
+ stream = Stream.new { |o| 10.times { step += 1 }}
+ stream.callback { final = step }
+ stream.each { |str| }
+ assert_equal 10, final
+ end
+
+ it 'does not trigger the callback if close is set to false' do
+ step = 0
+ final = 0
+ stream = Stream.new(Stream, false) { |o| 10.times { step += 1 } }
+ stream.callback { final = step }
+ stream.each { |str| }
+ assert_equal 0, final
+ end
+
+ class MockScheduler
+ def initialize(*) @schedule, @defer = [], [] end
+ def schedule(&block) @schedule << block end
+ def defer(&block) @defer << block end
+ def schedule!(*) @schedule.pop.call until @schedule.empty? end
+ def defer!(*) @defer.pop.call until @defer.empty? end
+ end
+
+ it 'allows dropping in another scheduler' do
+ scheduler = MockScheduler.new
+ processing = sending = done = false
+
+ stream = Stream.new(scheduler) do |out|
+ processing = true
+ out << :foo
+ end
+
+ stream.each { sending = true}
+ stream.callback { done = true }
+
+ scheduler.schedule!
+ assert !processing
+ assert !sending
+ assert !done
+
+ scheduler.defer!
+ assert processing
+ assert !sending
+ assert !done
+
+ scheduler.schedule!
+ assert sending
+ assert done
+ end
+
+ it 'schedules exceptions to be raised on the main thread/event loop/...' do
+ scheduler = MockScheduler.new
+ Stream.new(scheduler) { fail 'should be caught' }.each { }
+ scheduler.defer!
+ assert_raise(RuntimeError) { scheduler.schedule! }
+ end
+end

0 comments on commit 49d4c90

Please sign in to comment.