/
util.py
160 lines (139 loc) · 5.81 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from __future__ import division
from builtins import str
from past.builtins import basestring
from past.utils import old_div
import click
import itertools
import re
import sys
def natural_sort(l):
convert = lambda text: int(text) if text.isdigit() else text.lower()
alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
return sorted(l, key=alphanum_key)
def flatmap(func, items):
return itertools.chain.from_iterable(map(func, items))
def ascii_encode(non_compatible_string):
"""Primarily used for ensuring terminal display compatibility"""
if non_compatible_string:
return non_compatible_string.encode('ascii', errors='ignore').decode("ascii")
else:
return ""
def pull(nested_dict):
if "type" in nested_dict and "inputs" not in nested_dict:
return nested_dict
else:
inputs = {}
if "type" in nested_dict and "inputs" in nested_dict:
for param, input in list(nested_dict["inputs"].items()):
inputs[str(param)] = pull(input)
return inputs
else:
return nested_dict
def regex_manifest(protocol, input):
"""Special input types, gets updated as more input types are added"""
if "type" in input and input["type"] == "choice":
if "options" in input:
pattern = '\[(.*?)\]'
match = re.search(pattern, str(input["options"]))
if not match:
click.echo("Error in %s: input type \"choice\" options must "
"be in the form of: \n[\n {\n \"value\": "
"<choice value>, \n \"label\": <choice label>\n "
"},\n ...\n]" % protocol['name'])
raise RuntimeError
else:
click.echo("Must have options for 'choice' input type." +
" Error in: " + protocol["name"])
raise RuntimeError
def iter_json(manifest):
all_types = {}
try:
protocol = manifest['protocols']
except TypeError:
raise RuntimeError("Error: Your manifest.json file doesn't contain "
"valid JSON and cannot be formatted.")
for protocol in manifest["protocols"]:
types = {}
for param, input in list(protocol["inputs"].items()):
types[param] = pull(input)
if isinstance(input, dict):
if input["type"] == "group" or input["type"] == "group+":
for i, j in list(input.items()):
if isinstance(j, dict):
for k, l in list(j.items()):
regex_manifest(protocol, l)
else:
regex_manifest(protocol, input)
all_types[protocol["name"]] = types
return all_types
def robotize(well_ref, well_count, col_count):
"""Function referenced from autoprotocol.container_type.robotize()"""
if isinstance(well_ref, list):
return [robotize(well, well_count, col_count) for well in well_ref]
if not isinstance(well_ref, (basestring, int)):
raise TypeError("ContainerType.robotize(): Well reference given "
"is not of type 'str' or 'int'.")
well_ref = str(well_ref)
m = re.match("([a-z])(\d+)$", well_ref, re.I)
if m:
row = ord(m.group(1).upper()) - ord('A')
col = int(m.group(2)) - 1
well_num = row * col_count + col
# Check bounds
if row > (old_div(well_count, col_count)):
raise ValueError("Row given exceeds "
"container dimensions.")
if col > col_count or col < 0:
raise ValueError("Col given exceeds "
"container dimensions.")
if well_num > well_count:
raise ValueError("Well given "
"exceeds container dimensions.")
return well_num
else:
m = re.match("\d+$", well_ref)
if m:
well_num = int(m.group(0))
# Check bounds
if well_num > well_count or well_num < 0:
raise ValueError("Well number "
"given exceeds container dimensions.")
return well_num
else:
raise ValueError("Well must be in "
"'A1' format or be an integer.")
def humanize(well_ref, well_count, col_count):
"""Function referenced from autoprotocol.container_type.humanize()"""
if isinstance(well_ref, list):
return [humanize(well, well_count, col_count) for well in well_ref]
if isinstance(well_ref, str):
try:
well_ref = int(well_ref)
except:
raise ValueError("Well reference (%s) given has to be parseable into int." % well_ref)
if not isinstance(well_ref, int):
raise TypeError("Well reference (%s) given "
"is not of type 'int'." % well_ref)
idx = robotize(well_ref, well_count, col_count)
row, col = (idx // col_count, idx % col_count)
# Check bounds
if well_ref > well_count or well_ref < 0:
raise ValueError("Well reference "
"given exceeds container dimensions.")
return "ABCDEFGHIJKLMNOPQRSTUVWXYZ"[row] + str(col + 1)
def by_well(datasets, well):
return [datasets[reading].props['data'][well][0] for
reading in list(datasets.keys())]
def makedirs(name, mode=None, exist_ok=False):
"""Forward ports `exist_ok` flag for Py2 makedirs. Retains mode defaults"""
from os import makedirs
mode = mode if mode is not None else 0o777
if sys.version_info[0] < 3:
# Note that Py2 makedirs still errors on all errno.EEXIST, unlike Py3
try:
makedirs(name, mode)
except OSError:
if not exist_ok:
raise
else:
makedirs(name, mode, exist_ok)