Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

add EventMachine adapter

  • Loading branch information...
commit b04706456862866960b6a4c2ece5ee9ae978d509 1 parent 4878051
@mislav mislav authored
Showing with 190 additions and 0 deletions.
  1. +2 −0  lib/faraday/adapter.rb
  2. +188 −0 lib/faraday/adapter/em_http.rb
View
2  lib/faraday/adapter.rb
@@ -10,6 +10,7 @@ class Adapter < Middleware
:NetHttp => 'net_http',
:Typhoeus => 'typhoeus',
:EMSynchrony => 'em_synchrony',
+ :EMHttp => 'em_http',
:Patron => 'patron',
:Excon => 'excon',
:Test => 'test'
@@ -21,6 +22,7 @@ class Adapter < Middleware
:typhoeus => :Typhoeus,
:patron => :Patron,
:em_synchrony => :EMSynchrony,
+ :em_http => :EMHttp,
:excon => :Excon
module Parallelism
View
188 lib/faraday/adapter/em_http.rb
@@ -0,0 +1,188 @@
+module Faraday
+ class Adapter
+ # EventMachine adapter is useful for either asynchronous requests
+ # when in EM reactor loop or for making parallel requests in
+ # synchronous code.
+ class EMHttp < Faraday::Adapter
+
+ dependency 'em-http'
+
+ self.supports_parallel = true
+
+ def self.setup_parallel_manager(options = nil)
+ Manager.new
+ end
+
+ def call(env)
+ super
+ perform_request env
+ @app.call env
+ end
+
+ def perform_request(env)
+ if parallel?(env)
+ manager = env[:parallel_manager]
+ manager.add {
+ perform_single_request(env).
+ callback { env[:response].finish(env) }
+ }
+ else
+ unless EventMachine.reactor_running?
+ error = nil
+ # start EM, block until request is completed
+ EventMachine.run do
+ perform_single_request(env).
+ callback { EventMachine.stop }.
+ errback { |client|
+ error = client.error || "connection failed"
+ EventMachine.stop
+ }
+ end
+ raise Faraday::Error::ClientError, error if error
+ else
+ # EM is running: instruct upstream that this is an async request
+ env[:parallel_manager] = true
+ perform_single_request(env).
+ callback { env[:response].finish(env) }.
+ errback {
+ # TODO: no way to communicate the error in async mode
+ raise NotImplementedError
+ }
+ end
+ end
+ end
+
+ # TODO: reuse the connection to support pipelining
+ def perform_single_request(env)
+ req = EventMachine::HttpRequest.new(env[:url], connection_config(env))
+ req.setup_request(env[:method], request_config(env)).callback { |client|
+ save_response(env, client.response_header.status, client.response) do |resp_headers|
+ client.response_header.each do |name, value|
+ resp_headers[name.to_sym] = value
+ end
+ end
+ }
+ end
+
+ def connection_config(env)
+ options = {}
+ configure_ssl(options, env)
+ configure_proxy(options, env)
+ configure_timeout(options, env)
+ options
+ end
+
+ def request_config(env)
+ options = {
+ :body => read_body(env),
+ :head => env[:request_headers],
+ # :keepalive => true,
+ # :file => 'path/to/file', # stream data off disk
+ }
+ configure_compression(options, env)
+ # configure_proxy_auth
+ # :proxy => {:authorization => [user, pass]}
+ # proxy[:username] && proxy[:password]
+ options
+ end
+
+ def read_body(env)
+ body = env[:body]
+ body.respond_to?(:read) ? body.read : body
+ end
+
+ def configure_ssl(options, env)
+ if ssl = env[:ssl]
+ # :ssl => {
+ # :private_key_file => '/tmp/server.key',
+ # :cert_chain_file => '/tmp/server.crt',
+ # :verify_peer => false
+ end
+ end
+
+ def configure_proxy(options, env)
+ if proxy = request_options(env)[:proxy]
+ options[:proxy] = {
+ :host => proxy[:uri].host,
+ :port => proxy[:uri].port
+ }
+ end
+ end
+
+ def configure_timeout(options, env)
+ timeout, open_timeout = request_options(env).values_at(:timeout, :open_timeout)
+ options[:connect_timeout] = options[:inactivity_timeout] = timeout
+ options[:connect_timeout] = open_timeout if open_timeout
+ end
+
+ def configure_compression(options, env)
+ if env[:method] == :get and not options[:head].key? 'accept-encoding'
+ options[:head]['accept-encoding'] = 'gzip, compressed'
+ end
+ end
+
+ def request_options(env)
+ env[:request]
+ end
+
+ def parallel?(env)
+ !!env[:parallel_manager]
+ end
+
+ # The parallel manager is designed to start an EventMachine loop
+ # and block until all registered requests have been completed.
+ class Manager
+ def initialize
+ reset
+ end
+
+ def reset
+ @registered_procs = []
+ @num_registered = 0
+ @num_succeeded = 0
+ @errors = []
+ @running = false
+ end
+
+ def running?() @running end
+
+ def add
+ if running?
+ perform_request { yield }
+ else
+ @registered_procs << Proc.new
+ end
+ @num_registered += 1
+ end
+
+ def run
+ if @num_registered > 0
+ @running = true
+ EventMachine.run do
+ @registered_procs.each do |proc|
+ perform_request(&proc)
+ end
+ end
+ if @errors.size > 0
+ raise Faraday::Error::ClientError, @errors.first || "connection failed"
+ end
+ end
+ ensure
+ reset
+ end
+
+ def perform_request
+ client = yield
+ client.callback { @num_succeeded += 1; check_finished }
+ client.errback { @errors << client.error; check_finished }
+ end
+
+ def check_finished
+ if @num_succeeded + @errors.size == @num_registered
+ EventMachine.stop
+ end
+ end
+ end
+ end
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.