/
runner.rb
305 lines (253 loc) · 7.89 KB
/
runner.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
require 'json'
command = ARGV[0]
out_file = ARGV[2]
module DigdagEnv
in_file = ARGV[1]
in_data = JSON.parse(File.read(in_file))
params = in_data['params']
# TODO include indifferent access like Embulk::DataSource
PARAMS = params
SUBTASK_CONFIG = {}
EXPORT_PARAMS = {}
STORE_PARAMS = {}
STATE_PARAMS = {}
end
# should this be a digdag.gem so that users can unit-test a command without running digdag?
module Digdag
class Env
def initialize
@params = DigdagEnv::PARAMS
@subtask_config = DigdagEnv::SUBTASK_CONFIG
@export_params = DigdagEnv::EXPORT_PARAMS
@store_params = DigdagEnv::STORE_PARAMS
@state_params = DigdagEnv::STATE_PARAMS
@subtask_index = 0
end
attr_reader :params
attr_reader :subtask_config
attr_reader :export_params
attr_reader :store_params
attr_reader :state_params
def set_state(**params)
@state_params.merge!(params)
end
def export(**params)
@export_params.merge!(params)
end
def store(**params)
@store_params.merge!(params)
end
# add_subtask(params)
# add_subtask(singleton_method_name, params={})
# add_subtask(klass, instance_method_name, params={})
def add_subtask(*args)
if args.length == 1 && args[0].is_a?(Hash)
# add_subtask(params)
config = args[0]
elsif args.length == 1 || (args.length == 2 && args[1].is_a?(Hash))
# add_subtask(singleton_method_name, params={})
method_name = args[0]
params = Hash(args[1])
begin
method_name = method_name.to_sym
rescue NameError, ArgumentError
raise ArgumentError, "Second argument must be a Symbol but got #{method_name.inspect}"
end
if method_name.to_s.include?(".")
raise ArgumentError, "Method name can't include '.'"
end
config = params.dup
config["rb>"] = method_name.to_s
elsif args.length == 2 || (args.length == 3 && args[2].is_a?(Hash))
# add_subtask(klass, instance_method_name, params={})
klass = args[0]
method_name = args[1]
params = Hash(args[2])
begin
method_name = method_name.to_sym
rescue NameError, ArgumentError
raise ArgumentError, "Second argument must be a Symbol but got #{method_name.inspect}"
end
if method_name.to_s.include?(".")
raise ArgumentError, "Method name can't include '.'"
end
if klass.is_a?(Class)
class_name = klass.name
else
begin
class_name = klass.to_sym.to_s
rescue NameError, ArgumentError
raise ArgumentError, "First argument must be a Class or Symbol but got #{klass.inspect}"
end
end
# validation
begin
klass = Kernel.const_get(class_name) # const_get with String (not Symbol) searches nested constants
rescue NameError
raise ArgumentError, "Could not find class named #{class_name}"
end
unless klass.respond_to?(method_name) || klass.public_instance_methods.include?(method_name)
raise ArgumentError, "Class #{klass} does not have method #{method_name.inspect}"
end
config = params.dup
config["rb>"] = "::#{klass}.#{method_name}"
else
raise ArgumentError, "wrong number of arguments (#{args.length} for 1..3 with the last argument is a Hash)"
end
begin
JSON.dump(config)
rescue => e
raise ArgumentError, "Parameters must be serializable using JSON: #{e}"
end
@subtask_config["+subtask#{@subtask_index}"] = config
@subtask_index += 1
nil
end
end
DIGDAG_ENV = Env.new
private_constant :DIGDAG_ENV
def self.env
DIGDAG_ENV
end
end
# add the archive path to LOAD_PATH
$LOAD_PATH << File.expand_path(Dir.pwd)
def digdag_inspect_command(command)
fragments = command.split(".")
method_name = fragments.pop.to_sym
if fragments.empty?
# method
return nil, method_name, false
else
# Name::Space::Class.method
class_name = fragments.join(".")
klass = Kernel.const_get(class_name)
is_instance_method = klass.public_instance_methods.include?(method_name)
return klass, method_name, is_instance_method
end
end
def digdag_inspect_arguments(receiver, method_name, params)
if receiver
parameters = receiver.method(method_name).parameters
if method_name == :new && parameters == [[:rest]]
# This is Object.new that forwards all arguments to #initialize
begin
parameters = receiver.instance_method(:initialize).parameters
rescue NameError => e
end
end
else
parameters = method(method_name).parameters
end
args = []
keywords = nil
parameters.each do |kind,name|
key = name.to_s
case kind
when :req
# required argument like a
unless params.has_key?(key)
if receiver.is_a?(Class)
raise ArgumentError, "Method '#{receiver}.#{method_name}' requires parameter '#{key}' but not set"
else
raise ArgumentError, "Method '#{receiver.class}##{method_name}' requires parameter '#{key}' but not set"
end
end
args << params[key]
when :opt
# optional argument like a=nil
if params.has_key?(key)
args << params[key]
else
# use the default value.
end
when :rest
# variable-length arguments like *a
# there're really we can do here to keep consistency with :opt.
# should this be an error?
when :keyreq
# required keyword argument like a:
unless params.has_key?(key)
if receiver.is_a?(Class)
raise ArgumentError, "Method '#{receiver}.#{method_name}' requires parameter '#{key}' but not set"
else
raise ArgumentError, "Method '#{receiver.class}##{method_name}' requires parameter '#{key}' but not set"
end
end
if keywords.nil?
keywords = {}
args << keywords
end
keywords[name] = params[key]
when :key
# optional keyword argument like a: nil
if params.has_key?(key)
if keywords.nil?
keywords = {}
args << keywords
end
keywords[name] = params[key]
else
# use the default value.
end
when :keyrest
# rest-of-keywords argument like **a
# symbolize keys otherwise method call causes error:
# "TypeError: wrong argument type String (expected Symbol)"
if keywords.nil?
keywords = {}
args << keywords
end
keywords.merge!(digdag_symbolize_keys(params))
end
end
return args
end
def digdag_symbolize_keys(hash)
built = {}
hash.each_pair do |k,v|
if v.is_a?(Hash)
v = digdag_symbolize_keys(v)
end
built[k.to_s.to_sym] = v
end
return built
end
klass, method_name, is_instance_method = digdag_inspect_command(command)
error = nil
if klass.nil?
method_args = digdag_inspect_arguments(nil, method_name, DigdagEnv::PARAMS)
begin
result = send(method_name, *method_args)
rescue => error
end
elsif is_instance_method
new_args = digdag_inspect_arguments(klass, :new, DigdagEnv::PARAMS)
instance = klass.new(*new_args)
method_args = digdag_inspect_arguments(instance, method_name, DigdagEnv::PARAMS)
begin
result = instance.send(method_name, *method_args)
rescue => error
end
else
method_args = digdag_inspect_arguments(klass, method_name, DigdagEnv::PARAMS)
begin
result = klass.send(method_name, *method_args)
rescue => error
end
end
out = {
'subtask_config' => DigdagEnv::SUBTASK_CONFIG,
'export_params' => DigdagEnv::EXPORT_PARAMS,
'store_params' => DigdagEnv::STORE_PARAMS,
#'state_params' => DigdagEnv::STATE_PARAMS, # only for retrying
}
if error
out['error'] = {
'class' => error.class.to_s,
'message' => error.message,
'backtrace' => error.backtrace,
}
end
File.open(out_file, "w") {|f| f.write out.to_json }
raise error if error