|
1 | | -from subprocess import check_output |
2 | | -import pandas as pd |
3 | 1 | from datetime import datetime, timedelta |
4 | 2 | from io import StringIO |
5 | 3 | from json import loads |
6 | 4 | from os.path import join |
| 5 | +from subprocess import check_output |
| 6 | + |
| 7 | +import pandas as pd |
7 | 8 |
|
8 | | -from qiita_db.util import MaxRSS_helper |
9 | 9 | from qiita_db.exceptions import QiitaDBUnknownIDError |
10 | 10 | from qiita_db.processing_job import ProcessingJob |
11 | 11 | from qiita_db.software import Software |
12 | | - |
| 12 | +from qiita_db.util import MaxRSS_helper |
13 | 13 |
|
14 | 14 | all_commands = [c for s in Software.iter(False) for c in s.commands] |
15 | 15 |
|
16 | 16 | # retrieving only the numerice external_id means that we are only focusing |
17 | 17 | # on barnacle2/slurm jobs |
18 | | -main_jobs = [j for c in all_commands for j in c.processing_jobs |
19 | | - if j.status == 'success' and j.external_id.isnumeric()] |
20 | | - |
21 | | -sacct = ['sacct', '-p', '--format=JobID,ElapsedRaw,MaxRSS,Submit,Start,MaxRSS,' |
22 | | - 'CPUTimeRAW,ReqMem,AllocCPUs,AveVMSize', '-j'] |
| 18 | +main_jobs = [ |
| 19 | + j |
| 20 | + for c in all_commands |
| 21 | + for j in c.processing_jobs |
| 22 | + if j.status == "success" and j.external_id.isnumeric() |
| 23 | +] |
| 24 | + |
| 25 | +sacct = [ |
| 26 | + "sacct", |
| 27 | + "-p", |
| 28 | + "--format=JobID,ElapsedRaw,MaxRSS,Submit,Start,MaxRSS," |
| 29 | + "CPUTimeRAW,ReqMem,AllocCPUs,AveVMSize", |
| 30 | + "-j", |
| 31 | +] |
23 | 32 |
|
24 | 33 | data = [] |
25 | 34 | for i, j in enumerate(main_jobs): |
26 | 35 | if i % 1000 == 0: |
27 | | - print(f'{i}/{len(main_jobs)}') |
| 36 | + print(f"{i}/{len(main_jobs)}") |
28 | 37 | eid = j.external_id |
29 | | - extra_info = '' |
30 | | - rvals = StringIO(check_output(sacct + [eid]).decode('ascii')) |
31 | | - _d = pd.read_csv(rvals, sep='|') |
32 | | - _d['QiitaID'] = j.id |
| 38 | + extra_info = "" |
| 39 | + rvals = StringIO(check_output(sacct + [eid]).decode("ascii")) |
| 40 | + _d = pd.read_csv(rvals, sep="|") |
| 41 | + _d["QiitaID"] = j.id |
33 | 42 | cmd = j.command |
34 | 43 | s = j.command.software |
35 | 44 | try: |
|
43 | 52 | except TypeError as e: |
44 | 53 | # similar to the except above, exept that for these 2 commands, we have |
45 | 54 | # the study_id as None |
46 | | - if cmd.name in {'create_sample_template', 'delete_sample_template', |
47 | | - 'list_remote_files'}: |
| 55 | + if cmd.name in { |
| 56 | + "create_sample_template", |
| 57 | + "delete_sample_template", |
| 58 | + "list_remote_files", |
| 59 | + }: |
48 | 60 | continue |
49 | 61 | else: |
50 | 62 | raise e |
51 | 63 |
|
52 | 64 | sname = s.name |
53 | 65 |
|
54 | | - if cmd.name == 'release_validators': |
55 | | - ej = ProcessingJob(j.parameters.values['job']) |
| 66 | + if cmd.name == "release_validators": |
| 67 | + ej = ProcessingJob(j.parameters.values["job"]) |
56 | 68 | extra_info = ej.command.name |
57 | 69 | samples, columns, input_size = ej.shape |
58 | | - elif cmd.name == 'complete_job': |
59 | | - artifacts = loads(j.parameters.values['payload'])['artifacts'] |
| 70 | + elif cmd.name == "complete_job": |
| 71 | + artifacts = loads(j.parameters.values["payload"])["artifacts"] |
60 | 72 | if artifacts is not None: |
61 | | - extra_info = ','.join({ |
62 | | - x['artifact_type'] for x in artifacts.values() |
63 | | - if 'artifact_type' in x}) |
64 | | - elif cmd.name == 'Validate': |
65 | | - input_size = sum([len(x) for x in loads( |
66 | | - j.parameters.values['files']).values()]) |
| 73 | + extra_info = ",".join( |
| 74 | + {x["artifact_type"] for x in artifacts.values() if "artifact_type" in x} |
| 75 | + ) |
| 76 | + elif cmd.name == "Validate": |
| 77 | + input_size = sum([len(x) for x in loads(j.parameters.values["files"]).values()]) |
67 | 78 | sname = f"{sname} - {j.parameters.values['artifact_type']}" |
68 | | - elif cmd.name == 'Alpha rarefaction curves [alpha_rarefaction]': |
| 79 | + elif cmd.name == "Alpha rarefaction curves [alpha_rarefaction]": |
69 | 80 | extra_info = j.parameters.values[ |
70 | | - ('The number of rarefaction depths to include between min_depth ' |
71 | | - 'and max_depth. (steps)')] |
72 | | - |
73 | | - _d['external_id'] = eid |
74 | | - _d['sId'] = s.id |
75 | | - _d['sName'] = sname |
76 | | - _d['sVersion'] = s.version |
77 | | - _d['cId'] = cmd.id |
78 | | - _d['cName'] = cmd.name |
79 | | - _d['samples'] = samples |
80 | | - _d['columns'] = columns |
81 | | - _d['input_size'] = input_size |
82 | | - _d['extra_info'] = extra_info |
83 | | - _d.drop(columns=['Unnamed: 10'], inplace=True) |
| 81 | + ( |
| 82 | + "The number of rarefaction depths to include between min_depth " |
| 83 | + "and max_depth. (steps)" |
| 84 | + ) |
| 85 | + ] |
| 86 | + |
| 87 | + _d["external_id"] = eid |
| 88 | + _d["sId"] = s.id |
| 89 | + _d["sName"] = sname |
| 90 | + _d["sVersion"] = s.version |
| 91 | + _d["cId"] = cmd.id |
| 92 | + _d["cName"] = cmd.name |
| 93 | + _d["samples"] = samples |
| 94 | + _d["columns"] = columns |
| 95 | + _d["input_size"] = input_size |
| 96 | + _d["extra_info"] = extra_info |
| 97 | + _d.drop(columns=["Unnamed: 10"], inplace=True) |
84 | 98 | data.append(_d) |
85 | 99 |
|
86 | 100 | data = pd.concat(data) |
|
100 | 114 | # external_id but separate from external_id.batch. |
101 | 115 | # Here we are going to merge all this info into a single row + some |
102 | 116 | # other columns |
103 | | -date_fmt = '%Y-%m-%dT%H:%M:%S' |
| 117 | +date_fmt = "%Y-%m-%dT%H:%M:%S" |
104 | 118 |
|
105 | 119 | df = [] |
106 | | -for eid, __df in data.groupby('external_id'): |
| 120 | +for eid, __df in data.groupby("external_id"): |
107 | 121 | tmp = __df.iloc[1].copy() |
108 | 122 | # Calculating WaitTime, basically how long did the job took to start |
109 | 123 | # this is useful for some general profiling |
110 | | - tmp['WaitTime'] = datetime.strptime( |
111 | | - __df.iloc[0].Start, date_fmt) - datetime.strptime( |
112 | | - __df.iloc[0].Submit, date_fmt) |
| 124 | + tmp["WaitTime"] = datetime.strptime( |
| 125 | + __df.iloc[0].Start, date_fmt |
| 126 | + ) - datetime.strptime(__df.iloc[0].Submit, date_fmt) |
113 | 127 | df.append(tmp) |
114 | 128 | df = pd.DataFrame(df) |
115 | 129 |
|
116 | 130 | # This is important as we are transforming the MaxRSS to raw value |
117 | 131 | # so we need to confirm that there is no other suffixes |
118 | | -print('Make sure that only 0/K/M exist', set( |
119 | | - df.MaxRSS.apply(lambda x: str(x)[-1]))) |
| 132 | +print("Make sure that only 0/K/M exist", set(df.MaxRSS.apply(lambda x: str(x)[-1]))) |
120 | 133 |
|
121 | 134 | # Generating new columns |
122 | | -df['MaxRSSRaw'] = df.MaxRSS.apply(lambda x: MaxRSS_helper(str(x))) |
123 | | -df['ElapsedRawTime'] = df.ElapsedRaw.apply( |
124 | | - lambda x: timedelta(seconds=float(x))) |
| 135 | +df["MaxRSSRaw"] = df.MaxRSS.apply(lambda x: MaxRSS_helper(str(x))) |
| 136 | +df["ElapsedRawTime"] = df.ElapsedRaw.apply(lambda x: timedelta(seconds=float(x))) |
125 | 137 |
|
126 | 138 | # Thu, Apr 27, 2023 was the first time Jeff and I changed the old allocations |
127 | 139 | # (from barnacle) to a better allocation so using job 1265533 as the |
128 | 140 | # before/after so we only use the latests for the newest version |
129 | | -df['updated'] = df.external_id.apply( |
130 | | - lambda x: 'after' if int(x) >= 1265533 else 'before') |
| 141 | +df["updated"] = df.external_id.apply( |
| 142 | + lambda x: "after" if int(x) >= 1265533 else "before" |
| 143 | +) |
131 | 144 |
|
132 | | -fn = join('/panfs', 'qiita', f'jobs_{df.Start.max()[:10]}.tsv.gz') |
133 | | -df.to_csv(fn, sep='\t', index=False) |
| 145 | +fn = join("/panfs", "qiita", f"jobs_{df.Start.max()[:10]}.tsv.gz") |
| 146 | +df.to_csv(fn, sep="\t", index=False) |
0 commit comments