-
Notifications
You must be signed in to change notification settings - Fork 11
/
database.py
executable file
·6382 lines (5945 loc) · 298 KB
/
database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Tools for connecting to MongoDB.
"""
import os
import io
import copy
import pathlib
import pickle
import struct
import urllib.request
from array import array
import pandas as pd
try:
import dask.bag as daskbag
_mspasspy_has_dask = True
except ImportError:
_mspasspy_has_dask = False
try:
import dask.dataframe as daskdf
except ImportError:
_mspasspy_has_dask = False
try:
import pyspark
_mspasspy_has_pyspark = True
except ImportError:
_mspasspy_has_pyspark = False
import gridfs
import pymongo
import pymongo.errors
import numpy as np
import obspy
from obspy.clients.fdsn import Client
from obspy import Inventory
from obspy import UTCDateTime
import boto3, botocore
import json
import base64
import uuid
from mspasspy.ccore.io import _mseed_file_indexer, _fwrite_to_file, _fread_from_file
from mspasspy.util.converter import Trace2TimeSeries, Stream2Seismogram
from mspasspy.ccore.seismic import (
TimeSeries,
Seismogram,
_CoreSeismogram,
DoubleVector,
TimeSeriesEnsemble,
SeismogramEnsemble,
)
from mspasspy.ccore.utility import (
Metadata,
MsPASSError,
AtomicType,
ErrorSeverity,
dmatrix,
ProcessingHistory,
)
from mspasspy.db.collection import Collection
from mspasspy.db.schema import DatabaseSchema, MetadataSchema
from mspasspy.util.converter import Textfile2Dataframe
class Database(pymongo.database.Database):
"""
MsPASS core database handle. All MongoDB database operation in MsPASS
should utilize this object. This object is a subclass of the
Database class of pymongo. It extends the class in several ways
fundamental to the MsPASS framework:
1. It abstracts read and write operations for seismic data managed
by the framework. Note reads and writes are for atomic objects.
Use the distributed read and write functions for parallel handling of
complete data sets.
2. It contains methods for managing the most common seismic Metadata
(namely source and receiver geometry and receiver response data).
3. It adds a schema that can (optionally) be used to enforce type
requirements and/or provide aliasing.
4. Manages error logging data.
5. Manages (optional) processing history data
The class currently has only one constructor normally called with
a variant of the following:
db=Database(dbclient,'mydatabase')
where dbclient is either a MongoDB database client instance or
(recommended) the MsPASS DBClient wrapper (a subclass of the
pymongo client). The second argument is the database "name"
passed to the MongoDB server that defines your working database.
Optional parameters are:
:param schema: a :class:`str` of the yaml file name that defines
both the database schema and the metadata schema. If this parameter
is set, it will override the following two.
:param db_schema: Set the name for the database schema to use with this
handle. Default is the MsPASS schema. (See User's Manual for details)
:param md_schema: Set the name for the Metadata schema. Default is
the MsPASS definitions. (see User's Manual for details)
As a subclass of pymongo Database the constructor accepts any
parameters defined for the base class (see pymongo documentation)
"""
def __init__(self, *args, schema=None, db_schema=None, md_schema=None, **kwargs):
super(Database, self).__init__(*args, **kwargs)
if schema:
self.database_schema = DatabaseSchema(schema)
self.metadata_schema = MetadataSchema(schema)
else:
if isinstance(db_schema, DatabaseSchema):
self.database_schema = db_schema
elif isinstance(db_schema, str):
self.database_schema = DatabaseSchema(db_schema)
else:
self.database_schema = DatabaseSchema()
if isinstance(md_schema, MetadataSchema):
self.metadata_schema = md_schema
elif isinstance(md_schema, str):
self.metadata_schema = MetadataSchema(md_schema)
else:
self.metadata_schema = MetadataSchema()
def __getstate__(self):
ret = self.__dict__.copy()
ret["_Database__client"] = self.client.__repr__()
ret["_BaseObject__codec_options"] = self.codec_options.__repr__()
return ret
def __setstate__(self, data):
# somewhat weird that this import is requiired here but it won't
# work without it. Not sure how the symbol MongoClient is required
# here but it is - ignore if a lint like ide says MongoClient is not used
from pymongo import MongoClient
# The following is also needed for this object to be serialized correctly
# with dask distributed. Otherwise, the deserialized codec_options
# will become a different type unrecognized by pymongo. Not sure why...
from bson.codec_options import CodecOptions, TypeRegistry, DatetimeConversion
from bson.binary import UuidRepresentation
data["_Database__client"] = eval(data["_Database__client"])
data["_BaseObject__codec_options"] = eval(data["_BaseObject__codec_options"])
self.__dict__.update(data)
def __getitem__(self, name):
"""
Get a collection of this database by name.
Raises InvalidName if an invalid collection name is used.
:Parameters:
- `name`: the name of the collection to get
"""
return Collection(self, name)
def get_collection(
self,
name,
codec_options=None,
read_preference=None,
write_concern=None,
read_concern=None,
):
"""
Get a :class:`mspasspy.db.collection.Collection` with the given name
and options.
Useful for creating a :class:`mspasspy.db.collection.Collection` with
different codec options, read preference, and/or write concern from
this :class:`Database`.
:Parameters:
- `name`: The name of the collection - a string.
- `codec_options` (optional): An instance of
:class:`bson.codec_options.CodecOptions`. If ``None`` (the
default) the :attr:`codec_options` of this :class:`Database` is
used.
- `read_preference` (optional): The read preference to use. If
``None`` (the default) the :attr:`read_preference` of this
:class:`Database` is used. See :mod:`pymongo.read_preferences`
for options.
- `write_concern` (optional): An instance of
:class:`pymongo.write_concern.WriteConcern`. If ``None`` (the
default) the :attr:`write_concern` of this :class:`Database` is
used.
- `read_concern` (optional): An instance of
:class:`pymongo.read_concern.ReadConcern`. If ``None`` (the
default) the :attr:`read_concern` of this :class:`Database` is
used.
"""
return Collection(
self,
name,
False,
codec_options,
read_preference,
write_concern,
read_concern,
)
def create_collection(
self,
name,
codec_options=None,
read_preference=None,
write_concern=None,
read_concern=None,
session=None,
**kwargs
):
"""
Create a new :class:`mspasspy.db.collection.Collection` in this
database.
Normally collection creation is automatic. This method should
only be used to specify options on
creation. :class:`~pymongo.errors.CollectionInvalid` will be
raised if the collection already exists.
:Parameters:
- `name`: the name of the collection to create
- `codec_options` (optional): An instance of
:class:`~bson.codec_options.CodecOptions`. If ``None`` (the
default) the :attr:`codec_options` of this :class:`Database` is
used.
- `read_preference` (optional): The read preference to use. If
``None`` (the default) the :attr:`read_preference` of this
:class:`Database` is used.
- `write_concern` (optional): An instance of
:class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the
default) the :attr:`write_concern` of this :class:`Database` is
used.
- `read_concern` (optional): An instance of
:class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the
default) the :attr:`read_concern` of this :class:`Database` is
used.
- `collation` (optional): An instance of
:class:`~pymongo.collation.Collation`.
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`.
- `**kwargs` (optional): additional keyword arguments will
be passed as options for the `create collection command`_
All optional `create collection command`_ parameters should be passed
as keyword arguments to this method. Valid options include, but are not
limited to:
- ``size``: desired initial size for the collection (in
bytes). For capped collections this size is the max
size of the collection.
- ``capped``: if True, this is a capped collection
- ``max``: maximum number of objects if capped (optional)
- ``timeseries``: a document specifying configuration options for
timeseries collections
- ``expireAfterSeconds``: the number of seconds after which a
document in a timeseries collection expires
"""
with self.__client._tmp_session(session) as s:
# Skip this check in a transaction where listCollections is not
# supported.
if (not s or not s.in_transaction) and name in self.list_collection_names(
filter={"name": name}, session=s
):
raise pymongo.errors.CollectionInvalid(
"collection %s already exists" % name
)
return Collection(
self,
name,
True,
codec_options,
read_preference,
write_concern,
read_concern,
session=s,
**kwargs
)
def set_metadata_schema(self, schema):
"""
Use this method to change the metadata schema defined for this
instance of a database handle. This method sets the metadata
schema (interal namespace). Use set_database_schema to change
the stored data schema.
:param schema: an instance of :class:`mspsspy.db.schema.MetadataSchema` or a :class:`str` of the yaml file name.
"""
if isinstance(schema, MetadataSchema):
self.metadata_schema = schema
elif isinstance(schema, str):
self.metadata_schema = MetadataSchema(schema)
else:
raise MsPASSError(
"Error: argument schema is of type {}, which is not supported".format(
type(schema)
),
"Fatal",
)
def set_database_schema(self, schema):
"""
Use this method to change the database schema defined for this
instance of a database handle. This method sets the database
schema (namespace for attributes saved in MongoDB). Use metadata_schema
to change the in memory namespace.
:param schema: an instance of :class:`mspsspy.db.schema.DatabaseSchema` or a :class:`str` of the yaml file name.
"""
if isinstance(schema, DatabaseSchema):
self.database_schema = schema
elif isinstance(schema, str):
self.database_schema = DatabaseSchema(schema)
else:
raise MsPASSError(
"Error: argument schema is of type {}, which is not supported".format(
type(schema)
),
"Fatal",
)
def set_schema(self, schema):
"""
Use this method to change both the database and metadata schema defined for this
instance of a database handle. This method sets the database
schema (namespace for attributes saved in MongoDB) and the metadata
schema (interal namespace).
:param schema: a :class:`str` of the yaml file name.
"""
self.database_schema = DatabaseSchema(schema)
self.metadata_schema = MetadataSchema(schema)
def read_data(
self,
object_id,
mode="promiscuous",
normalize=None,
load_history=False,
exclude_keys=None,
collection="wf",
data_tag=None,
alg_name="read_data",
alg_id="0",
define_as_raw=False,
retrieve_history_record=False,
merge_method=0,
merge_fill_value=None,
merge_interpolation_samples=0,
aws_access_key_id=None,
aws_secret_access_key=None,
):
"""
This is the core MsPASS reader for constructing Seismogram or TimeSeries
objects from data managed with MondoDB through MsPASS. It is the
core reader for serial processing where a typical algorithm would be:
query= { ... properly constructed MondoDB query or just '{}'}
cursor=db.collection.find(query) # collection is wf_TimeSeries or wf_Seismogram
for doc in cursor:
id=doc['_id']
d=db.read_data(id)
... # additional processing here
The above loop will construct one Seismogram or TimeSeries (depending on
which collection is referenced) for handling within the for loop.
Use the read_distributed_data function to read a dataset for
parallel processing. This function is designed to handle one atomic
object at a time. It is, in fact, called in read_distributed_data
Optional parameters control read behavior and additional options
not always needed. An important one for handling reads from a
a dataset saved partway through a workflow is data_tag.
:param object_id: MongoDB object id of the wf document to be constructed from
data defined in the database. The object id is guaranteed unique and provides
a unique link to a unique document or nothing. In the later case the
function will return a None.
:type cursor: :class:`pymongo.cursor.CursorType`
:param mode: reading mode that controls how the function interacts with
the schema definition for the data type. Must be one of
['promiscuous','cautious','pedantic']. See user's manual for a
detailed description of what the modes mean. Default is 'promiscuous'
which turns off all schema checks and loads all attributes defined for
each object read.
:type mode: :class:`str`
:param normalize: list of collections that are to used for data
normalization. (see User's manual and MongoDB documentation for
details on this concept) Briefly normalization means common
metadata like source and receiver geometry are defined in separate
smaller collections that are linked through this mechanism
during reads. Default uses no normalization.
:type normalize: a :class:`list` of :class:`str`
:param load_history: boolean (True or False) switch used to enable or
disable object level history mechanism. When set True each datum
will be tagged with its origin id that defines the leaf nodes of a
history G-tree. See the User's manual for additional details of this
feature. Default is False.
:param exclude_keys: Sometimes it is helpful to remove one or more
attributes stored in the database from the data's Metadata (header)
so they will not cause problems in downstream processing.
:type exclude_keys: a :class:`list` of :class:`str`
:param collection: Specify an alternate collection name to
use for reading the data. The default sets the collection name
based on the data type and automatically loads the correct schema.
The collection listed must be defined in the schema and satisfy
the expectations of the reader. This is an advanced option that
is indended only to simplify extensions to the reader.
:param data_tag: The definition of a dataset can become ambiguous
when partially processed data are saved within a workflow. A common
example would be windowing long time blocks of data to shorter time
windows around a particular seismic phase and saving the windowed data.
The windowed data can be difficult to distinguish from the original
with standard queries. For this reason we make extensive use of "tags"
for save and read operations to improve the efficiency and simplify
read operations. Default turns this off by setting the tag null (None).
:type data_tag: :class:`str`
:param alg_name: alg_name is the name the func we are gonna save while preserving the history.
:type alg_name: :class:`str`
:param alg_id: alg_id is a unique id to record the usage of func while preserving the history.
:type alg_id: :class:`bson.objectid.ObjectId`
:param define_as_raw: a boolean control whether we would like to set_as_origin when loading processing history
:type define_as_raw: :class:`bool`
:param retrieve_history_record: a boolean control whether we would like to load processing history
:type retrieve_history_record: :class:`bool`
:param method: Methodology to handle overlaps/gaps of traces. Defaults to 0.
See `__add__ <https://docs.obspy.org/packages/autogen/obspy.core.trace.Trace.__add__.html#obspy.core.trace.Trace.__add__>` for details on methods 0 and 1,
see `_cleanup <https://docs.obspy.org/packages/autogen/obspy.core.stream.Stream._cleanup.html#obspy.core.stream.Stream._cleanup>` for details on method -1.
Any merge operation performs a cleanup merge as a first step (method -1).
:type method: :class:`int`
:param fill_value: Fill value for gaps. Defaults to None.
:type fill_value: :class:`int`, :class:`float` or None
:param interpolation_samples: Used only for method=1. It specifies the number of samples
which are used to interpolate between overlapping traces. Default to 0.
If set to -1 all overlapping samples are interpolated.
:type interpolation_samples: :class:`int`
:return: either :class:`mspasspy.ccore.seismic.TimeSeries`
or :class:`mspasspy.ccore.seismic.Seismogram`
"""
try:
wf_collection = self.database_schema.default_name(collection)
except MsPASSError as err:
raise MsPASSError(
"collection {} is not defined in database schema".format(collection),
"Invalid",
) from err
object_type = self.database_schema[wf_collection].data_type()
if object_type not in [TimeSeries, Seismogram]:
raise MsPASSError(
"only TimeSeries and Seismogram are supported, but {} is requested. Please check the data_type of {} collection.".format(
object_type, wf_collection
),
"Fatal",
)
if mode not in ["promiscuous", "cautious", "pedantic"]:
raise MsPASSError(
"only promiscuous, cautious and pedantic are supported, but {} is requested.".format(
mode
),
"Fatal",
)
if normalize is None:
normalize = []
if exclude_keys is None:
exclude_keys = []
# This assumes the name of a metadata schema matches the data type it defines.
read_metadata_schema = self.metadata_schema[object_type.__name__]
# We temporarily swap the main collection defined by the metadata schema by
# the wf_collection. This ensures the method works consistently for any
# user-specified collection argument.
metadata_schema_collection = read_metadata_schema.collection("_id")
if metadata_schema_collection != wf_collection:
temp_metadata_schema = copy.deepcopy(self.metadata_schema)
temp_metadata_schema[object_type.__name__].swap_collection(
metadata_schema_collection, wf_collection, self.database_schema
)
read_metadata_schema = temp_metadata_schema[object_type.__name__]
# find the corresponding document according to object id
col = self[wf_collection]
try:
oid = object_id["_id"]
except:
oid = object_id
object_doc = col.find_one({"_id": oid})
if not object_doc:
return None
if data_tag:
if "data_tag" not in object_doc or object_doc["data_tag"] != data_tag:
return None
# 1. build metadata as dict
md = Metadata()
# 1.1 read in the attributes from the document in the database
for k in object_doc:
if k in exclude_keys:
continue
if mode == "promiscuous":
md[k] = object_doc[k]
continue
# FIXME: note that we do not check whether the attributes' type in the database matches the schema's definition.
# This may or may not be correct. Should test in practice and get user feedbacks.
if read_metadata_schema.is_defined(k) and not read_metadata_schema.is_alias(
k
):
md[k] = object_doc[k]
# 1.2 read the attributes in the metadata schema
# col_dict is a hashmap used to store the normalized records by the normalized_id in object_doc
col_dict = {}
# log_error_msg is used to record all the elog entries generated during the reading process
# After the mspass_object is created, we would post every elog entry with the messages in the log_error_msg.
log_error_msg = []
for k in read_metadata_schema.keys():
col = read_metadata_schema.collection(k)
# explanation of the 4 conditions in the following if statement
# 1.2.1. col is not None and is a normalized collection name
# 1.2.2. normalized key id exists in the wf document
# 1.2.3. k is not one of the exclude keys
# 1.2.4. col is in the normalize list provided by user
if (
col
and col != wf_collection
and col + "_id" in object_doc
and k not in exclude_keys
and col in normalize
):
# try to find the corresponding record in the normalized collection from the database
if col not in col_dict:
col_dict[col] = self[col].find_one({"_id": object_doc[col + "_id"]})
# might unable to find the normalized document by the normalized_id in the object_doc
# we skip reading this attribute
if not col_dict[col]:
continue
# this attribute may be missing in the normalized record we retrieve above
# in this case, we skip reading this attribute
# however, if it is a required attribute for the normalized collection
# we should post an elog entry to the associated wf object created after.
unique_k = self.database_schema[col].unique_name(k)
if not unique_k in col_dict[col]:
if self.database_schema[col].is_required(unique_k):
log_error_msg.append(
"Attribute {} is required in collection {}, but is missing in the document with id={}.".format(
unique_k, col, object_doc[col + "_id"]
)
)
continue
md[k] = col_dict[col][unique_k]
# 1.3 schema check normalized data according to the read mode
is_dead = False
fatal_keys = []
if mode == "cautious":
for k in md:
if read_metadata_schema.is_defined(k):
col = read_metadata_schema.collection(k)
unique_key = self.database_schema[col].unique_name(k)
if not isinstance(md[k], read_metadata_schema.type(k)):
# try to convert the mismatch attribute
try:
# convert the attribute to the correct type
md[k] = read_metadata_schema.type(k)(md[k])
except:
if self.database_schema[col].is_required(unique_key):
fatal_keys.append(k)
is_dead = True
log_error_msg.append(
"cautious mode: Required attribute {} has type {}, forbidden by definition and unable to convert".format(
k, type(md[k])
)
)
elif mode == "pedantic":
for k in md:
if read_metadata_schema.is_defined(k):
if not isinstance(md[k], read_metadata_schema.type(k)):
fatal_keys.append(k)
is_dead = True
log_error_msg.append(
"pedantic mode: {} has type {}, forbidden by definition".format(
k, type(md[k])
)
)
# 1.4 create a mspass object by passing MetaData
# if not changing the fatal key values, runtime error in construct a mspass object
for k in fatal_keys:
if read_metadata_schema.type(k) is str:
md[k] = ""
elif read_metadata_schema.type(k) is int:
md[k] = 0
elif read_metadata_schema.type(k) is float:
md[k] = 0.0
elif read_metadata_schema.type(k) is bool:
md[k] = False
elif read_metadata_schema.type(k) is dict:
md[k] = {}
elif read_metadata_schema.type(k) is list:
md[k] = []
elif read_metadata_schema.type(k) is bytes:
md[k] = b"\x00"
else:
md[k] = None
try:
# Note a CRITICAL feature of the Metadata constructors
# for both of these objects is that they allocate the
# buffer for the sample data and initialize it to zero.
# This allows sample data readers to load the buffer without
# having to handle memory management.
if object_type is TimeSeries:
mspass_object = TimeSeries(md)
else:
# api mismatch here. This ccore Seismogram constructor
# had an ancestor that had an option to read data here.
# we never do that here
mspass_object = Seismogram(md, False)
except MsPASSError as merr:
# if the constructor fails mspass_object will be invalid
# To preserve the error we have to create a shell to hold the error
if object_type is TimeSeries:
mspass_object = TimeSeries()
else:
mspass_object = Seismogram()
# Default constructors leaves result marked dead so below should work
mspass_object.elog.log_error(merr)
return mspass_object
# not continue step 2 & 3 if the mspass object is dead
if is_dead:
mspass_object.kill()
for msg in log_error_msg:
mspass_object.elog.log_error("read_data", msg, ErrorSeverity.Invalid)
else:
# 2.load data from different modes
mspass_object.live = True
storage_mode = object_doc["storage_mode"]
if storage_mode == "file":
if "format" in object_doc:
self._read_data_from_dfile(
mspass_object,
object_doc["dir"],
object_doc["dfile"],
object_doc["foff"],
nbytes=object_doc["nbytes"],
format=object_doc["format"],
merge_method=merge_method,
merge_fill_value=merge_fill_value,
merge_interpolation_samples=merge_interpolation_samples,
)
else:
self._read_data_from_dfile(
mspass_object,
object_doc["dir"],
object_doc["dfile"],
object_doc["foff"],
merge_method=merge_method,
merge_fill_value=merge_fill_value,
merge_interpolation_samples=merge_interpolation_samples,
)
elif storage_mode == "gridfs":
self._read_data_from_gridfs(mspass_object, object_doc["gridfs_id"])
elif storage_mode == "url":
self._read_data_from_url(
mspass_object,
object_doc["url"],
format=None if "format" not in object_doc else object_doc["format"],
)
elif storage_mode == "s3_continuous":
self._read_data_from_s3_continuous(
mspass_object, aws_access_key_id, aws_secret_access_key
)
elif storage_mode == "s3_lambda":
self._read_data_from_s3_lambda(
mspass_object, aws_access_key_id, aws_secret_access_key
)
elif storage_mode == "fdsn":
self._read_data_from_fdsn(mspass_object)
else:
raise TypeError("Unknown storage mode: {}".format(storage_mode))
# 3.load history
if load_history:
history_obj_id_name = (
self.database_schema.default_name("history_object") + "_id"
)
if history_obj_id_name in object_doc:
self._load_history(
mspass_object,
object_doc[history_obj_id_name],
alg_name,
alg_id,
define_as_raw,
retrieve_history_record,
)
mspass_object.clear_modified()
# 4.post complaint elog entries if any
for msg in log_error_msg:
mspass_object.elog.log_error("read_data", msg, ErrorSeverity.Complaint)
return mspass_object
def save_data(
self,
mspass_object,
mode="promiscuous",
storage_mode="gridfs",
dir=None,
dfile=None,
format=None,
overwrite=False,
exclude_keys=None,
collection=None,
data_tag=None,
alg_name="save_data",
alg_id="0",
):
"""
Use this method to save an atomic data object (TimeSeries or Seismogram)
to be managed with MongoDB. The Metadata are stored as documents in
a MongoDB collection. The waveform data can be stored in a conventional
file system or in MongoDB's gridfs system. At the time this docstring
was written testing was still in progress to establish the relative
performance of file system io versus gridfs, but our working hypothesis
is the answer of which is faster will be configuration dependent. In
either case the goal of this function is to make a save operation as
simple as possible by abstracting the complications involved in
the actual save.
Any errors messages held in the object being saved are always
written to documents in MongoDB is a special collection defined in
the schema. Saving object level history is optional.
There are multiple options described below. One worth emphasizing is
"data_tag". Such a tag is essential for intermediate saves of
a dataset if there is no other unique way to distinguish the
data in is current state from data saved earlier. For example,
consider a job that did nothing but read waveform segments spanning
a long time period (e.g. day files),cutting out a shorter time window,
and then saving windowed data. Crafting an unambiguous query to
find only the windowed data in that situation could be challenging
or impossible. Hence, we recommend a data tag always be used for
most saves.
The mode parameter needs to be understood by all users of this
function. All modes enforce a schema constraint for "readonly"
attributes. An immutable (readonly) attribute by definition
should not be changed during processing. During a save
all attributes with a key defined as readonly are tested
with a method in the Metadata container that keeps track of
any Metadata changes. If a readonly attribute is found to
have been changed it will be renamed with the prefix
"READONLYERROR_", saved, and an error posted (e.g. if you try
to alter site_lat (a readonly attribute) in a workflow when
you save the waveform you will find an entry with the key
READONERROR_site_lat.) In the default 'promiscuous' mode
all other attributes are blindly saved to the database as
name value pairs with no safeties. In 'cautious' mode we
add a type check. If the actual type of an attribute does not
match what the schema expect, this method will try to fix the
type error before saving the data. If the conversion is
successful it will be saved with a complaint error posted
to elog. If it fails, the attribute will not be saved, an
additional error message will be posted, and the save
algorithm continues. In 'pedantic' mode, in contrast, all
type errors are considered to invalidate the data.
Similar error messages to that in 'cautious' mode are posted
but any type errors will cause the datum passed as arg 0
to be killed. The lesson is saves can leave entries that
may need to be examined in elog and when really bad will
cause the datum to be marked dead after the save.
This method can throw an exception but only for errors in
usage (i.e. arguments defined incorrectly)
:param mspass_object: the object you want to save.
:type mspass_object: either :class:`mspasspy.ccore.seismic.TimeSeries` or :class:`mspasspy.ccore.seismic.Seismogram`
:param mode: This parameter defines how attributes defined with
key-value pairs in MongoDB documents are to be handled on reading.
By "to be handled" we mean how strongly to enforce name and type
specification in the schema for the type of object being constructed.
Options are ['promiscuous','cautious','pedantic'] with 'promiscuous'
being the default. See the User's manual for more details on
the concepts and how to use this option.
:type mode: :class:`str`
:param storage_mode: Must be either "gridfs" or "file. When set to
"gridfs" the waveform data are stored internally and managed by
MongoDB. If set to "file" the data will be stored in a file system
with the dir and dfile arguments defining a file name. The
default is "gridfs".
:type storage_mode: :class:`str`
:param dir: file directory for storage. This argument is ignored if
storage_mode is set to "gridfs". When storage_mode is "file" it
sets the directory in a file system where the data should be saved.
Note this can be an absolute or relative path. If the path is
relative it will be expanded with the standard python library
path functions to a full path name for storage in the database
document with the attribute "dir". As for any io we remind the
user that you much have write permission in this directory.
The writer will also fail if this directory does not already
exist. i.e. we do not attempt to
:type dir: :class:`str`
:param dfile: file name for storage of waveform data. As with dir
this parameter is ignored if storage_mode is "gridfs" and is
required only if storage_mode is "file". Note that this file
name does not have to be unique. The writer always calls positions
the write pointer to the end of the file referenced and sets the
attribute "foff" to that position. That allows automatic appends to
files without concerns about unique names.
:type dfile: :class:`str`
:param format: the format of the file. This can be one of the
`supported formats <https://docs.obspy.org/packages/autogen/obspy.core.stream.Stream.write.html#supported-formats>`__
of ObsPy writer. The default the python None which the method
assumes means to store the data in its raw binary form. The default
should normally be used for efficiency. Alternate formats are
primarily a simple export mechanism. See the User's manual for
more details on data export. Used only for "file" storage mode.
:type format: :class:`str`
:param overwrite: If true gridfs data linked to the original
waveform will be replaced by the sample data from this save.
Default is false, and should be the normal use. This option
should never be used after a reduce operator as the parents
are not tracked and the space advantage is likely minimal for
the confusion it would cause. This is most useful for light, stable
preprocessing with a set of map operators to regularize a data
set before more extensive processing. It can only be used when
storage_mode is set to gridfs.
:type overwrite: boolean
:param exclude_keys: Metadata can often become contaminated with
attributes that are no longer needed or a mismatch with the data.
A type example is the bundle algorithm takes three TimeSeries
objects and produces a single Seismogram from them. That process
can, and usually does, leave things like seed channel names and
orientation attributes (hang and vang) from one of the components
as extraneous baggage. Use this of keys to prevent such attributes
from being written to the output documents. Not if the data being
saved lack these keys nothing happens so it is safer, albeit slower,
to have the list be as large as necessary to eliminate any potential
debris.
:type exclude_keys: a :class:`list` of :class:`str`
:param collection: The default for this parameter is the python
None. The default should be used for all but data export functions.
The normal behavior is for this writer to use the object
data type to determine the schema is should use for any type or
name enforcement. This parameter allows an alernate collection to
be used with or without some different name and type restrictions.
The most common use of anything other than the default is an
export to a diffrent format.
:param data_tag: a user specified "data_tag" key. See above and
User's manual for guidance on how the use of this option.
:type data_tag: :class:`str`
:return: Data object as saved (if killed it will be dead)
"""
if not isinstance(mspass_object, (TimeSeries, Seismogram)):
raise TypeError("only TimeSeries and Seismogram are supported")
if storage_mode not in ["file", "gridfs"]:
raise TypeError("Unknown storage mode: {}".format(storage_mode))
if mode not in ["promiscuous", "cautious", "pedantic"]:
raise MsPASSError(
"only promiscuous, cautious and pedantic are supported, but {} is requested.".format(
mode
),
"Fatal",
)
# below we try to capture permission issue before writing anything to the database.
# However, in the case that a storage is almost full, exceptions can still be
# thrown, which could mess up the database record.
if storage_mode == "file":
if not dfile and not dir:
# Note the following uses the dir and dfile defined in the data object.
# It will ignore these two keys already in the collection in an update
# transaction, and the dir and dfile in the collection will be replaced.
if ("dir" not in mspass_object) or ("dfile" not in mspass_object):
raise ValueError("dir or dfile is not specified in data object")
dir = os.path.abspath(mspass_object["dir"])
dfile = mspass_object["dfile"]
if dir is None:
dir = os.getcwd()
else:
dir = os.path.abspath(dir)
if dfile is None:
dfile = self._get_dfile_uuid(
format
) # If dfile name is not given, or defined in mspass_object, a new uuid will be generated
fname = os.path.join(dir, dfile)
if os.path.exists(fname):
if not os.access(fname, os.W_OK):
raise PermissionError(
"No write permission to the save file: {}".format(fname)
)
else:
# the following loop finds the top level of existing parents to fname
# and check for write permission to that directory.
for path_item in pathlib.PurePath(fname).parents:
if os.path.exists(path_item):
if not os.access(path_item, os.W_OK | os.X_OK):
raise PermissionError(
"No write permission to the save directory: {}".format(
dir
)
)
break
schema = self.metadata_schema
if isinstance(mspass_object, TimeSeries):
save_schema = schema.TimeSeries
else:
save_schema = schema.Seismogram
# should define wf_collection here because if the mspass_object is dead
if collection:
wf_collection_name = collection
else:
# This returns a string that is the collection name for this atomic data type
# A weird construct
wf_collection_name = save_schema.collection("_id")
wf_collection = self[wf_collection_name]
if mspass_object.live:
if exclude_keys is None:
exclude_keys = []
# FIXME starttime will be automatically created in this function
self._sync_metadata_before_update(mspass_object)
# This method of Metadata returns a list of all
# attributes that were changed after creation of the
# object to which they are attached.
changed_key_list = mspass_object.modified()
copied_metadata = Metadata(mspass_object)
# clear all the aliases
# TODO check for potential bug in handling clear_aliases
# and modified method - i.e. keys returned by modified may be
# aliases
save_schema.clear_aliases(copied_metadata)
# remove any values with only spaces
for k in copied_metadata:
if not str(copied_metadata[k]).strip():
copied_metadata.erase(k)
# remove any defined items in exclude list
for k in exclude_keys:
if k in copied_metadata:
copied_metadata.erase(k)
# the special mongodb key _id is currently set readonly in
# the mspass schema. It would be cleard in the following loop
# but it is better to not depend on that external constraint.
# The reason is the insert_one used below for wf collections
# will silently update an existing record if the _id key
# is present in the update record. We want this method
# to always save the current copy with a new id and so
# we make sure we clear it
if "_id" in copied_metadata:
copied_metadata.erase("_id")
# Now remove any readonly data
for k in copied_metadata.keys():
if save_schema.is_defined(k):
if save_schema.readonly(k):
if k in changed_key_list:
newkey = "READONLYERROR_" + k
copied_metadata.change_key(k, newkey)
mspass_object.elog.log_error(
"Database.save_data",
"readonly attribute with key="
+ k
+ " was improperly modified. Saved changed value with key="
+ newkey,
ErrorSeverity.Complaint,
)
else:
copied_metadata.erase(k)
# Done editing, now we convert copied_metadata to a python dict
# using this Metadata method or the long version when in cautious or pedantic mode
insertion_dict = dict()
if mode == "promiscuous":
# A python dictionary can use Metadata as a constructor due to
# the way the bindings were defined
insertion_dict = dict(copied_metadata)
else:
# Other modes have to test every key and type of value
# before continuing. pedantic kills data with any problems
# Cautious tries to fix the problem first
# Note many errors can be posted - one for each problem key-value pair
for k in copied_metadata:
if save_schema.is_defined(k):
if isinstance(copied_metadata[k], save_schema.type(k)):
insertion_dict[k] = copied_metadata[k]
else:
if mode == "pedantic":
mspass_object.kill()
message = "pedantic mode error: key=" + k
value = copied_metadata[k]