-
Notifications
You must be signed in to change notification settings - Fork 27
/
connector_playbook_execution_reporter_task.rb
200 lines (162 loc) · 5.74 KB
/
connector_playbook_execution_reporter_task.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
module InsightsCloud
module Async
class ConnectorPlaybookExecutionReporterTask < ::Actions::EntryAction
include Dynflow::Action::Polling
include ForemanRhCloud::CertAuth
def self.subscribe
Actions::RemoteExecution::RunHostsJob
end
def self.connector_feature_id
@connector_feature_id ||= RemoteExecutionFeature.feature!(:rh_cloud_connector_run_playbook).id
end
def plan(job_invocation)
return unless connector_playbook_job?(job_invocation)
@job_invocation = job_invocation
invocation_inputs = invocation_inputs(job_invocation)
report_url = invocation_inputs['report_url']
report_interval = [invocation_inputs['report_interval'].to_i, 5].max
correlation_id = invocation_inputs['correlation_id']
plan_self(
current_org_id: job_invocation.targeted_hosts.first.organization_id,
report_url: report_url,
report_interval: report_interval,
job_invocation_id: job_invocation.id,
correlation_id: correlation_id
)
end
def run(event = nil)
# Handle skip events
return if event == Dynflow::Action::Skip
super
end
def rescue_strategy_for_self
Dynflow::Action::Rescue::Skip
end
def done?(current_status = invocation_status)
job_invocation.finished? || current_status.map { |_host_id, task_status| task_status['report_done'] }.all?
end
# noop, we don't want to do anything when the polling task starts
def invoke_external_task
poll_external_task
end
def poll_external_task
current_status = invocation_status
report_job_progress(current_status)
# record the current state and increment the sequence for the next invocation
{
invocation_status: current_status,
}
end
def poll_intervals
[report_interval]
end
private
def connector_playbook_job?(job_invocation)
job_invocation&.remote_execution_feature_id == connector_feature_id
end
def connector_feature_id
self.class.connector_feature_id
end
def invocation_inputs(job_invocation)
Hash[
job_invocation.pattern_template_invocations.first.input_values.map do |input_value|
[input_value.template_input.name, input_value.value]
end
]
end
def job_invocation
@job_invocation ||= JobInvocation.find(input['job_invocation_id'])
end
def report_interval
@report_interval ||= input['report_interval']
end
def correlation_id
@correlation_id ||= input['correlation_id']
end
def host_status(host)
external_task&.dig('invocation_status', host)
end
def sequence(host)
host_status(host)&.fetch('sequence', 0).to_i
end
def host_done?(host)
ActiveModel::Type::Boolean.new.cast(host_status(host)&.fetch('report_done', nil))
end
def report_url
input['report_url']
end
def invocation_status
Hash[job_invocation.targeting.hosts.map do |host|
next unless host.insights&.uuid
[
host.insights.uuid,
task_status(job_invocation.sub_task_for_host(host), host.insights.uuid),
]
end.compact]
end
def task_status(host_task, host_name)
unless host_task
return { 'state' => 'unknown' }
end
{
'state' => host_task.state,
'output' => host_task.main_action.live_output.map { |line| line['output'] }.join("\n"),
'exit_status' => host_task.main_action.exit_status,
'sequence' => sequence(host_name),
'report_done' => host_done?(host_name),
}
end
def report_job_progress(invocation_status)
generator = InsightsCloud::Generators::PlaybookProgressGenerator.new(correlation_id)
all_hosts_success = true
invocation_status.each do |host_name, status|
# skip host if the host already reported that it's finished
next if status['report_done']
unless status['state'] == 'unknown'
sequence = status['sequence']
generator.host_progress_message(host_name, status['output'], sequence)
status['sequence'] = sequence + 1 # increase the sequence for each host
end
if status['state'] == 'stopped'
generator.host_finished_message(host_name, status['exit_status'])
status['report_done'] = true
all_hosts_success &&= status['exit_status'] == 0
end
end
generator.job_finished_message(all_hosts_success) if done?(invocation_status)
send_report(generator.generate)
end
def send_report(report)
execute_cloud_request(
organization: current_org,
method: :post,
url: report_url,
content_type: 'application/vnd.redhat.playbook-sat.v3+jsonl',
payload: {
file: wrap_report(report),
multipart: true,
}
)
end
# RestClient has to accept an object that responds to :read, :path and :content_type methods
# to properly generate a multipart message.
# see: https://github.com/rest-client/rest-client/blob/2c72a2e77e2e87d25ff38feba0cf048d51bd5eca/lib/restclient/payload.rb#L161
def wrap_report(report)
obj = StringIO.new(report)
def obj.content_type
'application/vnd.redhat.playbook-sat.v3+jsonl'
end
def obj.path
'file'
end
obj
end
def logger
action_logger
end
def current_org
Organization.find(input[:current_org_id])
end
end
end
end