/
parallel_batch.rb
67 lines (53 loc) · 1.51 KB
/
parallel_batch.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# encoding: utf-8
class ParallelBatch < ActiveRecord::Base
#####################
### Class methods ###
#####################
def self.find_or_create!
first || create!
# When starting many batches at the same time we are pretty sure to get a MySQL
# error reporting a duplicated entry. That's why we are retrying one time only.
rescue ActiveRecord::StatementInvalid
first || create!
end
def self.start(concurrency = 1)
concurrency.times { Process.detach(fork { start_fork }) }
end
def self.start_fork
puts "#{self} has started with pid #{Process.pid}"
ActiveRecord::Base.connection.reconnect!
Process.daemon(false)
find_or_create!.run
end
def self.reset
find_or_create!.update_attributes!(offset: nil)
end
########################
### Instance methods ###
########################
def find_records
offset ? scope.where('id > ?', offset).order(:id).limit(batch_size) : scope.order(:id).limit(batch_size)
end
def next_batch
transaction do
reload(lock: true)
next unless (records = find_records).last
update_attributes!(offset: records.last.id)
records
end
end
def run
while records = next_batch
records.each { |record| perform(record) rescue nil }
end
end
def perfom(record)
raise NotImplementedError, 'You must override this method to perform your batch.'
end
def scope
raise NotImplementedError, 'You must override this method to scope your records.'
end
def batch_size
100
end
end