Skip to content

Commit

Permalink
rename: src -> ext. also lots of refactoring in ebb.rb for async
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed Aug 9, 2008
1 parent 539b81d commit ce2e110
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 84 deletions.
20 changes: 10 additions & 10 deletions Rakefile
Expand Up @@ -18,11 +18,11 @@ libev_url = File.join(libev_dist, libev_release)
LIBEBBFILES = ['ebb.c', 'ebb.h',
'ebb_request_parser.rl', 'ebb_request_parser.c', 'ebb_request_parser.h',
'rbtree.c', 'rbtree.h']
SRCEBBFILES = LIBEBBFILES.map { |f| "src" / f }
SRCEBBFILES = LIBEBBFILES.map { |f| "ext" / f }

DISTFILES = FileList.new('libev', 'lib/**/*.rb', 'src/*.{rb,rl,c,h}', 'bin/*', 'README', 'Rakefile') + SRCEBBFILES
DISTFILES = FileList.new('libev', 'lib/**/*.rb', 'ext/*.{rb,rl,c,h}', 'bin/*', 'README', 'Rakefile') + SRCEBBFILES
CLEAN.add ["**/*.{o,bundle,so,obj,pdb,lib,def,exp}", "benchmark/*.dump", 'site/index.html']
CLOBBER.add ['src/Makefile', 'src/mkmf.log'] + SRCEBBFILES
CLOBBER.add ['ext/Makefile', 'ext/mkmf.log'] + SRCEBBFILES

Rake::TestTask.new do |t|
t.test_files = FileList.new("test/*.rb")
Expand All @@ -31,15 +31,15 @@ end

LIBEBBFILES.each do |f|
file(".libebb"/f => ".libebb")
file("src"/f => ".libebb"/f) do |t|
sh "cp .libebb/#{f} src/#{f}"
file("ext"/f => ".libebb"/f) do |t|
sh "cp .libebb/#{f} ext/#{f}"
end
end

task(:default => [:compile])

task(:compile => ['src/Makefile','libev'] + SRCEBBFILES) do
sh "cd src && make"
task(:compile => ['ext/Makefile','libev'] + SRCEBBFILES) do
sh "cd ext && make"
end

file "libev" do
Expand All @@ -58,8 +58,8 @@ file ".libebb" do
sh "git clone git://github.com/ry/libebb.git .libebb"
end

file('src/Makefile' => 'src/extconf.rb') do
sh "cd src && ruby extconf.rb"
file('ext/Makefile' => 'ext/extconf.rb') do
sh "cd ext && ruby extconf.rb"
end

file(".libebb/ebb_request_parser.c" => '.libebb/ebb_request_parser.rl') do
Expand Down Expand Up @@ -114,7 +114,7 @@ spec = Gem::Specification.new do |s|
s.required_ruby_version = '>= 1.8.4'

s.require_path = 'ruby_lib'
s.extensions = 'src/extconf.rb'
s.extensions = 'ext/extconf.rb'
s.bindir = 'bin'
s.executables = %w(ebb_rails)

Expand Down
File renamed without changes.
File renamed without changes.
196 changes: 122 additions & 74 deletions lib/ebb.rb
Expand Up @@ -6,7 +6,7 @@ module Ebb
VERSION = "0.3.0"
LIBDIR = File.dirname(__FILE__)
autoload :Runner, LIBDIR + '/ebb/runner'
autoload :FFI, LIBDIR + '/../src/ebb_ffi'
autoload :FFI, LIBDIR + '/../ext/ebb_ffi'

def self.running?
FFI::server_open?
Expand All @@ -33,22 +33,22 @@ def self.start_server(app, options={})

if options.has_key?(:ssl_cert) and options.has_key?(:ssl_key)
unless FFI.respond_to?(:server_set_secure)
puts "ebb compiled without ssl support. get gnutls"
log.puts "ebb compiled without ssl support. get gnutls"
else
cert_file = options[:ssl_cert]
key_file = options[:ssl_key]
if FileTest.readable?(cert_file) and FileTest.readable?(cert_file)
FFI::server_set_secure(cert_file, key_file);
else
puts "error opening certificate or key file"
log.puts "error opening certificate or key file"
end
end
end

log.puts "Ebb PID #{Process.pid}"

@running = true
Connection.reset_write_queues
Connection.reset_responses
trap('INT') { stop_server }

while @running
Expand All @@ -65,97 +65,145 @@ def self.start_server(app, options={})
end

def self.process(app, req)
res = req.response
req.connection.responses << res

# p req.env
status, headers, body = app.call(req.env)
res = Response.new(status, headers, body)
res.last = !req.keep_alive?

# FIXME
unless body.respond_to?(:shift)
if body.kind_of?(String)
body = [body]
else
b = []
body.each { |chunk| b << chunk }
body = b
end
status = headers = body = nil
catch(:async) do
status, headers, body = app.call(req.env)
end

# TODO chunk encode the response if have chunked encoding
# James Tucker's async response scheme
# check out
# http://github.com/raggi/thin/tree/async_for_rack/example/async_app.ru
res.call(status, headers, body) if status != 0
# if status == 0 then the application promises to call
# env['async.callback'].call(status, headers, body)
# later on...

response_queue = Connection.write_queues[req.connection]
response_queue << res
req.connection.start_writing if response_queue.length == 1
end

class Connection
def self.reset_write_queues
@@write_queues = {}
def self.reset_responses
@@responses = {} # used for memory management :|
end

def self.write_queues
@@write_queues
def self.responses
@@responses
end

def start_writing
res = queue.first
FFI::connection_write(self, res.chunk)
end

def queue
@@write_queues[self]
def responses
@@responses[self]
end

# called from c
def append_request(req)
@requests.push req
end

def on_open
@requests = []
@@write_queues[self] = []
@@responses[self] = []
end

def on_close
# garbage collection !
@requests.each { |req| req.connection = nil }
@@write_queues.delete(self)
responses.each { |res| res.connection = nil }
@@responses.delete(self)
end

def writing?
! @being_written.nil?
end

def write
return if writing?
return unless res = responses.first
return if res.output.empty?
# NOTE: connection_write does not buffer!
chunk = res.output.shift
@being_written = chunk # need to store this so ruby doesn't gc it
FFI::connection_write(self, chunk)
end

# called after FFI::connection_write if complete
def on_writable
if queue.empty?
@@write_queues.delete(self)
else
res = queue.first
if chunk = res.shift
FFI::connection_write(self, chunk)
else
if res.last
FFI::connection_schedule_close(self)
@@write_queues.delete(self)
else
queue.shift # write nothing. we'll get'em next time
end
@being_written = nil
return unless res = responses.first
if res.finished?
responses.shift
if res.last
FFI::connection_schedule_close(self)
return
end
end
end
write
end
end

class Response
attr_reader :chunk
attr_accessor :last
def initialize(status, headers, body)
@body = body
@chunk = "HTTP/1.1 #{status} #{HTTP_STATUS_CODES[status.to_i]}\r\n"
headers.each { |field, value| @chunk << "#{field}: #{value}\r\n" }
@chunk << "\r\n#{@body.shift}"
@last = false
attr_reader :output
attr_accessor :last, :connection
def initialize(connection, last)
@connection = connection
@last = last
@output = []
@finished = false
@chunked = false
end

def call(status, headers, body)
@head = "HTTP/1.1 #{status} #{HTTP_STATUS_CODES[status.to_i]}\r\n"
headers.each { |field, value| @head << "#{field}: #{value}\r\n" }
@head << "\r\n"

# XXX i would prefer to do
# @chunked = true unless body.respond_to?(:length)
@chunked = true if headers["Transfer-Encoding"] == "chunked"
# I also don't like this
@last = true if headers["Connection"] == "close"

# Note: not setting Content-Length. do it yourself.

body.each do |chunk|
if @head.nil?
write(chunk)
else
write(@head + chunk)
@head = nil
end
@connection.write
end

body.on_error { close } if body.respond_to?(:on_error)

if body.respond_to?(:on_eof)
body.on_eof { finish }
else
finish
end

# deferred requests SHOULD NOT respond to close
body.close if body.respond_to?(:close)
end

# if returns nil, there is nothing else to write
# otherwise returns then next chunk needed to write.
# on writable call connection.write(response.shift)
def shift
@chunk = @body.shift
def finished?
@output.empty? and @finished
end

def finish
@finished = true
if @chunked
write("")
@connection.write
end
end

def write(chunk)
encoded = @chunked ? "#{chunk.length.to_s(16)}\r\n#{chunk}\r\n" : chunk
@output << encoded
end

HTTP_STATUS_CODES = {
Expand Down Expand Up @@ -223,31 +271,31 @@ class Request
'rack.multiprocess' => false,
'rack.run_once' => false
}

def keep_alive?
FFI::request_should_keep_alive?(self)
end

def env
@env ||= begin
env = @env_ffi.update(BASE_ENV)
env['CONTENT_LENGTH'] = env['HTTP_CONTENT_LENGTH']
env = BASE_ENV.merge(@env_ffi)
env['rack.input'] = self
env['CONTENT_LENGTH'] = env['HTTP_CONTENT_LENGTH']
env['async.callback'] = response
env
end
end

def read(want = 1024)
FFI::request_read(self, want)
def keep_alive?
FFI::request_should_keep_alive?(self)
end

def should_keep_alive?
if env['HTTP_VERSION'] == 'HTTP/1.0'
return env['HTTP_CONNECTION'] =~ /Keep-Alive/i
else
return env['HTTP_CONNECTION'] !~ /close/i
def response
@response ||= begin
last = !keep_alive? # this is the last response if the request isnt keep-alive
Response.new(@connection, last)
end
end

def read(want = 1024)
FFI::request_read(self, want)
end
end
end

Expand Down

0 comments on commit ce2e110

Please sign in to comment.