Permalink
Browse files

Implement redis backend.

  • Loading branch information...
1 parent deca815 commit 974e582f7133fbf96c4cb2afe5a6bdc6fc17ea94 @neglectedvalue committed Feb 7, 2013
Showing with 302 additions and 290 deletions.
  1. +52 −7 lib/yup.rb
  2. +0 −128 lib/yup/request_forwarder.rb
  3. +100 −49 lib/yup/state.rb
  4. +62 −1 test/helper.rb
  5. +0 −105 test/test_persistence_yup.rb
  6. +47 −0 test/test_stateful_yup_with_bdb.rb
  7. +41 −0 test/test_stateful_yup_with_redis.rb
View
@@ -1,3 +1,4 @@
+require 'uri'
require 'rubygems'
require 'eventmachine'
require 'logger'
@@ -7,6 +8,7 @@
require 'yup/version'
require 'yup/request_forwarder'
require 'yup/request_handler'
+require 'yup/state'
module Yup
@@resend_delay = 60.0
@@ -26,8 +28,8 @@ def self.retry_unless_2xx; @@retry_unless_2xx end
def self.retry_unless_2xx=(bool); @@retry_unless_2xx = bool end
def self.run(config)
- host = config[:listen_host] || 'localhost'
- port = config[:listen_port] || 8080
+ host = config[:listen_host] || 'localhost'
+ port = config[:listen_port] || 8080
status_code = config[:status_code] || 200
forward_to = config[:forward_to]
timeout = config[:timeout] || 60
@@ -39,7 +41,18 @@ def self.run(config)
end
def self.run_with_state(config)
- require 'yup/state'
+ case State.state_type(config[:persistent])
+ when :bdb
+ self.run_with_bdb(config)
+ when :redis
+ self.run_with_redis(config)
+ else
+ abort "Unknown scheme of persistent queue."
+ end
+ end
+
+ def self.run_with_bdb(config)
+ require 'yup/state/bdb'
host = config[:listen_host] || 'localhost'
port = config[:listen_port] || 8080
@@ -48,29 +61,61 @@ def self.run_with_state(config)
dbpath = config[:persistent]
timeout = config[:timeout] || 60
feedback_channel = File.join(Dir.tmpdir, "yupd-#{$$}-feedback")
- state = Yup::State.new(dbpath, forward_to, feedback_channel)
+ state = State::BDB.new(dbpath, forward_to, feedback_channel)
pid = Process.fork do
- State::RequestForwarder.new(state, forward_to, timeout).run_loop
+ State::BDB::RequestForwarder.new(state, forward_to, timeout).run_loop
end
if pid
db_closer = proc do
Yup.logger.info { "Terminating consumer #{$$}" }
Process.kill("KILL", pid)
- state.close
+ state.dispose()
exit 0
end
Signal.trap("TERM", &db_closer)
Signal.trap("INT", &db_closer)
end
EM.run do
- EM.start_unix_domain_server(feedback_channel, State::FeedbackHandler, state)
+ EM.start_unix_domain_server(feedback_channel, State::BDB::FeedbackHandler, state)
logger.info { "Feedback through #{feedback_channel}" }
EM.start_server(host, port, RequestHandler, forward_to, status_code, state, timeout)
logger.info { "Listening on #{host}:#{port}" }
end
end
+
+ def self.run_with_redis(config)
+ require 'yup/state/redis'
+
+ host = config[:listen_host] || 'localhost'
+ port = config[:listen_port] || 8080
+ status_code = config[:status_code] || 200
+ forward_to = config[:forward_to]
+ dbpath = config[:persistent]
+ timeout = config[:timeout] || 60
+ state = State::Redis.new(dbpath, forward_to)
+
+ pid = Process.fork do
+ State::Redis::RequestForwarder.new(state, forward_to, timeout).run_loop
+ end
+
+ if pid
+ db_closer = proc do
+ Yup.logger.info { "Terminating consumer #{$$}" }
+ Process.kill("KILL", pid)
+ state.dispose()
+ exit 0
+ end
+ Signal.trap("TERM", &db_closer)
+ Signal.trap("INT", &db_closer)
+ end
+
+ EM.run do
+ EM.start_server(host, port, RequestHandler, forward_to, status_code, state, timeout)
+ logger.info { "Listening on #{host}:#{port}" }
+ end
+ end
end
@@ -67,132 +67,4 @@ def log_response(http)
end
end
end
-
- class State
- class RequestForwarder
- def initialize(state, forward_to, timeout)
- @state = state
- @forward_to = forward_to
- @timeout = timeout
- @logger = Yup.logger.clone
- @logger.progname = "Yup::State::RequestForwarder"
-
- @yajl = Yajl::Parser.new(:symbolize_keys => true)
- @yajl.on_parse_complete = self.method(:make_request)
- end
-
- def run_loop
- loop do
- data = @state.bpop
- begin
- @yajl << data
- rescue Yajl::ParseError
- @logger.error { "Error while parsing \"#{data}\"" }
- end
- end
- end
-
- def make_request(req)
- begin
- @http_method, @request_url, headers, body = req
- headers = Hash[*headers.to_a.flatten.map(&:to_s)]
- headers["Host"] = @forward_to
- headers["Connection"] = "Close"
-
- req = "#{@http_method.upcase} #{@request_url} HTTP/1.1\r\n"
- headers.each do |k, v|
- req << "#{k}: #{v}\r\n"
- end
- req << "\r\n"
- req << body if !body.empty?
- raw_response = send_data(req.to_s, @forward_to)
-
- response_body = ""
- http = Http::Parser.new()
- http.on_body = proc do |chunk|
- response_body << chunk
- end
- http << raw_response
-
- if http.status_code && http.status_code / 100 == 2
- log_response(raw_response, response_body, http)
- @logger.info "Success"
- else
- log_response(raw_response, response_body, http)
- if Yup.retry_unless_2xx
- @logger.info "Fail: got status code #{http.status_code}; will retry after #{Yup.resend_delay} seconds"
- @state.to_feedback(Yajl::Encoder.encode([@http_method.downcase, @request_url, headers, body]))
-
- sleep Yup.resend_delay
- else
- @logger.info "Fail; will not retry"
- end
- end
-
- rescue Exception, Timeout::Error => e
- log_response(raw_response, response_body, http)
- @logger.info "Error: #{e.class}: #{e.message}; will retry after #{Yup.resend_delay} seconds"
-
- @state.to_feedback(Yajl::Encoder.encode([@http_method.downcase, @request_url, headers, body]))
-
- sleep Yup.resend_delay
- end
- end
-
- private
- def log_response(raw_response, body, http)
- @logger.info { "HTTP request: #{@http_method.upcase} #{@request_url} HTTP/1.1" }
- if raw_response && !raw_response.empty?
- @logger.info { "HTTP response: #{raw_response.lines.first.chomp}" }
- @logger.debug { "HTTP response headers" + (http.headers.empty? ? " is empty" : "\n" + http.headers.inspect) }
- @logger.debug { "HTTP response body" + (body.empty? ? " is empty" : "\n" + body.inspect) }
- end
- end
-
- def send_data(data, host)
- host, port = host.split(":")
- addr = Socket.getaddrinfo(host, nil)
- sock = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
-
- secs = Integer(@timeout)
- usecs = Integer((@timeout - secs) * 1_000_000)
- optval = [secs, usecs].pack("l_2")
- sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval)
- sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval)
-
- resp = Timeout::timeout(@timeout) do
- sock.connect(Socket.pack_sockaddr_in(port, addr[0][3]))
- sock.write(data)
- sock.read()
- end
- return resp
- ensure
- sock.close()
- end
- end
-
- class FeedbackHandler < EM::Connection
- def initialize(state)
- @state = state
-
- @yajl = Yajl::Parser.new(:symbolize_keys => true)
- @yajl.on_parse_complete = method(:on_message)
-
- @logger = Yup.logger.clone
- @logger.progname = "Yup::State::FeedbackHandler"
- end
-
- def receive_data(data)
- begin
- @yajl << data
- rescue Yajl::ParseError
- @logger.error { "Error while parsing \"#{data}\"" }
- end
- end
-
- def on_message(req)
- @state.push(Yajl::Encoder.encode(req))
- end
- end
- end
end
Oops, something went wrong.

0 comments on commit 974e582

Please sign in to comment.