forked from gnufied/backgroundrb
/
meta_worker.rb
355 lines (325 loc) · 10.9 KB
/
meta_worker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
module BackgrounDRb
# this class is a dummy class that implements things required for passing data to
# actual logger worker
class PacketLogger
def initialize(worker)
@worker = worker
end
def info(p_data)
@worker.send_request(:worker => :log_worker, :data => p_data)
end
def debug(p_data)
@worker.send_request(:worker => :log_worker, :data => p_data)
end
def error(p_data)
@worker.send_request(:worker => :log_worker, :data => p_data)
end
end
class WorkData
attr_accessor :data,:block
def initialize(args,&block)
@data = args
@block = block
end
end
class ThreadPool
attr_accessor :size,:threads,:work_queue,:logger
def initialize(size,logger)
@logger = logger
@size = size
@threads = []
@work_queue = Queue.new
@running_tasks = Queue.new
@size.times { add_thread }
end
# can be used to make a call in threaded manner
# passed block runs in a thread from thread pool
# for example in a worker method you can do:
# def fetch_url(url)
# puts "fetching url #{url}"
# thread_pool.defer(url) do |url|
# begin
# data = Net::HTTP.get(url,'/')
# File.open("#{RAILS_ROOT}/log/pages.txt","w") do |fl|
# fl.puts(data)
# end
# rescue
# logger.info "Error downloading page"
# end
# end
# end
# you can invoke above method from rails as:
# MiddleMan.ask_work(:worker => :rss_worker, :worker_method => :fetch_url, :data => "www.example.com")
# assuming method is defined in rss_worker
def defer(*args,&block)
@work_queue << WorkData.new(args,&block)
end
def add_thread
@threads << Thread.new do
while true
task = @work_queue.pop
@running_tasks << task
block_arity = task.block.arity
begin
ActiveRecord::Base.verify_active_connections!
block_arity == 0 ? task.block.call : task.block.call(*(task.data))
rescue
logger.info($!.to_s)
logger.info($!.backtrace.join("\n"))
end
@running_tasks.pop
end
end
end
# method ensures exclusive run of deferred tasks for 2 seconds, so as they do get a chance to run.
def exclusive_run
if @running_tasks.empty? && @work_queue.empty?
return
else
# puts "going to sleep for a while"
sleep(0.05)
return
end
end
end
# == MetaWorker class
# BackgrounDRb workers are asynchrounous reactors which work using events
# You are free to use threads in your workers, but be reasonable with them.
# Following methods are available to all workers from parent classes.
# * BackgrounDRb::MetaWorker#connect
#
# Above method connects to an external tcp server and integrates the connection
# within reactor loop of worker. For example:
#
# class TimeClient
# def receive_data(p_data)
# worker.get_external_data(p_data)
# end
#
# def post_init
# p "***************** : connection completed"
# end
# end
#
# class FooWorker < BackgrounDRb::MetaWorker
# set_worker_name :foo_worker
# def create(args = nil)
# external_connection = nil
# connect("localhost",11009,TimeClient) { |conn| external_connection = conn }
# end
#
# def get_external_data(p_data)
# puts "And external data is : #{p_data}"
# end
# end
# * BackgrounDRb::MetaWorker#start_server
#
# Above method allows you to start a tcp server from your worker, all the
# accepted connections are integrated with event loop of worker
# class TimeServer
#
# def receive_data(p_data)
# end
#
# def post_init
# add_periodic_timer(2) { say_hello_world }
# end
#
# def connection_completed
# end
#
# def say_hello_world
# p "***************** : invoking hello world #{Time.now}"
# send_data("Hello World\n")
# end
# end
#
# class ServerWorker < BackgrounDRb::MetaWorker
# set_worker_name :server_worker
# def create(args = nil)
# # start the server when worker starts
# start_server("0.0.0.0",11009,TimeServer) do |client_connection|
# client_connection.say_hello_world
# end
# end
# end
class MetaWorker < Packet::Worker
attr_accessor :config_file, :my_schedule, :run_time, :trigger_type, :trigger
attr_accessor :logger, :thread_pool
iattr_accessor :pool_size
@pool_size = nil
def self.pool_size(size = nil)
if size
@pool_size = size
else
@pool_size
end
end
# does initialization of worker stuff and invokes create method in
# user defined worker class
def worker_init
@config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/backgroundrb.yml")).result)
load_rails_env
@logger = PacketLogger.new(self)
@thread_pool = ThreadPool.new(pool_size || 20,@logger)
if(worker_options && worker_options[:schedule] && no_auto_load)
load_schedule_from_args
elsif(@config_file[:schedules] && @config_file[:schedules][worker_name.to_sym])
@my_schedule = @config_file[:schedules][worker_name.to_sym]
new_load_schedule if @my_schedule
end
if respond_to?(:create)
create_arity = method(:create).arity
(create_arity == 0) ? create : create(worker_options[:data])
end
@logger.info "#{worker_name} started"
@logger.info "Schedules for worker loaded"
end
# loads workers schedule from options supplied from rails
# a user may pass trigger arguments to dynamically define the schedule
def load_schedule_from_args
@my_schedule = worker_options[:schedule]
new_load_schedule if @my_schedule
end
# receives requests/responses from master process or other workers
def receive_data p_data
if p_data[:data][:worker_method] == :exit
exit
return
end
case p_data[:type]
when :request: process_request(p_data)
when :response: process_response(p_data)
end
end
# method is responsible for invoking appropriate method in user
def process_request(p_data)
user_input = p_data[:data]
logger.info "#{user_input[:worker_method]} #{user_input[:data]}"
if (user_input[:worker_method]).nil? or !respond_to?(user_input[:worker_method])
logger.info "Undefined method #{user_input[:worker_method]} called on worker #{worker_name}"
return
end
called_method_arity = self.method(user_input[:worker_method]).arity
result = nil
if called_method_arity != 0
result = self.send(user_input[:worker_method],user_input[:data])
else
result = self.send(user_input[:worker_method])
end
result = "dummy_result" unless result
send_response(p_data,result) if can_dump?(result)
end
def can_dump?(p_object)
begin
Marshal.dump(p_object)
return true
rescue TypeError
return false
rescue
return false
end
end
# new experimental scheduler
def new_load_schedule
@worker_method_triggers = { }
@my_schedule.each do |key,value|
case value[:trigger_args]
when String
cron_args = value[:trigger_args] || "0 0 0 0 0"
trigger = BackgrounDRb::CronTrigger.new(cron_args)
when Hash
trigger = BackgrounDRb::Trigger.new(value[:trigger_args])
end
@worker_method_triggers[key] = { :trigger => trigger,:data => value[:data],:runtime => trigger.fire_time_after(Time.now).to_i }
end
end
# probably this method should be made thread safe, so as a method needs to have a
# lock or something before it can use the method
def register_status p_data
status = {:type => :status,:data => p_data}
begin
send_data(status)
rescue TypeError => e
status = {:type => :status,:data => "invalid_status_dump_check_log"}
send_data(status)
logger.info(e.to_s)
logger.info(e.backtrace.join("\n"))
rescue
status = {:type => :status,:data => "invalid_status_dump_check_log"}
send_data(status)
logger.info($!.to_s)
logger.info($!.backtrace.join("\n"))
end
end
def register_result p_data
result = { :type => :result, :data => p_data }
begin
send_data(result)
rescue TypeError => e
logger.info(e.to_s)
logger.info(e.backtrace.join("\n"))
rescue
logger.info($!.to_s)
logger.info($!.backtrace.join("\n"))
end
end
def send_response input,output
input[:data] = output
input[:type] = :response
begin
send_data(input)
rescue TypeError => e
logger.info(e.to_s)
logger.info(e.backtrace.join("\n"))
input[:data] = "invalid_result_dump_check_log"
send_data(input)
rescue
logger.info($!.to_s)
logger.info($!.backtrace.join("\n"))
input[:data] = "invalid_result_dump_check_log"
send_data(input)
end
end
def unbind; end
def connection_completed; end
def check_for_timer_events
begin
ActiveRecord::Base.verify_active_connections! if defined?(ActiveRecord)
super
rescue
logger.info($!.to_s)
logger.info($!.backtrace.join("\n"))
end
return if @worker_method_triggers.nil? or @worker_method_triggers.empty?
@worker_method_triggers.delete_if { |key,value| value[:trigger].respond_to?(:end_time) && value[:trigger].end_time <= Time.now }
@worker_method_triggers.each do |key,value|
time_now = Time.now.to_i
if value[:runtime] < time_now
begin
(t_data = value[:data]) ? send(key,t_data) : send(key)
rescue
# logger.info($!.to_s)
# logger.info($!.backtrace.join("\n"))
p $!
p $!.backtrace
end
value[:runtime] = value[:trigger].fire_time_after(Time.now).to_i
end
end
end
# method would allow user threads to run exclusively for a while
def run_user_threads
@thread_pool.exclusive_run
end
private
def load_rails_env
db_config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/database.yml")).result)
run_env = @config_file[:backgroundrb][:environment] || 'development'
ENV["RAILS_ENV"] = run_env
RAILS_ENV.replace(run_env) if defined?(RAILS_ENV)
ActiveRecord::Base.establish_connection(db_config_file[run_env])
ActiveRecord::Base.allow_concurrency = true
end
end # end of class MetaWorker
end # end of module BackgrounDRb