Skip to content
This repository
file 149 lines (129 sloc) 4.407 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
require 'sidekiq/util'
require 'sidekiq/actor'

require 'sidekiq/middleware/server/active_record'
require 'sidekiq/middleware/server/retry_jobs'
require 'sidekiq/middleware/server/logging'

module Sidekiq
  ##
  # The Processor receives a message from the Manager and actually
  # processes it. It instantiates the worker, runs the middleware
  # chain and then calls Sidekiq::Worker#perform.
  class Processor
    # To prevent a memory leak, ensure that stats expire. However, they should take up a minimal amount of storage
    # so keep them around for a long time
    STATS_TIMEOUT = 24 * 60 * 60 * 365 * 5

    include Util
    include Actor

    def self.default_middleware
      Middleware::Chain.new do |m|
        m.add Middleware::Server::Logging
        m.add Middleware::Server::RetryJobs
        m.add Middleware::Server::ActiveRecord
      end
    end

    attr_accessor :proxy_id

    def initialize(boss)
      @boss = boss
    end

    def process(work)
      msgstr = work.message
      queue = work.queue_name

      @boss.async.real_thread(proxy_id, Thread.current)

      ack = true
      begin
        msg = Sidekiq.load_json(msgstr)
        klass = msg['class'].constantize
        worker = klass.new
        worker.jid = msg['jid']

        stats(worker, msg, queue) do
          Sidekiq.server_middleware.invoke(worker, msg, queue) do
            worker.perform(*cloned(msg['args']))
          end
        end
      rescue Sidekiq::Shutdown
        # Had to force kill this job because it didn't finish
        # within the timeout. Don't acknowledge the work since
        # we didn't properly finish it.
        ack = false
      rescue Exception => ex
        handle_exception(ex, msg || { :message => msgstr })
        raise
      ensure
        work.acknowledge if ack
      end

      @boss.async.processor_done(current_actor)
    end

    def inspect
      "<Processor##{object_id.to_s(16)}>"
    end

    private

    def thread_identity
      @str ||= Thread.current.object_id.to_s(36)
    end

    def stats(worker, msg, queue)
      # Do not conflate errors from the job with errors caused by updating
      # stats so calling code can react appropriately
      retry_and_suppress_exceptions do
        hash = Sidekiq.dump_json({:queue => queue, :payload => msg, :run_at => Time.now.to_i })
        Sidekiq.redis do |conn|
          conn.multi do
            conn.hmset("#{identity}:workers", thread_identity, hash)
            conn.expire("#{identity}:workers", 60*60)
          end
        end
      end

      begin
        yield
      rescue Exception
        retry_and_suppress_exceptions do
          Sidekiq.redis do |conn|
            failed = "stat:failed:#{Time.now.utc.to_date}"
            result = conn.multi do
              conn.incrby("stat:failed", 1)
              conn.incrby(failed, 1)
            end
            conn.expire(failed, STATS_TIMEOUT) if result.last == 1
          end
        end
        raise
      ensure
        retry_and_suppress_exceptions do
          Sidekiq.redis do |conn|
            processed = "stat:processed:#{Time.now.utc.to_date}"
            result = conn.multi do
              conn.hdel("#{identity}:workers", thread_identity)
              conn.incrby("stat:processed", 1)
              conn.incrby(processed, 1)
            end
            conn.expire(processed, STATS_TIMEOUT) if result.last == 1
          end
        end
      end
    end

    # Singleton classes are not clonable.
    SINGLETON_CLASSES = [ NilClass, TrueClass, FalseClass, Symbol, Fixnum, Float, Bignum ].freeze

    # Deep clone the arguments passed to the worker so that if
    # the message fails, what is pushed back onto Redis hasn't
    # been mutated by the worker.
    def cloned(ary)
      Marshal.load(Marshal.dump(ary))
    end

    # If an exception occurs in the block passed to this method, that block will be retried up to max_retries times.
    # All exceptions will be swallowed and logged.
    def retry_and_suppress_exceptions(max_retries = 2)
      retry_count = 0
      begin
        yield
      rescue => e
        retry_count += 1
        if retry_count <= max_retries
          Sidekiq.logger.debug {"Suppressing and retrying error: #{e.inspect}"}
          sleep(1)
          retry
        else
          handle_exception(e, { :message => "Exhausted #{max_retries} retries"})
        end
      end
    end
  end
end
Something went wrong with that request. Please try again.