-
Notifications
You must be signed in to change notification settings - Fork 10
/
normalize.py
executable file
·3440 lines (3109 loc) · 154 KB
/
normalize.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from abc import ABC, abstractmethod
from mspasspy.ccore.utility import MsPASSError, ErrorSeverity, Metadata
from mspasspy.util.error_logger import PyErrorLogger
from mspasspy.ccore.seismic import (
TimeSeries,
Seismogram,
TimeSeriesEnsemble,
SeismogramEnsemble,
)
from mspasspy.db.database import Database
from mspasspy.util.decorators import mspass_func_wrapper
from bson import ObjectId
from obspy import UTCDateTime
import pymongo
import copy
import pandas as pd
import dask
import numpy as np
type_pdd = pd.core.frame.DataFrame
type_ddd = dask.dataframe.core.DataFrame
class BasicMatcher(ABC):
"""
Base class defining the api for a generic matching capability for
MongoDB normalization. The base class is a skeleton that
defines on required abstract methods and initializes a set of
universal attributes all matchers need. It cannot be instatiated directly.
Matching is defined as one of two things: (1) a one-to-one
match algorithm is guaranteed to have each search yield either
exactly one match or none. That is defined through a find_one
method following the same concept in MongoDB. (2) some
matches are not unique and yield more than one document.
For that case use the find method. Unlike the MongoDB
find method, however, find in this context returns a list of
Metadata containers holding the set of attributes requested
in lists defined on constuction.
Another way of viewing this interface, in fact, is an abstraction of
the find and find_one methods of MongoDB to a wider class of
algorithms that may or may not utilize MongoDB directly.
In particular, intermediate level classes defined below that implement
different cache data structures allow input either by
loading data from a MongoDB collection of from a pandas DataFrame.
That can potentially provide a wide variety of applications of
matching data to tabular data contained in files loaded into
pandas by any of long list of standard dataframe read methods.
Examples are any SQL database or antelope raw tables or views
loaded as text files.
"""
def __init__(
self,
attributes_to_load=None,
load_if_defined=None,
aliases=None,
):
"""
Base class constructor sets only attributes considered necessary for all
subclasses. Most if not all subclasses will want to call this
constructor driven by an arg list passed to the subclass constructor.
:param attributes_to_load: is a list of keys (strings) that are to
be loaded from the normalizing table/collection. Default for
this base class is None, but subclasses should normally define
a default list. It is important to realize that all subclasses
should normally treat this list as a set of required attributes.
Optional values should appear in the load_if_defined list.
:param load_if_defined: is a secondary list of keys (strings) that
should be loaded only if they are defined. Default is None here,
subclass should set their own default values.
:param aliases: should be a python dictionary used to define
alternative keys to access a data object's Metadata from that
defining the same attribute in the collection/table being
matched. Note carefully the key of the dictionary is the
collection/table attribute name and the value associated with
that key is the alias to use to fetch Metadata. When matchers
scan the attributes_to_load and load_if_defined list they
should treat missing entries in alias as meaning the key in
the collection/table and Metadata are identical. Default is
a None which is used as a signal to this constructor to
create an empty dictionary meaning there are no aliases.
:type aliases: python dictionary
"""
if attributes_to_load is None:
self.attributes_to_load = []
else:
self.attributes_to_load = attributes_to_load
if load_if_defined is None:
self.load_if_defined = []
else:
self.load_if_defined = load_if_defined
if aliases is None:
self.aliases = dict()
elif isinstance(aliases, dict):
self.aliases = aliases
else:
raise TypeError(
"BasicMatcher constructor: aliases argument must be either None or define a python dictionary"
)
def __call__(self, d, *args, **kwargs):
"""
Convenience method that allows the shorthand of the find_one method
using the standard python meaning of this symbol. e.g. if
we have an concrete instance of a subclass of this base class
for matching against ObjectIds using the implementation below
called IDMatcher we assign to the symbol "matcher" we can
get a Metadata container of the matching content for a MsPASS
data object with symbol d using md = matcher(d) versus
md = matcher.find_one(d). All subclasses have this interface
define because it is a (nonvirtual) base class method.
"""
return self.find_one(d, *args, **kwargs)
@abstractmethod
def find(self, mspass_object, *args, **kwargs) -> tuple:
"""
Abstraction of the MongoDB database find method with the
matching criteria defined when a concrete instance is instantiated.
Like the MongoDB method implementations it should return a list
of containers matching the keys found in the data passed through
the mspass_object. A key difference from MongoDB, however, is
that instead of a MongoDB cursor we return a python list of Metadata
containers. In some instances that is a
direct translation of a MongoDB cursor to a list of Metadata
objects. The abstraction is useful to allow small collections
to be accessed faster with a generic cache algorithm (see below)
and loading of tables of data through a file-based subclass
of this base. All can be treated through a common interface.
WE STRESS STRONGLY that the abstraction assumes returns are
always small enough to not cause a memory bloating problem.
If you need the big-memory model of a cursor use it directly.
All subclasses must implement this virtual
method to be concrete or they cannot be instantiated.
If the matching algorithm implemented is always expected to
be a unique one-to-one match applications may want to have this
method throw an exception as a use error. That case should
use the find_one interface defined below.
All implementations should return a pair (2 component tuple).
0 is expected to hold a list of Metadata containers and
1 is expected to contain either a None type or an PyErrorLogger
object. The PyErrorLogger is a convenient way to pass error
messages back to the caller in a manner that is easier to handle
with the MsPASS error system than an exception mechanism.
Callers should handle four cases that are possible for a return
(Noting [] means an empty list and [...] a list with data)
1. [] None - notmatch found
2. [] ErrorLog - failure with an informational message in the
ErrorLog that should be preserved. The presence of an error
should imply something went wrong and it was simply a null result.
3. [...] None - all is good with no detected errors
4. [...] ErrorLog - valid data returned but there is a warning
or informational message posted. In this case handlers
may want to examine the ErrorSeverity components of the log
and handle different levels differently (e.g. Fatal and
Informational should always be treated differently)
"""
@abstractmethod
def find_one(self, mspass_object, *args, **kwargs) -> tuple:
"""
Abstraction of the MongoDB database find_one method with the
matching criteria defined when a concrete instance is instantiated.
Like the MongoDB method implementations should return the
unique document that the keys in mspass_object are expected
to define with the matching criteria defined by the instance.
A type example of an always unique match is ObjectIds.
When a match is found the result should be returned in a
Metadata container. The attributes returned are normally
a subset of the document and are defined by the base class
attributes "attributes_to_load" and "load_if_defined".
For database instances this is little more than copying
desired attributes from the matching document returned by
MongoDB, but for abstraction more may be involved.
e.g., implemented below is a generic cached algorithm that
stores a collection to be matched in memory for efficiency.
That implementation allows the "collection" to be loaded from
MongoDB or a pandas DataFrame.
All implementations should return a pair (2 component tuple).
0 is expected to hold a Metadata containers that was yielded by
the match. It should be returned as None if there is no match.
1 is expected to contain either a None type or an PyErrorLogger
object. The PyErrorLogger is a convenient way to pass error
messages back to the caller in a manner that is easier to handle
with the MsPASS error system than an exception mechanism.
Callers should handle four cases that are possible for a return:
1, None None - no match found
2. None ErrorLog - failure with an informational message in the
ErrorLog that the caller may want be preserved or convert to
an exception.
3. Metadata None - all is good with no detected errors
4. Metadata ErrorLog - valid data was returned but there is a warning
or informational message posted. In this case handlers
may want to examine the ErrorSeverity components of the log
and handle different levels differently (e.g. Fatal and
Informational should always be treated differently)
"""
pass
class DatabaseMatcher(BasicMatcher):
"""
Matcher using direct database queries to MongoDB. Each call to
the find method of this class constructs a query, calls the MongoDB
database find method with that query, and extracts desired attributes
from the return in the form of a Metadata container. The query
construction is abstracted by a virtual method called query_generator.
This is an intermediate class that cannot be instantiated directly
because it contains a virtual method. User's should consult
docstrings for constructors for subclasses of this intermediate class.
"""
def __init__(
self,
db,
collection,
attributes_to_load=None,
load_if_defined=None,
aliases=None,
require_unique_match=False,
prepend_collection_name=False,
):
"""
Constructor for this intermediate class. It should not be
used except by subclasses as this intermediate class
is not concrete.
"""
super().__init__(
attributes_to_load=attributes_to_load,
load_if_defined=load_if_defined,
aliases=aliases,
)
if not isinstance(collection, str):
raise TypeError(
"{} constructor: required arg1 must be a collection name - received invalid type".format(
self.__class__.__name__
)
)
self.collection = collection
if isinstance(db, pymongo.database.Database):
self.dbhandle = db[collection]
else:
message = "DatabaseMatcher constructor: db argument is not a valid Database handle\n"
message += "Actual type of db arg={}".format(str(type(db)))
raise TypeError(message)
self.require_unique_match = require_unique_match
self.prepend_collection_name = prepend_collection_name
@abstractmethod
def query_generator(self, mspass_object) -> dict:
"""
Subclasses of this intermediate class MUST implement this
method. It should extract content from mspass_object and
use that content to generate a MongoDB query that is
passed directly to the find method of the MongoDB database
handle stored within this object (self) during
the class construction. Since pymongo uses a
python dict for that purpose it must return a valid
query dict. Implementations should return None if no
query could be generated. Common, for example, if a key
required to generate the query is missing from mspass_object.
"""
pass
def find(self, mspass_object):
"""
Generic database implementation of the find method for this
abstraction. It returns what the base class api specifies.
That is, normally it returns a tuple with component 0 being
a python list of Metadata containers. Each container holds
the subset of attributes defined by attributes_to_load and
(if present) load_if_defined. The list is the set of all
documents matching the query, which at this level of the class
structure is abstract.
The method dogmatically requires data for all keys
defined by attributes_to_load. It will throw a MsPASSError
exception with a Fatal tag if any of the required attributes
are not defined in any of the documents. The return matches
the API specification for BasicMatcher.
It also handles failures of the abstract query_generator
through the mechanism the base class api specified: a None
return means the method could not create a valid query.
Failures in the query will always post a message to elog
tagging the result as "Invalid".
It also handles the common problem of dead data or accidentally
receiving invalid data like a None. The later may cause other
algorithms to abort, but we handle it here return [None,None].
We don't return an PyErrorLogger in that situation as the assumption
is there is no place to put it and something else has gone really
wrong.
"""
if not _input_is_valid(mspass_object):
elog = PyErrorLogger()
message = "received invalid data. Arg0 must be a valid MsPASS data object"
elog.log_error(message, ErrorSeverity.Invalid)
if mspass_object.dead():
return [None, None]
query = self.query_generator(mspass_object)
if query is None:
elog = PyErrorLogger()
message = "query_generator method failed to generate a valid query - required attributes are probably missing"
elog.log_error(message, ErrorSeverity.Invalid)
return [None, elog]
number_hits = self.dbhandle.count_documents(query)
if number_hits <= 0:
elog = PyErrorLogger()
message = "query = " + str(query) + " yielded no documents"
elog.log_error(message, ErrorSeverity.Complaint)
return [None, elog]
cursor = self.dbhandle.find(query)
elog = PyErrorLogger()
metadata_list = []
for doc in cursor:
try:
md = _extractData2Metadata(
doc,
self.attributes_to_load,
self.aliases,
self.prepend_collection_name,
self.collection,
self.load_if_defined,
)
metadata_list.append(md)
except MsPASSError as e:
raise MsPASSError("DatabaseMatcher.find: " + e.message, e.severity)
if elog.size() <= 0:
return [metadata_list, None]
else:
return [metadata_list, elog]
def find_one(self, mspass_object):
"""
Generic database implementation of the find_one method. The tacit
assumption is that if you call find_one you are expecting a unique
match to the algorithm implemented. The actual behavior for a
nonunique match is controlled by the class attribute
require_unique_match. Subclasses that want to dogmatically enforce
uniqueness (appropriate for example with ObjectIds) should
set require_unique_match True. In that case if a match is not
unique the method will throw an exception. When False, which is
the default, an informational message is posted and the method
returns the first list element returned by find. This method is
actually little more than a wrapper around find to handle that
uniqueness issue.
"""
find_output = self.find(mspass_object)
if find_output[0] is None:
return [None, find_output[1]]
mdlist_length = len(find_output[0])
if mdlist_length == 1:
return [find_output[0][0], find_output[1]]
else:
# somewhat messy logic to handle differnt situations
# we throw an exception if the constructor set require_unique_match
# True. Otherwise we need to handle the distinction on whether or
# not the return from find had an PyErrorLogger defined with data.
if self.require_unique_match:
message = "query of collection {col} did not yield a unique match. Found {n} matching documents. Aborting".format(
col=self.collection, n=mdlist_length
)
raise MsPASSError(
"DatabaseMatcher.find_one: " + message, ErrorSeverity.Fatal
)
else:
if find_output[1] is None:
elog = PyErrorLogger()
else:
elog = find_output[1]
message = "query of collection {col} did not yield a unique match. Found {n} matching documents. Using first one in list".format(
col=self.collection, n=mdlist_length
)
elog.log_error(
"DatabaseMatcher.find_one", message, ErrorSeverity.Complaint
)
return [find_output[0][0], elog]
class DictionaryCacheMatcher(BasicMatcher):
"""
Matcher implementing a caching method based on a python dictionary.
This is an intermediate class for instances where the database collection
to be matched is small enough that the in-memory model is appropriate.
It should also only be used if the matching algorithm can be reduced
to a single string that can serve as a unique id for each tuple.
The class defines a generic dictionary cache with a string key. The way that
key is define is abstracted through two virtual methods:
(1) The cache_id method creates a match key from a mspass data object.
That is normally from the Metadata container but it is not
restricted to that. e.g. start time for TimeSeries or
Seismogram objects can be obtained from the t0 attribute
directly.
(2) The db_make_cache_id is called by the internal method of this
intermediate class (method name is _load_normalization_cache)
to build the cache index from MongoDB documents scanned to
construct the cache.
Two different methods to define the cache index are necessary as a
generic way to implement aliases. A type example is the mspass use
of names like "channel_id" to refer to the ObjectId of a specific
document in the channel collection. When loading channel the name
key is "_id" but data objects would normally have that same data
defined with the key "channel_id". Similarly, if data have had
aliases applied a key in the data may not match the name in a
collection to be matched. The dark side of this is it is very
easy when running subclasses of this to get null results with
all members of a dataset. As always testing with a subset of
data is strongly recommended before running versions of this on
a large dataset.
This class cannot be instantiated because it is not concrete
(has abstract - virtual - methods that must be defined by subclasses)
See implementations for constructor argument definitions.
"""
def __init__(
self,
db_or_df,
collection,
query=None,
attributes_to_load=None,
aliases=None,
load_if_defined=None,
require_unique_match=False,
prepend_collection_name=False,
use_dataframe_index_as_cache_id=False,
):
"""
Constructor for this intermediate class. It should not be
used except by subclasses as this intermediate class
is not concrete. It calls the base class constructor
and then loads two internal attributes: query and collection.
It then creates the normalization python dict that applies the
abstract cache_id method. Note that only works for
concrete subclasses of this intermediate class.
"""
super().__init__(
attributes_to_load=attributes_to_load,
load_if_defined=load_if_defined,
aliases=aliases,
)
if not isinstance(collection, str):
raise TypeError(
"{} constructor: required arg1 must be a collection name - received invalid type".format(
self.__class__.__name__
)
)
if query == None:
self.query = dict()
elif isinstance(query, dict):
self.query = query
else:
raise TypeError(
"{} constructor: query argument must be a python dict container".format(
self.__class__.__name__
)
)
self.collection = collection
self.require_unique_match = require_unique_match
self.prepend_collection_name = prepend_collection_name
self.use_dataframe_index_as_cache_id = use_dataframe_index_as_cache_id
# This is a redundant initialization but a minor cost for stability
self.normcache = dict()
# Reference the base class to avoid a type error
# This seems to be an oddity from using the same name Database
# in mspass as pymongo
if isinstance(db_or_df, pymongo.database.Database):
self._db_load_normalization_cache(db_or_df, collection)
elif isinstance(db_or_df, (type_ddd, type_pdd)):
self._df_load_normalization_cache(db_or_df, collection)
else:
message = "{} constructor: required arg0 must be a Database handle or panda Dataframe\n".format(
self.__class__.__name__
)
message += "Actual type = {}".format(str(type(db_or_df)))
raise TypeError(message)
@abstractmethod
def cache_id(self, mspass_object) -> str:
"""
Concrete implementations must implement this method to define
how a mspass data object, mspass_object, is to be used to
construct the key to the cache dict container. It is
distinct from db_make_cache_id to allow differences in naming
or even the algorithm used to construct the key from a datum
relative to the database. This complicates the interface but
makes it more generic.
:param mspass_object: is expected to be a MsPASS object.
Any type restrictions should be implemented in subclasses
that implement the method.
:return: should always return a valid string and never throw
an exception. If the algorithm fails the implementation should
return a None.
"""
pass
@abstractmethod
def db_make_cache_id(self, doc) -> str:
"""
Concrete implementation must implement this method to define how
the cache index is to be created from database documents passed
through the arg doc, which pymongo always returns as a python dict.
It is distinct from cache_id to allow differences in naming or
the algorithm for loading the cache compared to accessing it
using attributes of a data object. If the id string cannot
be created from doc an implementation should return None.
The generic loaders in this class, db_load_normalization_cache
and df_load_normalization_class, handle that situation cleanly
but if a subclass overrides the load methods they should handle
such errors. "cleanly" in this case means they throw an
exception which is appropriate since they are run during construction
and any invalid key is not acceptable in that situation.
"""
pass
def find_one(self, mspass_object) -> tuple:
"""
Implementation of find for generic cached method. It uses the cache_id
method to create the indexing string from mspass_object and then returns
a match to the cache stored in self. Only subclasses of this
intermediate class can work because the cache_id method is
defined as a pure virtual method in this intermediate class.
That construct is used to simplify writing additional matcher
classes. All extensions need to do is define the cache_id
and db_make_cache_id algorithms to build that index.
:param mspass_object: Any valid MsPASS data object. That means
TimeSeries, Seismogram, TimeSeriesEnsemble, or SeismogramEnsemble.
This datum is passed to the (abstract) cache_id method to
create an index string and the result is used to fetch the
Metadata container matching that key. What is required of the
input is dependent on the subclass implementation of cache_id.
:return: 2-component tuple following API specification in BasicMatcher.
Only two possible results are possible from this implementation:
None ErrorLog - failure with an error message that can be passed on
if desired or printed
Metadata None - all is good with no detected errors. The Metadata
container holds all attributes_to_load and any defined
load_if_defined values.
"""
find_output = self.find(mspass_object)
if find_output[0] is None:
return find_output
elif len(find_output[0]) == 1:
return [find_output[0][0], find_output[1]]
else:
# as with the database version we use require_unique_match
# to define if we should be dogmatic or not
if self.require_unique_match:
message = "query does not yield a unique match and require_unique_match is set true"
raise MsPASSError(
"DictionaryCacheMatcher.find: " + message, ErrorSeverity.Fatal
)
else:
message = "encountered a nonunique match calling find_one - returning contents of first matching document found"
if find_output[1] is None:
elog = PyErrorLogger()
else:
elog = find_output[1]
elog.log_error(
"DictionaryCacheMatcher.find: ", message, ErrorSeverity.Complaint
)
return [find_output[0][0], elog]
def find(self, mspass_object) -> tuple:
"""
Generic implementation of find method for cached tables/collections.
This method is a generalization of the MongoDB database find method.
It differs in two ways. First, it creates the "query" directly from
a MsPASS data object (pymongo find requires a dict as input).
Second, the result is return as a python list of Metadata containers
containing what is (usually) a subset of the data stored in the
original collection (table). In contrast pymongo database find
returns a database "Cursor" object which is their implementation of
a large list that may exceed the size of memory. A key point is
the model here makes sense only if the original table itself is small
enough to not cause a memory problem. Further, find calls that
yield long list may cause efficiency problems as subclasses that
build on this usually will need to do a linear search through the
list if they need to find a particular instance (e.g. call to find_one).
:param mspass_object: data object to match against
data in cache (i.e. query).
:type mspass_object: must be a valid MsPASS data object.
currently that means TimeSeries, Seismogram, TimeSeriesEnsemble,
or SeismogramEnsemble. If it is anything else (e.g. None)
this method will return a tuple [None, elog] with elog being
a PyErrorLogger with a posted message.
:return: tuple with two elements. 0 is either a list of valid Metadata
container(s) or None and 1 is either None or an PyErrorLogger object.
There are only two possible returns from this method:
[None, elog] - find failed. See/save elog for why it failed.
[ [md1, md2, ..., mdn], None] - success with 0 a list of Metadata
containing attributes_to_load and load_if_defined
(if appropriate) in each component.
"""
if not _input_is_valid(mspass_object):
elog = PyErrorLogger()
elog.log_error(
"Received datum that was not a valid MsPASS data object",
ErrorSeverity.Invalid,
)
return [None, elog]
if mspass_object.dead():
return [None, None]
thisid = self.cache_id(mspass_object)
# this should perhaps generate two different messages as the
# they imply slightly different things - the current message
# is accurate though
if (thisid == None) or (thisid not in self.normcache):
error_message = "cache_id method found no match for this datum"
elog = PyErrorLogger()
elog.log_error(error_message, ErrorSeverity.Invalid)
return [None, elog]
else:
return [self.normcache[thisid], None]
def _db_load_normalization_cache(self, db, collection):
"""
This private method abstracts the process of loading a cached
version of a normalizing collection. It creates a python
dict stored internally with the name self.normcache. The
container is keyed by a string created by
the virtual method cache_id. The value associated with each
dict key is a python list of Metadata containers.
Each component is constructed from any document matching the
algorithm defined by cache_id. The list is constructed by
essentially appending a new Metadata object whenever a
matching cache_id is returned.
The Metadata containers normally contain only a subset of
the original attributes in the collection. The list
attributes_to_load is treated as required and this method
will throw a MsPASSError exception if any of them are missing
from any document parsed. Use load_if_define for attributes
that are not required for your workflow.
This method will throw a MsPASS fatal error in two situations:
1. If the collection following the (optional) query is empty
2. If any attribute in the self.attributes_to_load is not
defined in any document loaded. In all BasicMatcher
subclasses attributes_to_load are considered required.
:param db: MsPASS Database class MongoDB database handle. Note
it can be the subclass of the base class MongooDB handle
as extensions for MsPASS to the handle are not used in this method.
:param collection: string defining the MongoDB collection that is
to be loaded and indexed - the normalizing collection target.
"""
dbhandle = db[collection]
self.collection_size = dbhandle.count_documents(self.query)
if self.collection_size <= 0:
message = "Query={} of collection {} yielded no documents\n".format(
str(self.query), collection
)
message += "Cannot construct a zero length object"
raise MsPASSError(
"DictionaryCacheMatcher._load_normalization_cache: " + message,
ErrorSeverity.Fatal,
)
cursor = dbhandle.find(self.query)
self.normcache = dict()
count = 0
for doc in cursor:
cache_key = self.db_make_cache_id(doc)
# This error trap may not be necessary but the api requires us
# to handle a None return
if cache_key == None:
raise MsPASSError(
"DictionaryCacheMatcher._load_normalization_cache: "
+ "db_make_cache_id failed - coding problem or major problem with collection="
+ collection,
ErrorSeverity.Fatal,
)
try:
md = _extractData2Metadata(
doc,
self.attributes_to_load,
self.aliases,
self.prepend_collection_name,
collection,
self.load_if_defined,
)
if cache_key in self.normcache:
self.normcache[cache_key].append(md)
else:
self.normcache[cache_key] = [md]
count += 1
except MsPASSError as e:
raise MsPASSError(
e.message
+ " in document number {n} of collection {col}".format(
n=count, col=collection
),
e.severity,
)
def _df_load_normalization_cache(self, df, collection):
"""
This function does the same thing as _db_load_normalization_cache, the
only difference is that this current function takes one argument, which
is a dataframe.
:param df: a pandas/dask dataframe where we load data from
"""
query_result = df
if self.query is not None and len(self.query) > 0:
# Create a query
# There are multiple ways of querying in a dataframe, according to
# the experiments in https://stackoverflow.com/a/46165056/11138718
# We pick the following approach:
sub_conds = [df[key].values == val for key, val in self.query.items()]
cond = np.logical_and.reduce(sub_conds)
query_result = df[cond]
if len(query_result.index) <= 0:
message = (
"Query={query_str} of dataframe={dataframe_str}"
" yielded 0 documents - cannot construct this object".format(
query_str=str(self.query), dataframe_str=str(df)
)
)
raise MsPASSError(
"DictionaryCacheMatcher._load_normalization_cache: " + message,
ErrorSeverity.Fatal,
)
self.normcache = dict()
count = 0
for index, doc in query_result.iterrows():
cache_key = index
if not self.use_dataframe_index_as_cache_id:
cache_key = self.db_make_cache_id(doc)
# This error trap may not be necessary but the api requires us
# to handle a None return
if cache_key == None:
raise MsPASSError(
"DictionaryCacheMatcher._load_normalization_cache: "
+ "db_make_cache_id failed - coding problem or major problem with collection="
+ collection,
ErrorSeverity.Fatal,
)
try:
md = _extractData2Metadata(
doc,
self.attributes_to_load,
self.aliases,
self.prepend_collection_name,
collection,
self.load_if_defined,
)
if cache_key in self.normcache:
self.normcache[cache_key].append(md)
else:
self.normcache[cache_key] = [md]
count += 1
except MsPASSError as e:
raise MsPASSError(
e.message
+ " in document number {n} of collection {col}".format(
n=count, col=collection
),
e.severity,
)
class DataFrameCacheMatcher(BasicMatcher):
"""
Matcher implementing a caching method based on a Pandas DataFrame
This is an intermediate class for instances where the database collection
to be matched is small enough that the in-memory model is appropriate.
It should be used when the matching algorithm is readily cast into the
subsetting api of a pandas DataFrame.
The constructor of this intermediate class first calls the BasicMatcher
(base class) constructor to initialize some common attribute including
the critical lists of attributes to be loaded. This constructor then
creates the internal DataFrame cache by one of two methods.
If arg0 is a MongoDB database handle it loads the data in the
named collection to a DataFrame created during construction. If the
input is a DataFrame already it is simply copied selecting only
columns defined by the attributes_to_load and load_if_defined lists.
There is also an optional parameter, custom_null_values, that is a
python dictionary defining values in a field that should be treated
as a definition of a Null for that field. The constuctor converts
such values to a standard pandas null field value.
This class implements generic find and find_one methods.
Subclasses of this class must implement a "subset" method to be
concrete. A subset method is the abstract algorithm that defines
a match for that instance expressed as a pandas subset operation.
(For most algorithms there are multiple ways to skin that cat or is
it a panda?) See concrete subclasses for examples.
This class cannot be instantiated because it is not concrete
(has abstract - virtual - methods that must be defined by subclasses)
See implementations for constructor argument definitions.
"""
def __init__(
self,
db_or_df,
collection=None,
attributes_to_load=None,
load_if_defined=None,
aliases=None,
require_unique_match=False,
prepend_collection_name=False,
custom_null_values=None,
):
"""
Constructor for this intermediate class. It should not be
used except by subclasses as this intermediate class
is not concrete.
"""
self.prepend_collection_name = prepend_collection_name
self.require_unique_match = require_unique_match
self.custom_null_values = custom_null_values
# this is a necessary sanity check
if collection is None:
raise TypeError(
"DataFrameCacheMatcher constructor: collection name must be defined when prepend_collection_name is set True"
)
elif isinstance(collection, str):
self.collection = collection
else:
raise TypeError(
"DataFrameCacheMatcher constructor: collection argument must be a string type"
)
# We have to reference the base class pymongo.database.Database here because
# the MsPASS subclass name is also Database. That make this
# conditional fail in some uses. Using the base class is totally
# appropriate here anyway as no MsPASS extension methods are used
if not isinstance(db_or_df, (type_pdd, type_ddd, pymongo.database.Database)):
raise TypeError(
"DataFrameCacheMatcher constructor: required arg0 must be either a pandas, dask Dataframe, or database handle"
)
if attributes_to_load is None:
if load_if_defined is None:
raise MsPASSError(
"DataFrameCacheMatcher constructor: usage error. Cannot use default of attributes_to_load (triggers loading all columns) and define a list of names for argument load_if_defined"
)
aload = list()
for key in db_or_df.columns:
aload.append(key)
else:
aload = attributes_to_load
super().__init__(
attributes_to_load=aload,
load_if_defined=load_if_defined,
aliases=aliases,
)
df = (
db_or_df
if isinstance(db_or_df, (type_pdd, type_ddd))
else pd.DataFrame(list(db_or_df[self.collection].find({})))
)
self._load_dataframe_cache(df)
def find(self, mspass_object) -> tuple:
"""
DataFrame generic implementation of find method.
This method uses content in any part of the mspass_object
(data object) to subset the internal DataFrame cache to
return subset of tuples matching some condition defined
computed through the abstract (virtual) methdod subset.
It then copies entries in attributes_to_load and when not
null load_if_defined into one Metadata container for each
row of the returned DataFrame.
"""
if not _input_is_valid(mspass_object):
elog = PyErrorLogger(
"DataFrameCacheMatcher.find",
"Received datum that was not a valid MsPASS data object",
ErrorSeverity.Invalid,
)
return [None, elog]
subset_df = self.subset(mspass_object)
# assume all implementations will return a 0 length dataframe
# if the subset failed.
if len(subset_df) <= 0:
error_message = "subset method found no match for this datum"
elog = PyErrorLogger()
elog.log_error(error_message, ErrorSeverity.Invalid)
return [None, elog]
else:
# This loop cautiously fills one or more Metadata
# containers with each row of the DataFrame generating
# one Metadata container.
mdlist = list()
elog = None
for index, row in subset_df.iterrows():
md = Metadata()
notnulltest = row.notnull()
for k in self.attributes_to_load:
if notnulltest[k]:
if k in self.aliases:
key = self.aliases[k]
else:
key = k
if self.prepend_collection_name:
if key == "_id":
mdkey = self.collection + key
else:
mdkey = self.collection + "_" + key
else:
mdkey = key
md[mdkey] = row[key]
else:
if elog is None:
elog = PyErrorLogger()
error_message = "Encountered Null value for required attribute {key} - repairs of the input DataFrame are required".format(
key=k
)
elog.log_error(
error_message,
ErrorSeverity.Invalid,
)
return [None, elog]
for k in self.load_if_defined:
if notnulltest[k]:
if k in self.aliases:
key = self.aliases[k]
else:
key = k
if self.prepend_collection_name:
if key == "_id":
mdkey = self.collection + key
else:
mdkey = self.collection + "_" + key
else:
mdkey = key
md[mdkey] = row[key]
mdlist.append(md)
return [mdlist, None]
def find_one(self, mspass_object) -> tuple:
"""
DataFrame implementation of the find_one method.
This method is mostly a wrapper around the find method.
It calls the find method and then does one of two thing s
depending upon the value of self.require_unique_match.
When that boolean is True if the match is not unique it
creates an PyErrorLogger object, posts a message to the log,
and then returns a [Null,elog] pair. If self.require_unique_match
is False and the match is not ambiguous, it again creates an
PyErrorLogger and posts a message, but it also takes the first
container in the list returned by find and returns in as
component 0 of the pair.
"""
findreturn = self.find(mspass_object)
mdlist = findreturn[0]
if mdlist is None:
return findreturn
elif len(mdlist) == 1:
return [mdlist[0], findreturn[1]]
elif len(mdlist) > 1:
if self.require_unique_match:
raise MsPASSError(
"DataFrameCacheMatcher.find_one: found {n} matches when require_unique_match was set true".format(
n=len(mdlist)
),