/
purging.rb
157 lines (126 loc) · 4.55 KB
/
purging.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
module Metric::Purging
def self.purge_date(type)
value = ::Settings.performance.history[type]
case value
when Numeric
value.days.ago.utc
when String
value.to_i_with_method.seconds.ago.utc
end
end
def self.purge_all_timer
purge_realtime_timer
purge_rollup_timer
end
def self.purge_rollup_timer
purge_hourly_timer
purge_daily_timer
purge_rollup_task_timer
end
def self.purge_daily_timer(ts = nil)
ts ||= purge_date(:keep_daily_performances)
purge_timer(ts, "daily")
end
def self.purge_hourly_timer(ts = nil)
ts ||= purge_date(:keep_hourly_performances)
purge_timer(ts, "hourly")
end
def self.purge_realtime_timer(ts = nil)
ts ||= purge_date(:keep_realtime_performances)
purge_timer(ts, "realtime")
end
def self.purge_rollup_task_timer
MiqTask.delete_older(4.hours.ago.beginning_of_hour.utc, "name LIKE 'Performance rollup for %'")
end
def self.purge_timer(ts, interval)
MiqQueue.submit_job(
:class_name => name,
:method_name => "purge_#{interval}",
:args => [ts],
:msg_timeout => msg_timeout
)
end
def self.purge_window_size
::Settings.performance.history.purge_window_size
end
def self.msg_timeout
::Settings.performance.history.queue_timeout.to_i_with_method
end
def self.purge_scope(older_than, interval)
scope = interval == 'realtime' ? Metric : MetricRollup.where(:capture_interval_name => interval)
scope.where(scope.arel_table[:timestamp].lteq(older_than))
end
def self.purge_count(older_than, interval)
purge_scope(older_than, interval).count
end
def self.purge_daily(older_than, window = nil, total_limit = nil, &block)
purge_by_date(older_than, "daily", window, total_limit, &block)
end
def self.purge_hourly(older_than, window = nil, total_limit = nil, &block)
purge_by_date(older_than, "hourly", window, total_limit, &block)
end
def self.purge_realtime(older_than, _window = nil, _total_limit = nil)
truncate_child_tables(older_than)
end
# truncate metrics child tables
# Determines hours not being preserved and truncates them
# Used for realtime metrics.
def self.truncate_child_tables(older_than)
target_hours = determine_target_hours(older_than, Time.now.utc)
return if target_hours.blank?
target_hours.each do |hour|
Metric.connection.truncate(Metric.reindex_table_name(hour), "Metric Truncate table #{hour}")
end
end
def self.determine_target_hours(older_than, end_date)
return [] if (end_date - older_than) > 24.hours
start_hour = older_than.utc.hour
end_hour = end_date.utc.hour
end_hour += 24 if start_hour > end_hour
good_hours = (start_hour..end_hour).map { |h| h % 24 }
(0..23).to_a - good_hours.to_a
end
private_class_method :determine_target_hours
def self.purge(older_than, interval, window = nil, total_limit = nil, &block)
purge_by_date(older_than, interval, window, total_limit, &block)
end
def self.purge_by_date(older_than, interval, window = nil, total_limit = nil, &block)
scope = purge_scope(older_than, interval)
window ||= purge_window_size
_log.info("Purging #{total_limit || "all"} #{interval} metrics older than [#{older_than}]...")
total, timings = purge_in_batches(scope, window, 0, total_limit, &block)
_log.info("Purging #{total_limit || "all"} #{interval} metrics older than [#{older_than}]...Complete - " +
"Deleted #{total} records - Timings: #{timings.inspect}")
total
end
def self.purge_in_batches(scope, window, total = 0, total_limit = nil)
query = scope.select(:id).limit(window)
_, timings = Benchmark.realtime_block(:total_time) do
loop do
left_to_delete = total_limit && (total_limit - total)
if total_limit && left_to_delete < window
current_window = left_to_delete
query = query.limit(current_window)
end
if scope.klass == MetricRollup
batch_ids, _ = Benchmark.realtime_block(:query_batch) do
query.pluck(:id)
end
break if batch_ids.empty?
current_window = batch_ids.size
else
batch_ids = query
end
_log.info("Purging #{current_window} metrics.")
count, = Benchmark.realtime_block(:purge_metrics) do
scope.unscoped.where(:id => batch_ids).delete_all
end
break if count == 0
total += count
yield(count, total) if block_given?
break if count < window || (total_limit && (total_limit <= total))
end
end
[total, timings]
end
end