Skip to content

Commit

Permalink
Reimplement GirlFriday::WorkQueue to use Celluloid-based GirlFriday::…
Browse files Browse the repository at this point in the history
…Runner to execute background jobs

Tests are still failing... likely need to be rewritten now that the internals
have changed :/
  • Loading branch information
tarcieri committed May 19, 2011
1 parent ec2ac62 commit 8e0026b
Showing 1 changed file with 17 additions and 139 deletions.
156 changes: 17 additions & 139 deletions lib/girl_friday/work_queue.rb
Original file line number Diff line number Diff line change
@@ -1,161 +1,39 @@
module GirlFriday

class WorkQueue
Ready = Struct.new(:this)
Work = Struct.new(:msg, :callback)
Shutdown = Struct.new(:callback)


attr_reader :name
def initialize(name, options={}, &block)
@name = name.to_s
@size = options[:size] || 5
@processor = block
@error_handler = (options[:error_handler] || ErrorHandler.default).new

@shutdown = false
@busy_workers = []
@created_at = Time.now.to_i
@total_processed = @total_errors = @total_queued = 0
@persister = (options[:store] || Store::InMemory).new(name, (options[:store_config] || []))
start
@supervisor = GirlFriday::Runner.supervise_as(@name, @name, options, &block)
GirlFriday.queues << WeakRef.new(self)
end

def push(work, &block)
@supervisor << Work[work, block]
Celluloid::Actor[@name].push(work, &block)
end
alias_method :<<, :push

def status
{ @name => {
:pid => $$,
:pool_size => @size,
:ready => ready_workers.size,
:busy => @busy_workers.size,
:backlog => @persister.size,
:total_queued => @total_queued,
:total_processed => @total_processed,
:total_errors => @total_errors,
:uptime => Time.now.to_i - @created_at,
:created_at => @created_at,
}
}
end

def shutdown
# Runtime state should never be modified by caller thread,
# only the Supervisor thread.
@supervisor << Shutdown[block_given? ? Proc.new : nil]
end

private

def on_ready(who)
@total_processed += 1
if !@shutdown && work = @persister.pop
who.this << work
drain
else
@busy_workers.delete(who.this)
ready_workers << who.this
shutdown_complete if @shutdown && @busy_workers.size == 0
end
rescue => ex
# Redis network error? Log and ignore.
@error_handler.handle(ex)
Celluloid::Actor[@name].status
end

def shutdown_complete
begin
@when_shutdown.call(self) if @when_shutdown
rescue Exception => ex
@error_handler.handle(ex)
end
def shutdown(&block)
@supervisor.terminate(&block)
end

def on_work(work)
@total_queued += 1

if !@shutdown && worker = ready_workers.pop
@busy_workers << worker
worker << work
drain
else
@persister << work
end
rescue => ex
# Redis network error? Log and ignore.
@error_handler.handle(ex)
end

def ready_workers
@ready_workers ||= begin
workers = []
@size.times do |x|
# start N workers
workers << Actor.spawn_link(&@work_loop)
end
workers
end
end

def start
@supervisor = Actor.spawn do
supervisor = Actor.current
@work_loop = Proc.new do
loop do
work = Actor.receive
result = @processor.call(work.msg)
work.callback.call(result) if work.callback
supervisor << Ready[Actor.current]
end
end

Actor.trap_exit = true
begin
loop do
Actor.receive do |f|
f.when(Ready) do |who|
on_ready(who)
end
f.when(Work) do |work|
on_work(work)
end
f.when(Shutdown) do |stop|
@shutdown = true
@when_shutdown = stop.callback
shutdown_complete if @shutdown && @busy_workers.size == 0
end
f.when(Actor::DeadActorError) do |exit|
# TODO Provide current message contents as error context
@total_errors += 1
@busy_workers.delete(exit.actor)
ready_workers << Actor.spawn_link(&@work_loop)
@error_handler.handle(exit.reason)
end
end
end

rescue Exception => ex
$stderr.print "Fatal error in girl_friday: supervisor for #{name} died.\n"
$stderr.print("#{ex}\n")
$stderr.print("#{ex.backtrace.join("\n")}\n")
end
end
end
def inspect
current_status = Celluloid::Actor[@name].inspect

fields = {
:processed => current_status[:total_processed],
:backlog => current_status[:backlog],
:pool => current_status[:pool_size],
:uptime => current_status[:uptime]
}

def drain
# give as much work to as many ready workers as possible
ps = @persister.size
todo = ready_workers.size < ps ? ready_workers.size : ps
todo.times do
worker = ready_workers.pop
@busy_workers << worker
worker << @persister.pop
end
str = "#<GirlFriday::WorkQueue[@name] "
str << fields.map { |k, v| "#{k}=#{v}" }.join(', ')
str << ">"
end

end
Queue = WorkQueue
end

0 comments on commit 8e0026b

Please sign in to comment.