Skip to content

Commit

Permalink
Merge 546dd99 into 41990aa
Browse files Browse the repository at this point in the history
  • Loading branch information
afrase committed Mar 24, 2021
2 parents 41990aa + 546dd99 commit b7033ba
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 76 deletions.
6 changes: 0 additions & 6 deletions lib/sidekiq-scheduler/manager.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
require 'redis'

require 'sidekiq/util'

require 'sidekiq-scheduler/schedule'
require 'sidekiq-scheduler/scheduler'

Expand Down Expand Up @@ -37,10 +35,6 @@ def start
@scheduler_instance.load_schedule!
end

def reset
clear_scheduled_work
end

private

def load_scheduler_options(options)
Expand Down
40 changes: 24 additions & 16 deletions lib/sidekiq-scheduler/redis_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module RedisManager
#
# @return [String] schedule in JSON format
def self.get_job_schedule(name)
hget(:schedules, name)
hget('schedules', name)
end

# Returns the state of a given job
Expand Down Expand Up @@ -44,7 +44,7 @@ def self.get_job_last_time(name)
# @param [String] name The name of the job
# @param [Hash] config The new schedule for the job
def self.set_job_schedule(name, config)
hset(:schedules, name, JSON.generate(config))
hset('schedules', name, JSON.generate(config))
end

# Sets the state for a given job
Expand Down Expand Up @@ -75,7 +75,7 @@ def self.set_job_last_time(name, last_time)
#
# @param [String] name The name of the job
def self.remove_job_schedule(name)
hdel(:schedules, name)
hdel('schedules', name)
end

# Removes the next execution time for a given job
Expand All @@ -89,36 +89,38 @@ def self.remove_job_next_time(name)
#
# @return [Hash] hash with all the job schedules
def self.get_all_schedules
Sidekiq.redis { |r| r.hgetall(:schedules) }
Sidekiq.redis { |r| r.hgetall('schedules') }
end

# Returns boolean value that indicates if the schedules value exists
#
# @return [Boolean] true if the schedules key is set, false otherwise
def self.schedule_exist?
Sidekiq.redis { |r| r.exists(:schedules) }
Sidekiq.redis do |r|
!!(r.respond_to?(:exists?) ? r.exists?('schedules') : r.exists('schedules'))
end
end

# Returns all the schedule changes for a given time range.
#
# @param [Float] from The minimum value in the range
# @param [Float] to The maximum value in the range
#
# @return [Array] array with all the changed job names
# @return [Array] All the changed job names
def self.get_schedule_changes(from, to)
Sidekiq.redis { |r| r.zrangebyscore(:schedules_changed, from, "(#{to}") }
Sidekiq.redis { |r| r.zrangebyscore('schedules_changed', from, "(#{to}") }
end

# Register a schedule change for a given job
#
# @param [String] name The name of the job
def self.add_schedule_change(name)
Sidekiq.redis { |r| r.zadd(:schedules_changed, Time.now.to_f, name) }
Sidekiq.redis { |r| r.zadd('schedules_changed', Time.now.to_f, name) }
end

# Remove all the schedule changes records
def self.clean_schedules_changed
Sidekiq.redis { |r| r.del(:schedules_changed) unless r.type(:schedules_changed) == 'zset' }
Sidekiq.redis { |r| r.del('schedules_changed') unless r.type('schedules_changed') == 'zset' }
end

# Removes a queued job instance
Expand All @@ -130,9 +132,9 @@ def self.clean_schedules_changed
def self.register_job_instance(job_name, time)
job_key = pushed_job_key(job_name)
registered, _ = Sidekiq.redis do |r|
r.pipelined do
r.zadd(job_key, time.to_i, time.to_i)
r.expire(job_key, REGISTERED_JOBS_THRESHOLD_IN_SECONDS)
r.mutli do |multi|
multi.zadd(job_key, time.to_i, time.to_i)
multi.expire(job_key, REGISTERED_JOBS_THRESHOLD_IN_SECONDS)
end
end

Expand All @@ -142,6 +144,8 @@ def self.register_job_instance(job_name, time)
# Removes instances of the job older than 24 hours
#
# @param [String] job_name The name of the job
#
# @return [Integer] Number of members that were removed
def self.remove_elder_job_instances(job_name)
seconds_ago = Time.now.to_i - REGISTERED_JOBS_THRESHOLD_IN_SECONDS

Expand All @@ -154,28 +158,28 @@ def self.remove_elder_job_instances(job_name)
#
# @param [String] job_name The name of the job
#
# @return [String] the pushed job key
# @return [String] The pushed job key
def self.pushed_job_key(job_name)
"sidekiq-scheduler:pushed:#{job_name}"
end

# Returns the key of the Redis hash for job's execution times hash
#
# @return [String] with the key
# @return [String]
def self.next_times_key
'sidekiq-scheduler:next_times'
end

# Returns the key of the Redis hash for job's last execution times hash
#
# @return [String] with the key
# @return [String]
def self.last_times_key
'sidekiq-scheduler:last_times'
end

# Returns the Redis's key for saving schedule states.
#
# @return [String] with the key
# @return [String]
def self.schedules_state_key
'sidekiq-scheduler:states'
end
Expand All @@ -197,6 +201,8 @@ def self.hget(hash_key, field_key)
# @param [String] hash_key The key name of the hash
# @param [String] field_key The key name of the field
# @param [String] value The new value name for the field
#
# @return [Boolean]
def self.hset(hash_key, field_key, value)
Sidekiq.redis { |r| r.hset(hash_key, field_key, value) }
end
Expand All @@ -205,6 +211,8 @@ def self.hset(hash_key, field_key, value)
#
# @param [String] hash_key The key name of the hash
# @param [String] field_key The key name of the field
#
# @return [Integer] The number of fields that were removed from the hash
def self.hdel(hash_key, field_key)
Sidekiq.redis { |r| r.hdel(hash_key, field_key) }
end
Expand Down
39 changes: 31 additions & 8 deletions lib/sidekiq-scheduler/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
require 'thwait'
require 'sidekiq/util'
require 'json'
require 'sidekiq-scheduler/manager'
require 'sidekiq-scheduler/rufus_utils'
require 'sidekiq-scheduler/redis_manager'

Expand Down Expand Up @@ -243,7 +242,15 @@ def new_job(name, interval_type, config, schedule, options)
options = options.merge({ :job => true, :tags => [name] })

rufus_scheduler.send(interval_type, schedule, options) do |job, time|
idempotent_job_enqueue(name, time, SidekiqScheduler::Utils.sanitize_job_config(config)) if job_enabled?(name)
if job_enabled?(name)
conf = SidekiqScheduler::Utils.sanitize_job_config(config)

if job.is_a?(Rufus::Scheduler::CronJob)
idempotent_job_enqueue(name, calc_cron_run_time(job.cron_line, time.utc), conf)
else
idempotent_job_enqueue(name, time, conf)
end
end
end
end

Expand All @@ -257,8 +264,8 @@ def unschedule_job(name)

# Retrieves a schedule state
#
# @param name [String] with the schedule's name
# @return [Hash] with the schedule's state
# @param [String] name The schedule's name
# @return [Hash] The schedule's state
def schedule_state(name)
state = SidekiqScheduler::RedisManager.get_job_state(name)

Expand All @@ -267,19 +274,19 @@ def schedule_state(name)

# Saves a schedule state
#
# @param name [String] with the schedule's name
# @param name [Hash] with the schedule's state
# @param [String] name The schedule's name
# @param [Hash] state The schedule's state
def set_schedule_state(name, state)
SidekiqScheduler::RedisManager.set_job_state(name, state)
end

# Adds a Hash with schedule metadata as the last argument to call the worker.
# It currently returns the schedule time as a Float number representing the milisencods
# It currently returns the schedule time as a Float number representing the milliseconds
# since epoch.
#
# @example with hash argument
# arguments_with_metadata({value: 1}, scheduled_at: Time.now)
# #=> [{value: 1}, {scheduled_at: <miliseconds since epoch>}]
# #=> [{value: 1}, {scheduled_at: <milliseconds since epoch>}]
#
# @param args [Array|Hash]
# @param metadata [Hash]
Expand Down Expand Up @@ -354,5 +361,21 @@ def handle_errors
Sidekiq.logger.info "#{e.class.name}: #{e.message}"
end
end

def calc_cron_run_time(cron, time)
time = time.round # remove sub seconds to prevent rounding errors.
next_t = cron.next_time(time).utc
previous_t = cron.previous_time(time).utc
next_diff = next_t - time
previous_diff = time - previous_t

if next_diff == previous_diff
time
elsif next_diff > previous_diff
time - previous_diff
else
time + next_diff
end
end
end
end
4 changes: 1 addition & 3 deletions lib/sidekiq-scheduler/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
module SidekiqScheduler

VERSION = '3.0.1'

VERSION = '3.1.0'
end
4 changes: 2 additions & 2 deletions spec/sidekiq-scheduler/web_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
end

describe '/recurring-jobs/:name/toggle' do
subject { get "/recurring-jobs/#{URI.escape(enabled_job_name)}/toggle" }
subject { get "/recurring-jobs/#{CGI.escape(enabled_job_name)}/toggle" }

it 'toggles job enabled flag' do
expect { subject }
Expand All @@ -106,7 +106,7 @@
end

describe 'GET /recurring-jobs/:name/enqueue' do
subject { get "/recurring-jobs/#{URI.escape(job_name)}/enqueue" }
subject { get "/recurring-jobs/#{CGI.escape(job_name)}/enqueue" }

let(:job_name) { enabled_job_name }
let(:job) { jobs[job_name] }
Expand Down
2 changes: 1 addition & 1 deletion spec/support/store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def self.zrange(zset_key, from, to)
end

def self.exists(key)
Sidekiq.redis { |r| r.exists(key) }
Sidekiq.redis { |r| !!(r.respond_to?(:exists?) ? r.exists?(key) : r.exists(key)) }
end

def self.hexists(hash_key, field_key)
Expand Down
102 changes: 62 additions & 40 deletions web/views/recurring_jobs.erb
Original file line number Diff line number Diff line change
@@ -1,48 +1,70 @@
<h3><%= t('recurring_jobs') %></h3>

<div class="table_container">
<table class="table table-hover table-bordered table-striped table-white">
<thead>
<tr>
<th><%= t('name') %></th>
<th><%= t('description') %></th>
<th><%= t('interval') %></th>
<th><%= t('class') %></th>
<th><%= t('queue') %></th>
<th><%= t('arguments') %></th>
<th><%= t('last_time') %></th>
<th><%= t('next_time') %></th>
<th></th>
</tr>
</thead>
<style type="text/css">
.recurring-jobs .title {
margin-bottom: 5px;
}

<tbody>
<% @presented_jobs.each do |job| %>
<tr>
<td><%= job.name %></td>
<td><%= job['description'] %></td>
<td><%= job.interval %></td>
<td><%= job['class'] %></td>
<td>
<a href="<%= root_path %>queues/<%= job.queue %>"><%= job.queue %></a>
</td>
<td><%= job['args'] %></td>
<td><%= job.last_time %></td>
<td>
<span style="<%= 'text-decoration:line-through' unless job.enabled? %>">
<%= job.next_time || t('no_next_time') %>
</span>
</td>
<td class="text-center">
<a class="btn btn-warn btn-xs" href="<%= root_path %>recurring-jobs/<%= URI.escape(job.name) %>/enqueue">
.recurring-jobs .title .name {
font-weight: bold;
}

.recurring-jobs .info,
.recurring-jobs .description {
margin-bottom: 5px;
}

.recurring-jobs .actions {
margin-bottom: 5px;
}

.recurring-jobs .status,
.recurring-jobs .description {
font-size: 12px;
}
</style>

<div class="recurring-jobs">
<ul class="list-group">
<% @presented_jobs.sort_by(&:name).each do |job| %>
<li class="list-group-item">
<div class="title">
<div class="row">
<div class="col-xs-6">
<span class="name"><%= job.name %></span>
</div>
<div class="col-xs-6 text-right">
<a href="<%= root_path %>queues/<%= job.queue %>"><%= job.queue %></a>
</div>
</div>
</div>
<div class="description"><%= job['description'] %></div>
<div class="info text-muted">
<div class="row">
<div class="col-md-4 class"><%= job['class'] %></div>
<div class="col-md-4 interval text-left"><%= t('interval') %>: <%= job.interval %></div>
<div class="col-md-4 args"><%= t('arguments') %>: <%= job['args'] %></div>
</div>
</div>
<div class="status row text-muted">
<div class="col-md-4 actions">
<a class="btn btn-warn btn-xs" href="<%= root_path %>recurring-jobs/<%= CGI.escape(job.name) %>/enqueue">
<%= t('enqueue_now') %>
</a>
<a class="btn <%= job.enabled? ? "btn-primary" : "btn-warn"%> btn-xs" href="<%= root_path %>recurring-jobs/<%= URI.escape(job.name) %>/toggle">
<a class="btn <%= job.enabled? ? "btn-primary" : "btn-warn" %> btn-xs" href="<%= root_path %>recurring-jobs/<%= CGI.escape(job.name) %>/toggle">
<%= job.enabled? ? t('disable') : t('enable') %>
</a>
</td>
</tr>
<% end %>
</tbody>
</table>
</div>
<div class="col-md-4">
<span class="last_time"><%= t('last_time') %>: <%= job.last_time %></span>
</div>
<div class="col-md-4">
<span class="next_time text-right" style="<%= 'text-decoration:line-through' unless job.enabled? %>">
<%= t('next_time') %>: <%= job.next_time || t('no_next_time') %>
</span>
</div>
</div>
</li>
<% end %>
</ul>
</div>

0 comments on commit b7033ba

Please sign in to comment.