/
client.py
320 lines (272 loc) · 11.2 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Client for accessing the Nominal Response Library (http://ds.iris.edu/NRL/).
:copyright:
Lloyd Carothers IRIS/PASSCAL, 2016
The ObsPy Development Team (devs@obspy.org)
:license:
GNU Lesser General Public License, Version 3
(http://www.gnu.org/copyleft/lesser.html)
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from future.builtins import * # NOQA
import codecs
import io
import os
import sys
import warnings
import requests
import obspy
from obspy.core.compatibility import configparser
from obspy.core.inventory.util import _textwrap
# Simple cache for remote NRL access. The total data amount will always be
# fairly small so I don't think it needs any cache eviction for now.
_remote_nrl_cache = {}
class NRL(object):
"""
NRL client base class for accessing the Nominal Response Library.
http://ds.iris.edu/NRL/
Created with a URL for remote access or filesystem accessing a local copy.
"""
_index = 'index.txt'
def __new__(cls, root=None):
# Check if its a folder on the file-system.
if root and os.path.isdir(root):
return super(NRL, cls).__new__(LocalNRL)
# Otherwise delegate to the remote NRL client to deal with all kinds
# of remote resources (currently only HTTP).
return super(NRL, cls).__new__(RemoteNRL)
def __init__(self):
sensor_index = self._join(self.root, 'sensors', self._index)
self.sensors = self._parse_ini(sensor_index)
datalogger_index = self._join(self.root, 'dataloggers', self._index)
self.dataloggers = self._parse_ini(datalogger_index)
def __str__(self):
info = ['NRL library at ' + self.root]
if self.sensors is None:
info.append(' Sensors not parsed yet.')
else:
info.append(
' Sensors: {} manufacturers'.format(len(self.sensors)))
if len(self.sensors):
keys = [key for key in sorted(self.sensors)]
lines = _textwrap("'" + "', '".join(keys) + "'",
initial_indent=' ',
subsequent_indent=' ')
info.extend(lines)
if self.dataloggers is None:
info.append(' Dataloggers not parsed yet.')
else:
info.append(' Dataloggers: {} manufacturers'.format(
len(self.dataloggers)))
if len(self.dataloggers):
keys = [key for key in sorted(self.dataloggers)]
lines = _textwrap("'" + "', '".join(keys) + "'",
initial_indent=' ',
subsequent_indent=' ')
info.extend(lines)
return '\n'.join(_i.rstrip() for _i in info)
def _repr_pretty_(self, p, cycle): # pragma: no cover
p.text(str(self))
def _choose(self, choice, path):
# Should return either a path or a resp
cp = self._get_cp_from_ini(path)
options = cp.options(choice)
if 'path' in options:
newpath = cp.get(choice, 'path')
elif 'resp' in options:
newpath = cp.get(choice, 'resp')
# Strip quotes of new path
newpath = self._clean_str(newpath)
path = os.path.dirname(path)
return self._join(path, newpath)
def _parse_ini(self, path):
nrl_dict = NRLDict(self)
cp = self._get_cp_from_ini(path)
for section in cp.sections():
options = sorted(cp.options(section))
if section.lower() == 'main':
if options not in (['question'],
['detail', 'question']): # pragma: no cover
msg = "Unexpected structure of NRL file '{}'".format(path)
raise NotImplementedError(msg)
nrl_dict._question = self._clean_str(cp.get(section,
'question'))
continue
else:
if options == ['path']:
nrl_dict[section] = NRLPath(self._choose(section, path))
continue
# sometimes the description field is named 'description', but
# sometimes also 'descr'
elif options in (['description', 'resp'], ['descr', 'resp'],
['resp']):
if 'descr' in options:
descr = cp.get(section, 'descr')
elif 'description' in options:
descr = cp.get(section, 'description')
else:
descr = '<no description>'
descr = self._clean_str(descr)
resp_path = self._choose(section, path)
nrl_dict[section] = (descr, resp_path)
continue
else: # pragma: no cover
msg = "Unexpected structure of NRL file '{}'".format(path)
raise NotImplementedError(msg)
return nrl_dict
def _clean_str(self, string):
return string.strip('\'"')
def get_datalogger_response(self, datalogger_keys):
"""
Get the datalogger response.
:type datalogger_keys: list of str
:rtype: :class:`~obspy.core.inventory.response.Response`
"""
datalogger = self.dataloggers
for key in datalogger_keys:
datalogger = datalogger[key]
# Parse to an inventory object and return a response object.
with io.BytesIO(self._read_resp(datalogger[1]).encode()) as buf:
buf.seek(0, 0)
return obspy.read_inventory(buf, format="RESP")[0][0][0].response
def get_sensor_response(self, sensor_keys):
"""
Get the sensor response.
:type sensor_keys: list of str
:rtype: :class:`~obspy.core.inventory.response.Response`
"""
sensor = self.sensors
for key in sensor_keys:
sensor = sensor[key]
# Parse to an inventory object and return a response object.
with io.BytesIO(self._read_resp(sensor[1]).encode()) as buf:
buf.seek(0, 0)
return obspy.read_inventory(buf, format="RESP")[0][0][0].response
def get_response(self, datalogger_keys, sensor_keys):
"""
Get Response from NRL tree structure
:param datalogger_keys: List of data-loggers.
:type datalogger_keys: list[str]
:param sensor_keys: List of sensors.
:type sensor_keys: list[str]
:rtype: :class:`~obspy.core.inventory.response.Response`
>>> nrl = NRL()
>>> response = nrl.get_response(
... sensor_keys=['Nanometrics', 'Trillium Compact', '120 s'],
... datalogger_keys=['REF TEK', 'RT 130 & 130-SMA', '1', '200'])
>>> print(response) # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
Channel Response
From M/S (Velocity in Meters per Second) to COUNTS (Digital Counts)
Overall Sensitivity: 4.74576e+08 defined at 1.000 Hz
10 stages:
Stage 1: PolesZerosResponseStage from M/S to V, gain: 754.3
Stage 2: ResponseStage from V to V, gain: 1
Stage 3: Coefficients... from V to COUNTS, gain: 629129
Stage 4: Coefficients... from COUNTS to COUNTS, gain: 1
Stage 5: Coefficients... from COUNTS to COUNTS, gain: 1
Stage 6: Coefficients... from COUNTS to COUNTS, gain: 1
Stage 7: Coefficients... from COUNTS to COUNTS, gain: 1
Stage 8: Coefficients... from COUNTS to COUNTS, gain: 1
Stage 9: Coefficients... from COUNTS to COUNTS, gain: 1
Stage 10: Coefficients... from COUNTS to COUNTS, gain: 1
"""
dl_resp = self.get_datalogger_response(datalogger_keys)
sensor_resp = self.get_sensor_response(sensor_keys)
# Combine both by replace stage one in the data logger with stage
# one of the sensor.
dl_resp.response_stages.pop(0)
dl_resp.response_stages.insert(0, sensor_resp.response_stages[0])
try:
dl_resp.recalculate_overall_sensitivity()
except ValueError:
msg = "Failed to recalculate overall sensitivity."
warnings.warn(msg)
return dl_resp
class NRLDict(dict):
def __init__(self, nrl):
self._nrl = nrl
def __str__(self):
if len(self):
if self._question:
info = ['{} ({} items):'.format(self._question, len(self))]
else:
info = ['{} items:'.format(len(self))]
texts = ["'{}'".format(k) for k in sorted(self.keys())]
info.extend(_textwrap(", ".join(texts), initial_indent=' ',
subsequent_indent=' '))
return '\n'.join(_i.rstrip() for _i in info)
else:
return '0 items.'
def _repr_pretty_(self, p, cycle): # pragma: no cover
p.text(str(self))
def __getitem__(self, name):
value = super(NRLDict, self).__getitem__(name)
# if encountering a not yet parsed NRL Path, expand it now
if isinstance(value, NRLPath):
value = self._nrl._parse_ini(value)
self[name] = value
return value
class NRLPath(str):
pass
class LocalNRL(NRL):
"""
Subclass of NRL for accessing local copy NRL.
"""
def __init__(self, root):
self.root = root
self._join = os.path.join
super(self.__class__, self).__init__()
def _get_cp_from_ini(self, path):
"""
Returns a configparser from a path to an index.txt
"""
cp = configparser.ConfigParser()
with codecs.open(path, mode='r', encoding='UTF-8') as f:
if sys.version_info.major == 2: # pragma: no cover
cp.readfp(f)
else: # pragma: no cover
cp.read_file(f)
return cp
def _read_resp(self, path):
# Returns Unicode string of RESP
with open(path, 'r') as f:
return f.read()
class RemoteNRL(NRL):
"""
Subclass of NRL for accessing remote copy of NRL.
"""
def __init__(self, root='http://ds.iris.edu/NRL'):
self.root = root
super(self.__class__, self).__init__()
def _download(self, url):
"""
Download service with basic cache.
"""
if url not in _remote_nrl_cache:
response = requests.get(url)
_remote_nrl_cache[url] = response.text
return _remote_nrl_cache[url]
def _join(self, *paths):
url = paths[0]
for path in paths[1:]:
url = requests.compat.urljoin(url + '/', path)
return url
def _get_cp_from_ini(self, path):
'''
Returns a configparser from a path to an index.txt
'''
cp = configparser.SafeConfigParser()
with io.StringIO(self._download(path)) as buf:
if sys.version_info.major == 2: # pragma: no cover
cp.readfp(buf)
else: # pragma: no cover
cp.read_file(buf)
return cp
def _read_resp(self, path):
return self._download(path)
if __name__ == "__main__": # pragma: no cover
import doctest
doctest.testmod(exclude_empty=True)