Permalink
Browse files

new jobs admin area

and some work on delayed_jobs refactoring

refs #4226

Change-Id: I21a91a44368e77aef4a75e0d30cefe252a901691
Reviewed-on: https://gerrit.instructure.com/3640
Tested-by: Hudson <hudson@instructure.com>
Reviewed-by: Zach Wily <zach@instructure.com>
  • Loading branch information...
1 parent f624d59 commit 26032f20466e8edfcfc223d6a766b68fe8a64a7d @codekitchen codekitchen committed Apr 30, 2011
View
209 app/coffeescripts/jobs.coffee
@@ -0,0 +1,209 @@
+class FlavorGrid
+ constructor: (@options, @type_name, @grid_name) ->
+ @data = @options.data
+ @$element = $(@grid_name)
+ @setTimer() if @options.refresh_rate
+ @query = ''
+
+ setTimer: () =>
+ setTimeout (=> @refresh(@setTimer)), @options.refresh_rate
+
+ refresh: (cb) =>
+ @$element.queue () =>
+ $.ajaxJSON @options.url, "GET", { flavor: @options.flavor, q: @query }, (data) =>
+ @data.length = 0
+ @data.push item for item in data[@type_name]
+ if data.total && data.total > @data.length
+ @data.push({}) for i in [@data.length ... data.total]
+ @grid.removeAllRows()
+ @grid.updateRowCount()
+ @grid.render()
+ cb?()
+ @updated?()
+ @$element.dequeue()
+
+ change_flavor: (flavor) =>
+ @options.flavor = flavor
+ @grid.setSelectedRows []
+ @refresh()
+
+ grid_options: () ->
+ { rowHeight: 20 }
+
+ init: () ->
+ @columns = @build_columns()
+
+ @grid = new Slick.Grid(@grid_name, @data, @columns, @grid_options())
+ this
+
+class Jobs extends FlavorGrid
+ constructor: (options, type_name = 'jobs', grid_name = '#jobs-grid') ->
+ Jobs.max_attempts = options.max_attempts if options.max_attempts
+ super(options, type_name, grid_name)
+
+ search: (query) ->
+ @query = query
+ @refresh()
+
+ attempts_formatter: (r,c,d) =>
+ return '' unless @data[r].id
+ max = (@data[r].max_attempts || Jobs.max_attempts)
+ if d == 0
+ klass = ''
+ else if d < max
+ klass = 'has-failed-attempts'
+ else if d == @options.on_hold_attempt_count
+ klass = 'on-hold'
+ d = 'hold'
+ else
+ klass = 'has-failed-max-attempts'
+ out_of = if d == 'hold' then '' else "/ #{max}"
+ "<span class='#{klass}'>#{d}#{out_of}</span>"
+
+ id_formatter: (r,c,d) =>
+ @data[r].id || "<span class='unloaded-id'>-</span>"
+
+ build_columns: () ->
+ [
+ id: 'id'
+ name: 'id'
+ field: 'id'
+ width: 75
+ formatter: @id_formatter
+ ,
+ id: 'tag'
+ name: 'tag'
+ field: 'tag'
+ width: 200
+ ,
+ id: 'attempts'
+ name: 'attempt'
+ field: 'attempts'
+ width: 60
+ formatter: @attempts_formatter
+ ,
+ id: 'priority'
+ name: 'priority'
+ field: 'priority'
+ width: 70
+ ,
+ id: 'run_at'
+ name: 'run at'
+ field: 'run_at'
+ width: 165
+ ]
+
+ init: () ->
+ super()
+ @grid.onSelectedRowsChanged = () =>
+ rows = @grid.getSelectedRows()
+ row = if rows?.length == 1 then rows[0] else -1
+ job = @data[rows[0]] || {}
+ $('#show-job .show-field').each (idx, field) =>
+ field_name = field.id.replace("job-", '')
+ $(field).text(job[field_name])
+ this
+
+ selectAll: () ->
+ @grid.setSelectedRows([0...@data.length])
+ @grid.onSelectedRowsChanged()
+
+ onSelected: (action) ->
+ params =
+ flavor: @options.flavor
+ q: @query
+ update_action: action
+
+ all_jobs = @grid.getSelectedRows().length == @data.length
+
+ if all_jobs && action == 'destroy'
+ return unless confirm("Are you sure you want to delete *all* jobs of this type and matching this query?")
+
+ # special case -- if they've selected all, then don't send the ids so that
+ # we can operate on jobs that match the query but haven't even been loaded
+ # yet
+ unless all_jobs
+ params.job_ids = (@data[row].id for row in @grid.getSelectedRows())
+
+ $.ajaxJSON @options.batch_update_url, "POST", params, @refresh
+ @grid.setSelectedRows []
+
+ updated: () ->
+ $('#jobs-total').text @data.length
+
+class Workers extends Jobs
+ constructor: (options) ->
+ super(options, 'running', '#running-grid')
+
+ build_columns: () ->
+ cols = [
+ id: 'worker'
+ name: 'worker'
+ field: 'locked_by'
+ width: 175
+ ].concat(super())
+ cols.pop()
+ cols
+
+ updated: () ->
+
+class Tags extends FlavorGrid
+ constructor: (options) ->
+ super(options, 'tags', '#tags-grid')
+
+ build_columns: () ->
+ [
+ id: 'tag'
+ name: 'tag'
+ field: 'tag'
+ width: 200
+ ,
+ id: 'count'
+ name: 'count'
+ field: 'count'
+ width: 50
+ ]
+
+ grid_options: () ->
+ $.extend(super(), { enableCellNavigation: false })
+
+$.extend(window,
+ Jobs: Jobs
+ Workers: Workers
+ Tags: Tags
+)
+
+$(document).ready () ->
+ $('#tags-flavor').change () ->
+ window.tags.change_flavor($(this).val())
+ $('#jobs-flavor').change () ->
+ window.jobs.change_flavor($(this).val())
+
+ $('#jobs-refresh').click () ->
+ window.jobs.refresh()
+
+ search_event = if $('#jobs-search')[0].onsearch == undefined then 'change' else 'search'
+ $('#jobs-search').bind search_event, () ->
+ window.jobs.search $(this).val()
+
+ $('#select-all-jobs').click () -> window.jobs.selectAll()
+
+ $('#hold-jobs').click () -> window.jobs.onSelected('hold')
+ $('#un-hold-jobs').click () -> window.jobs.onSelected('unhold')
+ $('#delete-jobs').click () -> window.jobs.onSelected('destroy')
+
+ $('#job-handler-show').click () ->
+ $('#job-handler-wrapper').clone().dialog
+ title: 'Job Handler'
+ width: 900
+ height: 700
+ modal: true
+ false
+
+ $('#job-last_error-show').click () ->
+ $('#job-last_error-wrapper').clone().dialog
+ title: 'Last Error'
+ width: 900
+ height: 700
+ modal: true
+ false
View
82 app/controllers/jobs_controller.rb
@@ -0,0 +1,82 @@
+class JobsController < ApplicationController
+ before_filter :require_site_admin
+ POPULAR_TAG_COUNTS = 10
+
+ def index
+ jobs_scope
+
+ respond_to do |format|
+ format.html do
+ running
+ tags(@jobs)
+ @jobs_count = @jobs.count
+ end
+
+ format.js do
+ result = {}
+ case params[:only]
+ when 'running'
+ result[:running] = running
+ when 'tags'
+ result[:tags] = tags(@jobs)
+ when 'jobs'
+ result[:jobs] = @jobs.all(
+ :limit => params[:limit] || 100,
+ :offset => params[:offset].presence)
+ result[:total] = @jobs.count
+ end
+ render :json => result.to_json(:include_root => false)
+ end
+ end
+ end
+
+ def batch_update
+ jobs_scope
+
+ case params[:update_action]
+ when 'hold'
+ @jobs.find_each { |job| job.hold! }
+ when 'unhold'
+ @jobs.find_each { |job| job.unhold! }
+ when 'destroy'
+ @jobs.find_each { |job| job.destroy }
+ end
+
+ render :json => { :status => 'OK' }
+ end
+
+ protected
+
+ def running
+ @running = Delayed::Job.running.scoped(:order => 'id desc')
+ end
+
+ def tags(scope)
+ @tags = scope.count(:group => 'tag', :limit => POPULAR_TAG_COUNTS, :order => 'count(tag) desc', :select => 'tag').map { |t,c| { :tag => t, :count => c } }
+ end
+
+ def jobs_scope
+ @jobs = Delayed::Job.scoped(:order => 'id desc')
+ @flavor = params[:flavor] || 'current'
+
+ case @flavor
+ when 'future'
+ @jobs = @jobs.future
+ when 'current'
+ @jobs = @jobs.current
+ when 'all'
+ # pass
+ when 'failed'
+ @jobs = @jobs.failed
+ end
+
+ if params[:q].present?
+ @jobs = @jobs.scoped(:conditions => ["handler like ? OR id = ?", "%#{params[:q]}%", params[:q].to_i])
+ end
+
+ if params[:job_ids].present?
+ @jobs = @jobs.scoped(:conditions => { :id => params[:job_ids].map(&:to_i) })
+ end
+ end
+
+end
View
165 app/views/jobs/index.html.erb
@@ -0,0 +1,165 @@
+<% content_for :page_title do %>Jobs CP<% end %>
+<% content_for :stylesheets do %>
+<style type="text/css">
+ #content {
+ position: relative;
+ padding: 10px;
+ }
+ span.has-failed-attempts {
+ color: orange;
+ }
+ span.has-failed-max-attempts {
+ color: red;
+ }
+ dl#job-attributes dt {
+ float:left;
+ clear:left;
+ }
+ dl#job-attributes dd {
+ padding-left: 2em;
+ float:left;
+ }
+ #show-job {
+ margin-left: 25px;
+ width: 450px;
+ float:left;
+ }
+ #show-job dt {
+ font-weight: bold;
+ }
+ h2 {
+ font-size: 16px;
+ }
+ div.grid {
+ border: 2px solid #ccc;
+ margin-bottom: 10px;
+ }
+ #tags-wrapper h2 { float:left; }
+ #tags-flavor {
+ float:right;
+ }
+ #jobs-grid { float:left; margin-top: 6px; }
+ .slick-row.ui-state-active {
+ color: black;
+ font-weight: normal;
+ }
+ .slick-row.ui-state-active .slick-cell {
+ background-color: #ddd;
+ }
+ .show-field-wrapper { display: none; }
+ textarea.show-field { width: 850px; height: 630px; }
+ .unloaded-id { color: #aaa; }
+</style>
+<% end %>
+<% @show_left_side = false %>
+
+<% jammit_css :slickgrid %>
+<% jammit_js :slickgrid, :jobs %>
+
+<div id="running-wrapper" style="float:left">
+ <h2>Running Jobs</h2>
+ <div id="running-grid" style="width:600px; height:250px;" class='grid'>
+ </div>
+</div>
+<div id="tags-wrapper" style="float:right">
+ <h2>Popular Tags</h2>
+ <select id="tags-flavor">
+ <%= options_for_select([["All", 'all'],
+ ["Current", 'current'],
+ ["Future", 'future'],
+ ["Failed", 'failed']], @flavor) %>
+ </select>
+ <div id="tags-grid" style="clear:both; width:250px; height:250px" class='grid'>
+ </div>
+</div>
+<div style="clear:both">
+ <h2>Jobs List</h2>
+ <select id="jobs-flavor">
+ <%= options_for_select([["All", 'all'],
+ ["Current", 'current'],
+ ["Future", 'future'],
+ ["Failed", 'failed']], @flavor) %>
+ </select>
+ <button id='jobs-refresh'>refresh</button>
+ <input type='search' results='15' placeholder='Search Jobs' autosave='canvas-jobs-search' id='jobs-search' name='q' />
+ Total: <span id='jobs-total'><%= @jobs_count %></span>
+ <button id='select-all-jobs'>select all</button>
+ With Selection:
+ <button id='hold-jobs'>hold</button>
+ <button id='un-hold-jobs'>un-hold</button>
+ <button id='delete-jobs'>delete</button>
+ <div id="jobs-grid" style="width:650px; height:300px;" class='grid'>
+ </div>
+<div id="show-job">
+ <h2>Selected Job</h2>
+ <h3>id: <span id="job-id" class="show-field">&nbsp;</span></h3>
+ <h4>tag: <span id='job-tag' class='show-field'>&nbsp;</span></h4>
+ <table id='job-attributes' cellspacing="0">
+ <tr>
+ <td width="150">Priority:</td>
+ <td id="job-priority" class="show-field">&nbsp;</td>
+ </tr>
+ <tr>
+ <td>Attempts:</td>
+ <td id="job-attempts" class="show-field">&nbsp;</td>
+ </tr>
+ <tr>
+ <td>Max Attempts:</td>
+ <td id="job-max_attempts" class="show-field">&nbsp;</td>
+ </tr>
+ <tr>
+ <td>Run At:</td>
+ <td id="job-run_at" class="show-field">&nbsp;</td>
+ </tr>
+ <tr>
+ <td>Failed At:</td>
+ <td id="job-failed_at" class="show-field">&nbsp;</td>
+ </tr>
+ <tr>
+ <td>Handler:</td>
+ <td><a href='#' id='job-handler-show'>(show)</a>
+ <div id='job-handler-wrapper' class='show-field-wrapper'><textarea id='job-handler' class='show-field'></textarea></div>
+ </td>
+ </tr>
+ <tr>
+ <td>Last Error:</td>
+ <td><a href='#' id='job-last_error-show'>(show)</a>
+ <div id='job-last_error-wrapper' class='show-field-wrapper'><textarea id='job-last_error' class='show-field'></textarea></div>
+ </td>
+ </tr>
+ </table>
+</div>
+</div>
+
+<% js_block do
+jobs_data = @jobs.all(:limit => 100)
+jobs_data.length.upto(@jobs_count - 1) { jobs_data << {} }
+jobs_opts = {
+ :data => jobs_data,
+ :max_attempts => Delayed::Worker.max_attempts,
+ :rows => @jobs_count,
+ :flavor => @flavor,
+ :url => jobs_url(:only => :jobs),
+ :batch_update_url => batch_update_jobs_url,
+ :on_hold_attempt_count => Delayed::Job::ON_HOLD_COUNT,
+}
+running_opts = jobs_opts.merge({
+ :data => @running.all,
+ :url => jobs_url(:only => :running),
+ :refresh_rate => 2.seconds * 1000,
+})
+tags_opts = {
+ :data => @tags,
+ :url => jobs_url(:only => :tags),
+ :refresh_rate => 10.seconds * 1000,
+ :flavor => @flavor,
+}
+%>
+<script type="text/javascript">
+$(document).ready(function() {
+ window.jobs = new Jobs(<%= raw jobs_opts.to_json(:include_root => false) %>).init();
+ window.running = new Workers(<%= raw running_opts.to_json(:include_root => false) %>).init();
+ window.tags = new Tags(<%= raw tags_opts.to_json %>).init();
+});
+</script>
+<% end %>
View
2 config/assets.yml
@@ -66,6 +66,8 @@ javascripts:
gradebook_history:
- public/javascripts/jquery.metadata.js
- public/javascripts/gradebook-history.js
+ jobs:
+ - public/javascripts/jobs.js
profile:
- public/javascripts/profile.js
topics:
View
8 config/environment.rb
@@ -28,20 +28,24 @@
# Force all environments to use the same logger level
# (by default production uses :info, the others :debug)
- # config.log_level = :debug
+ # config.log_level = :info
# Make Time.zone default to the specified zone, and make Active Record store time values
# in the database in UTC, and return them converted to the specified local zone.
# Run "rake -D time" for a list of tasks for finding time zone names. Comment line to use default local time.
config.time_zone = 'UTC'
- memcache_servers = (YAML.load_file(RAILS_ROOT + "/config/memcache.yml")[RAILS_ENV] || []) rescue []
+ memcache_servers = (YAML.load_file(Rails.root+"config/memcache.yml")[Rails.env] || []) rescue []
if memcache_servers.empty?
config.cache_store = :nil_store
else
config.cache_store = :mem_cache_store, *memcache_servers
end
+ if ENV['RUNNING_AS_DAEMON'] == 'true'
+ config.log_path = Rails.root+'log/delayed_jobs.log'
+ end
+
# Use SQL instead of Active Record's schema dumper when creating the test database.
# This is necessary if your schema can't be completely dumped by the schema dumper,
# like if you have constraints or database-specific column types
View
6 config/initializers/delayed_job.rb
@@ -1,7 +1,3 @@
-Delayed::Worker.destroy_failed_jobs = false
-Delayed::Worker.max_attempts = 15
-Delayed::Worker.queue = "canvas_queue"
-
module Delayed
HIGH_PRIORITY = 0
NORMAL_PRIORITY = 10
@@ -21,7 +17,7 @@ module Delayed
# We don't want to keep around max_attempts failed jobs that failed because the
# underlying AR object was destroyed.
Delayed::Worker.on_max_failures = proc do |job, err|
- if err.is_a?(Delayed::PerformableMethod::LoadError)
+ if err.is_a?(Delayed::Backend::RecordNotFound)
return true
end
View
4 config/routes.rb
@@ -628,7 +628,9 @@
file.download 'download', :controller => 'files', :action => 'show', :download => '1'
end
end
-
+
+ map.resources :jobs, :only => %w(index), :collection => %w[batch_update]
+
Jammit::Routes.draw(map)
# API routes
View
313 public/javascripts/jobs.js
@@ -0,0 +1,313 @@
+/* DO NOT MODIFY. This file was compiled Fri, 13 May 2011 18:37:45 GMT from
+ * /Users/bpalmer/Programming/canvas/app/coffeescripts/jobs.coffee
+ */
+
+(function() {
+ var FlavorGrid, Jobs, Tags, Workers;
+ var __bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; }, __hasProp = Object.prototype.hasOwnProperty, __extends = function(child, parent) {
+ for (var key in parent) { if (__hasProp.call(parent, key)) child[key] = parent[key]; }
+ function ctor() { this.constructor = child; }
+ ctor.prototype = parent.prototype;
+ child.prototype = new ctor;
+ child.__super__ = parent.prototype;
+ return child;
+ };
+ FlavorGrid = (function() {
+ function FlavorGrid(options, type_name, grid_name) {
+ this.options = options;
+ this.type_name = type_name;
+ this.grid_name = grid_name;
+ this.change_flavor = __bind(this.change_flavor, this);;
+ this.refresh = __bind(this.refresh, this);;
+ this.setTimer = __bind(this.setTimer, this);;
+ this.data = this.options.data;
+ this.$element = $(this.grid_name);
+ if (this.options.refresh_rate) {
+ this.setTimer();
+ }
+ this.query = '';
+ }
+ FlavorGrid.prototype.setTimer = function() {
+ return setTimeout((__bind(function() {
+ return this.refresh(this.setTimer);
+ }, this)), this.options.refresh_rate);
+ };
+ FlavorGrid.prototype.refresh = function(cb) {
+ return this.$element.queue(__bind(function() {
+ return $.ajaxJSON(this.options.url, "GET", {
+ flavor: this.options.flavor,
+ q: this.query
+ }, __bind(function(data) {
+ var i, item, _i, _len, _ref, _ref2, _ref3;
+ this.data.length = 0;
+ _ref = data[this.type_name];
+ for (_i = 0, _len = _ref.length; _i < _len; _i++) {
+ item = _ref[_i];
+ this.data.push(item);
+ }
+ if (data.total && data.total > this.data.length) {
+ for (i = _ref2 = this.data.length, _ref3 = data.total; (_ref2 <= _ref3 ? i < _ref3 : i > _ref3); (_ref2 <= _ref3 ? i += 1 : i -= 1)) {
+ this.data.push({});
+ }
+ }
+ this.grid.removeAllRows();
+ this.grid.updateRowCount();
+ this.grid.render();
+ if (typeof cb == "function") {
+ cb();
+ }
+ if (typeof this.updated == "function") {
+ this.updated();
+ }
+ return this.$element.dequeue();
+ }, this));
+ }, this));
+ };
+ FlavorGrid.prototype.change_flavor = function(flavor) {
+ this.options.flavor = flavor;
+ this.grid.setSelectedRows([]);
+ return this.refresh();
+ };
+ FlavorGrid.prototype.grid_options = function() {
+ return {
+ rowHeight: 20
+ };
+ };
+ FlavorGrid.prototype.init = function() {
+ this.columns = this.build_columns();
+ this.grid = new Slick.Grid(this.grid_name, this.data, this.columns, this.grid_options());
+ return this;
+ };
+ return FlavorGrid;
+ })();
+ Jobs = (function() {
+ __extends(Jobs, FlavorGrid);
+ function Jobs(options, type_name, grid_name) {
+ if (type_name == null) {
+ type_name = 'jobs';
+ }
+ if (grid_name == null) {
+ grid_name = '#jobs-grid';
+ }
+ this.id_formatter = __bind(this.id_formatter, this);;
+ this.attempts_formatter = __bind(this.attempts_formatter, this);;
+ if (options.max_attempts) {
+ Jobs.max_attempts = options.max_attempts;
+ }
+ Jobs.__super__.constructor.call(this, options, type_name, grid_name);
+ }
+ Jobs.prototype.search = function(query) {
+ this.query = query;
+ return this.refresh();
+ };
+ Jobs.prototype.attempts_formatter = function(r, c, d) {
+ var klass, max, out_of;
+ if (!this.data[r].id) {
+ return '';
+ }
+ max = this.data[r].max_attempts || Jobs.max_attempts;
+ if (d === 0) {
+ klass = '';
+ } else if (d < max) {
+ klass = 'has-failed-attempts';
+ } else if (d === this.options.on_hold_attempt_count) {
+ klass = 'on-hold';
+ d = 'hold';
+ } else {
+ klass = 'has-failed-max-attempts';
+ }
+ out_of = d === 'hold' ? '' : "/ " + max;
+ return "<span class='" + klass + "'>" + d + out_of + "</span>";
+ };
+ Jobs.prototype.id_formatter = function(r, c, d) {
+ return this.data[r].id || "<span class='unloaded-id'>-</span>";
+ };
+ Jobs.prototype.build_columns = function() {
+ return [
+ {
+ id: 'id',
+ name: 'id',
+ field: 'id',
+ width: 75,
+ formatter: this.id_formatter
+ }, {
+ id: 'tag',
+ name: 'tag',
+ field: 'tag',
+ width: 200
+ }, {
+ id: 'attempts',
+ name: 'attempt',
+ field: 'attempts',
+ width: 60,
+ formatter: this.attempts_formatter
+ }, {
+ id: 'priority',
+ name: 'priority',
+ field: 'priority',
+ width: 70
+ }, {
+ id: 'run_at',
+ name: 'run at',
+ field: 'run_at',
+ width: 165
+ }
+ ];
+ };
+ Jobs.prototype.init = function() {
+ Jobs.__super__.init.call(this);
+ this.grid.onSelectedRowsChanged = __bind(function() {
+ var job, row, rows;
+ rows = this.grid.getSelectedRows();
+ row = (rows != null ? rows.length : void 0) === 1 ? rows[0] : -1;
+ job = this.data[rows[0]] || {};
+ return $('#show-job .show-field').each(__bind(function(idx, field) {
+ var field_name;
+ field_name = field.id.replace("job-", '');
+ return $(field).text(job[field_name]);
+ }, this));
+ }, this);
+ return this;
+ };
+ Jobs.prototype.selectAll = function() {
+ var _i, _ref, _results;
+ this.grid.setSelectedRows((function() {
+ _results = [];
+ for (var _i = 0, _ref = this.data.length; 0 <= _ref ? _i < _ref : _i > _ref; 0 <= _ref ? _i += 1 : _i -= 1){ _results.push(_i); }
+ return _results;
+ }).apply(this, arguments));
+ return this.grid.onSelectedRowsChanged();
+ };
+ Jobs.prototype.onSelected = function(action) {
+ var all_jobs, params, row;
+ params = {
+ flavor: this.options.flavor,
+ q: this.query,
+ update_action: action
+ };
+ all_jobs = this.grid.getSelectedRows().length === this.data.length;
+ if (all_jobs && action === 'destroy') {
+ if (!confirm("Are you sure you want to delete *all* jobs of this type and matching this query?")) {
+ return;
+ }
+ }
+ if (!all_jobs) {
+ params.job_ids = (function() {
+ var _i, _len, _ref, _results;
+ _ref = this.grid.getSelectedRows();
+ _results = [];
+ for (_i = 0, _len = _ref.length; _i < _len; _i++) {
+ row = _ref[_i];
+ _results.push(this.data[row].id);
+ }
+ return _results;
+ }).call(this);
+ }
+ $.ajaxJSON(this.options.batch_update_url, "POST", params, this.refresh);
+ return this.grid.setSelectedRows([]);
+ };
+ Jobs.prototype.updated = function() {
+ return $('#jobs-total').text(this.data.length);
+ };
+ return Jobs;
+ })();
+ Workers = (function() {
+ __extends(Workers, Jobs);
+ function Workers(options) {
+ Workers.__super__.constructor.call(this, options, 'running', '#running-grid');
+ }
+ Workers.prototype.build_columns = function() {
+ var cols;
+ cols = [
+ {
+ id: 'worker',
+ name: 'worker',
+ field: 'locked_by',
+ width: 175
+ }
+ ].concat(Workers.__super__.build_columns.call(this));
+ cols.pop();
+ return cols;
+ };
+ Workers.prototype.updated = function() {};
+ return Workers;
+ })();
+ Tags = (function() {
+ __extends(Tags, FlavorGrid);
+ function Tags(options) {
+ Tags.__super__.constructor.call(this, options, 'tags', '#tags-grid');
+ }
+ Tags.prototype.build_columns = function() {
+ return [
+ {
+ id: 'tag',
+ name: 'tag',
+ field: 'tag',
+ width: 200
+ }, {
+ id: 'count',
+ name: 'count',
+ field: 'count',
+ width: 50
+ }
+ ];
+ };
+ Tags.prototype.grid_options = function() {
+ return $.extend(Tags.__super__.grid_options.call(this), {
+ enableCellNavigation: false
+ });
+ };
+ return Tags;
+ })();
+ $.extend(window, {
+ Jobs: Jobs,
+ Workers: Workers,
+ Tags: Tags
+ });
+ $(document).ready(function() {
+ var search_event;
+ $('#tags-flavor').change(function() {
+ return window.tags.change_flavor($(this).val());
+ });
+ $('#jobs-flavor').change(function() {
+ return window.jobs.change_flavor($(this).val());
+ });
+ $('#jobs-refresh').click(function() {
+ return window.jobs.refresh();
+ });
+ search_event = $('#jobs-search')[0].onsearch === void 0 ? 'change' : 'search';
+ $('#jobs-search').bind(search_event, function() {
+ return window.jobs.search($(this).val());
+ });
+ $('#select-all-jobs').click(function() {
+ return window.jobs.selectAll();
+ });
+ $('#hold-jobs').click(function() {
+ return window.jobs.onSelected('hold');
+ });
+ $('#un-hold-jobs').click(function() {
+ return window.jobs.onSelected('unhold');
+ });
+ $('#delete-jobs').click(function() {
+ return window.jobs.onSelected('destroy');
+ });
+ $('#job-handler-show').click(function() {
+ $('#job-handler-wrapper').clone().dialog({
+ title: 'Job Handler',
+ width: 900,
+ height: 700,
+ modal: true
+ });
+ return false;
+ });
+ return $('#job-last_error-show').click(function() {
+ $('#job-last_error-wrapper').clone().dialog({
+ title: 'Last Error',
+ width: 900,
+ height: 700,
+ modal: true
+ });
+ return false;
+ });
+ });
+}).call(this);
View
5 script/delayed_job
@@ -1,6 +1,5 @@
#!/usr/bin/env ruby
ENV['RUNNING_AS_DAEMON'] = 'true'
-require File.expand_path(File.join(File.dirname(__FILE__), '..', 'config', 'environment'))
-require 'delayed/command'
-Delayed::Pool.run
+require(File.expand_path("../../vendor/plugins/delayed_job/lib/delayed/pool", __FILE__))
+Delayed::Pool.new.run()
View
5 vendor/plugins/delayed_job/MIT-LICENSE
@@ -1,4 +1,5 @@
-Copyright (c) 2005 Tobias Luetke
+Portions copyright (c) 2005 Tobias Luetke
+Portions copyright (c) 2011 Instructure Inc.
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
@@ -17,4 +18,4 @@ MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOa AND
NONINFRINGEMENT. IN NO EVENT SaALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
-WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
View
222 vendor/plugins/delayed_job/README.textile
@@ -1,222 +0,0 @@
-h1. Delayed::Job
-
-Delated_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background.
-
-This fork adds a 'queue' column for each job, and the ability to specify which queue a worker should pull from.
-
-It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks. Amongst those tasks are:
-
-* sending massive newsletters
-* image resizing
-* http downloads
-* updating smart collections
-* updating solr, our search server, after product changes
-* batch imports
-* spam checks
-
-h2. Installation
-
-To install as a gem, add the following to @config/environment.rb@:
-
-<pre>
-config.gem 'delayed_job'
-</pre>
-
-Rake tasks are not automatically loaded from gems, so you'll need to add the following to your Rakefile:
-
-<pre>
-begin
- require 'delayed/tasks'
-rescue LoadError
- STDERR.puts "Run `rake gems:install` to install delayed_job"
-end
-</pre>
-
-To install as a plugin: (main branch)
-
-<pre>
-script/plugin install git://github.com/collectiveidea/delayed_job.git
-</pre>
-
-To install as a plugin: (this fork)
-
-<pre>
-script/plugin install git://github.com/bracken/delayed_job.git
-</pre>
-
-After delayed_job is installed, you will need to setup the backend.
-
-h2. Backends
-
-delayed_job supports multiple backends for storing the job queue. The default is Active Record, which requires a jobs table.
-
-<pre>
-script/generate delayed_job
-rake db:migrate
-</pre>
-
-You can change the backend in an initializer:
-
-<pre>
-# config/initializers/delayed_job.rb
-Delayed::Worker.backend = :mongo
-</pre>
-
-h2. Upgrading to 1.8
-
-If you are upgrading from a previous release, you will need to generate the new @script/delayed_job@:
-
-<pre>
-script/generate delayed_job --skip-migration
-</pre>
-
-Known Issues: script/delayed_job does not work properly with anything besides the Active Record backend. That will be resolved before the next gem release.
-
-h2. Queuing Jobs
-
-Call @#send_later(method, params)@ on any object and it will be processed in the background.
-
-<pre>
-# without delayed_job
-Notifier.deliver_signup(@user)
-
-# with delayed_job
-Notifier.send_later :deliver_signup, @user
-
-# putting it in a specific queue
-Notifier.send_later_with_queue :deliver_signup, "my_queue", @user
-</pre>
-
-If a method should always be run in the background, you can call @#handle_asynchronously@ after the method declaration:
-
-<pre>
-class Device
- def deliver
- # long running method
- end
- handle_asynchronously :deliver
- #handle_asynchronously_with_queue :deliver, "my_queue"
-end
-
-device = Device.new
-device.deliver
-</pre>
-
-h2. Running Jobs
-
-@script/delayed_job@ can be used to manage a background process which will start working off jobs.
-
-<pre>
-$ RAILS_ENV=production script/delayed_job start
-$ RAILS_ENV=production script/delayed_job stop
-
-# Runs two workers in separate processes.
-$ RAILS_ENV=production script/delayed_job -n 2 start
-$ RAILS_ENV=production script/delayed_job stop
-
-# Specify the queue the worker should work out of with -q/--queue=QUEUE_NAME
-$ RAILS_ENV=production script/delayed_job start --queue=my_queue
-$ RAILS_ENV=production script/delayed_job stop
-
-# Specify a name to append to the process name -p/--process-name=NAME
-# the process name will be "delayed_job_#{NAME}"
-$ RAILS_ENV=production script/delayed_job start -p custom_name
-$ RAILS_ENV=production script/delayed_job stop
-</pre>
-
-Workers can be running on any computer, as long as they have access to the database and their clock is in sync. Keep in mind that each worker will check the database at least every 5 seconds.
-
-You can also invoke @RAILS_ENV=development rake jobs:work@ which will start working off jobs. You can cancel the rake task with @CTRL-C@.
-You can specify the queue for the rake task: @RAILS_ENV=development rake jobs:work[my_queue]@, or: @RAILS_ENV=development QUEUE=my_queue rake jobs:work@
-
-h2. Custom Jobs
-
-Jobs are simple ruby objects with a method called perform. Any object which responds to perform can be stuffed into the jobs table. Job objects are serialized to yaml so that they can later be resurrected by the job runner.
-
-<pre>
-class NewsletterJob < Struct.new(:text, :emails)
- def perform
- emails.each { |e| NewsletterMailer.deliver_text_to_email(text, e) }
- end
-end
-
-Delayed::Job.enqueue(NewsletterJob.new('lorem ipsum...', Customers.find(:all).collect(&:email)), :queue=>"my_queue")
-</pre>
-
-h2. Gory Details
-
-The library evolves around a delayed_jobs table which looks as follows:
-
-<pre>
- create_table :delayed_jobs, :force => true do |table|
- table.integer :priority, :default => 0 # Allows some jobs to jump to the front of the queue
- table.integer :attempts, :default => 0 # Provides for retries, but still fail eventually.
- table.text :handler # YAML-encoded string of the object that will do work
- table.text :last_error # reason for last failure (See Note below)
- table.text :queue, :default => nil # The queue that this job is in
- table.datetime :run_at # When to run. Could be Time.zone.now for immediately, or sometime in the future.
- table.datetime :locked_at # Set when a client is working on this object
- table.datetime :failed_at # Set when all retries have failed (actually, by default, the record is deleted instead)
- table.string :locked_by # Who is working on this object (if locked)
- table.timestamps
- end
-</pre>
-
-On failure, the job is scheduled again in 5 seconds + N ** 4, where N is the number of retries.
-
-The default Worker.max_attempts is 25. After this, the job either deleted (default), or left in the database with "failed_at" set.
-With the default of 25 attempts, the last retry will be 20 days later, with the last interval being almost 100 hours.
-
-The default Worker.max_run_time is 4.hours. If your job takes longer than that, another computer could pick it up. It's up to you to
-make sure your job doesn't exceed this time. You should set this to the longest time you think the job could take.
-
-By default, it will delete failed jobs (and it always deletes successful jobs). If you want to keep failed jobs, set
-Delayed::Worker.destroy_failed_jobs = false. The failed jobs will be marked with non-null failed_at.
-
-Here is an example of changing job parameters in Rails:
-
-<pre>
-# config/initializers/delayed_job_config.rb
-Delayed::Worker.destroy_failed_jobs = false
-Delayed::Worker.sleep_delay = 60
-Delayed::Worker.max_attempts = 3
-Delayed::Worker.max_run_time = 5.minutes
-Delayed::Worker.queue = "my_queue"
-</pre>
-
-h3. Cleaning up
-
-You can invoke @rake jobs:clear@ to delete all jobs in the queue.
-To only delete jobs for a specific queue run @rake jobs:clear[queue_name]@.
-
-h2. Mailing List
-
-Join us on the mailing list at http://groups.google.com/group/delayed_job
-
-h2. How to contribute
-
-If you find what looks like a bug:
-
-# Check the GitHub issue tracker to see if anyone else has had the same issue.
- http://github.com/collectiveidea/delayed_job/issues/
-# If you don't see anything, create an issue with information on how to reproduce it.
-
-If you want to contribute an enhancement or a fix:
-
-# Fork the project on github.
- http://github.com/collectiveidea/delayed_job/
-# Make your changes with tests.
-# Commit the changes without making changes to the Rakefile, VERSION, or any other files that aren't related to your enhancement or fix
-# Send a pull request.
-
-h3. Changes
-
-* 1.7.0: Added failed_at column which can optionally be set after a certain amount of failed job attempts. By default failed job attempts are destroyed after about a month.
-
-* 1.6.0: Renamed locked_until to locked_at. We now store when we start a given job instead of how long it will be locked by the worker. This allows us to get a reading on how long a job took to execute.
-
-* 1.5.0: Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us to use pessimistic locking instead of relying on row level locks. This enables us to run as many worker processes as we need to speed up queue processing.
-
-* 1.2.0: Added #send_later to Object for simpler job creation
-
-* 1.0.0: Initial release
View
44 vendor/plugins/delayed_job/lib/delayed/backend/active_record.rb
@@ -28,6 +28,20 @@ class Job < ::ActiveRecord::Base
cattr_accessor :use_row_locking
self.use_row_locking = false
+ named_scope :current, lambda {
+ { :conditions => ["run_at <= ? AND failed_at IS NULL", db_time_now] }
+ }
+
+ named_scope :future, lambda {
+ { :conditions => ["run_at > ? AND failed_at IS NULL", db_time_now] }
+ }
+
+ named_scope :failed, :conditions => ["failed_at IS NOT NULL"]
+
+ named_scope :not_running, :conditions => ["locked_at is NULL"]
+
+ named_scope :running, :conditions => ["locked_at is NOT NULL and locked_by <> 'on hold'"]
+
named_scope :ready_to_run, lambda {|worker_name, max_run_time|
{:conditions => ['(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL', db_time_now, db_time_now - max_run_time, worker_name]}
}
@@ -39,10 +53,12 @@ def self.clear_locks!(worker_name)
end
def self.get_and_lock_next_available(worker_name,
- max_run_time = Worker.max_run_time,
- queue = nil)
+ max_run_time,
+ queue = nil,
+ min_priority = nil,
+ max_priority = nil)
if self.use_row_locking
- scope = self.all_available(worker_name, max_run_time, queue)
+ scope = self.all_available(worker_name, max_run_time, queue, min_priority, max_priority)
::ActiveRecord::Base.silence do
# it'd be a big win to do this in a DB function and avoid the extra
@@ -57,7 +73,7 @@ def self.get_and_lock_next_available(worker_name,
end
end
else
- job = find_available(worker_name, 5, max_run_time, queue).detect do |job|
+ job = find_available(worker_name, 5, max_run_time, queue, min_priority, max_priority).detect do |job|
if job.lock_exclusively!(max_run_time, worker_name)
true
else
@@ -69,18 +85,22 @@ def self.get_and_lock_next_available(worker_name,
end
def self.find_available(worker_name,
- limit = 5,
- max_run_time = Worker.max_run_time,
- queue = nil)
- all_available(worker_name, max_run_time, queue).all(:limit => limit)
+ limit,
+ max_run_time,
+ queue = nil,
+ min_priority = nil,
+ max_priority = nil)
+ all_available(worker_name, max_run_time, queue, min_priority, max_priority).all(:limit => limit)
end
def self.all_available(worker_name,
- max_run_time = Worker.max_run_time,
- queue = nil)
+ max_run_time,
+ queue = nil,
+ min_priority = nil,
+ max_priority = nil)
scope = self.ready_to_run(worker_name, max_run_time)
- scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
- scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
+ scope = scope.scoped(:conditions => ['priority >= ?', min_priority]) if min_priority
+ scope = scope.scoped(:conditions => ['priority <= ?', max_priority]) if max_priority
scope = scope.scoped(:conditions => ['queue = ?', queue]) if queue
scope = scope.scoped(:conditions => ['queue is null']) unless queue
scope.by_priority
View
61 vendor/plugins/delayed_job/lib/delayed/backend/base.rb
@@ -3,11 +3,16 @@ module Backend
class DeserializationError < StandardError
end
+ class RecordNotFound < DeserializationError
+ end
+
module Base
+ ON_HOLD_COUNT = 50
+
def self.included(base)
base.extend ClassMethods
end
-
+
module ClassMethods
# Add a job to the queue
# The first argument should be an object that respond_to?(:perform)
@@ -19,16 +24,14 @@ def enqueue(*args)
unless object.respond_to?(:perform)
raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end
-
+
options = args.first || {}
options[:priority] ||= self.default_priority
options[:payload_object] = object
options[:queue] ||= Delayed::Worker.queue
self.create(options)
end
end
-
- ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
def failed?
failed_at
@@ -50,6 +53,15 @@ def name
end
end
+ def full_name
+ obj = payload_object rescue nil
+ if obj && obj.respond_to?(:full_name)
+ obj.full_name
+ else
+ name
+ end
+ end
+
def payload_object=(object)
self['handler'] = object.to_yaml
self['tag'] = if object.respond_to?(:tag)
@@ -60,12 +72,12 @@ def payload_object=(object)
"#{object.class}#perform"
end
end
-
+
# Moved into its own method so that new_relic can trace it.
def invoke_job
payload_object.perform
end
-
+
# Unlock this job (note: not saved to DB)
def unlock
self.locked_at = nil
@@ -89,19 +101,28 @@ def reschedule_at
new_time
end
- private
+ def hold!
+ self.locked_by = 'on hold'
+ self.locked_at = Time.now
+ self.attempts = ON_HOLD_COUNT
+ self.failed_at = Time.now
+ self.save!
+ end
- def deserialize(source)
- handler = YAML.load(source) rescue nil
+ def unhold!
+ self.locked_by = nil
+ self.locked_at = nil
+ self.attempts = 0
+ self.run_at = Time.now
+ self.failed_at = nil
+ self.last_error = nil
+ self.save!
+ end
- unless handler.respond_to?(:perform)
- if handler.nil? && source =~ ParseObjectFromYaml
- handler_class = $1
- end
- attempt_to_load(handler_class || handler.class)
- handler = YAML.load(source)
- end
+ private
+ def deserialize(source)
+ handler = YAML.load(source)
return handler if handler.respond_to?(:perform)
raise DeserializationError,
@@ -111,18 +132,12 @@ def deserialize(source)
"Job failed to load: #{e.message}. Try to manually require the required file."
end
- # Constantize the object so that ActiveSupport can attempt
- # its auto loading magic. Will raise LoadError if not successful.
- def attempt_to_load(klass)
- klass.constantize
- end
-
protected
def before_save
self.run_at ||= self.class.db_time_now
end
-
+
end
end
end
View
82 vendor/plugins/delayed_job/lib/delayed/command.rb
@@ -1,82 +0,0 @@
-require 'rubygems'
-require 'daemons'
-require 'optparse'
-
-module Delayed
- class Command
- attr_accessor :worker_count, :process_name
-
- def initialize(args)
- @files_to_reopen = []
- @options = {:quiet => true}
-
- @worker_count = 1
-
- opts = OptionParser.new do |opts|
- opts.banner = "Usage: #{File.basename($0)} [options] start|stop|restart|run"
-
- opts.on('-h', '--help', 'Show this message') do
- puts opts
- exit 1
- end
- opts.on('-e', '--environment=NAME', 'Specifies the environment to run this delayed jobs under (test/development/production).') do |e|
- STDERR.puts "The -e/--environment option has been deprecated and has no effect. Use RAILS_ENV and see http://github.com/collectiveidea/delayed_job/issues/#issue/7"
- end
- opts.on('--min-priority N', 'Minimum priority of jobs to run.') do |n|
- @options[:min_priority] = n
- end
- opts.on('--max-priority N', 'Maximum priority of jobs to run.') do |n|
- @options[:max_priority] = n
- end
- opts.on('-n', '--number_of_workers=workers', "Number of unique workers to spawn") do |worker_count|
- @worker_count = worker_count.to_i rescue 1
- end
- opts.on('-p', '--process-name=NAME', "The name to append to the process name. eg. delayed_job_NAME") do |process_name|
- @process_name = process_name
- end
- opts.on('-q', '--queue=QUEUE_NAME', "The name of the queue for the workers to pull work from") do |queue|
- @options[:queue] = queue
- end
- end
- @args = opts.parse!(args)
- end
-
- def daemonize
- ObjectSpace.each_object(File) do |file|
- @files_to_reopen << file unless file.closed?
- end
-
- worker_count.times do |worker_index|
- base_name = @process_name ? "delayed_job_#{@process_name}" : "delayed_job"
- process_name = worker_count == 1 ? base_name : "#{base_name}.#{worker_index}"
- Daemons.run_proc(process_name, :dir => "#{RAILS_ROOT}/tmp/pids", :dir_mode => :normal, :ARGV => @args) do |*args|
- run process_name
- end
- end
- end
-
- def run(worker_name = nil)
- Dir.chdir(RAILS_ROOT)
-
- # Re-open file handles
- @files_to_reopen.each do |file|
- begin
- file.reopen File.join(RAILS_ROOT, 'log', 'delayed_job.log'), 'a+'
- file.sync = true
- rescue ::Exception
- end
- end
-
- ActiveRecord::Base.connection.reconnect!
-
- worker = Delayed::Worker.new(@options)
- worker.name_prefix = "#{worker_name} "
- worker.start
- rescue => e
- Rails.logger.fatal e
- STDERR.puts e.message
- exit 1
- end
-
- end
-end
View
100 vendor/plugins/delayed_job/lib/delayed/performable_method.rb
@@ -1,113 +1,35 @@
-YAML.add_ruby_type("object:Module") do |type, val|
- val.constantize
-end
-
-YAML.add_ruby_type("object:Class") do |type, val|
- val.constantize
-end
-
-class Module
- def to_yaml(opts = {})
- YAML.quick_emit(self.object_id, opts) do |out|
- out.scalar(taguri, name)
- end
- end
-
- def load_for_delayed_job(arg)
- self
- end
-end
-
-class Class
- def to_yaml(opts = {})
- YAML.quick_emit(self.object_id, opts) do |out|
- out.scalar(taguri, name)
- end
- end
-
- def load_for_delayed_job(arg)
- self
- end
-end
-
-class ActiveRecord::Base
- yaml_as "tag:ruby.yaml.org,2002:ActiveRecord"
-
- def to_yaml(opts = {})
- if id.nil?
- raise("Can't serialize unsaved ActiveRecord object for delayed job: #{self.inspect}")
- end
- YAML.quick_emit(self.object_id, opts) do |out|
- out.scalar(taguri, id.to_s)
- end
- end
-
- def self.yaml_new(klass, tag, val)
- klass.find(val)
- rescue ActiveRecord::RecordNotFound
- raise Delayed::Backend::DeserializationError, "Couldn't find #{klass} with id #{val.inspect}"
- end
-end
-
-class Module
- def yaml_tag_read_class(name)
- name.constantize
- name
- end
-end
-
module Delayed
class PerformableMethod < Struct.new(:object, :method, :args)
- STRING_FORMAT = /^LOAD\;([A-Z][\w\:]+)(?:\;(\w+))?$/
-
attr_accessor :tag
-
- class LoadError < StandardError
- end
def initialize(object, method, args = [])
raise NoMethodError, "undefined method `#{method}' for #{object.inspect}" unless object.respond_to?(method)
- self.object = dump(object)
- self.args = args.map { |a| dump(a) }
+ self.object = object
+ self.args = args
self.method = method.to_sym
self.tag = display_name
end
-
+
def display_name
- if STRING_FORMAT === object
- "#{$1}#{$2 ? '#' : '.'}#{method}"
- elsif object.is_a?(Module)
+ if object.is_a?(Module)
"#{object}.#{method}"
else
"#{object.class}##{method}"
end
end
-
- def perform
- live_object.send(method, *args.map{|a| load(a)})
- end
- def live_object
- @live_object ||= load(object)
+ def perform
+ object.send(method, *args)
end
- private
-
- def load(obj)
- if STRING_FORMAT === obj
- $1.constantize.load_for_delayed_job($2)
- else
- obj
+ def full_name
+ obj_name = object.is_a?(ActiveRecord::Base) ? "#{object.class}.find(#{object.id}).#{method}" : display_name
+ arg_names = args.map do |a|
+ a.is_a?(ActiveRecord::Base) ? "#{a.class}.find(#{a.id})" : a.inspect
end
- rescue => e
- Delayed::Worker.logger.warn "Could not load object for job: #{e.message}"
- raise PerformableMethod::LoadError
- end
-
- def dump(obj)
- obj
+ "#{obj_name}(#{arg_names.join(", ")})"
end
end
end
View
323 vendor/plugins/delayed_job/lib/delayed/pool.rb
@@ -1,181 +1,218 @@
-require 'rubygems'
-require 'daemons'
-
-# The config/delayed_jobs.yml file has a format like:
-#
-# production:
-# workers:
-# - queue: normal
-# workers: 2
-# max_priority: 10
-# - queue: normal
-# workers: 2
-#
-# default:
-# workers:
-# - queue: normal
-# workers: 5
-# - periodic: config/periodic_jobs.rb
-#
-# If a "periodic" worker is not specified, rufus-scheduler is not required
+require 'optparse'
+require 'yaml'
+require 'fileutils'
module Delayed
- class Pool
- attr_accessor :workers
+class Pool
- def self.run
- if GC.respond_to?(:copy_on_write_friendly=)
- GC.copy_on_write_friendly = true
- end
- self.new(Rails.root+"config/delayed_jobs.yml").daemonize
+ attr_reader :options, :workers
+
+ def initialize(args = ARGV)
+ @args = args
+ @workers = {}
+ @options = {
+ :config_file => expand_rails_path("config/delayed_jobs.yml"),
+ :pid_folder => expand_rails_path("tmp/pids"),
+ }
+ end
+
+ def run
+ if GC.respond_to?(:copy_on_write_friendly=)
+ GC.copy_on_write_friendly = true
end
- def logger
- RAILS_DEFAULT_LOGGER
+ OptionParser.new do |opts|
+ opts.banner = "Usage #{$0} <command> <options>"
+ opts.separator %{\nWhere <command> is one of:
+ start start the jobs daemon
+ stop stop the jobs daemon
+ run start and run in the foreground
+ restart stop and then start the jobs daemon
+ status show daemon status
+}
+
+ opts.separator "\n<options>"
+ opts.on("-c", "--config", "Use alternate config file (default #{options[:config_file]})") { |c| options[:config_file] = c }
+ opts.on("-p", "--pid", "Use alternate folder for PID files (default #{options[:pid_folder]})") { |p| options[:pid_folder] = p }
+ opts.on_tail("-h", "--help", "Show this message") { puts opts; exit }
+ end.parse!(@args)
+
+ command = @args.shift
+ case command
+ when 'start'
+ daemonize
+ start
+ when 'stop'
+ stop
+ when 'run'
+ start
+ when 'status'
+ status
+ when 'restart'
+ stop if status(false)
+ sleep(0.5) while status(false)
+ daemonize
+ start
+ else
+ raise("Unknown command: #{command.inspect}")
end
+ end
- def initialize(config_filename)
- @workers = {}
- config = YAML.load_file(config_filename)
- @config = (environment && config[environment]) || config['default']
- # Backwards compatibility from when the config was just an array of queues
- @config = { :workers => @config } if @config.is_a?(Array)
- @config = @config.with_indifferent_access
- unless @config && @config.is_a?(Hash)
- raise ArgumentError,
- "Invalid config file #{config_filename}"
- end
- logger.auto_flushing = true if logger.respond_to?(:auto_flushing)
- Worker::Settings.each do |setting|
- Worker.send("#{setting}=", @config[setting.to_s]) if @config.key?(setting.to_s)
- end
+ protected
+
+ def start
+ load_rails
+ tail_rails_log unless @daemon
+
+ say "Started job master", :info
+ read_config(options[:config_file])
+ spawn_all_workers
+ say "Workers spawned"
+ join
+ say "Shutting down"
+ rescue Interrupt => e
+ say "Signal received, exiting", :info
+ rescue Exception => e
+ say "Job master died with error: #{e.inspect}\n#{e.backtrace.join("\n")}", :fatal
+ raise
+ ensure
+ remove_pid_file
+ end
+
+ def say(msg, level = :debug)
+ if defined?(Rails)
+ Rails.logger.send(level, "[#{Process.pid}]P #{msg}")
+ else
+ puts(msg)
end
+ end
+
+ def load_rails
+ require(expand_rails_path("config/environment.rb"))
+ Dir.chdir(Rails.root)
+ end
- def environment
- RAILS_ENV
+ def spawn_all_workers
+ ActiveRecord::Base.connection_handler.clear_all_connections!
+
+ @config[:workers].each do |worker_config|
+ worker_config = worker_config.with_indifferent_access
+ (worker_config[:workers] || 1).times { spawn_worker(worker_config) }
end
+ end
- def daemonize
- @files_to_reopen = []
- ObjectSpace.each_object(File) do |file|
- @files_to_reopen << file unless file.closed?
- end
- Daemons.run_proc('delayed_jobs_pool',
- :dir => "#{Rails.root}/tmp/pids",
- :dir_mode => :normal) do
-
- Dir.chdir(Rails.root)
- log_path = Rails.root+"log/delayed_job.log"
-
- # Re-open file handles
- @files_to_reopen.each do |file|
- begin
- file.reopen(log_path, 'a+')
- file.sync = true
- rescue ::Exception
- end
- end
-
- if $0 == 'delayed_jobs_pool'
- # TODO: Daemons library doesn't provide a great way to redirect
- # stdout/stderr to an arbitrary log file, and it also doesn't provid
- # a great way to detect if we actually daemonized (or if we're
- # running in the FG).
- STDOUT.reopen(log_path, 'a+')
- STDERR.reopen(STDOUT)
- STDOUT.sync = STDERR.sync = true
- end
-
- ActiveRecord::Base.connection_handler.clear_all_connections!
-
- start
- join
- end
+ def spawn_worker(worker_config)
+ if worker_config[:periodic]
+ return # backwards compat
+ else
+ queue = worker_config[:queue] || Delayed::Worker.queue
+ worker = Delayed::Worker.new(Process.pid, worker_config)
end
- def start
- spawn_all_workers
- say "**** started master at PID: #{Process.pid}"
+ pid = fork do
+ Delayed::Job.connection.reconnect!
+ worker.start
end
+ workers[pid] = worker
+ end
- def join
- begin
- loop do
- child = Process.wait
- if child
- worker = delete_worker(child)
- say "**** child died: #{child}, restarting"
- spawn_worker(worker.config)
- end
- end
- rescue Errno::ECHILD
+ def join
+ loop do
+ child = Process.wait
+ if child
+ worker = workers.delete(child)
+ say "child exited: #{child}, restarting", :info
+ spawn_worker(worker.config)
end
- say "**** all children killed. exiting"
end
+ end
- def spawn_all_workers
- @config[:workers].each do |worker_config|
- worker_config = worker_config.with_indifferent_access
- (worker_config[:workers] || 1).times { spawn_worker(worker_config) }
+ def tail_rails_log
+ Rails.logger.auto_flushing = true if Rails.logger.respond_to?(:auto_flushing=)
+ Thread.new do
+ f = File.open(Rails.configuration.log_path.presence || (Rails.root+"log/#{Rails.env}.log"), 'r')
+ f.seek(0, IO::SEEK_END)
+ loop do
+ content = f.read
+ content.present? ? STDOUT.print(content) : sleep(0.5)
end
end
+ end
- def spawn_worker(worker_config)
- if worker_config[:queue]
- worker_config[:max_priority] ||= nil
- worker_config[:min_priority] ||= nil
- worker = Delayed::PoolWorker.new(worker_config)
- elsif worker_config[:periodic]
- worker = Delayed::PeriodicChildWorker.new(worker_config)
- else
- raise "invalid worker type in config: #{worker_config}"
- end
+ def daemonize
+ FileUtils.mkdir_p(pid_folder)
+ puts "Daemonizing..."
- pid = fork do
- worker.start
- end
- workers[pid] = worker
- end
+ exit if fork
+ Process.setsid
+ exit if fork
- def delete_worker(child)
- worker = workers.delete(child)
- return worker if worker
- say "whoaaa wtf this child isn't known: #{child}"
- end
+ @daemon = true
+ File.open(pid_file, 'wb') { |f| f.write(Process.pid.to_s) }
- def say(text, level = Logger::INFO)
- if logger
- logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}"
- else
- puts text
- end
- end
+ # if we blow up so badly that we can't syslog the error, it has to go to
+ # log/delayed_jobs.log
+ last_ditch_logfile = expand_rails_path("log/delayed_jobs.log")
+ STDIN.reopen("/dev/null")
+ STDOUT.reopen(last_ditch_logfile, 'a')
+ STDERR.reopen(STDOUT)
+ STDOUT.sync = STDERR.sync = true
+ end
+ def pid_folder
+ options[:pid_folder]
end
- module ChildWorker
- # must be called before forking
- def initialize(*args)
- @parent_pid = Process.pid
- super
- end
+ def pid_file
+ File.join(pid_folder, 'delayed_jobs_pool.pid')
+ end
- def exit?
- super || parent_exited?
+ def remove_pid_file
+ return unless @daemon
+ pid = File.read(pid_file) if File.file?(pid_file)
+ if pid.to_i == Process.pid
+ FileUtils.rm(pid_file)
end
+ end
- private
+ def stop
+ pid = File.read(pid_file) if File.file?(pid_file)
+ if pid.to_i > 0
+ puts "Stopping pool #{pid}..."
+ Process.kill('INT', pid.to_i)
+ else
+ status
+ end
+ end
- def parent_exited?
- @parent_pid && @parent_pid != Process.ppid
+ def status(print = true)
+ pid = File.read(pid_file) if File.file?(pid_file)
+ if pid
+ puts "Delayed jobs running, pool PID: #{pid}" if print
+ else
+ puts "No delayed jobs pool running" if print
end
+ pid.to_i > 0 ? pid.to_i : nil
end
- class PoolWorker < Delayed::Worker
- include ChildWorker
+ def read_config(config_filename)
+ config = YAML.load_file(config_filename)
+ @config = config[Rails.env] || config['default']
+ # Backwards compatibility from when the config was just an array of queues
+ @config = { :workers => @config } if @config.is_a?(Array)
+ @config = @config.with_indifferent_access
+ unless @config && @config.is_a?(Hash)
+ raise ArgumentError,
+ "Invalid config file #{config_filename}"
+ end
+ Worker::Settings.each do |setting|
+ Worker.send("#{setting}=", @config[setting.to_s]) if @config.key?(setting.to_s)
+ end
end
- class PeriodicChildWorker < Delayed::PeriodicWorker
- include ChildWorker
+ def expand_rails_path(path)
+ File.expand_path("../../../../../../#{path}", __FILE__)
end
+
+end
end
View
31 vendor/plugins/delayed_job/lib/delayed/recipes.rb
@@ -1,31 +0,0 @@
-# Capistrano Recipes for managing delayed_job
-#
-# Add these callbacks to have the delayed_job process restart when the server
-# is restarted:
-#
-# after "deploy:stop", "delayed_job:stop"
-# after "deploy:start", "delayed_job:start"
-# after "deploy:restart", "delayed_job:restart"
-
-Capistrano::Configuration.instance.load do
- namespace :delayed_job do
- def rails_env
- fetch(:rails_env, false) ? "RAILS_ENV=#{fetch(:rails_env)}" : ''
- end
-
- desc "Stop the delayed_job process"
- task :stop, :roles => :app do
- run "cd #{current_path};#{rails_env} script/delayed_job stop"
- end
-
- desc "Start the delayed_job process"
- task :start, :roles => :app do
- run "cd #{current_path};#{rails_env} script/delayed_job start"
- end
-
- desc "Restart the delayed_job process"
- task :restart, :roles => :app do
- run "cd #{current_path};#{rails_env} script/delayed_job restart"
- end
- end
-end
View
331 vendor/plugins/delayed_job/lib/delayed/worker.rb
@@ -1,259 +1,146 @@
-require 'timeout'
-
module Delayed
- class TimeoutError < RuntimeError; end
- class WorkerBase
- cattr_accessor :logger
- attr_reader :config
+class TimeoutError < RuntimeError; end
- self.logger = if defined?(Merb::Logger)
- Merb.logger
- elsif defined?(RAILS_DEFAULT_LOGGER)
- RAILS_DEFAULT_LOGGER
- end
+class Worker
- def initialize(options={})
- @config = options
- end
+ Settings = [ :max_run_time, :max_attempts ]
+ cattr_accessor :queue, *Settings
- def exit?
- @exit
- end
+ self.max_attempts = 15
+ self.max_run_time = 4.hours
+ self.queue = "canvas_queue"
- def worker_type_string
- ""
- end
+ attr_reader :config, :queue, :min_priority, :max_priority, :sleep_delay
- def say(text, level = Logger::INFO)
- puts text unless @quiet
- logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger
- end
+ # Callback to fire when a delayed job fails max_attempts times. If this
+ # callback is defined, then the value of destroy_failed_jobs is ignored, and
+ # the job is destroyed if this block returns true.
+ #
+ # This allows for destroying "uninteresting" failures, while keeping around
+ # interesting failures to be investigated later.
+ #
+ # The block is called with args(job, last_exception)
+ def self.on_max_failures=(block)
+ @@on_max_failures = block
+ end
+ cattr_reader :on_max_failures
+
+ def initialize(parent_pid, options = {})
+ @parent_pid = parent_pid
+ @config = options
+ @exit = false
+ @queue = options[:queue] || self.class.queue
+ @min_priority = options[:min_priority]
+ @max_priority = options[:max_priority]
+ @sleep_delay = Setting.get('delayed_jobs_sleep_delay', '1.0').to_f
+ end
- def procline(string)
- $0 = "#{worker_type_string}:#{string}"
- say "* (#{Process.pid}) #{string}"
- end
+ def name
+ @name ||= "#{Socket.gethostname rescue "X"}:#{Process.pid}"
end
- class Worker < WorkerBase
- Settings = [ :max_attempts, :max_run_time, :sleep_delay, ]
- cattr_accessor *Settings
- cattr_accessor :min_priority, :max_priority, :queue, :cant_fork
- self.sleep_delay = 5
- self.max_attempts = 25
- self.max_run_time = 4.hours
- self.queue = nil
+ def exit?
+ @exit || parent_exited?
+ end
- attr_reader :config
+ def parent_exited?
+ @parent_pid != Process.ppid
+ end
- # By default failed jobs are destroyed after too many attempts. If you want to keep them around
- # (perhaps to inspect the reason for the failure), set this to false.
- cattr_accessor :destroy_failed_jobs
- self.destroy_failed_jobs = true
-
- # name_prefix is ignored if name is set directly
- attr_accessor :name_prefix, :queue
-
- cattr_reader :backend
-
- def self.backend=(backend)
- if backend.is_a? Symbol
- require "delayed/backend/#{backend}"
- backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
- end
- @@backend = backend
- silence_warnings { ::Delayed.const_set(:Job, backend) }
- end
+ def start
+ say "Starting job worker", :info
- # Callback to fire when a delayed job fails max_attempts times. If this
- # callback is defined, then the value of destroy_failed_jobs is ignored, and
- # the job is destroyed if this block returns true.
- #
- # This allows for destroying "uninteresting" failures, while keeping around
- # interesting failures to be investigated later.
- #
- # The block is called with args(job, last_exception)
- def self.on_max_failures=(block)
- @@on_max_failures = block
- end
- cattr_reader :on_max_failures
+ trap('INT') { say 'Exiting'; @exit = true }
- def initialize(options={})
- super
- @quiet = options[:quiet]
- @queue = options[:queue] || self.class.queue
- self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
- self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
- @already_retried = false
+ loop do
+ run
+ break if exit?
end
+ ensure
+ Delayed::Job.clear_locks!(name)
+ end
- # Every worker has a unique name which by default is the pid of the process. There are some
- # advantages to overriding this with something which survives worker retarts: Workers can#
- # safely resume working on tasks which are locked by themselves. The worker will assume that
- # it crashed before.
- def name
- return @name unless @name.nil?
- "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
- end
+ protected
- # Sets the name of the worker.
- # Setting the name to nil will reset the default worker name
- def name=(val)
- @name = val
+ def run
+ job = nil
+ Rails.logger.silence do
+ job = Delayed::Job.get_and_lock_next_available(
+ name,
+ self.class.max_run_time,
+ queue,
+ min_priority,
+ max_priority)
end
- def priority_string
- min_priority = self.class.min_priority || 0
- max_priority = self.class.max_priority
- "#{min_priority}:#{max_priority || "max"}"
- end
-
- def start(exit_when_queues_empty = false)
- enable_gc_optimizations
- @exit = false
-
- say "*** Starting job worker #{name}"
-
- trap('TERM') { say 'Exiting...'; @exit = true }
- trap('INT') { say 'Exiting...'; @exit = true }
-
- waiting = false # avoid logging "waiting for queue" over and over
-
- loop do
- job = Delayed::Job.get_and_lock_next_available(name,
- self.class.max_run_time,
- @queue)
- if job
- waiting = false
- start_time = Time.now
- unless self.class.cant_fork
- @child = fork do
- run(job, start_time)
- end
- procline "watch: #{@child}:#{start_time.to_i}"
- Process.wait
- # establish a new connection to the job queue, since when the child
- # exits it disconnects the parent as well.
- # the parent shouldn't ever access any table but delayed_jobs, so
- # we don't have to worry about other connections here.
- Delayed::Job.connection.reconnect!
- else
- run(job, start_time)
- end
- elsif exit_when_queues_empty
- break
- else
- procline("wait:#{@queue}:#{priority_string}") unless waiting
- waiting = true
- sleep(@@sleep_delay)
- end
-
- break if exit?
- end
-
- ensure
- Delayed::Job.clear_locks!(name)
+ if job
+ perform(job)
+ else
+ sleep(sleep_delay)
end
+ end
- def run(job, start_time = Time.now)
- procline "run: #{job.name}:#{start_time.to_i}"
- runtime = Benchmark.realtime do
- Timeout.timeout(self.class.max_run_time.to_i, Delayed::TimeoutError) { job.invoke_job }
+ def perform(job)
+ start_time = Time.now
+ say_job(job, "Processing #{log_job(job, :long)}", :info)
+ runtime = Benchmark.realtime do
+ Timeout.timeout(self.class.max_run_time.to_i, Delayed::TimeoutError) { job.invoke_job }
+ Rails.logger.silence do
job.destroy
end
- # TODO: warn if runtime > max_run_time ?
- say "* [JOB] #{name} completed after %.4f" % runtime
- return true # did work
- rescue Exception => e
- handle_failed_job(job, e)
- return false # work failed
end
+ say_job(job, "Completed #{log_job(job)} %.0fms" % (runtime * 1000), :info)
+ rescue Exception => e
+ handle_failed_job(job, e)
+ end
- # Reschedule the job in the future (when a job fails).
- # Uses an exponential scale depending on the number of failed attempts.
- def reschedule(job, error = nil, time = nil)
- job.attempts += 1
- if job.attempts >= (job.max_attempts || self.class.max_attempts)
- job.failed_at = Delayed::Job.db_time_now
- if self.class.on_max_failures
- destroy_job = self.class.on_max_failures.call(job, error)
- else
- destroy_job = self.class.destroy_failed_jobs
- end
- if destroy_job
- say "* [JOB] PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO
- job.destroy
- return
- end
- end
-
- # still reschedule even if the job has failed max_attempts times -- maybe
- # somebody will increase max_attempts later
- time ||= job.reschedule_at
- job.run_at = time
- job.unlock
- job.save!
- end
+ def handle_failed_job(job, error)
+ job.last_error = error.message + "\n" + error.backtrace.join("\n")
+ say_job(job, "Failed with #{error.class} [#{error.message}] (#{job.attempts} attempts)", :error)
+ reschedule(job, error)
+ end
- # Enables GC Optimizations if you're running REE.
- # http://www.rubyenterpriseedition.com/faq.html#adapt_apps_for_cow
- def enable_gc_optimizations
- if GC.respond_to?(:copy_on_write_friendly=)
- GC.copy_on_write_friendly = true
+ # Reschedule the job in the future (when a job fails).
+ # Uses an exponential scale depending on the number of failed attempts.
+ def reschedule(job, error = nil, time = nil)
+ job.attempts += 1
+ if job.attempts >= (job.max_attempts || self.class.max_attempts)
+ job.failed_at = Delayed::Job.db_time_now
+ destroy_job = true
+ if self.class.on_max_failures
+ destroy_job = self.class.on_max_failures.call(job, error)
+ end
+ if destroy_job
+ say "destroying #{job.name} because of #{job.attempts} consecutive failures.", :info
+ job.destroy
end
end
- protected
-
- def handle_failed_job(job, error)
- job.last_error = error.message + "\n" + error.backtrace.join("\n")
- say "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR
- reschedule(job, error)
- end
-
- def worker_type_string
- "delayed"
- end