Permalink
Browse files

add streaming

  • Loading branch information...
1 parent aca8d1d commit 34cfd3cbe90a3a2408d679f0e946ff87dc094836 @rkh rkh committed Aug 18, 2011
Showing with 702 additions and 1 deletion.
  1. +3 −1 Gemfile.lock
  2. +1 −0 lib/sinatra/contrib.rb
  3. +267 −0 lib/sinatra/streaming.rb
  4. +1 −0 sinatra-contrib.gemspec
  5. +430 −0 spec/streaming_spec.rb
View
@@ -1,6 +1,6 @@
GIT
remote: git://github.com/sinatra/sinatra
- revision: af6b4c5367f0e05c047186787fe961d3c195c5c8
+ revision: 962d3f549ec6b7a2adbbee8438f03b28cdf80d6f
specs:
sinatra (1.3.0)
rack (~> 1.3)
@@ -11,6 +11,7 @@ PATH
specs:
sinatra-contrib (1.3.0)
backports (>= 2.0)
+ eventmachine
rack-protection
rack-test
sinatra (~> 1.3.0)
@@ -23,6 +24,7 @@ GEM
diff-lcs (1.1.2)
erubis (2.7.0)
escape_utils (0.2.3)
+ eventmachine (0.12.10)
haml (3.1.2)
json (1.5.1)
json (1.5.1-java)
View
@@ -17,6 +17,7 @@ module Common
helpers :EngineTracking
helpers :JSON
helpers :LinkHeader
+ helpers :Streaming
end
##
View
@@ -0,0 +1,267 @@
+require 'sinatra/base'
+require 'eventmachine'
+require 'backports'
+
+module Sinatra
+
+ # = Sinatra::Streaming
+ #
+ # Sinatra 1.3 introduced the +stream+ helper. This addon improves the
+ # streaming API by making the stream object immitate an IO object, turing
+ # it into a real Deferrable and making the body play nicer with middleware
+ # unaware of streaming.
+ #
+ # == IO-like behavior
+ #
+ # This is useful when passing the stream object to a library expecting an
+ # IO or StringIO object.
+ #
+ # get '/' do
+ # stream do |out|
+ # out.puts "Hello World!", "How are you?"
+ # out.write "Written #{out.pos} bytes so far!\n"
+ # out.putc(65) unless out.closed?
+ # out.flush
+ # end
+ # end
+ #
+ # == Proper Deferrable
+ #
+ # Handy when using EventMachine.
+ #
+ # list = []
+ #
+ # get '/' do
+ # stream(false) do |out|
+ # list << out
+ # out.callback { list.delete out }
+ # out.errback do
+ # logger.warn "lost connection"
+ # list.delete out
+ # end
+ # end
+ # end
+ #
+ # == Better Middleware Handling
+ #
+ # Blocks passed to #map! or #map will actually be applied while streaming
+ # (as you might suspect, #map! applies modifications to the current body,
+ # #map creates a new one):
+ #
+ # class StupidMiddleware
+ # def initialize(app) @app = app end
+ #
+ # def call(env)
+ # status, headers, body = @app.call(env)
+ # body.map! { |e| e.upcase }
+ # [status, headers, body]
+ # end
+ # end
+ #
+ # use StupidMiddleware
+ #
+ # get '/' do
+ # stream do |out|
+ # out.puts "still"
+ # sleep 1
+ # out.puts "streaming"
+ # end
+ # end
+ #
+ # Even works if #each is used to generate an Enumerator:
+ #
+ # def call(env)
+ # status, headers, body = @app.call(env)
+ # body = body.each.map { |s| s.upcase }
+ # [status, headers, body]
+ # end
+ #
+ # Note that both examples violate the Rack specification.
+ #
+ # == Setup
+ #
+ # In a classic application:
+ #
+ # require "sinatra"
+ # require "sinatra/streaming"
+ #
+ # In a modular application:
+ #
+ # require "sinatra/base"
+ # require "sinatra/streaming"
+ #
+ # class MyApp < Sinatra::Base
+ # helpers Streaming
+ # end
+ module Streaming
+ def stream(*)
+ stream = super
+ stream.extend Stream
+ stream.app = self
+ env['async.close'].callback { stream.close } if env.key? 'async.close'
+ stream
+ end
+
+ module Stream
+ include EventMachine::Deferrable
+
+ attr_accessor :app, :lineno, :pos, :transformer, :closed
+ alias tell pos
+ alias closed? closed
+
+ def self.extended(obj)
+ obj.closed, obj.lineno, obj.pos = false, 0, 0
+ obj.callback { obj.closed = true }
+ obj.errback { obj.closed = true }
+ end
+
+ def <<(data)
+ raise IOError, 'not opened for writing' if closed?
+ data = data.to_s
+ data = @transformer[data] if @transformer
+ @pos += data.bytesize
+ super(data)
+ end
+
+ def each
+ # that way body.each.map { ... } works
+ return self unless block_given?
+ super
+ end
+
+ def map(&block)
+ # dup would not copy the mixin
+ clone.map!(&block)
+ end
+
+ def map!(&block)
+ if @transformer
+ inner, outer = @transformer, block
+ block = proc { |value| outer[inner[value]] }
+ end
+ @transformer = block
+ self
+ end
+
+ def write(data)
+ self << data
+ data.bytesize
+ end
+
+ alias syswrite write
+ alias write_nonblock write
+
+ def print(*args)
+ args.each { |arg| self << arg }
+ nil
+ end
+
+ def printf(format, *args)
+ print(format.to_s % args)
+ end
+
+ def putc(c)
+ print c.chr
+ end
+
+ def puts(*args)
+ args.each { |arg| self << "#{arg}\n" }
+ nil
+ end
+
+ def close
+ @scheduler.schedule { succeed }
+ nil
+ end
+
+ def close_read
+ raise IOError, "closing non-duplex IO for reading"
+ end
+
+ def closed_read?
+ true
+ end
+
+ def closed_write?
+ closed?
+ end
+
+ def external_encoding
+ Encoding.find settings.default_encoding
+ rescue NameError
+ settings.default_encoding
+ end
+
+ def closed?
+ @closed
+ end
+
+ def settings
+ app.settings
+ end
+
+ def rewind
+ @pos = @lineno = 0
+ end
+
+ def not_open_for_reading(*)
+ raise IOError, "not opened for reading"
+ end
+
+ alias bytes not_open_for_reading
+ alias eof? not_open_for_reading
+ alias eof not_open_for_reading
+ alias getbyte not_open_for_reading
+ alias getc not_open_for_reading
+ alias gets not_open_for_reading
+ alias read not_open_for_reading
+ alias read_nonblock not_open_for_reading
+ alias readbyte not_open_for_reading
+ alias readchar not_open_for_reading
+ alias readline not_open_for_reading
+ alias readlines not_open_for_reading
+ alias readpartial not_open_for_reading
+ alias sysread not_open_for_reading
+ alias ungetbyte not_open_for_reading
+ alias ungetc not_open_for_reading
+ private :not_open_for_reading
+
+ def enum_not_open_for_reading(*)
+ not_open_for_reading if block_given?
+ enum_for(:not_open_for_reading)
+ end
+
+ alias chars enum_not_open_for_reading
+ alias each_line enum_not_open_for_reading
+ alias each_byte enum_not_open_for_reading
+ alias each_char enum_not_open_for_reading
+ alias lines enum_not_open_for_reading
+ undef enum_not_open_for_reading
+
+ def dummy(*) end
+ alias flush dummy
+ alias fsync dummy
+ alias internal_encoding dummy
+ alias pid dummy
+ undef dummy
+
+ def seek(*)
+ 0
+ end
+
+ alias sysseek seek
+
+ def sync
+ true
+ end
+
+ def tty?
+ false
+ end
+
+ alias isatty tty?
+ end
+ end
+
+ helpers Streaming
+end
View
@@ -107,6 +107,7 @@ Gem::Specification.new do |s|
s.add_dependency "tilt", "~> 1.3"
s.add_dependency "rack-test"
s.add_dependency "rack-protection"
+ s.add_dependency "eventmachine"
s.add_development_dependency "rspec", "~> 2.3"
s.add_development_dependency "haml"
Oops, something went wrong.

0 comments on commit 34cfd3c

Please sign in to comment.