This repository has been archived by the owner on Apr 1, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 122
/
storage.rb
111 lines (100 loc) · 3.89 KB
/
storage.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
module Sidekiq::Status::Storage
RESERVED_FIELDS=%w(status stop update_time).freeze
BATCH_LIMIT = 500
protected
# Stores multiple values into a job's status hash,
# sets last update time
# @param [String] id job id
# @param [Hash] status_updates updated values
# @param [Integer] expiration optional expire time in seconds
# @param [ConnectionPool] redis_pool optional redis connection pool
# @return [String] Redis operation status code
def store_for_id(id, status_updates, expiration = nil, redis_pool=nil)
redis_connection(redis_pool) do |conn|
conn.multi do
conn.hmset id, 'update_time', Time.now.to_i, *(status_updates.to_a.flatten(1))
conn.expire id, (expiration || Sidekiq::Status::DEFAULT_EXPIRY)
conn.publish "status_updates", id
end[0]
end
end
# Stores job status and sets expiration time to it
# only in case of :failed or :stopped job
# @param [String] id job id
# @param [Symbol] job status
# @param [Integer] expiration optional expire time in seconds
# @param [ConnectionPool] redis_pool optional redis connection pool
# @return [String] Redis operation status code
def store_status(id, status, expiration = nil, redis_pool=nil)
store_for_id id, {status: status}, expiration, redis_pool
end
# Unschedules the job and deletes the Status
# @param [String] id job id
# @param [Num] job_unix_time, unix timestamp for the scheduled job
def delete_and_unschedule(job_id, job_unix_time = nil)
Sidekiq.redis do |conn|
scan_options = {offset: 0, conn: conn, start: (job_unix_time || '-inf'), end: (job_unix_time || '+inf')}
while not (jobs = schedule_batch(scan_options)).empty?
match = scan_scheduled_jobs_for_jid jobs, job_id
unless match.nil?
conn.zrem "schedule", match
conn.del job_id
return true # Done
end
scan_options[:offset] += BATCH_LIMIT
end
end
false
end
# Gets a single valued from job status hash
# @param [String] id job id
# @param [String] Symbol field fetched field name
# @return [String] Redis operation status code
def read_field_for_id(id, field)
Sidekiq.redis do |conn|
conn.hmget(id, field)[0]
end
end
# Gets the whole status hash from the job status
# @param [String] id job id
# @return [Hash] Hash stored in redis
def read_hash_for_id(id)
Sidekiq.redis do |conn|
conn.hgetall id
end
end
private
# Gets the batch of scheduled jobs based on input options
# Uses Redis zrangebyscore for log(n) search, if unix-time is provided
# @param [Hash] options, options hash containing (REQUIRED) keys:
# - conn: Redis connection
# - start: start score (i.e. -inf or a unix timestamp)
# - end: end score (i.e. +inf or a unix timestamp)
# - offset: current progress through (all) jobs (e.g.: 100 if you want jobs from 100 to BATCH_LIMIT)
def schedule_batch(options)
options[:conn].zrangebyscore "schedule", options[:start], options[:end], {limit: [options[:offset], BATCH_LIMIT]}
end
# Searches the jobs Array for the job_id
# @param [Array] scheduled_jobs, results of Redis schedule key
# @param [String] id job id
def scan_scheduled_jobs_for_jid(scheduled_jobs, job_id)
# A Little skecthy, I know, but the structure of these internal JSON
# is predefined in such a way where this will not catch unintentional elements,
# and this is notably faster than performing JSON.parse() for every listing:
scheduled_jobs.each { |job_listing| (return job_listing) if job_listing.include?("\"jid\":\"#{job_id}") }
nil
end
# Yields redis connection. Uses redis pool if available.
# @param [ConnectionPool] redis_pool optional redis connection pool
def redis_connection(redis_pool=nil)
if redis_pool
redis_pool.with do |conn|
yield conn
end
else
Sidekiq.redis do |conn|
yield conn
end
end
end
end