-
Notifications
You must be signed in to change notification settings - Fork 1
/
task-worker.rb
195 lines (166 loc) · 5.84 KB
/
task-worker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
module Pione
module Agent
# TaskWorker is an agent to process tasks
class TaskWorker < TupleSpaceClient
set_agent_type :task_worker, self
#
# instance methods
#
attr_reader :tuple_space
attr_reader :execution_thread
attr_accessor :once # the agent will be killed at task completion if true
# @param [Pione::TupleSpace::TupleSpaceServer] tuple_space
# tuple space
# @param [String] features
# features that the task worker has
# @param [Hash] option
# @option option [] :env
# a environment object
# @option option [String] :session_id
# session ID
# @option option [URI] :request_from
# URI that a client requested from
def initialize(tuple_space, features, env=nil)
super(tuple_space)
@tuple_space = tuple_space
@features = features
@env = env || get_environment
end
#
# activity definitions
#
define_transition :take_task
define_transition :init_task
define_transition :execute_task
define_transition :finalize_task
define_transition :connection_error
chain :init => :take_task
chain :take_task => :init_task
chain :init_task => :execute_task
chain :execute_task => :finalize_task
chain :finalize_task => lambda {|agent, result| agent.once ? :terminate : :take_task}
chain :connection_error => :terminate
define_exception_handler Restart => :take_task
define_exception_handler DRb::DRbConnError => :connection_error
#
# transitions
#
def transit_to_init
@request_from = @tuple_space.attribute("request_from")
@session_id = @tuple_space.attribute("session_id")
@client_ui = @tuple_space.attribute("client_ui")
super
end
# Take a task and turn it to foreground.
def transit_to_take_task
return take(TupleSpace::TaskTuple.new(features: @features))
end
# Initialize the task.
def transit_to_init_task(task)
# make flag tuples
working = TupleSpace::WorkingTuple.new(task.domain_id, task.digest)
foreground = TupleSpace::ForegroundTuple.new(task.domain_id, task.digest)
if read!(working)
# the task is working already, so we will dicard the task
raise Restart.new
else
# turn foreground flag on
write(working)
write(foreground)
# go next transition
return task
end
rescue Rinda::RedundantTupleError
raise Restart.new
end
# Execute the task.
def transit_to_execute_task(task)
# setup rule engine
engine = make_engine(task)
# start the engine
@execution_thread = Thread.new do
engine.handle || terminate
end
# spawn child task worker if flow
if engine.rule_definition.rule_type == "flow"
spawn_child_task_worker(task)
end
# wait until the engine ends
@execution_thread.join
# go next transition
return task
end
# Finalize the task. This method will turn working flag off and background.
def transit_to_finalize_task(task)
take!(TupleSpace::WorkingTuple.new(task.domain_id, task.digest))
take!(TupleSpace::ForegroundTuple.new(task.domain_id, task.digest))
end
# Report the connection error.
def transit_to_connection_error(e)
Log::SystemLog.warn("task worker agent was disconnected from tuple space unexpectedly, goes to termination.")
end
#
# helper methods
#
# Get a environment object from tuple space.
def get_environment
if env = read!(TupleSpace::EnvTuple.new)
env.obj
else
raise TupleSpaceError.new("the tuple space is invalid because \"env\" tuple not found.")
end
end
# Make an engine from the task.
def make_engine(task)
param = {
:tuple_space => @tuple_space,
:env => @env,
:package_id => task.package_id,
:rule_name => task.rule_name,
:inputs => task.inputs,
:param_set => task.param_set,
:domain_id => task.domain_id,
:caller_id => task.caller_id,
:request_from => @request_from,
:session_id => @session_id,
:client_ui => @client_ui
}
RuleEngine.make(param)
end
# Spawn child task worker. This method repeats to create a child agent
# while rule execution thread is alive.
def spawn_child_task_worker(task)
child_agent = nil
foreground = TupleSpace::ForegroundTuple.new(task.domain_id, task.digest)
# child worker loop
while @execution_thread.alive? do
if @execution_thread.status == "sleep"
if child_agent.nil? or child_agent.terminated?
# when there isn't active child agent
child_agent = self.class.new(tuple_space_server, @features, @env)
child_agent.once = true
# make log record
record = Log::CreateChildTaskWorkerProcessRecord.new.tap do |x|
x.parent = uuid
x.child = child_agent.uuid
end
# spawn child agent with logging
with_process_log(record) do
# turn background
take!(foreground)
# start child agent
child_agent.start
end
# wait until the child agent completes the task
child_agent.wait_until_terminated(nil)
end
else
sleep 0.1 # FIXME : rewrite this sleep by more sophisticated way
end
end
# turn foreground
write(foreground) unless read!(foreground)
end
end
end
end