Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tree: 0cd2379abd
Fetching contributors…

Cannot retrieve contributors at this time

file 115 lines (100 sloc) 3.78 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
module Yup
  module State
    def self.queue_type(str)
      uri = URI.parse(str)
      case uri.scheme
      when "bdb"
        :bdb
      when "redis"
        :redis
      end
    end

    class RequestForwarder
      def initialize(state, forward_to, timeout)
        @state = state
        @forward_to = forward_to
        @timeout = timeout
        @logger = Yup.logger.clone
        @logger.progname = "Yup::State::RequestForwarder"

        @yajl = Yajl::Parser.new(:symbolize_keys => true)
        @yajl.on_parse_complete = self.method(:make_request)
      end

      def run_loop
        loop do
          data = @state.bpop
          begin
            @yajl << data
          rescue Yajl::ParseError
            @logger.error { "Error while parsing \"#{data}\"" }
          end
        end
      end

      def make_request(req)
        begin
          @http_method, @request_url, headers, body = req
          headers = Hash[*headers.to_a.flatten.map(&:to_s)]
          headers["Host"] = @forward_to
          headers["Connection"] = "Close"

          req = "#{@http_method.upcase} #{@request_url} HTTP/1.1\r\n"
          headers.each do |k, v|
            req << "#{k}: #{v}\r\n"
          end
          req << "\r\n"
          req << body if !body.empty?
          raw_response = send_data(req.to_s, @forward_to)

          response_body = ""
          http = Http::Parser.new()
          http.on_body = proc do |chunk|
            response_body << chunk
          end
          http << raw_response

          if http.status_code && http.status_code / 100 == 2
            log_response(raw_response, response_body, http)
            @logger.info "Success"
          else
            log_response(raw_response, response_body, http)
            if Yup.retry_unless_2xx
              @logger.info "Fail: got status code #{http.status_code}; will retry after #{Yup.resend_delay} seconds"
              @state.pushback(Yajl::Encoder.encode([@http_method.downcase, @request_url, headers, body]))

              sleep Yup.resend_delay
            else
              @logger.info "Fail; will not retry"
            end
          end

        rescue Exception, Timeout::Error => e
          log_response(raw_response, response_body, http)
          @logger.info "Error: #{e.class}: #{e.message}; will retry after #{Yup.resend_delay} seconds"

          @state.pushback(Yajl::Encoder.encode([@http_method.downcase, @request_url, headers, body]))

          sleep Yup.resend_delay
        end
      end

    private
      def log_response(raw_response, body, http)
        @logger.info { "HTTP request: #{@http_method.upcase} #{@request_url} HTTP/1.1" }
        if raw_response && !raw_response.empty?
          @logger.info { "HTTP response: #{raw_response.lines.first.chomp}" }
          @logger.debug { "HTTP response headers" + (http.headers.empty? ? " is empty" : "\n" + http.headers.inspect) }
          @logger.debug { "HTTP response body" + (body.empty? ? " is empty" : "\n" + body.inspect) }
        end
      end

      def send_data(data, host)
        host, port = host.split(":")
        addr = Socket.getaddrinfo(host, nil)
        sock = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)

        secs = Integer(@timeout)
        usecs = Integer((@timeout - secs) * 1_000_000)
        optval = [secs, usecs].pack("l_2")
        sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, optval)
        sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, optval)

        resp = Timeout::timeout(@timeout) do
          sock.connect(Socket.pack_sockaddr_in(port, addr[0][3]))
          sock.write(data)
          sock.read()
        end
        return resp
      ensure
        sock.close()
      end
    end
  end
end
Something went wrong with that request. Please try again.