-
Notifications
You must be signed in to change notification settings - Fork 1
/
parse_ci_monitor_json.py
executable file
·162 lines (135 loc) · 4.94 KB
/
parse_ci_monitor_json.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/env python3
import json
import argparse
import re
import sys
import os
import subprocess
import pprint
from datetime import datetime
from typing import List, Set
def argparser():
parser = argparse.ArgumentParser(
prog="This tool helps to group failed tests based on owners, feature tests and testcase ids across multiple nightly or upgrade runs "
)
parser.add_argument(
"-r",
"--runs",
nargs="+",
help="space seperated list of test run ids.",
)
parser.add_argument(
"-o",
"--output",
help="location to write output file",
default=f"./{datetime.today().strftime('%Y%m%d')}.json",
)
parser.add_argument("-v", "--version",
help="Specify OCP version", required=True)
return parser.parse_args()
def get_test_failure_profile(content: str, profile: str):
content = re.search("(http.*)", content)
if content:
linkto_logs = "[Link to logs|" + content.groups()[0] + "]|" + profile
else:
linkto_logs = f"not found|{profile}"
return linkto_logs
def get_automation_script(cfields: List):
for cf in cfields:
if cf["key"] == "automation_script":
script = cf["value"]["content"]
m = re.search("file: (features.*feature)\\n", script)
automation_script = m.groups()[0]
break
return automation_script
def get_owner(ascript: str, testid: str):
BUSHSLICER_HOME = os.getenv("BUSHSLICER_HOME")
dir = f"{BUSHSLICER_HOME}/{ascript}"
owner = None
try:
owner = subprocess.check_output(
f"egrep -B 1 {testid} {dir} | grep author", shell=True
)
owner = owner.decode().rstrip()
owner = re.search("author\s*(.*)@redhat.com", owner)
owner = owner.groups()[0]
except subprocess.CalledProcessError:
owner = "Not found"
return owner
def get_testrun_json(run_id: str):
"""Download the test case json data from polarshift."""
# Call $BUSHSLICER_HOME/tools/polarshift.rb get-run RUN_ID to download the json describing the test run
BUSHSLICER_HOME = os.getenv("BUSHSLICER_HOME")
# use -o to avoid extra non json garbage printed to stdout
cmd = [
f"{BUSHSLICER_HOME}/tools/polarshift.rb",
"get-run",
f"{run_id}",
"-o",
f"{run_id}.json",
]
subprocess.check_output(cmd)
run_json = get_json_from_file(f"{run_id}.json")
return run_json
def get_json_from_file(file_path: str):
"""Read in json from file_path."""
with open(file_path, "r") as f:
content = json.load(f)
return content
def write_output(data: dict, ofile: str):
with open(ofile, "w") as outfile:
json.dump(data, outfile, indent=4, sort_keys=True)
print(f"report written to {ofile}")
def get_existing_report(outputFile: str) -> dict:
if os.path.exists(outputFile):
with open(outputFile, 'r') as fh:
return json.load(fh)
else:
return dict()
def get_ocp_versions(report: dict) -> Set:
if report:
return set(report['version'])
else:
return set()
def main():
args = argparser()
report_struct = get_existing_report(args.output)
versions = get_ocp_versions(report_struct)
versions.add(args.version)
report_struct.update({"version": list(versions)})
for run in args.runs:
output = get_testrun_json(run)
profile = output["title"]
profile = re.search(".* - (.*)$", profile)
profile = profile.groups()[0]
for record in output["records"]["TestRecord"]:
if record["result"] == "Failed":
linkto_logs = get_test_failure_profile(
record["comment"]["content"], profile
)
linkto_logs += f"|{args.version}|{run}"
automation_script = get_automation_script(
record["test_case"]["customFields"]["Custom"]
)
id = record["test_case"]["id"]
owner = get_owner(automation_script, id)
if report_struct.get(owner, 0):
if report_struct[owner].get(automation_script, 0):
if report_struct[owner][automation_script].get(id, 0):
report_struct[owner][automation_script][id].append(
linkto_logs
)
else:
report_struct[owner][automation_script].update(
{id: [linkto_logs]}
)
else:
report_struct[owner].update(
{automation_script: {id: [linkto_logs]}}
)
else:
report_struct[owner] = {
automation_script: {id: [linkto_logs]}}
write_output(report_struct, args.output)
if __name__ == "__main__":
sys.exit(main())