-
Notifications
You must be signed in to change notification settings - Fork 8
/
io_utils.py
246 lines (204 loc) · 7.25 KB
/
io_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
#!/usr/bin/env python
"""
This module provides utility classes for io operations.
"""
__author__ = "Shyue Ping Ong, Rickard Armiento, Anubhav Jain"
__copyright__ = "Copyright 2011, The Materials Project"
__version__ = "1.0"
__maintainer__ = "Shyue Ping Ong"
__email__ = "shyue@mit.edu"
__status__ = "Production"
__date__ = "Sep 23, 2011"
import re
import numpy
import os
def zopen(filename, *args, **kwargs):
"""
This wrapper wraps around the bz2, gzip and standard python's open function
to deal intelligently with bzipped, gzipped or standard text files.
Args:
filename:
filename
args:
Standard args for python open(..). E.g., 'r' for read, 'w' for
write.
kwargs:
Standard kwargs for python open(..).
Returns:
File handler
"""
file_ext = filename.split(".")[-1].upper()
if file_ext == "BZ2":
import bz2
return bz2.BZ2File(filename, *args, **kwargs)
elif file_ext in ("GZ", "Z"):
import gzip
return gzip.GzipFile(filename, *args, **kwargs)
else:
return open(filename, *args, **kwargs)
def zpath(filename):
"""
Returns an existing (zipped or unzipped) file path given the unzipped
version. If no path exists, returns the filename unmodified
Args:
filename:
filename without zip extension
Returns:
filename with a zip extension (unless an unzipped version
exists)
"""
for p in [filename, filename + '.gz', filename + '.bz2']:
if os.path.exists(p):
return p
return filename
def clean_lines(string_list, remove_empty_lines=True):
"""
Strips whitespace, \n and \r and empty lines from a list.
Args:
string_list:
List of strings
remove_empty_lines:
Set to True to skip lines which are empty after stripping.
Returns:
List of clean strings with no whitespaces.
"""
for s in string_list:
clean_s = s
if '#' in s:
ind = s.index('#')
clean_s = s[:ind]
clean_s = clean_s.strip()
if (not remove_empty_lines) or clean_s != '':
yield clean_s
def micro_pyawk(filename, search, results=None, debug=None, postdebug=None):
"""
Small awk-mimicking search routine.
'file' is file to search through.
'search' is the "search program", a list of lists/tuples with 3 elements;
i.e. [[regex,test,run],[regex,test,run],...]
'results' is a an object that your search program will have access to for
storing results.
Here regex is either as a Regex object, or a string that we compile into a
Regex. test and run are callable objects.
This function goes through each line in filename, and if regex matches that
line *and* test(results,line)==True (or test == None) we execute
run(results,match),where match is the match object from running
Regex.match.
The default results is an empty dictionary. Passing a results object let
you interact with it in run() and test(). Hence, in many occasions it is
thus clever to use results=self.
Author: Rickard Armiento
Returns:
results
"""
if results is None:
results = {}
# Compile strings into regexs
for entry in search:
if isinstance(entry[0], str):
entry[0] = re.compile(entry[0])
with zopen(filename) as f:
for line in f:
for i in range(len(search)):
match = search[i][0].search(line)
if match and (search[i][1] is not None
or search[i][1](results, line)):
if debug is not None:
debug(results, match)
search[i][2](results, match)
if postdebug is not None:
postdebug(results, match)
return results
def clean_json(input_json, strict=False):
"""
This method cleans an input json-like dict object, either a list or a dict,
nested or otherwise, by converting all non-string dictionary keys (such as
int and float) to strings.
Args:
input_dict:
input dictionary.
strict:
This parameters sets the behavior when clean_json encounters an
object it does not understand. If strict is True, clean_json will
try to get the to_dict attribute of the object. If no such
attribute is found, an attribute error will be thrown. If strict is
False, clean_json will simply call str(object) to convert the
object to a string representation.
Returns:
Sanitized dict that can be json serialized.
"""
if isinstance(input_json, (list, numpy.ndarray, tuple)):
return [clean_json(i, strict=strict) for i in input_json]
elif isinstance(input_json, dict):
return {str(k): clean_json(v, strict=strict)
for k, v in input_json.items()}
elif isinstance(input_json, (int, float)):
return input_json
else:
if not strict:
return str(input_json)
else:
if isinstance(input_json, basestring):
return str(input_json)
elif input_json is None:
return 'None'
else:
return clean_json(input_json.to_dict, strict=strict)
def which(program):
"""
Returns full path to a executable.
"""
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
def reverse_readline(m_file, blk_size=4096):
"""
Generator method to read a file line-by-line, but backwards. This allows
one to efficiently get data at the end of a file.
Based on code by Peter Astrand <astrand@cendio.se>, using modifications by
Raymond Hettinger and Kevin German.
http://code.activestate.com/recipes/439045-read-a-text-file-backwards-yet-another-implementat/
Args:
m_file:
File stream to read (backwards)
blk_size:
The buffer size. Defaults to 4096.
Returns:
Generator that returns lines from the file. Similar behavior to the
file.readline() method, except the lines are returned from the back
of the file.
"""
buf = ""
m_file.seek(0, 2)
lastchar = m_file.read(1)
trailing_newline = (lastchar == "\n")
while 1:
newline_pos = buf.rfind("\n")
pos = m_file.tell()
if newline_pos != -1:
# Found a newline
line = buf[newline_pos+1:]
buf = buf[:newline_pos]
if pos or newline_pos or trailing_newline:
line += "\n"
yield line
elif pos:
# Need to fill buffer
toread = min(blk_size, pos)
m_file.seek(pos-toread, 0)
buf = m_file.read(toread) + buf
m_file.seek(pos-toread, 0)
if pos == toread:
buf = "\n" + buf
else:
# Start-of-file
return