Skip to content

Commit

Permalink
reschedule queues and starting on the pool manager
Browse files Browse the repository at this point in the history
  • Loading branch information
collin committed Nov 25, 2009
1 parent cb56fb6 commit ec2fffb
Show file tree
Hide file tree
Showing 17 changed files with 494 additions and 53 deletions.
2 changes: 1 addition & 1 deletion config.ru
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ $LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__) + '/lib')
require 'resque/server'

use Rack::ShowExceptions

require 'test-failures/fail'
run Rack::URLMap.new \
"/" => Resque::Server.new
# "/csp" => CSP::Aplication.new
1 change: 0 additions & 1 deletion lib/resque/failure/queue_per_error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def per_queue_failures
def self.count
# Now using the counter.
Resque.redis[FailureCounter].to_i
# Resque.redis.llen(:failed).to_i
end

def self.all(error_queue_name, start = 0, count = 1)
Expand Down
127 changes: 127 additions & 0 deletions lib/resque/pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
require 'dnssd'
require 'eventmachine'
Thread.abort_on_exception = true
module Resque
class Pool
Version = "0.0.0".freeze
Service = "_resquepool._tcp".freeze
Port = 2395

class PoolConnection < EventMachine::Connection
class PoolProtocolError < StandardError; end

AddWorker = "1".freeze
RemoveWorker = "2".freeze
SetOption = "3".freeze

def initialize(pool)
@pool = pool
end

def receive_data(data)
case data.slice!(0,1)
when AddWorker
@pool.add_worker
send_data "OK\n"
when RemoveWorker
@pool.remove_worker
send_data "OK\n"
when SetOption
key, value = *data.split(':')
(key and value) or raise PoolProtocolError
@pool.set_option(key, value)
send_data "OK\n"
else raise PoolProtocolError
end
end

def unbind
@pool.remove_all_workers
end
end

class PoolWorker < ::Resque::Worker
attr_reader :pool

def initialize(pool)
@pool = pool
end

def queues
pool.queues
end
end

attr_reader :workers

def self.start
pool = Pool.new
EventMachine.run do
EventMachine.start_server("127.0.0.1", 2395, PoolConnection, pool)
end
end

def initialize
@workers = []
@config = {:queues => ["*"]}
register_bonjour
end

def add_worker
worker = nil
begin
worker = Resque::Pool::PoolWorker.new(self)
worker.verbose = @config[:logging] || @config[:verbose]
worker.very_verbose = @config[:vverbose]
workers << worker
rescue Resque::NoQueueError
abort "set QUEUE env var, e.g. $ QUEUE=critical,high rake resque:work"
end

child = fork do
puts "*** Starting worker #{worker}"
worker.work(@config[:interval] || 5) # interval, will block
end

Process.detach(child)
end

def queues
@config[:queues]
end

def set_option(key, value)
@config[key].intern = Resque.decode(value.chomp)
end

def remove_worker
worker = workers.shift
worker.shutdown
end

def remove_all_workers
while(workers.any?) do
remove_worker
end
end

def self.list
list = {}
service = DNSSD.browse(Service) do |reply|
list[reply.name] = reply
end
sleep 5
service.stop
list
end

def register_bonjour
tr = DNSSD::TextRecord.new
tr["description"] = "A dynamic Pool of Resque workers"

DNSSD.register("ResquePool-#{`hostname`}:#{Port}", Service, 'local', 2395) do |rr|
# puts "Registered Resque Pool on port 2395. Starting service."
end
end
end
end
16 changes: 13 additions & 3 deletions lib/resque/queue_set.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def failure_queues
end

def key
"#{@queue_name}"
@queue_name
end

def any?
Expand All @@ -31,7 +31,7 @@ def name
alias to_s name

def count
Resque.redis.llen(key).to_i
Resque.size(key)
end
alias size count

Expand Down Expand Up @@ -60,7 +60,17 @@ def backtrace
def unimplemented
raise "Unimplemented"
end
alias schedule unimplemented

def count
Resque.redis.llen(@queue_name).to_i
end

def reschedule
while(failed = Resque.decode(Resque.redis.lpop(@queue_name))) do
Resque.enqueue failed["payload"]["class"].constantize, failed["payload"]["args"]
Resque.redis.decr("failure_counter")
end
end
alias remove unimplemented
end
end
22 changes: 5 additions & 17 deletions lib/resque/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,6 @@ def partial(template, local_vars = {})
@partial = false
end

def poll
if @polling
text = "Last Updated: #{Time.now.strftime("%H:%M:%S")}"
else
text = "<a href='#{url(request.path_info)}.poll' rel='poll'>Live Poll</a>"
end
"<p class='poll'>#{text}</p>"
end

end

def show(page, layout = true)
Expand All @@ -115,14 +106,6 @@ def show(page, layout = true)
show page
end
end

%w( overview workers ).each do |page|
get "/#{page}.poll" do
content_type "text/plain"
@polling = true
show(page.to_sym, false).gsub(/\s{1,}/, ' ')
end
end

get "/failed" do
if Resque::Failure.url
Expand All @@ -137,6 +120,11 @@ def show(page, layout = true)
redirect url('failed')
end

post "/reschedule/:key" do
Resque::FailureQueue.new(params[:key]).reschedule
"OK"
end

get "/stats" do
redirect url("/stats/resque")
end
Expand Down
Binary file removed lib/resque/server/.DS_Store
Binary file not shown.
137 changes: 137 additions & 0 deletions lib/resque/server/public/jquery.event.drag-1.5.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*!
jquery.event.drag.js ~ v1.5 ~ Copyright (c) 2008, Three Dub Media (http://threedubmedia.com)
Liscensed under the MIT License ~ http://threedubmedia.googlecode.com/files/MIT-LICENSE.txt
*/
;(function($){ // secure $ jQuery alias
/*******************************************************************************************/
// Created: 2008-06-04 | Updated: 2009-03-24
/*******************************************************************************************/
// Events: drag, dragstart, dragend
/*******************************************************************************************/

// jquery method
$.fn.drag = function( fn1, fn2, fn3 ){
if ( fn2 ) this.bind('dragstart', fn1 ); // 2+ args
if ( fn3 ) this.bind('dragend', fn3 ); // 3 args
return !fn1 ? this.trigger('drag') // 0 args
: this.bind('drag', fn2 ? fn2 : fn1 ); // 1+ args
};

// local refs
var $event = $.event, $special = $event.special,

// special event configuration
drag = $special.drag = {
not: ':input', // don't begin to drag on event.targets that match this selector
distance: 0, // distance dragged before dragstart
which: 1, // mouse button pressed to start drag sequence
dragging: false, // hold the active target element
setup: function( data ){
data = $.extend({
distance: drag.distance,
which: drag.which,
not: drag.not
}, data || {});
data.distance = squared( data.distance ); // x² + y² = distance²
$event.add( this, "mousedown", handler, data );
if ( this.attachEvent ) this.attachEvent("ondragstart", dontStart ); // prevent image dragging in IE...
},
teardown: function(){
$event.remove( this, "mousedown", handler );
if ( this === drag.dragging ) drag.dragging = drag.proxy = false; // deactivate element
selectable( this, true ); // enable text selection
if ( this.detachEvent ) this.detachEvent("ondragstart", dontStart ); // prevent image dragging in IE...
}
};

// prevent normal event binding...
$special.dragstart = $special.dragend = { setup:function(){}, teardown:function(){} };

// handle drag-releatd DOM events
function handler ( event ){
var elem = this, returned, data = event.data || {};
// mousemove or mouseup
if ( data.elem ){
// update event properties...
elem = event.dragTarget = data.elem; // drag source element
event.dragProxy = drag.proxy || elem; // proxy element or source
event.cursorOffsetX = data.pageX - data.left; // mousedown offset
event.cursorOffsetY = data.pageY - data.top; // mousedown offset
event.offsetX = event.pageX - event.cursorOffsetX; // element offset
event.offsetY = event.pageY - event.cursorOffsetY; // element offset
}
// mousedown, check some initial props to avoid the switch statement
else if ( drag.dragging || ( data.which>0 && event.which!=data.which ) ||
$( event.target ).is( data.not ) ) return;
// handle various events
switch ( event.type ){
// mousedown, left click, event.target is not restricted, init dragging
case 'mousedown':
$.extend( data, $( elem ).offset(), {
elem: elem, target: event.target,
pageX: event.pageX, pageY: event.pageY
}); // store some initial attributes
$event.add( document, "mousemove mouseup", handler, data );
selectable( elem, false ); // disable text selection
drag.dragging = null; // pending state
return false; // prevents text selection in safari
// mousemove, check distance, start dragging
case !drag.dragging && 'mousemove':
if ( squared( event.pageX-data.pageX )
+ squared( event.pageY-data.pageY ) // x² + y² = distance²
< data.distance ) break; // distance tolerance not reached
event.target = data.target; // force target from "mousedown" event (fix distance issue)
returned = hijack( event, "dragstart", elem ); // trigger "dragstart", return proxy element
if ( returned !== false ){ // "dragstart" not rejected
drag.dragging = elem; // activate element
drag.proxy = event.dragProxy = $( returned || elem )[0]; // set proxy
}
// mousemove, dragging
case 'mousemove':
if ( drag.dragging ){
returned = hijack( event, "drag", elem ); // trigger "drag"
if ( $special.drop ){ // manage drop events
$special.drop.allowed = ( returned !== false ); // prevent drop
$special.drop.handler( event ); // "dropstart", "dropend"
}
if ( returned !== false ) break; // "drag" not rejected, stop
event.type = "mouseup"; // helps "drop" handler behave
}
// mouseup, stop dragging
case 'mouseup':
$event.remove( document, "mousemove mouseup", handler ); // remove page events
if ( drag.dragging ){
if ( $special.drop ) $special.drop.handler( event ); // "drop"
hijack( event, "dragend", elem ); // trigger "dragend"
}
selectable( elem, true ); // enable text selection
drag.dragging = drag.proxy = data.elem = false; // deactivate element
break;
}
return true;
};

// set event type to custom value, and handle it
function hijack ( event, type, elem ){
event.type = type; // force the event type
var result = $.event.handle.call( elem, event );
return result===false ? false : result || event.result;
};

// return the value squared
function squared ( value ){ return Math.pow( value, 2 ); };

// suppress default dragstart IE events...
function dontStart(){ return ( drag.dragging === false ); };

// toggles text selection attributes
function selectable ( elem, bool ){
if ( !elem ) return; // maybe element was removed ?
elem.unselectable = bool ? "off" : "on"; // IE
elem.onselectstart = function(){ return bool; }; // IE
//if ( document.selection && document.selection.empty ) document.selection.empty(); // IE
if ( elem.style ) elem.style.MozUserSelect = bool ? "" : "none"; // FF
};

/*******************************************************************************************/
})( jQuery ); // confine scope
Loading

0 comments on commit ec2fffb

Please sign in to comment.