/
routing_client.py
376 lines (314 loc) · 13.6 KB
/
routing_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Base class for all FDSN routers.
:copyright:
The ObsPy Development Team (devs@obspy.org)
Celso G Reyes, 2017
IRIS-DMC
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from future.builtins import * # NOQA
from multiprocessing.dummy import Pool as ThreadPool
import decorator
import io
import sys
import traceback
import warnings
from obspy.core.compatibility import (urlparse, string_types,
get_reason_from_response)
import obspy
from ...base import HTTPClient
from .. import client
from ..client import raise_on_error
from ..header import FDSNException, URL_MAPPINGS, FDSNNoDataException
def RoutingClient(routing_type, *args, **kwargs): # NOQA
"""
Helper function to get the correct routing instance.
:type routing_type: str
:param routing_type: The type of router to initialize.
``"iris-federator"`` or ``"eida-routing"``. Will consequently return
either a :class:`~.federator_routing_client.FederatorRoutingClient` or
a :class:`~.eidaws_routing_client.EIDAWSRoutingClient` object,
respectively.
Remaining ``args`` and ``kwargs`` will be passed to the underlying classes.
For example, credentials can be supported for all underlying data centers.
See :meth:`BaseRoutingClient <BaseRoutingClient.__init__>` for details.
>>> from obspy.clients.fdsn import RoutingClient
Get an instance of a routing client using the IRIS Federator:
>>> c = RoutingClient("iris-federator")
>>> print(type(c)) # doctest: +ELLIPSIS
<class '...routing.federator_routing_client.FederatorRoutingClient'>
Or get an instance of a routing client using the EIDAWS routing web
service:
>>> c = RoutingClient("eida-routing")
>>> print(type(c)) # doctest: +ELLIPSIS
<class '...routing.eidaws_routing_client.EIDAWSRoutingClient'>
"""
if routing_type.lower() == "eida-routing":
from .eidaws_routing_client import EIDAWSRoutingClient
return EIDAWSRoutingClient(*args, **kwargs)
if routing_type.lower() == "iris-federator":
from .federator_routing_client import FederatorRoutingClient
return FederatorRoutingClient(*args, **kwargs)
else:
raise NotImplementedError(
"Routing type '%s' is not implemented. Available types: "
"`iris-federator`, `eida-routing`" % routing_type)
@decorator.decorator
def _assert_filename_not_in_kwargs(f, *args, **kwargs):
if "filename" in kwargs:
raise ValueError("The `filename` argument is not supported")
return f(*args, **kwargs)
@decorator.decorator
def _assert_attach_response_not_in_kwargs(f, *args, **kwargs):
if "attach_response" in kwargs:
raise ValueError("The `attach_response` argument is not supported")
return f(*args, **kwargs)
def _try_download_bulk(r):
try:
return _download_bulk(r)
except Exception:
reason = "".join(traceback.format_exception(*sys.exc_info()))
warnings.warn(
"Failed to download data of type '%s' from '%s' due to: \n%s" % (
r["data_type"], r["endpoint"], reason))
return None
def _download_bulk(r):
# Figure out the passed credentials, if any. Two possibilities:
# (1) User and password, given explicitly for the base URLs (or an
# explicity given `eida_token` key per URL).
# (2) A global EIDA_TOKEN key. It will be used for all services that
# don't have explicit credentials and also support the `/auth` route.
credentials = r["credentials"].get(urlparse(r["endpoint"]).netloc, {})
try:
c = client.Client(r["endpoint"], debug=r["debug"],
timeout=r["timeout"], **credentials)
# This should rarely happen but better safe than sorry.
except FDSNException as e: # pragma: no cover
msg = e.args[0]
msg += "It will not be used for routing. Try again later?"
warnings.warn(msg)
return None
if not credentials and "EIDA_TOKEN" in r["credentials"] and \
c._has_eida_auth:
c.set_eida_token(r["credentials"]["EIDA_TOKEN"])
if r["data_type"] == "waveform":
fct = c.get_waveforms_bulk
service = c.services["dataselect"]
elif r["data_type"] == "station":
fct = c.get_stations_bulk
service = c.services["station"]
# Keep only kwargs that are supported by this particular service.
kwargs = {k: v for k, v in r["kwargs"].items() if k in service}
bulk_str = ""
for key, value in kwargs.items():
bulk_str += "%s=%s\n" % (key, str(value))
try:
return fct(bulk_str + r["bulk_str"])
except FDSNException:
return None
def _strip_protocol(url):
url = urlparse(url)
return url.netloc + url.path
# Does not inherit from the FDSN client as that would be fairly hacky as
# some methods just make no sense for the routing client to have (e.g.
# get_events() but also others).
class BaseRoutingClient(HTTPClient):
def __init__(self, debug=False, timeout=120, include_providers=None,
exclude_providers=None, credentials=None):
"""
:type routing_type: str
:param routing_type: The type of
router to initialize. For details see :func:`RoutingClient`.
:type exclude_providers: str or list of str
:param exclude_providers: Get no data from these providers. Can be
the full HTTP address or one of the shortcuts ObsPy knows about.
:type include_providers: str or list of str
:param include_providers: Get data only from these providers. Can be
the full HTTP address of one of the shortcuts ObsPy knows about.
:type credentials: dict
:param credentials: Credentials for the individual data centers as a
dictionary that maps base url of FDSN web service to either
username/password or EIDA token, e.g.
``credentials={
'geofon.gfz-potsdam.de': {'eida_token': 'my_token_file.txt'},
'service.iris.edu': {'user': 'me', 'password': 'my_pass'}
'EIDA_TOKEN': '/path/to/token.txt'
}``
The root level ``'EIDA_TOKEN'`` will be applied to all data centers
that claim to support the ``/auth`` route and don't have data
center specific credentials.
You can also use a URL mapping as for the normal FDSN client
instead of the URL.
"""
HTTPClient.__init__(self, debug=debug, timeout=timeout)
self.include_providers = include_providers
self.exclude_providers = exclude_providers
# Parse credentials.
self.credentials = {}
for key, value in (credentials or {}).items():
if key == "EIDA_TOKEN":
self.credentials[key] = value
# Map, if necessary.
if key in URL_MAPPINGS:
key = URL_MAPPINGS[key]
# Make sure urlparse works correctly.
if not key.startswith("http"):
key = "http://" + key
# Only use the location.
self.credentials[urlparse(key).netloc] = value
@property
def include_providers(self):
return self.__include_providers
@include_providers.setter
def include_providers(self, value):
self.__include_providers = self._expand_providers(value)
@property
def exclude_providers(self):
return self.__exclude_providers
@exclude_providers.setter
def exclude_providers(self, value):
self.__exclude_providers = self._expand_providers(value)
def _expand_providers(self, providers):
if providers is None:
providers = []
elif isinstance(providers, string_types):
providers = [providers]
return [_strip_protocol(URL_MAPPINGS[_i])
if _i in URL_MAPPINGS
else _strip_protocol(_i) for _i in providers]
def _filter_requests(self, split):
"""
Filter requests based on including and excluding providers.
:type split: dict
:param split: A dictionary containing the desired routing.
"""
key_map = {_strip_protocol(url): url for url in split.keys()}
# Apply both filters.
f_keys = set(key_map.keys())
if self.include_providers:
f_keys = f_keys.intersection(set(self.include_providers))
f_keys = f_keys.difference(set(self.exclude_providers))
return {key_map[k]: split[key_map[k]] for k in f_keys}
def _download_waveforms(self, split, **kwargs):
return self._download_parallel(split, data_type="waveform", **kwargs)
def _download_stations(self, split, **kwargs):
return self._download_parallel(split, data_type="station", **kwargs)
def _download_parallel(self, split, data_type, **kwargs):
# Apply the provider filter.
split = self._filter_requests(split)
if not split:
raise FDSNNoDataException(
"Nothing remains to download after the provider "
"inclusion/exclusion filters have been applied.")
if data_type not in ["waveform", "station"]: # pragma: no cover
raise ValueError("Invalid data type.")
# One thread per data center.
dl_requests = []
for k, v in split.items():
dl_requests.append({
"debug": self._debug,
"timeout": self._timeout,
"endpoint": k,
"bulk_str": v,
"data_type": data_type,
"kwargs": kwargs,
"credentials": self.credentials})
pool = ThreadPool(processes=len(dl_requests))
results = pool.map(_try_download_bulk, dl_requests)
# Merge all results into a single object.
if data_type == "waveform":
collection = obspy.Stream()
elif data_type == "station":
collection = obspy.Inventory(
networks=[],
source="ObsPy FDSN Routing %s" % obspy.__version__)
else: # pragma: no cover
raise ValueError
for _i in results:
if not _i:
continue
collection += _i
# Explitly close the thread pool as somehow this does not work
# automatically under linux. See #2342.
pool.close()
return collection
def _handle_requests_http_error(self, r):
"""
This assumes the same error code semantics as the base fdsnws web
services.
Please overwrite this method in a child class if necessary.
"""
reason = get_reason_from_response(r)
if hasattr(r, "content"):
c = r.content
try:
c = c.encode()
except Exception:
pass
reason += b" -- " + c
with io.BytesIO(reason) as buf:
raise_on_error(r.status_code, buf)
@_assert_filename_not_in_kwargs
@_assert_attach_response_not_in_kwargs
def get_waveforms(self, starttime, endtime, **kwargs):
"""
Get waveforms from multiple data centers.
Arguments are the same as in
:meth:`obspy.clients.fdsn.client.Client.get_waveforms()`.
Any additional ``**kwargs`` are passed on to each individual service's
dataselect service if the service supports them (otherwise they are
silently ignored for that particular fdsnws endpoint).
The ``filename`` and ``attach_response`` parameters of the single
provider FDSN client are not supported.
This can route on a number of different parameters, depending on the
service, please see the web site of each individual routing service
for details.
"""
# This just calls the bulk downloader to only implement the logic once.
# Just pass these to the bulk request.
bulk = []
for _i in ["network", "station", "location", "channel"]:
if _i in kwargs:
bulk.append(kwargs[_i])
del kwargs[_i]
else:
bulk.append("*")
bulk.extend([starttime, endtime])
return self.get_waveforms_bulk([bulk], **kwargs)
def get_service_version(self):
"""
Return a semantic version number of the remote service as a string.
"""
r = self._download(self._url + "/version")
return r.content.decode() if \
hasattr(r.content, "decode") else r.content
@_assert_filename_not_in_kwargs
def get_stations(self, **kwargs):
"""
Get stations from multiple data centers.
It will pass on most parameters to the underlying routed service.
They will also be passed on to the individual FDSNWS implementations
if a service supports them.
The ``filename`` parameter of the single provider FDSN client is not
supported.
This can route on a number of different parameters, please see the
web sites of the
`IRIS Federator <https://service.iris.edu/irisws/fedcatalog/1/>`_
and of the `EIDAWS Routing Service
<http://www.orfeus-eu.org/data/eida/webservices/routing/>`_ for
details.
"""
# Just pass these to the bulk request.
bulk = [kwargs.pop(key, '*') for key in (
"network", "station", "location", "channel", "starttime",
"endtime")]
return self.get_stations_bulk([bulk], **kwargs)
if __name__ == '__main__': # pragma: no cover
import doctest
doctest.testmod(exclude_empty=True)