forked from tra/spawnling
/
spawn.rb
119 lines (107 loc) · 3.71 KB
/
spawn.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
module Spawn
# default to forking (unless windows or jruby)
@@method = (RUBY_PLATFORM =~ /(win32|java)/) ? :thread : :fork
# socket to close in child process
@@resources = []
# add calls to this in your environment.rb to set your configuration, for example,
# to use forking everywhere except your 'development' environment:
# Spawn::method :fork
# Spawn::method :thread, 'development'
def self.method(method, env = nil)
if !env || env == RAILS_ENV
@@method = method
end
RAILS_DEFAULT_LOGGER.debug "spawn> method = #{@@method}" if defined? RAILS_DEFAULT_LOGGER
end
# set the resource to disconnect from in the child process (when forking)
def self.resource_to_close(resource)
@@resources << resource
end
# close all the resources added by calls to resource_to_close
def self.close_resources
@@resources.each do |resource|
resource.close if resource && resource.respond_to?(:close) && !resource.closed?
end
@@resources = []
end
# Spawns a long-running section of code and returns the ID of the spawned process.
# By default the process will be a forked process. To use threading, pass
# :method => :thread or override the default behavior in the environment by setting
# 'Spawn::method :thread'.
def spawn(options = {})
options.symbolize_keys!
# setting options[:method] will override configured value in @@method
if options[:method] == :yield || @@method == :yield
yield
elsif options[:method] == :thread || (options[:method] == nil && @@method == :thread)
if ActiveRecord::Base.allow_concurrency
thread_it(options) { yield }
else
logger.error("spawn(:method=>:thread) only allowed when allow_concurrency=true")
raise "spawn requires config.active_record.allow_concurrency=true when used with :method=>:thread"
end
else
fork_it(options) { yield }
end
end
def wait(sids = [])
# wait for all threads and/or forks
sids.to_a.each do |sid|
if sid.type == :thread
sid.handle.join()
else
begin
Process.wait(sid.handle)
rescue
# if the process is already done, ignore the error
end
end
end
# clean up connections from expired threads
ActiveRecord::Base.verify_active_connections!()
end
class SpawnId
attr_accessor :type
attr_accessor :handle
def initialize(t, h)
self.type = t
self.handle = h
end
end
protected
def fork_it(options)
# The problem with rails is that it only has one connection (per class),
# so when we fork a new process, we need to reconnect.
logger.debug "spawn> parent PID = #{Process.pid}"
child = fork do
start = Time.now
# disconnect from the listening socket, et al
Spawn.close_resources
# get a new connection so the parent can keep the original one
ActiveRecord::Base.spawn_reconnect
begin
# run the block of code that takes so long
logger.debug "spawn> child PID = #{Process.pid}"
yield
ensure
ActiveRecord::Base.connection.disconnect!
ActiveRecord::Base.remove_connection
end
logger.info "spawn> child[#{Process.pid}] took #{Time.now - start} sec"
# this form of exit doesn't call at_exit handlers
exit!(0)
end
# detach from child process (parent may still wait for detached process if they wish)
Process.detach(child)
return SpawnId.new(:fork, child)
end
def thread_it(options)
# clean up stale connections from previous threads
ActiveRecord::Base.verify_active_connections!()
thr = Thread.new do
# run the long-running code block
yield
end
return SpawnId.new(:thread, thr)
end
end