Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
rkh committed Jul 7, 2011
0 parents commit 89302a9
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 0 deletions.
25 changes: 25 additions & 0 deletions README.md
@@ -0,0 +1,25 @@
# Rack::AsyncStream

Ever tried to use streaming with Thin? Didn't work? Fear not! Just use this
middleware!

Works with Ruby 1.8.7, 1.9, JRuby, Rubinius, any Rails version since 2.3, any
version of Sinatra, your stand-alone Rack app and probably a lot more libraries,
frameworks and Ruby implementations.

## Usage

``` ruby
# config.ru
class SlowStream
def each
100.times do |i|
yield "We're at #{i}\n"
sleep 0.5
end
end
end

use Rack::AsyncStream
run proc { [200, {'Content-Type' => 'text/plain'}, SlowStream.new] }
```
33 changes: 33 additions & 0 deletions config.ru
@@ -0,0 +1,33 @@
$LOAD_PATH.unshift 'lib'
require 'rack/async_stream'

app = proc do
body = Object.new
def body.each
10.times do |i|
yield "Number #{i}\n"
sleep 0.3
end
end
[200, {'Content-Type' => 'text/plain'}, body]
end

map '/fiber' do
use Rack::AsyncStream, :logging => true, :stream => Rack::AsyncStream::FiberStream
run app
end

map '/callcc' do
use Rack::AsyncStream, :logging => true, :stream => Rack::AsyncStream::ContinuationStream
run app
end

map '/thread' do
use Rack::AsyncStream, :logging => true, :stream => Rack::AsyncStream::ThreadStream
run app
end

map '/' do
use Rack::AsyncStream, :logging => true
run app
end
120 changes: 120 additions & 0 deletions lib/rack/async_stream.rb
@@ -0,0 +1,120 @@
require 'stringio'
require 'eventmachine'

module Rack
class AsyncStream
class Stream
include EM::Deferrable

def self.available
@classes ||= [FiberStream, ContinuationStream, ThreadStream]
end

def self.available?
@available ||= check
end

def self.try_require(lib)
require lib
rescue LoadError
end

def initialize(body)
raise "#{self.class} not available" unless self.class.available?
@body = body
end

def respond_to?(*args)
super or @body.respond_to?(*args)
end

def method_missing(method, *args, &block)
return super unless @body.respond_to? method
@body.send(method, *args, &block)
end
end

class FiberStream < Stream
def self.check
try_require 'fiber'
include Rubinius if defined? Rubinius and not defined? Fiber
defined? Fiber
end

def each(&block)
fiber = Fiber.new do
@body.each do |str|
block.call(str)
EM.next_tick { fiber.resume }
Fiber.yield
end
succeed
end
fiber.resume
end
end

class ContinuationStream < Stream
def self.check
try_require 'continuation'
defined? callcc
end

def each(&block)
callcc do |outer|
@body.each do |str|
block.call(str)
callcc do |inner|
EM.next_tick { inner.call }
outer.call
end
end
succeed
end
end
end

class ThreadStream < Stream
def self.check
require 'thread'
true
end

def each(&block)
EM.defer do
@body.each { |str| EM.next_tick { block.call(str) } }
EM.next_tick { succeed }
end
end
end

def initialize(app, options = {})
@app = app
@dont_wrap = options[:dont_wrap] || [Array, String, IO, File, StringIO]
@stream = options[:stream] || Stream.available.detect(&:available?)
@response = options[:response] || :async
@logging = options[:logging]
end

def log(env, message)
return unless @logging
if logger = env['rack.logger']
logger.info(message)
else
env['rack.errors'].puts message
end
end

def call(env)
status, headers, body = @app.call(env)
if status < 0 or @dont_wrap.include? body.class or !env['async.callback']
[status, headers, body]
else
log env, "Wrapping %p in %p" % [body, @stream]
stream = @stream.new body
EM.next_tick { env['async.callback'].call [status, headers, stream] }
Symbol === @response ? throw(@response) : @response
end
end
end
end
5 changes: 5 additions & 0 deletions spec/async_stream_spec.rb
@@ -0,0 +1,5 @@
require 'rack/async_stream'

describe Rack::AsyncStream do
it 'should be able to stream with Thin'
end

0 comments on commit 89302a9

Please sign in to comment.