Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

initial commit

  • Loading branch information...
commit 2585f0d11d2dd23d7a0e0bb94e51095ff8af574f 0 parents
Jerry Cheung jch authored
3  .gitignore
@@ -0,0 +1,3 @@
+.yardoc
+doc
+Gemfile.lock
2  .rspec
@@ -0,0 +1,2 @@
+--color
+--format=progress
17 .travis.yml
@@ -0,0 +1,17 @@
+bundler_args:
+ --without development debug darwin
+
+before_script:
+ - "bundle exec thin start -R spec/integration/server.ru -p 8888"
+ - "sleep 2"
+
+env:
+ - SERVER=http://localhost:8888
+
+language: ruby
+
+script: "bundle exec rake spec:integration && bundle exec rake spec"
+
+rvm:
+ - 1.9.2
+ - 1.9.3
1  .yardopts
@@ -0,0 +1 @@
+--no-private --protected - LICENSE
38 Gemfile
@@ -0,0 +1,38 @@
+source 'http://rubygems.org'
+
+gemspec
+
+group :development do
+ gem 'yard'
+ gem 'guard'
+ gem 'guard-rspec'
+ gem 'guard-bundler'
+end
+
+group :development, :test do
+ gem 'rake'
+ gem 'rack-test'
+ gem 'rspec', '~> 2.9'
+ gem 'bundler'
+ gem 'pry'
+ gem 'faraday'
+ gem 'thin'
+
+ # integration
+ gem 'capybara-webkit'
+ gem 'em-eventsource'
+ gem 'em-http-request'
+ gem 'sinatra'
+ gem 'sinatra-synchrony'
+end
+
+# debugger for 1.9 only
+group :debug do
+ gem 'debugger'
+end
+
+# Mac specific
+group :darwin do
+ gem 'rb-fsevent'
+ gem 'growl'
+end
11 Guardfile
@@ -0,0 +1,11 @@
+guard 'bundler' do
+ watch 'Gemfile'
+ watch 'rack-stream.gemspec'
+end
+
+guard 'rspec', :version => 2 do
+ watch(%r{^spec/.+_spec\.rb$})
+ watch(%r{^lib/(.+)\.rb$}) { |m| "spec/lib/#{m[1]}_spec.rb" }
+ watch('spec/spec_helper.rb') { "spec" }
+end
+
8 LICENSE
@@ -0,0 +1,8 @@
+Copyright (c) 2012, Jerry Cheung
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
+
+Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
282 README.md
@@ -0,0 +1,282 @@
+# rack-stream
+
+## Overview
+
+rack-stream is middleware for building multi-protocol streaming rack endpoints.
+
+## Example
+
+```ruby
+# config.ru
+require 'rack/stream'
+
+class App
+ include Rack::Stream::DSL
+
+ def call(env)
+ after_open do
+ count = 0
+ EM.add_periodic_timer(1) do
+ if count != 3
+ chunk "chunky #{count}\n"
+ count += 1
+ else
+ # Connection isn't closed until #close is called.
+ # Useful if you're building a firehose API
+ close
+ end
+ end
+ end
+
+ before_close do
+ chunk "monkey!\n"
+ end
+
+ [200, {'Content-Type' => 'application/json'}, []]
+ end
+end
+
+Rack::Builder.app do
+ use Rack::Stream
+ run App
+end
+```
+
+To run the example:
+
+```
+> thin start -R config.ru -p 3000
+> curl -i -N http://localhost:3000/
+>> HTTP/1.1 200 OK
+>> Content-Type: text/plain
+>> Transfer-Encoding: chunked
+>>
+>> chunky 0
+>> chunky 1
+>> chunky 2
+>> monkey
+```
+
+This same endpoint can be accessed via WebSockets or EventSource, see
+'Multi-Protocol Support' below.
+
+## Connection Lifecycle
+
+When using rack-stream, downstream apps can access the
+`Rack::Stream::App` instance via `env['rack.stream']`. This object is
+used to control when the connection is closed, and what is streamed.
+`Rack::Stream::DSL` delegates access methods to `env['rack.stream']`
+on the downstream rack app.
+
+`Rack::Stream::App` instances are in one of the follow states:
+
+* new
+* open
+* closed
+* errored
+
+Each state is described below.
+
+### new
+
+When a request first comes in, rack-stream processes any downstream
+rack apps and uses their status and headers for its response. Any
+downstream response bodies are queued for streaming once the headers
+and status have been sent.
+
+```ruby
+use Rack::Stream
+
+# once Rack::Stream instance is :open, 'Chunky Monkey' will be streamed out
+run lambda {|env| [200, {'Content-Type' => 'text/plain'}, ['Chunky Monkey']]}
+```
+
+### open
+
+Before the status and headers are sent in the response, they are
+frozen and cannot be further modified. Attempting to modify these
+fields will put the instance into an `:errored` state.
+
+After the status and headers are sent, registered `:after_open`
+callbacks will be called. If no `:after_open` callbacks are defined,
+the instance will close the connection after flushing any queued
+chunks.
+
+If any `:after_open` callbacks are defined, it's the callback's
+responsibility to call `#close` when the connection should be
+closed. This allows you to build firehose streaming APIs with full
+control of when to close connections.
+
+```ruby
+use Rack::Stream
+
+run lambda {|env|
+ stream = env['rack.stream']
+ stream.after_open do
+ stream.chunk "Chunky"
+ stream.chunk "Monkey"
+ stream.close # <-- It's your responsibility to close the connection
+ end
+ [200, {'Content-Type' => 'text/plain'}, ['Hello', 'World']] # <-- downstream response bodies are also streamed
+}
+```
+
+There are no `:before_open` callbacks. If you want something to be
+done before streaming is started, simply return it as part of your
+downstream response.
+
+### closed
+
+An instance enters the `:closed` state after the method `#close` is
+called on it. By default, any remainined queued content to be streamed
+will be flushed before the connection is closed.
+
+```ruby
+use Rack::Stream
+
+run lambda {|env|
+ # to save typing, access the Rack::Stream instance with #instance_eval
+ env['rack.stream'].instance_eval do
+ before_close do
+ chunk "Goodbye!" # chunks can still be sent
+ end
+
+ after_close do
+ # any additional cleanup. Calling #chunk here will result in an error.
+ end
+ end
+ [200, {}, []]
+}
+```
+
+### errored
+
+An instance enters the `:errored` state if an illegal action is
+performed in one of the states. Legal actions for the different states
+are:
+
+* **new** - `#status=`, `#headers=`
+* **open** - `#chunk`, `#close`
+
+All other actions are considered illegal. Manipulating headers after
+`:new` is also illegal. The connection is closed immediately, and the
+error is written to `env['rack.error']`
+
+## Manipuating Content
+
+When a connection is open and streaming content, you can define
+`:before_chunk` callbacks to manipulate the content before it's sent
+out.
+
+```ruby
+use Rack::Stream
+
+run lambda {|env|
+ env['rack.stream'].instance_eval do
+ after_open do
+ chunk "chunky", "monkey"
+ end
+
+ before_chunk do |chunks|
+ # return the manipulated chunks of data to be sent
+ # this will stream MONKEYCHUNKY
+ chunks.map(&:upcase).reverse
+ end
+ end
+}
+```
+
+## Multi-Protocol Support
+
+`Rack::Stream` allows you to write an API endpoint that automatically
+responds to different protocols based on the incoming request. This
+allows you to write a single rack endpoint that can respond to normal
+HTTP, WebSockets, or EventSource.
+
+Assuming that rack-stream endpoint is running on port 3000. You can
+access it with the following:
+
+### HTTP
+
+```
+# -i prints headers, -N immediately displays output instead of buffering
+curl -i -N http://localhost:3000/
+```
+
+### WebSockets
+
+With Ruby:
+
+```ruby
+require 'eventmachine'
+require 'faye/websocket'
+
+EM.run {
+ socket = Faye::WebSocket::Client.new('ws://localhost:3000/)
+ socket.onmessage = lambda {|e| puts e.data} # puts each streamed chunk
+ socket.onclose = lambda {|e| EM.stop}
+}
+```
+
+With Javascript:
+
+```js
+var socket = new WebSocket("ws://localhost:3000/");
+socket.onmessage = function(m) {console.log(m);}
+socket.onclose = function() {console.log('socket closed');}
+```
+
+### EventSource
+
+From Wikipedia:
+
+> Server-sent events is a technology for providing push notifications
+> from a server to a browser client in the form of DOM events. The
+> Server-Sent Events EventSource API is now being standardized as part
+> of HTML5 by the W3C.
+
+With Ruby:
+
+```ruby
+require 'em-eventsource'
+
+EM.run do
+ source = EventMachine::EventSource.new("http://example.com/streaming")
+ source.message do |m|
+ puts m
+ end
+ source.start
+end
+```
+
+With Javascript:
+
+```js
+var source = new EventSource('/');
+source.addEventListener('message', function(e) {
+ console.log(e.data);
+});
+```
+
+## Supported Runtimes
+
+* 1.9.2
+* 1.9.3
+
+If a runtime is not listed above, it may still work. It just means I
+haven't tried it yet.
+
+## Roadmap
+
+* more protocols / custom protocols http://en.wikipedia.org/wiki/HTTP_Streaming
+* integrate into [grape](http://github.com/intridea/grape)
+* add sinatra example that serves page that uses JS to connect
+
+## Further Reading
+
+* [Stream Updates With Server-Sent Events](http://www.html5rocks.com/en/tutorials/eventsource/basics/)
+* [thin_async](https://github.com/macournoyer/thin_async) was where I got started
+* [thin-async-test](https://github.com/phiggins/thin-async-test) used to simulate thin in tests
+* [thin](https://github.com/macournoyer/thin)
+* [faye-websocket-ruby](https://github.com/faye/faye-websocket-ruby) used for testing and handling different protocols
+* [rack-chunked](http://rack.rubyforge.org/doc/Rack/Chunked.html)
15 Rakefile
@@ -0,0 +1,15 @@
+#!/usr/bin/env rake
+require "bundler/gem_tasks"
+require "rspec/core/rake_task"
+
+Bundler::GemHelper.install_tasks
+
+RSpec::Core::RakeTask.new('spec') do |t|
+ t.rspec_opts = '--tag ~integration'
+end
+
+RSpec::Core::RakeTask.new('spec:integration') do |t|
+ t.pattern = 'spec/integration/*_spec.rb'
+end
+
+task :default => :spec
7 examples/basic.ru
@@ -0,0 +1,7 @@
+require 'rack/stream'
+
+use Rack::Stream
+
+run lambda {|env|
+ [200, {'Content-Type' => 'text/plain'}, ['hello', ' ', 'world']]
+}
20 examples/loop.ru
@@ -0,0 +1,20 @@
+require 'rack/stream'
+
+use Rack::Stream
+
+run lambda {|env|
+ env["rack.stream"].instance_eval do
+ count = 0
+ after_open do
+ timer = EM::PeriodicTimer.new(0.1) do
+ if count > 10
+ timer.cancel
+ close
+ end
+ chunk "Chunky\n"
+ count += 1
+ end
+ end
+ end
+ [200, {}, []]
+}
4 examples/no_stream.ru
@@ -0,0 +1,4 @@
+use Rack::Chunked
+run lambda {|env|
+ [200, {'Content-Type' => 'text/plain'}, ['hello', 'world']]
+}
15 examples/websocket_client.rb
@@ -0,0 +1,15 @@
+require 'faye/websocket'
+
+EM.run {
+ ws = Faye::WebSocket::Client.new('ws://localhost:3000/')
+ ws.onopen = lambda do |event|
+ ws.send("hello world")
+ end
+ ws.onmessage = lambda do |event|
+ puts "message: #{event.data}"
+ end
+ ws.onclose = lambda do |event|
+ puts "websocket closed"
+ EM.stop
+ end
+}
20 lib/rack/stream.rb
@@ -0,0 +1,20 @@
+require 'eventmachine'
+require 'faye/websocket'
+
+require 'rack/stream/handlers'
+require 'rack/stream/deferrable_body'
+require 'rack/stream/app'
+require 'rack/stream/dsl'
+
+module Rack
+ # Middleware for building multi-protocol streaming rack endpoints.
+ class Stream
+ def initialize(app, options={})
+ @app = app
+ end
+
+ def call(env)
+ App.new(@app).call(env)
+ end
+ end
+end
136 lib/rack/stream/app.rb
@@ -0,0 +1,136 @@
+module Rack
+ class Stream
+ class App
+ class UnsupportedServerError < StandardError; end
+ class StateConstraintError < StandardError; end
+
+ # The state of the connection
+ # :new
+ # :open
+ # :closed
+ attr_reader :state
+
+ # @private
+ attr_reader :env
+
+ attr_reader :status, :headers
+
+ def initialize(app, options={})
+ @app = app
+ @state = :new
+ @callbacks = Hash.new {|h,k| h[k] = []}
+ end
+
+ def call(env)
+ @env = env
+ @env['rack.stream'] = self
+
+ app_status, app_headers, app_body = @app.call(@env)
+
+ @status = app_status
+ @headers = app_headers
+ @handler = Handlers.find(self)
+
+ # apply before_chunk to any response bodies
+ @callbacks[:after_open].unshift(lambda {chunk(*app_body)})
+
+ # By default, close a connection if no :after_open is specified
+ after_open {close} if @callbacks[:after_open].size == 1
+
+ EM.next_tick {
+ open!
+ }
+ ASYNC_RESPONSE
+ end
+
+ def status=(code)
+ require_state :new
+ @status = code
+ end
+
+ def headers=(hash)
+ require_state :new
+ @headers = hash
+ end
+
+ def chunk(*chunks)
+ require_state :open
+ run_callbacks(:chunk, chunks) {|mutated_chunks|
+ @handler.chunk(*mutated_chunks)
+ }
+ end
+ alias :<< :chunk
+
+ def close(flush = true)
+ require_state :open
+
+ # run in the next tick since it's more natural to call #chunk right
+ # before #close
+ EM.next_tick {
+ run_callbacks(:close) {
+ @state = :closed
+ @handler.close!(flush)
+ }
+ }
+ end
+
+ def new?; @state == :new end
+ def open?; @state == :open end
+ def closed?; @state == :closed end
+ def errored?; @state == :errored end
+
+ private
+ ASYNC_RESPONSE = [-1, {}, []].freeze
+
+ def require_state(*allowed_states)
+ unless allowed_states.include?(@state)
+ action = caller[0]
+ raise StateConstraintError.new("\nCalled\n '#{caller[0]}'\n Allowed :#{allowed_states * ','}\n Current :#{@state}")
+ end
+ end
+
+ # Transition state from :new to :open
+ #
+ # Freezes headers to prevent further modification
+ def open! #(server)
+ raise UnsupportedServerError.new "missing async.callback. run within thin or rainbows" unless @env['async.callback']
+ run_callbacks(:open) {
+ @state = :open
+ @headers.freeze
+ @handler.open!
+ }
+ end
+
+ # Skips any remaining chunks, and immediately closes the connection
+ def error!(e)
+ @env['rack.errors'].puts(e.message)
+ @status = 500 if new?
+ @state = :errored
+ @handler.close!(false)
+ end
+
+ def self.define_callbacks(name, *types)
+ types.each do |type|
+ callback_name = "#{type}_#{name.to_s}"
+ define_method callback_name do |&blk|
+ @callbacks[callback_name.to_sym] << blk
+ self
+ end
+ end
+ end
+ define_callbacks :open, :after
+ define_callbacks :chunk, :before, :after
+ define_callbacks :close, :before, :after
+
+ def run_callbacks(name, *args)
+ modified = @callbacks["before_#{name}".to_sym].inject(args) do |memo, cb|
+ [cb.call(*memo)]
+ end
+ yield(*modified) if block_given?
+ @callbacks["after_#{name}".to_sym].each {|cb| cb.call(*args)}
+ rescue StateConstraintError => e
+ error!(e)
+ end
+ end
+ end
+end
50 lib/rack/stream/deferrable_body.rb
@@ -0,0 +1,50 @@
+module Rack
+ class Stream
+ # From [thin_async](https://github.com/macournoyer/thin_async)
+ class DeferrableBody
+ include EM::Deferrable
+
+ # @param chunks - object that responds to each. holds initial chunks of content
+ def initialize(chunks = [])
+ @queue = []
+ chunks.each {|c| chunk(c)}
+ end
+
+ # Enqueue a chunk of content to be flushed to stream at a later time
+ def chunk(*chunks)
+ @queue += chunks
+ schedule_dequeue
+ end
+
+ # When rack attempts to iterate over `body`, save the block,
+ # and execute at a later time when `@queue` has elements
+ def each(&blk)
+ @body_callback = blk
+ schedule_dequeue
+ end
+
+ def empty?
+ @queue.empty?
+ end
+
+ def close!(flush = true)
+ EM.next_tick {
+ succeed if !flush
+ succeed if flush && empty?
+ schedule_dequeue if flush && !empty?
+ }
+ end
+
+ private
+
+ def schedule_dequeue
+ return unless @body_callback
+ EM.next_tick do
+ next unless c = @queue.shift
+ @body_callback.call(c)
+ schedule_dequeue unless empty?
+ end
+ end
+ end
+ end
+end
18 lib/rack/stream/dsl.rb
@@ -0,0 +1,18 @@
+require 'forwardable'
+
+module Rack
+ class Stream
+ module DSL
+ def self.included(base)
+ base.class_eval do
+ extend Forwardable
+ def_delegators :rack_stream, :after_open, :before_chunk, :chunk, :after_chunk, :before_close, :close, :after_close
+
+ def rack_stream
+ env['rack.stream']
+ end
+ end
+ end
+ end
+ end
+end
98 lib/rack/stream/handlers.rb
@@ -0,0 +1,98 @@
+module Rack
+ class Stream
+ module Handlers
+ def find(app)
+ if Faye::WebSocket.websocket?(app.env)
+ WebSocket.new(app)
+ elsif Faye::EventSource.eventsource?(app.env)
+ EventSource.new(app)
+ else
+ Http.new(app)
+ end
+ end
+ module_function :find
+
+ class AbstractHandler
+ def initialize(app)
+ @app = app
+ end
+
+ def chunk(*chunks)
+ raise NotImplementedError
+ end
+
+ def open!
+ raise NotImplementedError
+ end
+
+ def close!(flush = true)
+ raise NotImplementedError
+ end
+ end
+
+ class Http < AbstractHandler
+ TERM = "\r\n".freeze
+ TAIL = "0#{TERM}#{TERM}".freeze
+
+ def initialize(app)
+ super
+ @app.headers['Transfer-Encoding'] = 'chunked'
+ @app.headers.delete('Content-Length')
+ @body = DeferrableBody.new # swap this out for different body types
+ end
+
+ def chunk(*chunks)
+ @body.chunk(*chunks.map {|c| encode_chunk(c)})
+ end
+
+ def open!
+ @app.env['async.callback'].call [@app.status, @app.headers, @body]
+ end
+
+ def close!(flush = true)
+ @body.chunk(TAIL) # tail is special and already encoded
+ @body.close!(flush)
+ end
+
+ private
+ def encode_chunk(c)
+ return nil if c.nil?
+
+ size = Rack::Utils.bytesize(c) # Rack::File?
+ return nil if size == 0
+ c.dup.force_encoding(Encoding::BINARY) if c.respond_to?(:force_encoding)
+ [size.to_s(16), TERM, c, TERM].join
+ end
+ end
+
+ class WebSocket < AbstractHandler
+ def chunk(*chunks)
+ # this is not called until after #open!, so @ws is always defined
+ chunks.each {|c| @ws.send(c)}
+ end
+
+ def close!(flush = true)
+ @ws.close(@app.status)
+ end
+
+ def open!
+ @ws = Faye::WebSocket.new(@app.env)
+ end
+ end
+
+ class EventSource < WebSocket
+ def chunk(*chunks)
+ chunks.each {|c| @es.send(c)}
+ end
+
+ def close!(flush = true)
+ @es.close
+ end
+
+ def open!
+ @es = Faye::EventSource.new(@app.env)
+ end
+ end
+ end
+ end
+end
24 rack-stream.gemspec
@@ -0,0 +1,24 @@
+$:.push File.expand_path("../lib", __FILE__)
+
+Gem::Specification.new do |s|
+ s.name = "rack-stream"
+ s.version = "0.0.1"
+ s.platform = Gem::Platform::RUBY
+ s.authors = ["Jerry Cheung"]
+ s.email = ["jerry@intridea.com"]
+ s.homepage = "https://github.com/jch/rack-stream"
+ s.summary = %q{Rack middleware for building multi-protocol streaming rack endpoints}
+ s.description = %q{Rack middleware for building multi-protocol streaming rack endpoints}
+ s.license = "BSD"
+
+ s.rubyforge_project = "rack-stream"
+
+ s.add_runtime_dependency 'rack'
+ s.add_runtime_dependency 'eventmachine'
+ s.add_runtime_dependency 'faye-websocket'
+
+ s.files = `git ls-files`.split("\n")
+ s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
+ s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
+ s.require_paths = ["lib"]
+end
32 spec/integration/server.ru
@@ -0,0 +1,32 @@
+require 'rack/stream'
+
+use Rack::Stream
+
+run lambda {|env|
+ env['rack.stream'].instance_eval do
+ after_open do
+ chunk "Chunky", "Monkey"
+ EM.next_tick do
+ chunk "Brownie", "Batter"
+ close
+ end
+ end
+
+ before_chunk do |chunks|
+ chunks.map(&:upcase)
+ end
+
+ after_chunk do
+ # TODO: how to test this
+ end
+
+ before_close do
+ chunk "closing"
+ end
+
+ after_close do
+ # TODO: how to test this in integration?
+ end
+ end
+ [200, {'Content-Type' => 'text/plain'}, ['Hello', ' ', 'World']]
+}
78 spec/integration/server_spec.rb
@@ -0,0 +1,78 @@
+# CI test. see .travis.yml for config variables
+if ENV['SERVER']
+ require 'bundler/setup'
+ require 'rack'
+ require 'rack/stream'
+ require 'rspec'
+
+ require 'capybara/rspec'
+ require 'faraday'
+ require 'em-eventsource'
+ require 'capybara'
+ require 'capybara-webkit'
+
+ describe 'Integration', :integration => true, :type => :request, :driver => :webkit do
+ EXPECTED = 'HELLO WORLDCHUNKYMONKEYBROWNIEBATTERCLOSING'.freeze
+ let(:uri) {URI.parse(ENV['SERVER'])}
+
+ before :all do
+ Capybara.app_host = uri.to_s
+ Capybara.run_server = false
+ end
+
+ describe 'HTTP' do
+ it 'should stream with chunked transfer encoding' do
+ http = Faraday.new uri.to_s
+ 2.times.map do
+ res = http.get '/'
+ res.status.should == 200
+ res.headers['content-type'].should == 'text/plain'
+ res.headers['transfer-encoding'].should == 'chunked'
+ res.body.should == EXPECTED
+ end
+ end
+ end
+
+ describe 'WebSocket' do
+ it 'should stream with websockets' do
+ uri.scheme = 'ws'
+ EM.run {
+ ws = Faye::WebSocket::Client.new(uri.to_s)
+ # ws.onopen = lambda {|e| puts 'opened'}
+ $ws_chunks = []
+ ws.onmessage = lambda {|e| $ws_chunks << e.data}
+ ws.onclose = lambda do |e|
+ EM.stop
+ $ws_chunks.join('').should == EXPECTED
+ $ws_chunks = nil
+ end
+ }
+ end
+ end
+
+ describe 'EventSource' do
+ # em-eventsource needs to send 'Accept' => 'text/event-stream'
+ # not sure if the gem isn't working or if its rack-stream. a web integration spec would be nice
+ # it 'should stream with eventsource' do
+ # @chunks = ""
+ # source = EventMachine::EventSource.new(uri.to_s)
+ # source.message do |message|
+ # puts message
+ # @chunks << message
+ # source.stop if @chunks == EXPECTED
+ # end
+ # source.start
+ # end
+ end
+
+ describe 'Javascript', :type => :request, :driver => :webkit do
+ it 'should work from a js client' do
+ visit '/capybara'
+ # capybara doesn't distinguish between " " and ""
+ all('#ws li').map(&:text).should =~ ["socket opened", "HELLO", "", "WORLD", "CHUNKY", "MONKEY", "BROWNIE", "BATTER", "CLOSING", "socket closed"]
+
+ all('#es li').map(&:text).should =~ ["HELLO", "", "WORLD", "CHUNKY", "MONKEY", "BROWNIE", "BATTER", "CLOSING"]
+ end
+ end
+ end
+end
37 spec/integration/sinatra.ru
@@ -0,0 +1,37 @@
+require 'sinatra/base'
+require 'sinatra/synchrony'
+require 'rack/stream'
+
+use Rack::Stream
+
+class App < Sinatra::Base
+ include Rack::Stream::DSL
+
+ get '/capybara' do
+ erb :index
+ end
+
+ get '/' do
+ after_open do
+ chunk "Chunky", "Monkey"
+ EM.next_tick do
+ chunk "Brownie", "Batter"
+ close
+ end
+ end
+
+ before_chunk do |chunks|
+ chunks.map(&:upcase)
+ end
+
+ before_close do
+ chunk "closing"
+ end
+
+ status 200
+ headers 'Content-Type' => 'text/plain'
+ ['Hello', ' ', 'World']
+ end
+end
+
+run App
36 spec/integration/views/index.erb
@@ -0,0 +1,36 @@
+<html>
+<head></head>
+<body>
+ <h1>Rack::Stream Sinatra Example</h1>
+ <h2>Websocket</h2>
+ <ul id='ws'></ul>
+
+ <h2>EventSource</h2>
+ <ul id='es'></ul>
+
+ <script src="https://ajax.googleapis.com/ajax/libs/jquery/1.7.2/jquery.min.js"></script>
+ <script type='text/javascript'>
+ var socket, source;
+
+ function ws(m) {
+ $('<li>').text(m).appendTo('#ws');
+ }
+
+ function es(m) {
+ $('<li>').text(m).appendTo('#es');
+ }
+
+ $(function() {
+ socket = new WebSocket("ws://localhost:8888/");
+ socket.onopen = function() {ws('socket opened');}
+ socket.onmessage = function(m) {ws(m.data);}
+ socket.onclose = function() {ws('socket closed');}
+
+ source = new EventSource('http://localhost:8888/');
+ source.addEventListener('message', function(e) {
+ es(e.data);
+ });
+ });
+ </script>
+</body>
+</html>
125 spec/lib/rack/stream_spec.rb
@@ -0,0 +1,125 @@
+require 'spec_helper'
+
+describe Rack::Stream do
+ def app
+ b = Rack::Builder.new
+ b.use Support::MockServer
+ b.use Rack::Stream
+ b.run endpoint
+ b
+ end
+
+ shared_examples_for 'invalid action' do
+ it "should raise invalid state" do
+ get '/'
+ last_response.errors.should =~ /Invalid action/
+ last_response.status.should == 500
+ end
+ end
+
+ let(:endpoint) {
+ lambda {|env| [201, {'Content-Type' => 'text/plain', 'Content-Length' => 11}, ["Hello world"]]}
+ }
+
+ before {get '/'}
+
+ context "defaults" do
+ it "should close connection with status" do
+ last_response.status.should == 201
+ end
+
+ it "should set headers" do
+ last_response.headers['Content-Type'].should == 'text/plain'
+ end
+
+ it "should not error" do
+ last_response.errors.should == ""
+ end
+
+ it "should remove Content-Length header" do
+ last_response.headers['Content-Length'].should be_nil
+ end
+
+ it "should use chunked transfer encoding" do
+ last_response.headers['Transfer-Encoding'].should == 'chunked'
+ end
+ end
+
+ context "basic streaming" do
+ let(:endpoint) {
+ lambda {|env|
+ env['rack.stream'].instance_eval do
+ after_open do
+ chunk "Chunky "
+ chunk "Monkey"
+ close
+ end
+ end
+ [200, {'Content-Length' => 0}, ['']]
+ }
+ }
+
+ it "should stream and close" do
+ last_response.status.should == 200
+ # last_response.body.should == "Chunky Monkey"
+ last_response.body.should == "7\r\nChunky \r\n6\r\nMonkey\r\n0\r\n\r\n"
+ end
+ end
+
+ context "before chunk" do
+ let(:endpoint) {
+ lambda {|env|
+ env['rack.stream'].instance_eval do
+ after_open do
+ chunk "Chunky", "Monkey"
+ close
+ end
+
+ before_chunk {|chunks| chunks.map(&:upcase)}
+ before_chunk {|chunks| chunks.reverse}
+ end
+ [200, {}, []]
+ }
+ }
+
+ it 'should allow modification of queued chunks' do
+ last_response.body.should == "6\r\nMONKEY\r\n6\r\nCHUNKY\r\n0\r\n\r\n"
+ end
+ end
+
+ context "before close" do
+ let(:endpoint) {
+ lambda {|env|
+ env['rack.stream'].instance_eval do
+ before_close do
+ chunk "Chunky "
+ chunk "Monkey"
+ end
+ end
+ [200, {}, []]
+ }
+ }
+
+ it "should stream and close" do
+ last_response.body.should == "7\r\nChunky \r\n6\r\nMonkey\r\n0\r\n\r\n"
+ end
+ end
+
+ context "after close" do
+ let(:endpoint) {
+ lambda {|env|
+ env['rack.stream'].instance_eval do
+ after_close do
+ $after_close_called = true
+ end
+ end
+ [200, {}, []]
+ }
+ }
+
+ it "should allow cleanup" do
+ $after_close_called.should be_true
+ $after_close_called = nil
+ end
+ end
+end
35 spec/spec_helper.rb
@@ -0,0 +1,35 @@
+require 'bundler/setup'
+require 'rack'
+require 'rack/stream'
+require 'rack/test'
+require 'rspec'
+
+Dir[File.expand_path('../support/**/*.rb', __FILE__)].each {|f| require f}
+
+# Used by tests to untangle evented code, but not required for use w/ lib
+require 'fiber'
+require 'timeout'
+
+# TODO: swap this with em-spec or something else
+# Patch rspec to run examples in a reactor
+# based on em-rspec, but with synchrony pattern and does not auto stop the reactor
+RSpec::Core::Example.class_eval do
+ alias ignorant_run run
+
+ def run(example_group_instance, reporter)
+ EM.run do
+ Fiber.new do
+ EM.add_timer(2) {
+ raise Timeout::Error.new("aborting test due to timeout")
+ EM.stop
+ }
+ @ignorant_success = ignorant_run example_group_instance, reporter
+ end.resume
+ end
+ @ignorant_success
+ end
+end
+
+RSpec.configure do |c|
+ c.include Rack::Test::Methods
+end
35 spec/support/mock_server.rb
@@ -0,0 +1,35 @@
+module Support
+ class MockServer
+ class Callback
+ attr_reader :status, :headers, :body
+
+ def initialize(&blk)
+ @succeed_callback = blk
+ end
+
+ def call(args)
+ @status, @headers, deferred_body = args
+ @body = []
+ deferred_body.each do |s|
+ @body << s
+ end
+ deferred_body.callback {@succeed_callback.call}
+ deferred_body.callback {EM.stop}
+ end
+ end
+
+ def initialize(app)
+ @app = app
+ end
+
+ def call(env)
+ f = Fiber.current
+ callback = Callback.new do
+ f.resume [callback.status, callback.headers, callback.body]
+ end
+ env['async.callback'] = callback
+ @app.call(env)
+ Fiber.yield # wait until deferred body is succeeded
+ end
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.