/
core.py
1610 lines (1419 loc) · 66 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Functions dealing with reading and writing StationXML.
:copyright:
Lion Krischer (krischer@geophysik.uni-muenchen.de), 2013
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from future.builtins import * # NOQA
import copy
import inspect
import io
import math
import os
import re
import warnings
from lxml import etree
from numpy import double
import obspy
from obspy.core import compatibility
from obspy.core.util import AttribDict
from obspy.core.util.obspy_types import (ComplexWithUncertainties,
FloatWithUncertaintiesAndUnit)
from obspy.core.inventory import (CoefficientsTypeResponseStage,
CoefficientWithUncertainties,
FilterCoefficient, FIRResponseStage,
PolesZerosResponseStage,
PolynomialResponseStage,
ResponseListResponseStage, ResponseStage)
from obspy.core.inventory import (Angle, Azimuth, ClockDrift, Dip, Distance,
Frequency, Latitude, Longitude, SampleRate)
# Define some constants for writing StationXML files.
SOFTWARE_MODULE = "ObsPy %s" % obspy.__version__
SOFTWARE_URI = "https://www.obspy.org"
SCHEMA_VERSION = "1.1"
READABLE_VERSIONS = ("1.0", "1.1")
def _get_version_from_xmldoc(xmldoc):
"""
Return StationXML version string or ``None`` if parsing fails.
"""
root = xmldoc.getroot()
try:
match = re.match(
r'{http://www.fdsn.org/xml/station/[0-9]+}FDSNStationXML',
root.tag)
assert match is not None
except Exception:
return None
try:
version = root.attrib["schemaVersion"]
except KeyError:
return None
return version
def _is_stationxml(path_or_file_object):
"""
Simple function checking if the passed object contains a valid StationXML
1.0 or StationXML 1.1 file. Returns True of False.
The test is not exhaustive - it only checks the root tag but that should
be good enough for most real world use cases. If the schema is used to
test for a StationXML file, many real world files are false negatives as
they don't adhere to the standard.
:param path_or_file_object: File name or file like object.
"""
if hasattr(path_or_file_object, "tell") and hasattr(path_or_file_object,
"seek"):
current_position = path_or_file_object.tell()
try:
if isinstance(path_or_file_object, etree._Element):
xmldoc = path_or_file_object
else:
try:
xmldoc = etree.parse(path_or_file_object)
except etree.XMLSyntaxError:
return False
version = _get_version_from_xmldoc(xmldoc)
if version is None:
return False
if version not in READABLE_VERSIONS:
warnings.warn("The StationXML file has version %s, ObsPy can "
"read versions (%s). Proceed with caution." % (
version, ", ".join(READABLE_VERSIONS)))
return True
finally:
# Make sure to reset file pointer position.
try:
path_or_file_object.seek(current_position, 0)
except Exception:
pass
def validate_stationxml(path_or_object):
"""
Checks if the given path is a valid StationXML file.
Returns a tuple. The first item is a boolean describing if the validation
was successful or not. The second item is a list of all found validation
errors, if existent.
:param path_or_object: File name or file like object. Can also be an etree
element.
"""
if isinstance(path_or_object, etree._Element):
xmldoc = path_or_object
else:
try:
xmldoc = etree.parse(path_or_object)
except etree.XMLSyntaxError:
return (False, ("Not a XML file.",))
version = _get_version_from_xmldoc(xmldoc)
# Get the schema location.
schema_location = os.path.dirname(inspect.getfile(inspect.currentframe()))
schema_location = os.path.join(schema_location, "data",
"fdsn-station-%s.xsd" % version)
if not os.path.exists(schema_location):
msg = "No schema file found to validate StationXML version '%s'"
raise ValueError(msg % version)
xmlschema = etree.XMLSchema(etree.parse(schema_location))
valid = xmlschema.validate(xmldoc)
# Pretty error printing if the validation fails.
if valid is not True:
return (False, xmlschema.error_log)
return (True, ())
def _read_stationxml(path_or_file_object):
"""
Function reading a StationXML file.
:param path_or_file_object: File name or file like object.
"""
root = etree.parse(path_or_file_object).getroot()
# Fix the namespace as its not always the default namespace. Will need
# to be adjusted if the StationXML format gets another revision!
namespace = "http://www.fdsn.org/xml/station/1"
def _ns(tagname):
return "{%s}%s" % (namespace, tagname)
# Source and Created field must exist in a StationXML.
source = root.find(_ns("Source")).text
created = obspy.UTCDateTime(root.find(_ns("Created")).text)
# These are optional
sender = _tag2obj(root, _ns("Sender"), str)
module = _tag2obj(root, _ns("Module"), str)
module_uri = _tag2obj(root, _ns("ModuleURI"), str)
networks = []
for network in root.findall(_ns("Network")):
networks.append(_read_network(network, _ns))
inv = obspy.core.inventory.Inventory(networks=networks, source=source,
sender=sender, created=created,
module=module, module_uri=module_uri)
_read_extra(root, inv) # read extra tags from root element
return inv
def _read_base_node(element, object_to_write_to, _ns):
"""
Reads the base node structure from element and saves it in
object_to_write_to.
Reads everything except the 'code' attribute.
"""
object_to_write_to.start_date = \
_attr2obj(element, "startDate", obspy.UTCDateTime)
object_to_write_to.end_date = \
_attr2obj(element, "endDate", obspy.UTCDateTime)
object_to_write_to.restricted_status = \
_attr2obj(element, "restrictedStatus", str)
object_to_write_to.alternate_code = \
_attr2obj(element, "alternateCode", str)
object_to_write_to.historical_code = \
_attr2obj(element, "historicalCode", str)
object_to_write_to.description = \
_tag2obj(element, _ns("Description"), str)
object_to_write_to.identifier = \
_tag2obj(element, _ns("Identifier"), str)
object_to_write_to.comments = []
for comment in element.findall(_ns("Comment")):
object_to_write_to.comments.append(_read_comment(comment, _ns))
# Availability.
data_availability = element.find(_ns("DataAvailability"))
if data_availability is not None:
object_to_write_to.data_availability = \
_read_data_availability(data_availability, _ns)
_read_extra(element, object_to_write_to)
def _read_network(net_element, _ns):
network = obspy.core.inventory.Network(net_element.get("code"))
_read_base_node(net_element, network, _ns)
network.total_number_of_stations = \
_tag2obj(net_element, _ns("TotalNumberStations"), int)
network.selected_number_of_stations = \
_tag2obj(net_element, _ns("SelectedNumberStations"), int)
for operator in net_element.findall(_ns("Operator")):
network.operators.append(_read_operator(operator, _ns))
stations = []
for station in net_element.findall(_ns("Station")):
stations.append(_read_station(station, _ns))
network.stations = stations
return network
def _read_station(sta_element, _ns):
longitude = _read_floattype(sta_element, _ns("Longitude"), Longitude,
datum=True)
latitude = _read_floattype(sta_element, _ns("Latitude"), Latitude,
datum=True)
elevation = _read_floattype(sta_element, _ns("Elevation"), Distance,
unit=True)
station = obspy.core.inventory.Station(code=sta_element.get("code"),
latitude=latitude,
longitude=longitude,
elevation=elevation)
station.site = _read_site(sta_element.find(_ns("Site")), _ns)
_read_base_node(sta_element, station, _ns)
# water level only for schemas > 1.0
station.water_level = _read_floattype(sta_element, _ns("WaterLevel"),
FloatWithUncertaintiesAndUnit,
unit=True)
station.vault = _tag2obj(sta_element, _ns("Vault"), str)
station.geology = _tag2obj(sta_element, _ns("Geology"), str)
for equipment in sta_element.findall(_ns("Equipment")):
station.equipments.append(_read_equipment(equipment, _ns))
for operator in sta_element.findall(_ns("Operator")):
station.operators.append(_read_operator(operator, _ns))
station.creation_date = \
_tag2obj(sta_element, _ns("CreationDate"), obspy.UTCDateTime)
station.termination_date = \
_tag2obj(sta_element, _ns("TerminationDate"), obspy.UTCDateTime)
station.selected_number_of_channels = \
_tag2obj(sta_element, _ns("SelectedNumberChannels"), int)
station.total_number_of_channels = \
_tag2obj(sta_element, _ns("TotalNumberChannels"), int)
for ref in sta_element.findall(_ns("ExternalReference")):
station.external_references.append(_read_external_reference(ref, _ns))
channels = []
for channel in sta_element.findall(_ns("Channel")):
# Skip empty channels.
if not channel.items() and not channel.attrib:
continue
cha = _read_channel(channel, _ns)
# Might be None in case the channel could not be parsed.
if cha is None:
# This is None if, and only if, one of the coordinates could not
# be set.
msg = ("Channel %s.%s of station %s does not have a complete set "
"of coordinates and thus it cannot be read. It will not be "
"part of the final inventory object." % (
channel.get("locationCode"), channel.get("code"),
sta_element.get("code")))
warnings.warn(msg, UserWarning)
else:
channels.append(cha)
station.channels = channels
return station
def _read_floattype(parent, tag, cls, unit=False, datum=False,
additional_mapping={}):
elem = parent.find(tag)
if elem is None:
return None
# Catch non convertible numbers.
try:
convert = float(elem.text)
except Exception:
warnings.warn(
"'%s' could not be converted to a float. Will be skipped. Please "
"contact to report this issue." % etree.tostring(elem),
UserWarning)
return None
# Catch NaNs.
if math.isnan(convert):
warnings.warn("Tag '%s' has a value of NaN. It will be skipped." %
tag, UserWarning)
return None
obj = cls(convert)
if unit:
obj.unit = elem.attrib.get("unit")
if datum:
obj.datum = elem.attrib.get("datum")
obj.lower_uncertainty = elem.attrib.get("minusError")
obj.upper_uncertainty = elem.attrib.get("plusError")
for key1, key2 in additional_mapping.items():
setattr(obj, key1, elem.attrib.get(key2))
return obj
def _read_floattype_list(parent, tag, cls, unit=False, datum=False,
additional_mapping={}):
elems = parent.findall(tag)
objs = []
for elem in elems:
obj = cls(float(elem.text))
if unit:
obj.unit = elem.attrib.get("unit")
if datum:
obj.datum = elem.attrib.get("datum")
obj.lower_uncertainty = elem.attrib.get("minusError")
obj.upper_uncertainty = elem.attrib.get("plusError")
for key1, key2 in additional_mapping.items():
setattr(obj, key2, elem.attrib.get(key1))
objs.append(obj)
return objs
def _read_channel(cha_element, _ns):
"""
Returns either a :class:`~obspy.core.inventory.channel.Channel` object or
``None``.
It should return ``None`` if and only if it did not manage to
successfully create a :class:`~obspy.core.inventory.channel.Channel`
object which can only happen if one of the coordinates is not set. All the
others are optional. If either the location or channel code is not set it
raises but that is fine as that would deviate too much from the StationXML
standard to be worthwhile to recover from.
"""
code = cha_element.get("code")
location_code = cha_element.get("locationCode")
longitude = _read_floattype(cha_element, _ns("Longitude"), Longitude,
datum=True)
latitude = _read_floattype(cha_element, _ns("Latitude"), Latitude,
datum=True)
elevation = _read_floattype(cha_element, _ns("Elevation"), Distance,
unit=True)
depth = _read_floattype(cha_element, _ns("Depth"), Distance, unit=True)
# All of these must be given, otherwise it is an invalid station.
if None in [longitude, latitude, elevation, depth]:
return None
channel = obspy.core.inventory.Channel(
code=code, location_code=location_code, latitude=latitude,
longitude=longitude, elevation=elevation, depth=depth)
_read_base_node(cha_element, channel, _ns)
channel.azimuth = _read_floattype(cha_element, _ns("Azimuth"), Azimuth)
channel.dip = _read_floattype(cha_element, _ns("Dip"), Dip)
# water level only for schemas > 1.0
channel.water_level = _read_floattype(cha_element, _ns("WaterLevel"),
FloatWithUncertaintiesAndUnit,
unit=True)
# Add all types.
for type_element in cha_element.findall(_ns("Type")):
channel.types.append(type_element.text)
# Add all external references.
channel.external_references = \
[_read_external_reference(ext_ref, _ns)
for ext_ref in cha_element.findall(_ns("ExternalReference"))]
channel.sample_rate = _read_floattype(cha_element, _ns("SampleRate"),
SampleRate)
# Parse the optional sample rate ratio.
sample_rate_ratio = cha_element.find(_ns("SampleRateRatio"))
if sample_rate_ratio is not None:
channel.sample_rate_ratio_number_samples = \
_tag2obj(sample_rate_ratio, _ns("NumberSamples"), int)
channel.sample_rate_ratio_number_seconds = \
_tag2obj(sample_rate_ratio, _ns("NumberSeconds"), int)
channel.storage_format = _tag2obj(cha_element, _ns("StorageFormat"),
str)
# The clock drift is one of the few examples where the attribute name is
# different from the tag name. This improves clarity.
channel.clock_drift_in_seconds_per_sample = \
_read_floattype(cha_element, _ns("ClockDrift"), ClockDrift)
# The sensor.
calibunits = cha_element.find(_ns("CalibrationUnits"))
if calibunits is not None:
channel.calibration_units = _tag2obj(calibunits, _ns("Name"), str)
channel.calibration_units_description = \
_tag2obj(calibunits, _ns("Description"), str)
# The sensor.
sensor = cha_element.find(_ns("Sensor"))
if sensor is not None:
channel.sensor = _read_equipment(sensor, _ns)
# The pre-amplifier
pre_amplifier = cha_element.find(_ns("PreAmplifier"))
if pre_amplifier is not None:
channel.pre_amplifier = _read_equipment(pre_amplifier, _ns)
# The data logger
data_logger = cha_element.find(_ns("DataLogger"))
if data_logger is not None:
channel.data_logger = _read_equipment(data_logger, _ns)
# Other equipment
equipment = cha_element.find(_ns("Equipment"))
if equipment is not None:
channel.equipment = _read_equipment(equipment, _ns)
# Finally parse the response.
response = cha_element.find(_ns("Response"))
if response is not None:
channel.response = _read_response(response, _ns)
channel.response._attempt_to_fix_units()
return channel
def _read_response(resp_element, _ns):
response = obspy.core.inventory.response.Response()
response.resource_id = resp_element.attrib.get('resourceId')
if response.resource_id is not None:
response.resource_id = str(response.resource_id)
instrument_sensitivity = resp_element.find(_ns("InstrumentSensitivity"))
if instrument_sensitivity is not None:
response.instrument_sensitivity = \
_read_instrument_sensitivity(instrument_sensitivity, _ns)
instrument_polynomial = resp_element.find(_ns("InstrumentPolynomial"))
if instrument_polynomial is not None:
response.instrument_polynomial = \
_read_instrument_polynomial(instrument_polynomial, _ns)
# Now read all the stages.
for stage in resp_element.findall(_ns("Stage")):
if not len(stage):
continue
response.response_stages.append(_read_response_stage(stage, _ns))
_read_extra(resp_element, response)
return response
def _read_response_stage(stage_elem, _ns):
"""
This parses all ResponseStageTypes. It will return a different object
depending on the actual response type.
"""
# The stage sequence number is required!
stage_sequence_number = int(stage_elem.get("number"))
resource_id = stage_elem.attrib.get('resourceId')
if resource_id is not None:
resource_id = str(resource_id)
# All stages contain a stage gain and potentially a decimation.
gain_elem = stage_elem.find(_ns("StageGain"))
stage_gain = _tag2obj(gain_elem, _ns("Value"), float)
stage_gain_frequency = _tag2obj(gain_elem, _ns("Frequency"), float)
# Parse the decimation.
decim_elem = stage_elem.find(_ns("Decimation"))
if decim_elem is not None:
decimation_input_sample_rate = \
_read_floattype(decim_elem, _ns("InputSampleRate"), Frequency)
decimation_factor = _tag2obj(decim_elem, _ns("Factor"), int)
decimation_offset = _tag2obj(decim_elem, _ns("Offset"), int)
decimation_delay = _read_floattype(decim_elem, _ns("Delay"),
FloatWithUncertaintiesAndUnit,
unit=True)
decimation_correction = \
_read_floattype(decim_elem, _ns("Correction"),
FloatWithUncertaintiesAndUnit, unit=True)
else:
decimation_input_sample_rate = None
decimation_factor = None
decimation_offset = None
decimation_delay = None
decimation_correction = None
# Now determine which response type it actually is and return the
# corresponding object.
poles_zeros_elem = stage_elem.find(_ns("PolesZeros"))
coefficients_elem = stage_elem.find(_ns("Coefficients"))
response_list_elem = stage_elem.find(_ns("ResponseList"))
fir_elem = stage_elem.find(_ns("FIR"))
polynomial_elem = stage_elem.find(_ns("Polynomial"))
type_elems = [poles_zeros_elem, coefficients_elem, response_list_elem,
fir_elem, polynomial_elem]
# iterate and check for an response element and create alias
for elem in type_elems:
if elem is not None:
break
else:
# Nothing more to parse for gain only blockettes, create minimal
# ResponseStage and return
if stage_gain is not None and stage_gain_frequency is not None:
return obspy.core.inventory.ResponseStage(
stage_sequence_number=stage_sequence_number,
stage_gain=stage_gain,
stage_gain_frequency=stage_gain_frequency,
resource_id=resource_id, input_units=None, output_units=None)
# Raise if none of the previous ones has been found.
msg = "Could not find a valid Response Stage Type."
raise ValueError(msg)
# Now parse all elements the different stages share.
input_units_ = elem.find(_ns("InputUnits"))
input_units = _tag2obj(input_units_, _ns("Name"), str)
input_units_description = _tag2obj(input_units_, _ns("Description"),
str)
output_units_ = elem.find(_ns("OutputUnits"))
output_units = _tag2obj(output_units_, _ns("Name"), str)
output_units_description = _tag2obj(output_units_, _ns("Description"),
str)
description = _tag2obj(elem, _ns("Description"), str)
name = elem.attrib.get("name")
if name is not None:
name = str(name)
resource_id2 = elem.attrib.get('resourceId')
if resource_id2 is not None:
resource_id2 = str(resource_id2)
# Now collect all shared kwargs to be able to pass them to the different
# constructors..
kwargs = {"stage_sequence_number": stage_sequence_number,
"input_units": input_units,
"output_units": output_units,
"input_units_description": input_units_description,
"output_units_description": output_units_description,
"resource_id": resource_id, "resource_id2": resource_id2,
"stage_gain": stage_gain,
"stage_gain_frequency": stage_gain_frequency, "name": name,
"description": description,
"decimation_input_sample_rate": decimation_input_sample_rate,
"decimation_factor": decimation_factor,
"decimation_offset": decimation_offset,
"decimation_delay": decimation_delay,
"decimation_correction": decimation_correction}
# Handle Poles and Zeros Response Stage Type.
if elem is poles_zeros_elem:
pz_transfer_function_type = \
_tag2obj(elem, _ns("PzTransferFunctionType"), str)
normalization_factor = \
_tag2obj(elem, _ns("NormalizationFactor"), float)
normalization_frequency = \
_read_floattype(elem, _ns("NormalizationFrequency"), Frequency)
# Read poles and zeros to list of imaginary numbers.
def _tag2pole_or_zero(element):
real = _tag2obj(element, _ns("Real"), float)
imag = _tag2obj(element, _ns("Imaginary"), float)
if real is not None or imag is not None:
real = real or 0
imag = imag or 0
x = ComplexWithUncertainties(real, imag)
real = _attr2obj(element.find(_ns("Real")), "minusError", float)
imag = _attr2obj(element.find(_ns("Imaginary")), "minusError",
float)
if any([value is not None for value in (real, imag)]):
real = real or 0
imag = imag or 0
x.lower_uncertainty = complex(real, imag)
real = _attr2obj(element.find(_ns("Real")), "plusError", float)
imag = _attr2obj(element.find(_ns("Imaginary")), "plusError",
float)
if any([value is not None for value in (real, imag)]):
real = real or 0
imag = imag or 0
x.upper_uncertainty = complex(real, imag)
x.number = _attr2obj(element, "number", int)
return x
zeros = [_tag2pole_or_zero(el) for el in elem.findall(_ns("Zero"))]
poles = [_tag2pole_or_zero(el) for el in elem.findall(_ns("Pole"))]
obj = obspy.core.inventory.PolesZerosResponseStage(
pz_transfer_function_type=pz_transfer_function_type,
normalization_frequency=normalization_frequency,
normalization_factor=normalization_factor, zeros=zeros,
poles=poles, **kwargs)
_read_extra(elem, obj)
return obj
# Handle the coefficients Response Stage Type.
elif elem is coefficients_elem:
cf_transfer_function_type = \
_tag2obj(elem, _ns("CfTransferFunctionType"), str)
numerator = \
_read_floattype_list(elem, _ns("Numerator"),
FloatWithUncertaintiesAndUnit, unit=True)
denominator = \
_read_floattype_list(elem, _ns("Denominator"),
FloatWithUncertaintiesAndUnit, unit=True)
obj = obspy.core.inventory.CoefficientsTypeResponseStage(
cf_transfer_function_type=cf_transfer_function_type,
numerator=numerator, denominator=denominator, **kwargs)
_read_extra(elem, obj)
return obj
# Handle the response list response stage type.
elif elem is response_list_elem:
rlist_elems = []
for item in elem.findall(_ns("ResponseListElement")):
freq = _read_floattype(item, _ns("Frequency"), Frequency)
amp = _read_floattype(item, _ns("Amplitude"),
FloatWithUncertaintiesAndUnit, unit=True)
phase = _read_floattype(item, _ns("Phase"), Angle)
rlist_elems.append(
obspy.core.inventory.response.ResponseListElement(
frequency=freq, amplitude=amp, phase=phase))
obj = obspy.core.inventory.ResponseListResponseStage(
response_list_elements=rlist_elems, **kwargs)
_read_extra(elem, obj)
return obj
# Handle the FIR response stage type.
elif elem is fir_elem:
symmetry = _tag2obj(elem, _ns("Symmetry"), str)
coeffs = _read_floattype_list(elem, _ns("NumeratorCoefficient"),
FilterCoefficient,
additional_mapping={'i': "number"})
obj = obspy.core.inventory.FIRResponseStage(
coefficients=coeffs, symmetry=symmetry, **kwargs)
_read_extra(elem, obj)
return obj
# Handle polynomial instrument responses.
elif elem is polynomial_elem:
appr_type = _tag2obj(elem, _ns("ApproximationType"), str)
f_low = _read_floattype(elem, _ns("FrequencyLowerBound"), Frequency)
f_high = _read_floattype(elem, _ns("FrequencyUpperBound"), Frequency)
appr_low = _tag2obj(elem, _ns("ApproximationLowerBound"), double)
appr_high = _tag2obj(elem, _ns("ApproximationUpperBound"), double)
max_err = _tag2obj(elem, _ns("MaximumError"), float)
coeffs = _read_floattype_list(elem, _ns("Coefficient"),
CoefficientWithUncertainties,
additional_mapping={"number": "number"})
obj = obspy.core.inventory.PolynomialResponseStage(
approximation_type=appr_type, frequency_lower_bound=f_low,
frequency_upper_bound=f_high, approximation_lower_bound=appr_low,
approximation_upper_bound=appr_high, maximum_error=max_err,
coefficients=coeffs, **kwargs)
_read_extra(elem, obj)
return obj
def _read_instrument_sensitivity(sensitivity_element, _ns):
value = _tag2obj(sensitivity_element, _ns("Value"), float)
frequency = _tag2obj(sensitivity_element, _ns("Frequency"), float)
input_units_ = sensitivity_element.find(_ns("InputUnits"))
output_units_ = sensitivity_element.find(_ns("OutputUnits"))
sensitivity = obspy.core.inventory.response.InstrumentSensitivity(
value=value, frequency=frequency,
input_units=_tag2obj(input_units_, _ns("Name"), str),
output_units=_tag2obj(output_units_, _ns("Name"), str))
sensitivity.input_units_description = \
_tag2obj(input_units_, _ns("Description"), str)
sensitivity.output_units_description = \
_tag2obj(output_units_, _ns("Description"), str)
sensitivity.frequency_range_start = \
_tag2obj(sensitivity_element, _ns("FrequencyStart"), float)
sensitivity.frequency_range_end = \
_tag2obj(sensitivity_element, _ns("FrequencyEnd"), float)
sensitivity.frequency_range_db_variation = \
_tag2obj(sensitivity_element, _ns("FrequencyDBVariation"), float)
_read_extra(sensitivity_element, sensitivity)
return sensitivity
def _read_instrument_polynomial(element, _ns):
# XXX duplicated code, see reading of PolynomialResponseStage
input_units_ = element.find(_ns("InputUnits"))
input_units = _tag2obj(input_units_, _ns("Name"), str)
input_units_description = _tag2obj(input_units_, _ns("Description"),
str)
output_units_ = element.find(_ns("OutputUnits"))
output_units = _tag2obj(output_units_, _ns("Name"), str)
output_units_description = _tag2obj(output_units_, _ns("Description"),
str)
description = _tag2obj(element, _ns("Description"), str)
resource_id = element.attrib.get("resourceId", None)
name = element.attrib.get("name", None)
appr_type = _tag2obj(element, _ns("ApproximationType"), str)
f_low = _read_floattype(element, _ns("FrequencyLowerBound"), Frequency)
f_high = _read_floattype(element, _ns("FrequencyUpperBound"), Frequency)
appr_low = _tag2obj(element, _ns("ApproximationLowerBound"), float)
appr_high = _tag2obj(element, _ns("ApproximationUpperBound"), float)
max_err = _tag2obj(element, _ns("MaximumError"), float)
coeffs = _read_floattype_list(element, _ns("Coefficient"),
CoefficientWithUncertainties,
additional_mapping={"number": "number"})
obj = obspy.core.inventory.response.InstrumentPolynomial(
approximation_type=appr_type, frequency_lower_bound=f_low,
frequency_upper_bound=f_high, approximation_lower_bound=appr_low,
approximation_upper_bound=appr_high, maximum_error=max_err,
coefficients=coeffs, input_units=input_units,
input_units_description=input_units_description,
output_units=output_units,
output_units_description=output_units_description,
description=description, resource_id=resource_id, name=name)
_read_extra(element, obj)
return obj
def _read_external_reference(ref_element, _ns):
uri = _tag2obj(ref_element, _ns("URI"), str)
description = _tag2obj(ref_element, _ns("Description"), str)
obj = obspy.core.inventory.ExternalReference(uri=uri,
description=description)
_read_extra(ref_element, obj)
return obj
def _read_operator(operator_element, _ns):
agencies = [_i.text for _i in operator_element.findall(_ns("Agency"))]
contacts = []
for contact in operator_element.findall(_ns("Contact")):
contacts.append(_read_person(contact, _ns))
website = _tag2obj(operator_element, _ns("WebSite"), str)
obj = obspy.core.inventory.Operator(agencies=agencies, contacts=contacts,
website=website)
_read_extra(operator_element, obj)
return obj
def _read_data_availability(avail_element, _ns):
extent = avail_element.find(_ns("Extent"))
# Recovery from empty Extent tags.
if extent is None:
return extent
start = obspy.UTCDateTime(extent.get("start"))
end = obspy.UTCDateTime(extent.get("end"))
obj = obspy.core.inventory.util.DataAvailability(start=start, end=end)
_read_extra(avail_element, obj)
return obj
def _read_equipment(equip_element, _ns):
resource_id = equip_element.get("resourceId")
type = _tag2obj(equip_element, _ns("Type"), str)
description = _tag2obj(equip_element, _ns("Description"), str)
manufacturer = _tag2obj(equip_element, _ns("Manufacturer"), str)
vendor = _tag2obj(equip_element, _ns("Vendor"), str)
model = _tag2obj(equip_element, _ns("Model"), str)
serial_number = _tag2obj(equip_element, _ns("SerialNumber"), str)
installation_date = \
_tag2obj(equip_element, _ns("InstallationDate"), obspy.UTCDateTime)
removal_date = \
_tag2obj(equip_element, _ns("RemovalDate"), obspy.UTCDateTime)
calibration_dates = \
[obspy.core.UTCDateTime(_i.text)
for _i in equip_element.findall(_ns("CalibrationDate"))]
obj = obspy.core.inventory.Equipment(
resource_id=resource_id, type=type, description=description,
manufacturer=manufacturer, vendor=vendor, model=model,
serial_number=serial_number, installation_date=installation_date,
removal_date=removal_date, calibration_dates=calibration_dates)
_read_extra(equip_element, obj)
return obj
def _read_site(site_element, _ns):
name = _tag2obj(site_element, _ns("Name"), str)
description = _tag2obj(site_element, _ns("Description"), str)
town = _tag2obj(site_element, _ns("Town"), str)
county = _tag2obj(site_element, _ns("County"), str)
region = _tag2obj(site_element, _ns("Region"), str)
country = _tag2obj(site_element, _ns("Country"), str)
obj = obspy.core.inventory.Site(name=name, description=description,
town=town, county=county, region=region,
country=country)
_read_extra(site_element, obj)
return obj
def _read_comment(comment_element, _ns):
value = _tag2obj(comment_element, _ns("Value"), str)
begin_effective_time = \
_tag2obj(comment_element, _ns("BeginEffectiveTime"), obspy.UTCDateTime)
end_effective_time = \
_tag2obj(comment_element, _ns("EndEffectiveTime"), obspy.UTCDateTime)
authors = []
id = _attr2obj(comment_element, "id", int)
for author in comment_element.findall(_ns("Author")):
authors.append(_read_person(author, _ns))
obj = obspy.core.inventory.Comment(
value=value, begin_effective_time=begin_effective_time,
end_effective_time=end_effective_time, authors=authors, id=id)
_read_extra(comment_element, obj)
return obj
def _read_person(person_element, _ns):
names = _tags2obj(person_element, _ns("Name"), str)
agencies = _tags2obj(person_element, _ns("Agency"), str)
emails = _tags2obj(person_element, _ns("Email"), str)
phones = []
for phone in person_element.findall(_ns("Phone")):
phones.append(_read_phone(phone, _ns))
obj = obspy.core.inventory.Person(names=names, agencies=agencies,
emails=emails, phones=phones)
_read_extra(person_element, obj)
return obj
def _read_phone(phone_element, _ns):
country_code = _tag2obj(phone_element, _ns("CountryCode"), int)
area_code = _tag2obj(phone_element, _ns("AreaCode"), int)
phone_number = _tag2obj(phone_element, _ns("PhoneNumber"), str)
description = phone_element.get("description")
obj = obspy.core.inventory.PhoneNumber(
country_code=country_code, area_code=area_code,
phone_number=phone_number, description=description)
_read_extra(phone_element, obj)
return obj
def _write_stationxml(inventory, file_or_file_object, validate=False,
nsmap=None, level="response", **kwargs):
"""
Writes an inventory object to a buffer.
:type inventory: :class:`~obspy.core.inventory.Inventory`
:param inventory: The inventory instance to be written.
:param file_or_file_object: The file or file-like object to be written to.
:type validate: bool
:param validate: If True, the created document will be validated with the
StationXML schema before being written. Useful for debugging or if you
don't trust ObsPy. Defaults to False.
:type nsmap: dict
:param nsmap: Additional custom namespace abbreviation mappings
(e.g. `{"edb": "http://erdbeben-in-bayern.de/xmlns/0.1"}`).
"""
if nsmap is None:
nsmap = {}
elif None in nsmap:
msg = ("Custom namespace mappings do not allow redefinition of "
"default StationXML namespace (key `None`). "
"Use other namespace abbreviations for custom namespace tags.")
raise ValueError(msg)
nsmap[None] = "http://www.fdsn.org/xml/station/1"
attrib = {"schemaVersion": SCHEMA_VERSION}
# Check if any of the channels has a data availability element. In that
# case the namespaces need to be adjusted.
data_availability = False
for net in inventory:
for sta in net:
for cha in sta:
if cha.data_availability is not None:
data_availability = True
break
else:
continue
break
else:
continue
break
if data_availability:
attrib["{http://www.w3.org/2001/XMLSchema-instance}"
"schemaLocation"] = (
"http://www.fdsn.org/xml/station/1 "
"http://www.fdsn.org/xml/station/fdsn-station+"
"availability-1.0.xsd")
if "xsi" in nsmap:
msg = ("Custom namespace mappings do not allow redefinition of "
"StationXML availability namespace (key `xsi`). Use other "
"namespace abbreviations for custom namespace tags.")
raise ValueError(msg)
nsmap["xsi"] = "http://www.w3.org/2001/XMLSchema-instance"
root = etree.Element("FDSNStationXML", attrib=attrib, nsmap=nsmap)
etree.SubElement(root, "Source").text = inventory.source
if inventory.sender:
etree.SubElement(root, "Sender").text = inventory.sender
# Undocumented flag that does not write the module flags. Useful for
# testing. It is undocumented because it should not be used publicly.
if kwargs.get("_suppress_module_tags", False):
pass
else:
etree.SubElement(root, "Module").text = inventory.module
etree.SubElement(root, "ModuleURI").text = inventory.module_uri
etree.SubElement(root, "Created").text = str(inventory.created)
if level not in ["network", "station", "channel", "response"]:
raise ValueError("Requested stationXML write level is unsupported.")
for network in inventory.networks:
_write_network(root, network, level)
# Add custom namespace tags to root element
_write_extra(root, inventory)
tree = root.getroottree()
# The validation has to be done after parsing once again so that the
# namespaces are correctly assembled.
if validate is True:
buf = io.BytesIO()
tree.write(buf)
buf.seek(0)
validates, errors = validate_stationxml(buf)
buf.close()
if validates is False:
msg = "The created file fails to validate.\n"
for err in errors:
msg += "\t%s\n" % err
raise Exception(msg)
# Register all namespaces with the tree. This allows for
# additional namespaces to be added to an inventory that
# was not created by reading a StationXML file.
for prefix, ns in nsmap.items():
if prefix and ns:
etree.register_namespace(prefix, ns)
tree.write(file_or_file_object, pretty_print=True, xml_declaration=True,
encoding="UTF-8")
def _get_base_node_attributes(element):
attributes = {"code": element.code}
if element.start_date:
attributes["startDate"] = str(element.start_date)
if element.end_date:
attributes["endDate"] = str(element.end_date)
if element.restricted_status:
attributes["restrictedStatus"] = element.restricted_status
if element.alternate_code:
attributes["alternateCode"] = element.alternate_code
if element.historical_code:
attributes["historicalCode"] = element.historical_code
return attributes
def _write_base_node(element, object_to_read_from):
if object_to_read_from.identifier:
etree.SubElement(element, "Identifier", {"type": "DOI"}).text = \
object_to_read_from.identifier
if object_to_read_from.description:
etree.SubElement(element, "Description").text = \
object_to_read_from.description
for comment in object_to_read_from.comments:
_write_comment(element, comment)
_write_extra(element, object_to_read_from)
def _write_network(parent, network, level):
"""
Helper function converting a Network instance to an etree.Element.
"""
attribs = _get_base_node_attributes(network)
network_elem = etree.SubElement(parent, "Network", attribs)
_write_base_node(network_elem, network)
# Add the two, network specific fields.
if network.total_number_of_stations is not None:
etree.SubElement(network_elem, "TotalNumberStations").text = \
str(network.total_number_of_stations)
if network.selected_number_of_stations is not None:
etree.SubElement(network_elem, "SelectedNumberStations").text = \
str(network.selected_number_of_stations)
for operator in network.operators:
operator_elem = etree.SubElement(network_elem, "Operator")
for agency in operator.agencies:
etree.SubElement(operator_elem, "Agency").text = agency
if len(operator.agencies) > 1:
warnings.warn("The StationXML file contains more than one "
"Agency for a single Operator. StationXML "
"schemas > 1.0 only support a single Agency "
"per Operator. Only the first agency will "
"be used.")
break # skip other operators for schemas > 1.0
for contact in operator.contacts:
_write_person(operator_elem, contact, "Contact")
etree.SubElement(operator_elem, "WebSite").text = operator.website
_write_extra(operator_elem, operator)
if level == "network":
return
for station in network.stations:
_write_station(network_elem, station, level)
def _write_floattype(parent, obj, attr_name, tag, additional_mapping={},
cls=None):
attribs = {}
obj_ = getattr(obj, attr_name)
if obj_ is None:
return
if cls and not isinstance(obj_, cls):
obj_ = cls(obj_)