Skip to content

Commit

Permalink
Merge pull request #1 from lookout/wip/http
Browse files Browse the repository at this point in the history
Add support for HTTP transport (connector and Rack server)
  • Loading branch information
R. Tyler Croy committed Jun 6, 2014
2 parents 3e87a6c + 8486b9a commit a75c661
Show file tree
Hide file tree
Showing 13 changed files with 423 additions and 14 deletions.
2 changes: 1 addition & 1 deletion lib/protobuf.rb
Expand Up @@ -12,7 +12,7 @@
module Protobuf

# See Protobuf#connector_type documentation.
CONNECTORS = [ :socket, :zmq ].freeze
CONNECTORS = [ :socket, :zmq, :http ].freeze

# Default is Socket as it has no external dependencies.
DEFAULT_CONNECTOR = :socket
Expand Down
14 changes: 14 additions & 0 deletions lib/protobuf/cli.rb
Expand Up @@ -3,6 +3,7 @@
require 'protobuf/logger'
require 'protobuf/rpc/servers/socket_runner'
require 'protobuf/rpc/servers/zmq_runner'
require 'protobuf/rpc/servers/http_runner'

module Protobuf
class CLI < ::Thor
Expand All @@ -26,6 +27,7 @@ class CLI < ::Thor

option :socket, :type => :boolean, :aliases => %w(-s), :desc => 'Socket Mode for server and client connections.'
option :zmq, :type => :boolean, :aliases => %w(-z), :desc => 'ZeroMQ Socket Mode for server and client connections.'
option :http, :type => :boolean, :aliases => %w(), :desc => 'HTTP Server Mode for server and client connections.'

option :beacon_interval, :type => :numeric, :desc => 'Broadcast beacons every N seconds. (default: 5)'
option :beacon_port, :type => :numeric, :desc => 'Broadcast beacons to this port (default: value of ServiceDirectory.port)'
Expand Down Expand Up @@ -109,12 +111,16 @@ def configure_runner_mode
@runner_mode = :socket
elsif options.zmq?
@runner_mode = :zmq
elsif options.http?
@runner_mode = :http
else
case server_type = ENV["PB_SERVER_TYPE"]
when nil, /socket/i
@runner_mode = :socket
when /zmq/i
@runner_mode = :zmq
when /http/i
@runner_mode = :http
else
say "WARNING: You have provided incorrect option 'PB_SERVER_TYPE=#{server_type}'. Defaulting to socket mode.", :yellow
@runner_mode = :socket
Expand Down Expand Up @@ -148,6 +154,8 @@ def create_runner
create_zmq_runner
when :socket
create_socket_runner
when :http
create_http_runner
else
say_and_exit("Unknown runner mode: #{@runner_mode}")
end
Expand Down Expand Up @@ -211,6 +219,12 @@ def create_zmq_runner
@runner = ::Protobuf::Rpc::ZmqRunner.new(runner_options)
end

def create_http_runner
require 'protobuf/http'

@runner = ::Protobuf::Rpc::HttpRunner.new(runner_options)
end

def shutdown_server
::Protobuf::Logger.info { 'RPC Server shutting down...' }
@runner.try(:stop)
Expand Down
20 changes: 20 additions & 0 deletions lib/protobuf/http.rb
@@ -0,0 +1,20 @@
##
## HTTP Mode
##
#
# Require this file if you wish to run your server and/or client RPC
# with the HTTP handlers.
#
# To run with rpc_server specify the switch `http`:
#
# rpc_server --http myapp.rb
#
# To run for client-side only override the require in your Gemfile:
#
# gem 'protobuf', :require => 'protobuf/http'
#
require 'protobuf'
Protobuf.connector_type = :http

require 'protobuf/rpc/servers/http/server'
require 'protobuf/rpc/connectors/http'
2 changes: 2 additions & 0 deletions lib/protobuf/rpc/connector.rb
Expand Up @@ -9,6 +9,8 @@ def self.connector_for_client
case ::Protobuf.connector_type
when :zmq then
::Protobuf::Rpc::Connectors::Zmq
when :http then
::Protobuf::Rpc::Connectors::Http
else
::Protobuf::Rpc::Connectors::Socket
end
Expand Down
86 changes: 86 additions & 0 deletions lib/protobuf/rpc/connectors/http.rb
@@ -0,0 +1,86 @@
require 'protobuf/rpc/connectors/base'
require 'cgi'
require 'faraday'

module Protobuf
module Rpc
module Connectors
class Http < Base
include Protobuf::Rpc::Connectors::Common
include Protobuf::Logger::LogMethods

def send_request
timeout_wrap do
setup_connection
post_init
end
end

def log_signature
@_log_signature ||= "[http-client-#{self.class.name}]"
end

# private

def close_connection
log_debug { sign_message('Connector closed') }
end

# Method to determine error state, must be used with Connector api
def error?
log_debug { sign_message("Error state : #{@error}") }
if @error
true
else
false
end
end

def host
'http://' + options[:host] + ':' + options[:port].to_s
end

def client
@_client ||= Faraday.new(:url => host)
end

def send_data
rpc_request = ::Protobuf::Socketrpc::Request.decode(@request_data)

http_response = client.post do |http_request|
path = '/' + CGI.escape(rpc_request[:service_name]) + '/' + CGI.escape(rpc_request[:method_name])
http_request.url path
http_request.headers['Content-Type'] = 'application/x-protobuf'
http_request.headers['X-Protobuf-Caller'] = rpc_request[:caller] || ''
http_request.body = rpc_request[:request_proto]
end

# Server returns protobuf response with no error
if http_response.status == 200 and http_response.headers['x-protobuf-error'].nil?
rpc_response = Protobuf::Socketrpc::Response.new(
:response_proto => http_response.body
)
# Server returns protobuf error
elsif http_response.status != 200 and not http_response.headers['x-protobuf-error'].nil?
rpc_response = Protobuf::Socketrpc::Response.new(
:response_proto => http_response.body,
:error => http_response.headers['x-protobuf-error'],
:error_reason => http_response.headers['x-protobuf-error-reason'].to_i
)
# Server didn't return a response or error
else
rpc_response = Protobuf::Socketrpc::Response.new(
:response_proto => http_response.body,
:error => "Bad response from the server.",
:error_reason => Protobuf::Socketrpc::ErrorReason::BAD_RESPONSE_PROTO
)
end

@response_data = rpc_response.encode()

parse_response
end
end
end
end
end
90 changes: 90 additions & 0 deletions lib/protobuf/rpc/servers/http/server.rb
@@ -0,0 +1,90 @@
require 'protobuf/rpc/server'

require 'rack'
require 'rack/server'

module Protobuf
module Rpc
module Http
class Server
include ::Protobuf::Rpc::Server
include ::Protobuf::Logger::LogMethods

# TODO: more comprehensive mapping?
HTTP_STATUSES = {
Protobuf::Socketrpc::ErrorReason::BAD_REQUEST_DATA => 400,
Protobuf::Socketrpc::ErrorReason::BAD_REQUEST_PROTO => 400,
Protobuf::Socketrpc::ErrorReason::SERVICE_NOT_FOUND => 404,
Protobuf::Socketrpc::ErrorReason::METHOD_NOT_FOUND => 404,
Protobuf::Socketrpc::ErrorReason::RPC_ERROR => 500,
Protobuf::Socketrpc::ErrorReason::RPC_FAILED => 500,
Protobuf::Socketrpc::ErrorReason::INVALID_REQUEST_PROTO => 500,
Protobuf::Socketrpc::ErrorReason::BAD_RESPONSE_PROTO => 500,
Protobuf::Socketrpc::ErrorReason::UNKNOWN_HOST => 500,
Protobuf::Socketrpc::ErrorReason::IO_ERROR => 500
}

def initialize(options = {})
@options = options
end

def log_signature
@_log_signature ||= "[http-server-#{self.class.name}]"
end

def call(env)
path_components = env['PATH_INFO'].split("/").map{ |x| CGI::unescape(x) }.compact.reject{ |x| x.empty? }
if path_components.length != 2
headers = {
'Content-Type' => 'application/x-protobuf',
'X-Protobuf-Error' => "Expected 2 path components, got #{path_components.length.to_s}",
'X-Protobuf-Error-Reason' => Protobuf::Socketrpc::ErrorReason::INVALID_REQUEST_PROTO.to_s
}
return [400, headers, []]
end

rpc_request = ::Protobuf::Socketrpc::Request.new(
:service_name => path_components[0],
:method_name => path_components[1],
:request_proto => env['rack.input'].read,
:caller => env['HTTP_X_PROTOBUF_CALLER'] || ''
)

encoded_request = rpc_request.encode()
encoded_response = handle_request(encoded_request)

rpc_response = Protobuf::Socketrpc::Response.decode(encoded_response)

status = 200
headers = {
'Content-Type' => 'application/x-protobuf',
}

if rpc_response[:error].length > 0
headers['X-Protobuf-Error'] = rpc_response[:error]
headers['X-Protobuf-Error-Reason'] = rpc_response[:error_reason].to_s
status = HTTP_STATUSES[rpc_response[:error_reason]] or 500
end

[status, headers, [rpc_response['response_proto']]]
end

def run
Rack::Server.start(
:app => self,
:Host => @options[:host],
:Port => @options[:port]
)
@running = true
end

def running?
!!@running
end

def stop
end
end
end
end
end
34 changes: 34 additions & 0 deletions lib/protobuf/rpc/servers/http_runner.rb
@@ -0,0 +1,34 @@
module Protobuf
module Rpc
class HttpRunner

def initialize(options)
@options = case
when options.is_a?(OpenStruct) then
options.marshal_dump
when options.is_a?(Hash) then
options
when options.respond_to?(:to_hash) then
options.to_hash
else
raise "Cannot parse HTTP Server - server options"
end

@server = ::Protobuf::Rpc::Http::Server.new(@options)
end

def run
yield if block_given?
@server.run
end

def running?
@server.running?
end

def stop
@server.stop
end
end
end
end
4 changes: 3 additions & 1 deletion protobuf.gemspec
Expand Up @@ -24,10 +24,12 @@ require "protobuf/version"
s.add_dependency 'multi_json'
s.add_dependency 'thor'

s.add_development_dependency 'rack'
s.add_development_dependency 'faraday'
s.add_development_dependency 'ffi-rzmq'
s.add_development_dependency 'pry-nav'
s.add_development_dependency 'rake'
s.add_development_dependency 'rspec'
s.add_development_dependency 'rspec', '2.99.0'
s.add_development_dependency 'simplecov'
s.add_development_dependency 'yard'
s.add_development_dependency 'timecop'
Expand Down
13 changes: 13 additions & 0 deletions spec/lib/protobuf/rpc/connectors/connector_spec.rb
@@ -0,0 +1,13 @@
require 'spec_helper'

shared_examples "a Protobuf Connector" do
subject{ described_class.new({}) }

context "API" do
# Check the API
specify{ subject.respond_to?(:send_request, true).should be_true }
specify{ subject.respond_to?(:post_init, true).should be_true }
specify{ subject.respond_to?(:close_connection, true).should be_true }
specify{ subject.respond_to?(:error?, true).should be_true }
end
end
52 changes: 52 additions & 0 deletions spec/lib/protobuf/rpc/connectors/http_spec.rb
@@ -0,0 +1,52 @@
require 'spec_helper'
require 'protobuf/http'
require 'faraday'

describe ::Protobuf::Rpc::Connectors::Http do
subject { described_class.new({}) }

it_behaves_like "a Protobuf Connector"

specify{ described_class.include?(Protobuf::Rpc::Connectors::Common).should be_true }

let(:client_double) do
Faraday.new do |builder|
builder.adapter :test do |stub|
stub.post("/Foo%3A%3AUserService/find") {[ 200, {}, "\n\n\n\x03foo\x12\x03bar" ]}
stub.post("/Foo%3A%3AUserService/foo1") {[ 404, {
'x-protobuf-error' => "Foo::UserService#foo1 is not a defined RPC method.",
'x-protobuf-error-reason' => Protobuf::Socketrpc::ErrorReason::METHOD_NOT_FOUND.to_s
}, "" ]}
stub.post("/Foo%3A%3AUserService/foo2") {[ 500, {}, "" ]}
end
end
end

describe "#send_data" do
before do
subject.stub(:client) { client_double }
subject.stub(:parse_response) {}
end

it "handles RPC success correctly" do
subject.stub(:request_bytes) { "\n\x10Foo::UserService\x12\x04find\x1A\r\n\vfoo@bar.com\"\rabcdefghijklm" }
subject.send(:setup_connection)
subject.send(:send_data)
subject.instance_variable_get(:@response_data).should eq "\n\f\n\n\n\x03foo\x12\x03bar"
end

it "handles RPC error correctly" do
subject.stub(:request_bytes) { "\n\x10Foo::UserService\x12\x04foo1\x1A\r\n\vfoo@bar.com\"\rabcdefghijklm" }
subject.send(:setup_connection)
subject.send(:send_data)
subject.instance_variable_get(:@response_data).should eq "\n\x00\x122Foo::UserService#foo1 is not a defined RPC method. \x03"
end

it "handles server error correctly" do
subject.stub(:request_bytes) { "\n\x10Foo::UserService\x12\x04foo2\x1A\r\n\vfoo@bar.com\"\rabcdefghijklm" }
subject.send(:setup_connection)
subject.send(:send_data)
subject.instance_variable_get(:@response_data).should eq "\n\x00\x12\x1DBad response from the server. \x07"
end
end
end

0 comments on commit a75c661

Please sign in to comment.