/
DAPloaders.py
executable file
·2129 lines (1861 loc) · 109 KB
/
DAPloaders.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
__author__ = "Mike McCann"
__copyright__ = "Copyright 2011, MBARI"
__credits__ = ["Chander Ganesan, Open Technology Group"]
__license__ = "GPL"
__version__ = "$Revision: 1.1 $".split()[1]
__maintainer__ = "Mike McCann"
__email__ = "mccann at mbari.org"
__status__ = "Development"
'''
The DAPloaders module contains classes for reading data from OPeNDAP servers and
loading into the STOQS database. It assumes that all data are on the 4 spatial-
temporal dimensions as defined in the COARDS/CF convention. There are custom
derived classes here that understand, Mooring (Station and StationProfile), AUV
and Glider (Trajectory) CDM Data Types.
Mike McCann
MBARI Dec 29, 2011
@var __date__: Date of last svn commit
@undocumented: __doc__ parser
@author: __author__
@status: __status__
@license: __license__
'''
# Force lookup of models to THE specific stoqs module.
import os
import re
import sys
from argparse import Namespace
from django.contrib.gis.geos import LineString, Point
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../")) # config is one dir up
os.environ['DJANGO_SETTINGS_MODULE'] = 'config.settings.local'
from django.conf import settings
from django.db.models import Max
from django.db.utils import IntegrityError, DatabaseError
from django.db import transaction
from jdcal import gcal2jd, jd2gcal
from stoqs import models as m
from datetime import datetime, timedelta
import pytz
from pydap.client import open_url
import pydap.model
import math
from coards import to_udunits, from_udunits, ParserError
import logging
import socket
import seawater.eos80 as sw
from utils.utils import mode, simplify_points
from loaders import STOQS_Loader, SkipRecord, HasMeasurement, MEASUREDINSITU, FileNotFound
from loaders.SampleLoaders import get_closest_instantpoint, ClosestTimeNotFoundException
import numpy as np
from collections import defaultdict
# Set up logging
logger = logging.getLogger(__name__)
# Logging level set in stoqs/config/common.py, but may override here
##logger.setLevel(logging.INFO)
# When settings.DEBUG is True Django will fill up a hash with stats on every insert done to the database.
# "Monkey patch" the CursorWrapper to prevent this. Otherwise we can't load large amounts of data.
# See http://stackoverflow.com/questions/7768027/turn-off-sql-logging-while-keeping-settings-debug
from django.db.backends.base.base import BaseDatabaseWrapper
from django.db.backends.utils import CursorWrapper
if settings.DEBUG:
BaseDatabaseWrapper.make_debug_cursor = lambda self, cursor: CursorWrapper(cursor, self)
class ParameterNotFound(Exception):
pass
class NoValidData(Exception):
pass
class AuxCoordMissingStandardName(Exception):
pass
class VariableMissingCoordinatesAttribute(Exception):
pass
class InvalidSliceRequest(Exception):
pass
class OpendapError(Exception):
pass
class Base_Loader(STOQS_Loader):
'''
A base class for data load operations. This shouldn't be instantiated directly,
instead a loader for a particular platform should inherit from it. Since
each platform could have its own parameters, etc. each platform (at a minimum)
should declare the overridden names, ignored names, etc..
The time bounds of an Activities can be specified in two ways:
1. By specifying startDatetime and endDatetime. This is handy for extracting a subset
of data from an OPeNDAP data source, e.g. aggregated Mooring data, to populate a
campaign specific database
2. By setting startDatetime and endDatetime to None, in which case the start and end
times are defined by the start and end of the data in the specified url
A third time parameter (dataStartDatetime) can be specified. This is used for when
data is to be appended to an existing activity, such as for the realtime tethys loads
as done by the monitorLrauv.py script in the realtime folder. This
use has not been fully tested.
'''
def __init__(self, activityName, platformName, url, dbAlias='default', campaignName=None, campaignDescription=None,
activitytypeName=None, platformColor=None, platformTypeName=None,
startDatetime=None, endDatetime=None, dataStartDatetime=None, auxCoords=None, stride=1,
grdTerrain=None, appendFlag=False, backfill_timedelta=None ):
'''
Given a URL open the url and store the dataset as an attribute of the object,
then build a set of standard names using the dataset.
The activity is saved, as all the data loaded will be a set of instantpoints
that use the specified activity.
stride is used to speed up loads by skipping data.
@param activityName: A string describing this activity
@param platformName: A string that is the name of the platform.
If that name for a Platform exists in the DB, it will be used.
@param platformColor: An RGB hex string represnting the color of the platform.
@param url: The OPeNDAP URL for the data source
@param dbAlias: The name of the database alias as defined in settings.py
@param campaignName: A string describing the Campaign in which this activity belongs.
If that name for a Campaign exists in the DB, it will be used.
@param campaignDescription: A string expanding on the campaignName.
It should be a short phrase expressing the where and why of a campaign.
@param activitytypeName: A string such as 'mooring deployment' or 'AUV mission' describing type of
activity, If that name for a ActivityType exists in the DB, it will be used.
@param platformTypeName: A string describing the type of platform, e.g.: 'mooring', 'auv'.
If that name for a PlatformType exists in the DB, it will be used.
@param startDatetime: A Python datetime.dateime object specifying the start date time of data to load
@param endDatetime: A Python datetime.dateime object specifying the end date time of data to load
@param dataStartDatetime: A Python datetime.dateime object specifying the start date time of data
to append to an existing Activity
@param appendFlag: If true then a dataStartDatetime value will be set by looking up the last
timevalue in the database for the Activity returned by getActivityName().
A True value will override the passed parameter dataStartDatetime.
@param backfill_timedelta: Some appendFlag datastreams may have missing data data records at
end of previous loads, e.g. M1. Set this to a datetime.timedelta
to backfill those records.
@param auxCoords: a dictionary of coordinate standard_names (time, latitude, longitude, depth)
pointing to exact names of those coordinates. Used for variables missing the
coordinates attribute.
@param stride: The stride/step size used to retrieve data from the url.
'''
self.campaignName = campaignName
self.campaignDescription = campaignDescription
self.activitytypeName = activitytypeName
self.platformName = platformName
self.platformColor = platformColor
self.dbAlias = dbAlias
self.platformTypeName = platformTypeName
self.activityName = activityName
self.requested_startDatetime = startDatetime
self.startDatetime = startDatetime
self.requested_endDatetime = endDatetime
self.endDatetime = endDatetime
self.dataStartDatetime = dataStartDatetime # For when we append data to an existing Activity
self.auxCoords = auxCoords
self.stride = stride
self.grdTerrain = grdTerrain
self.appendFlag = appendFlag
self.backfill_timedelta = backfill_timedelta
self.url = url
self.varsLoaded = []
try:
self.ds = open_url(url)
except (socket.error, pydap.exceptions.ServerError, pydap.exceptions.ClientError):
message = 'Failed in attempt to open_url("%s")' % url
logger.warn(message)
# Give calling routing option of catching and ignoring
raise OpendapError(message)
except Exception:
logger.error('Failed in attempt to open_url("%s")', url)
raise
self.ignored_names = list(self.global_ignored_names) # Start with copy of list of global ignored names
self.build_standard_names()
def _getStartAndEndTimeFromDS(self):
'''
Examine all possible time coordinates for include_names and set the overall min and max time for the dataset.
To be used for setting Activity startDatetime and endDatetime.
'''
# TODO: Refactor to simplify. McCabe MC0001 pylint complexity warning issued.
# TODO: Parse EPIC time and time2 variables
minDT = {}
maxDT = {}
for v in self.include_names:
try:
ac = self.getAuxCoordinates(v)
except ParameterNotFound as e:
logger.warn(str(e))
continue
if self.getFeatureType() == 'trajectory' or self.getFeatureType() == 'trajectoryprofile':
logger.debug('Getting trajectory min and max times for v = %s', v)
logger.debug("self.ds[ac['time']][0] = %s", self.ds[ac['time']][0])
try:
minDT[v] = from_udunits(self.ds[ac['time']].data[0][0], self.ds[ac['time']].attributes['units'])
maxDT[v] = from_udunits(self.ds[ac['time']].data[-1][0], self.ds[ac['time']].attributes['units'])
except ParserError as e:
logger.warn("%s. Trying to fix up time units", e)
# Tolerate units like 1970-01-01T00:00:00Z - which is found on the IOOS Glider DAC
if self.ds[ac['time']].attributes['units'] == 'seconds since 1970-01-01T00:00:00Z':
minDT[v] = from_udunits(self.ds[ac['time']].data[0][0], 'seconds since 1970-01-01 00:00:00')
maxDT[v] = from_udunits(self.ds[ac['time']].data[-1][0], 'seconds since 1970-01-01 00:00:00')
elif self.getFeatureType() == 'timeseries' or self.getFeatureType() == 'timeseriesprofile':
logger.debug('Getting timeseries start time for v = %s', v)
minDT[v] = from_udunits(self.ds[v][ac['time']].data[0][0], self.ds[ac['time']].attributes['units'])
maxDT[v] = from_udunits(self.ds[v][ac['time']].data[-1][0], self.ds[ac['time']].attributes['units'])
else:
# Perhaps a strange file like LOPC size class data along a trajectory
minDT[v] = from_udunits(self.ds[ac['time']].data[0][0], self.ds[ac['time']].attributes['units'])
maxDT[v] = from_udunits(self.ds[ac['time']].data[-1][0], self.ds[ac['time']].attributes['units'])
logger.debug('minDT = %s', minDT)
logger.debug('maxDT = %s', maxDT)
# STOQS does not deal with data in the future and in B.C.
startDatetime = datetime.utcnow()
endDatetime = datetime(1,1,1)
for v, dt in minDT.iteritems():
try:
if dt < startDatetime:
startDatetime = dt
except NameError:
startDatetime = dt
for v, dt in maxDT.iteritems():
try:
if dt > endDatetime:
endDatetime = dt
except NameError:
endDatetime = dt
if not maxDT or not minDT:
raise NoValidData('No valid dates')
logger.info('Activity startDatetime = %s, endDatetime = %s', startDatetime, endDatetime)
return startDatetime, endDatetime
def initDB(self):
'''
Do the intial Database activities that are required before the data are processed: getPlatorm and createActivity.
Can be overridden by sub class. An overriding method can do such things as setting startDatetime and endDatetime.
'''
if self.checkForValidData():
self.platform = self.getPlatform(self.platformName, self.platformTypeName)
self.addParameters(self.ds)
# Ensure that startDatetime and startDatetime are defined as they are required fields of Activity
if not self.startDatetime or not self.endDatetime:
self.startDatetime, self.endDatetime = self._getStartAndEndTimeFromDS()
self.createActivity()
else:
raise NoValidData('No valid data in url %s' % (self.url))
def getmissing_value(self, var):
'''
Return the missing_value attribute for netCDF variable var
'''
try:
mv = self.ds[var].attributes['missing_value']
except KeyError:
logger.debug('Cannot get attribute missing_value for variable %s from url %s', var, self.url)
except AttributeError as e:
logger.debug(str(e))
else:
return mv
def get_FillValue(self, var):
'''
Return the _FillValue attribute for netCDF variable var
'''
try:
fv = self.ds[var].attributes['_FillValue']
except KeyError:
logger.debug('Cannot get attribute _FillValue for variable %s from url %s', var, self.url)
try:
# Fred's L_662 and other glider data files have the 'FillValue' attribute, not '_FillValue'
fv = self.ds[var].attributes['FillValue']
except Exception:
logger.debug('Cannot get attribute FillValue for variable %s from url %s', var, self.url)
return None
except AttributeError as e:
logger.debug(str(e))
else:
return fv
def get_shape_length(self, pname):
'''Works for both pydap 3.1.1 and 3.2.0
'''
try:
shape_length = len(self.ds[pname].shape)
except AttributeError:
# Likely using pydap 3.2+
shape_length = len(self.ds[pname].array.shape)
return shape_length
def getActivityName(self):
'''Return actual Activity name that will be in the database accounting
for permutations of startDatetime and stride values per NetCDF file name.
'''
# Modify Activity name if temporal subset extracted from NetCDF file
newName = self.activityName
if self.requested_startDatetime and self.requested_endDatetime:
if '(stride' in self.activityName:
first_part = self.activityName[:self.activityName.find('(stride')]
last_part = self.activityName[self.activityName.find('(stride'):]
else:
first_part = self.activityName
last_part = ''
newName = '{} starting at {} {}'.format(first_part.strip(), self.requested_startDatetime, last_part)
return newName
def getFeatureType(self):
'''
Return string of featureType from table at http://cf-pcmdi.llnl.gov/documents/cf-conventions/1.6/ch09.html.
Accomodate previous concepts of this attribute and convert to the new discrete sampling geometry conventions in CF-1.6.
Possible return values: 'trajectory', 'timeseries', 'timeseriesprofile', lowercase versions.
'''
conventions = ''
try:
nc_global_keys = self.ds.attributes['NC_GLOBAL']
except KeyError:
logger.warn('Dataset does not have an NC_GLOBAL attribute! Setting featureType to "trajectory" assuming that this is an old Tethys file')
return 'trajectory'
if 'Conventions' in nc_global_keys:
conventions = self.ds.attributes['NC_GLOBAL']['Conventions'].lower()
elif 'Convention' in nc_global_keys:
conventions = self.ds.attributes['NC_GLOBAL']['Convention'].lower()
elif 'conventions' in nc_global_keys:
conventions = self.ds.attributes['NC_GLOBAL']['conventions'].lower()
else:
conventions = ''
if 'cf-1.6' in conventions.lower():
featureType = self.ds.attributes['NC_GLOBAL']['featureType']
else:
# Accept earlier versions of the concept of this attribute that may be in legacy data sets
if 'cdm_data_type' in nc_global_keys:
featureType = self.ds.attributes['NC_GLOBAL']['cdm_data_type']
elif 'thredds_data_type' in nc_global_keys:
featureType = self.ds.attributes['NC_GLOBAL']['thredds_data_type']
elif 'CF%3afeatureType' in nc_global_keys:
featureType = self.ds.attributes['NC_GLOBAL']['CF%3afeatureType']
elif 'CF_featureType' in nc_global_keys:
featureType = self.ds.attributes['NC_GLOBAL']['CF_featureType']
elif 'CF:featureType' in nc_global_keys: # Seen in lrauv/*/realtime/sbdlogs files
featureType = self.ds.attributes['NC_GLOBAL']['CF:featureType']
elif 'featureType' in nc_global_keys: # Seen in roms.nc file from JPL
featureType = self.ds.attributes['NC_GLOBAL']['featureType']
else:
featureType = ''
if featureType.lower() == 'station':
# Used in elvis' TDS mooring data aggregation, it's really 'timeSeriesProfile'
featureType = 'timeSeriesProfile'
if featureType.lower() == 'Trajectory':
featureType = 'trajectory'
# Put the CF-1.6 proper featureType into NC_GLOBAL so that addResources will put it into the database
self.ds.attributes['NC_GLOBAL']['featureType'] = featureType
return featureType.lower()
def _getCoordinates(self, from_variables):
'''Return tuple of (Dictionary of geospatial/temporal standard_names keyed by variable name,
Dictionary of variable names keyed by geospatial/temporal standard_names).
'''
coordSN = {}
snCoord = {}
for k in from_variables:
if 'standard_name' in self.ds[k].attributes:
if self.ds[k].attributes['standard_name'] in ('time', 'latitude', 'longitude', 'depth'):
coordSN[k] = self.ds[k].attributes['standard_name']
snCoord[self.ds[k].attributes['standard_name']] = k
return coordSN, snCoord
def getAuxCoordinates(self, variable):
'''
Return a dictionary of a variable's auxilary coordinates mapped to the standard_names of 'time', 'latitude',
'longitude', and 'depth'. Accomodate previous ways of associating these variables and convert to the new
CF-1.6 conventions as outlined in Chapter 5 of the document. If an auxCoord dictionary is passed to the
Loader then that dictionary will be returned for variables that do not have a valid coordinates attribute;
this is handy for datasets that are not yet compliant.
Requirements for compliance: variables have a coordinates attribute listing the 4 geospatial/temporal
coordinates, the coordinate variables have standard_names of 'time', 'latitude', 'longitude', 'depth'.
Example return value: {'time': 'esecs', 'depth': 'DEPTH', 'latitude': 'lat', 'longitude': 'lon'}
'''
# Match items in coordinate attribute, via coordinate standard_name to coordinate name
if variable not in self.ds:
raise ParameterNotFound('Variable %s is not in dataset %s' % (variable, self.url))
coordDict = {}
if 'coordinates' in self.ds[variable].attributes:
coords = self.ds[variable].attributes['coordinates'].split()
coordSN, snCoord = self._getCoordinates(coords)
for coord in coords:
logger.debug(coord)
try:
logger.debug(snCoord)
coordDict[coordSN[coord]] = coord
except KeyError as e:
raise AuxCoordMissingStandardName(e)
else:
logger.warn('Variable %s is missing coordinates attribute, checking if loader has specified it in auxCoords', variable)
if variable in self.auxCoords:
# Try getting it from overridden values provided
for coordSN, coord in self.auxCoords[variable].iteritems():
try:
coordDict[coordSN] = coord
except KeyError as e:
raise AuxCoordMissingStandardName(e)
else:
logger.warn('%s not in auxCoords' % variable)
# Check for all 4 coordinates needed for spatial-temporal location - if any are missing raise exception with suggestion
reqCoords = set(('time', 'latitude', 'longitude', 'depth'))
logger.debug('coordDict = %s', coordDict)
if set(coordDict.keys()) != reqCoords:
logger.warn('Required coordinate(s) %s missing in NetCDF file. Consider overriding by setting an'
' auxCoords dictionary in your Loader.',
list(reqCoords - set(coordDict.keys())))
if not self.auxCoords:
raise VariableMissingCoordinatesAttribute('%s: %s missing coordinates attribute' % (self.url, variable,))
logger.debug('coordDict = %s', coordDict)
if not coordDict:
if self.auxCoords:
if variable in self.auxCoords:
# Simply return self.auxCoords if specified in the constructor
logger.debug('Returning auxCoords for variable %s that were specified in the constructor: %s', variable, self.auxCoords[variable])
return self.auxCoords[variable]
else:
raise ParameterNotFound('auxCoords is specified, but variable requested (%s) is not in %s' % (variable, self.auxCoords))
else:
return coordDict
def getNominalLocation(self):
'''
For timeSeries and timeSeriesProfile data return nominal location as a tuple of (depth, latitude, longitude) as
expressed in the coordinate variables of the mooring or station. For timeSeries features depth will be a scalar,
for timeSeriesProfile depth will be an array of depths.
'''
depths = {}
lats = {}
lons = {}
for v in self.include_names:
logger.debug('Calling self.getAuxCoordinates() for v = %s', v)
try:
ac = self.getAuxCoordinates(v)
except ParameterNotFound as e:
logger.warn('Skipping include_name = %s: %s', v, e)
continue
# depth may be single-valued or an array
if self.getFeatureType() == 'timeseries':
logger.debug('Initializing depths list for timeseries, ac = %s', ac)
try:
depths[v] = self.ds[v][ac['depth']].data[:][0]
except KeyError:
logger.warn('No depth coordinate found for %s. Assuming EPIC scalar and assigning depth from first element', v)
depths[v] = self.ds[ac['depth']].data[0]
elif self.getFeatureType() == 'timeseriesprofile':
logger.debug('Initializing depths list for timeseriesprofile, ac = %s', ac)
try:
depths[v] = self.ds[v][ac['depth']].data[:]
except KeyError:
# Likely a 'timeseries' variable in a 'timeseriesprofile' file (e.g. heading in ADCP file)
# look elsewhere for a nominal depth
depths[v] = [float(self.ds.attributes['NC_GLOBAL']['nominal_sensor_depth'])]
elif self.getFeatureType() == 'trajectoryprofile':
logger.debug('Initializing depths list for trajectoryprofile, ac = %s', ac)
depths[v] = self.ds[v][ac['depth']].data[:]
try:
lons[v] = self.ds[v][ac['longitude']].data[:][0]
except KeyError:
if len(self.ds[ac['longitude']][:]) == 1:
lons[v] = self.ds[ac['longitude']].data[:][0]
else:
logger.warn('Variable %s has longitude auxillary coordinate of length %d, expecting it to be 1.',
v, len(self.ds[ac['longitude']].data[:]))
try:
lats[v] = self.ds[v][ac['latitude']].data[:][0]
except KeyError:
if len(self.ds[ac['latitude']].data[:]) == 1:
lats[v] = self.ds[ac['latitude']].data[:][0]
else:
logger.warn('Variable %s has latitude auxillary coordidate of length %d, expecting it to be 1.',
v, len(self.ds[ac['latitude']].data[:]))
# All variables must have the same nominal location
if len(set(lats.values())) != 1 or len(set(lons.values())) != 1:
raise Exception('Invalid file coordinates structure. All variables must have'
' identical nominal lat & lon, lats = %s, lons = %s' % lats, lons)
return depths, lats, lons
def getTimeBegEndIndices(self, timeAxis):
'''
Return beginning and ending indices for the corresponding time axis indices
'''
isEPIC = False
try:
isEPIC = 'EPIC' in self.ds.attributes['NC_GLOBAL']['Conventions'].upper()
except KeyError:
# No 'Conventions' key on 'NC_GLOBAL', check another way, e.g.
# http://dods.mbari.org/opendap/data/CCE_Archive/MS1/20151006/CTOBSTrans9m/MBCCE_MS1_CTOBSTrans9m_20151006.nc
# does not have a Conventions global attribute, so also check for time, time2 and the units
isEPIC = 'time' in self.ds.keys() and 'time2' in self.ds.keys() and self.ds['time'].attributes['units'] == 'True Julian Day'
if isEPIC:
logger.warn("%s does not have 'Conventions', yet appears to be EPIC from its time/time2 variables", self.url)
if isEPIC:
# True Julian dates are at noon, so take int() to match EPIC's time axis values and to satisfy:
# datum: Time (UTC) in True Julian Days: 2440000 = 0000 h on May 23, 1968
# NOTE: Decimal Julian day [days] = time [days] + ( time2 [msec] / 86400000 [msec/day] )
jbd = int(sum(gcal2jd(self.startDatetime.year, self.startDatetime.month, self.startDatetime.day)) + 0.5)
jed = int(sum(gcal2jd(self.endDatetime.year, self.endDatetime.month, self.endDatetime.day)) + 0.5)
t_indx = np.where((jbd <= timeAxis) & (timeAxis <= jed))[0]
if not t_indx.any():
raise NoValidData('No data from %s for time values between %s and %s. Skipping.' % (self.url,
self.startDatetime, self.endDatetime))
# Refine indicies with fractional portion of the day (ms since midnight) as represented in the time2 variable
bms = 0
if self.startDatetime.hour or self.startDatetime.minute or self.startDatetime.second:
bms = self.startDatetime.hour * 3600000 + self.startDatetime.minute * 60000 + self.startDatetime.second * 1000
ems = 86400000
if self.endDatetime.hour or self.endDatetime.minute or self.endDatetime.second:
ems = self.endDatetime.hour * 3600000 + self.endDatetime.minute * 60000 + self.endDatetime.second * 1000
# Tolerate datasets that begin or end inside the limits of self.startDatetime and self.endDatetime
beg_day_indices = np.where(jbd == timeAxis)[0]
t2_indx_beg = 0
if beg_day_indices.any():
time2_axis_beg = self.ds['time2']['time2'][beg_day_indices[0]:beg_day_indices[-1]]
t2_indx_beg = np.where(bms <= time2_axis_beg)[0][0]
end_day_indices = np.where(jed == timeAxis)[0]
t2_indx_end = 0
if end_day_indices.any():
time2_axis_end = self.ds['time2']['time2'][end_day_indices[0]:end_day_indices[-1]]
t2_indx_end = len(time2_axis_end) - np.where(ems >= time2_axis_end)[0][-1]
indices = t_indx[0] + t2_indx_beg, t_indx[-1] - t2_indx_end
return indices
timeAxisUnits = timeAxis.units.lower()
timeAxisUnits = timeAxisUnits.replace('utc', 'UTC') # coards requires UTC to be upper case
if timeAxis.units == 'seconds since 1970-01-01T00:00:00Z'or timeAxis.units == 'seconds since 1970/01/01 00:00:00Z':
timeAxisUnits = 'seconds since 1970-01-01 00:00:00' # coards doesn't like ISO format
if self.startDatetime:
logger.debug('self.startDatetime, timeAxis.units = %s, %s', self.startDatetime, timeAxis.units)
s = to_udunits(self.startDatetime, timeAxisUnits)
logger.debug("For startDatetime = %s, the udnits value is %f", self.startDatetime, s)
if self.dataStartDatetime:
# Override s if self.dataStartDatetime is specified
logger.debug('self.dataStartDatetime, timeAxis.units = %s, %s', self.dataStartDatetime, timeAxis.units)
s = to_udunits(self.dataStartDatetime, timeAxisUnits)
logger.debug("For dataStartDatetime = %s, the udnits value is %f", self.dataStartDatetime, s)
if self.requested_endDatetime:
# endDatetime may be None, in which case just read until the end
e = to_udunits(self.endDatetime, timeAxisUnits)
logger.debug("For endDatetime = %s, the udnits value is %f", self.endDatetime, e)
else:
e = timeAxis[-1]
logger.debug("requested_endDatetime not given, using the last value of timeAxis = %f", e)
tf = (s <= timeAxis) & (timeAxis <= e)
logger.debug('tf = %s', tf)
tIndx = np.nonzero(tf == True)[0]
if tIndx.size == 0:
raise NoValidData('No data from %s for time values between %s and %s. Skipping.' % (self.url, s, e))
# For python slicing add 1 to the end index
logger.debug('tIndx = %s', tIndx)
try:
indices = (tIndx[0], tIndx[-1] + 1)
except IndexError:
raise NoValidData('Could not get first and last indexes from tIndex = %s. Skipping.' % (tIndx))
logger.debug('Start and end indices are: %s', indices)
if tIndx[-1] <= tIndx[0]:
raise InvalidSliceRequest('Cannot issue OPeNDAP temporal constraint expression with length 0 or less.')
return indices
def getTotalRecords(self):
'''
For the url count all the records that are to be loaded from all the include_names and return it.
Computes the sum of the product of the time slice and the rest of the elements of the shape.
'''
count = 0
numDerived = 0
trajSingleParameterCount = 0
for name in self.include_names:
logger.info('Counting valid data for parameter: %s', name)
try:
tIndx = self.getTimeBegEndIndices(self.ds[self.getAuxCoordinates(name)['time']])
except ParameterNotFound:
logger.warn('Ignoring parameter: %s', name)
except InvalidSliceRequest:
logger.warn('No valid data for parameter: %s', name)
continue
except KeyError as e:
logger.warn("%s: Skipping", e)
continue
try:
if self.getFeatureType() == 'trajectory':
try:
trajSingleParameterCount = np.prod(self.ds[name].shape[1:] + (np.diff(tIndx)[0],))
except AttributeError:
# Likely using pydap 3.2+
trajSingleParameterCount = np.prod(self.ds[name].array.shape[1:] + (np.diff(tIndx)[0],))
try:
count += (np.prod(self.ds[name].shape[1:] + (np.diff(tIndx)[0],)) / self.stride)
except AttributeError:
# Likely using pydap 3.2+
count += (np.prod(self.ds[name].array.shape[1:] + (np.diff(tIndx)[0],)) / self.stride)
except KeyError as e:
if self.getFeatureType() == 'trajectory':
# Assume that it's a derived variable and add same count as
logger.debug("%s: Assuming it's a derived parameter", e)
numDerived += 1
logger.debug('Adding %d derived parameters of length %d to the count', numDerived, trajSingleParameterCount / self.stride)
if trajSingleParameterCount:
count += (numDerived * trajSingleParameterCount / self.stride)
return count
def _genTimeSeriesGridType(self):
'''
Generator of TimeSeriesProfile (tzyx where z is multi-valued) and TimeSeries (tzyx where z is single-valued) data.
Using terminology from CF-1.6 assume data is from a discrete sampling geometry type of timeSeriesProfile or timeSeries.
Yields a uniform values dictionary for inserting rows into the database.
'''
data = {}
times = {}
depths = {}
latitudes = {}
longitudes = {}
timeUnits = {}
nomLats = {}
nomLons = {}
# TODO: Refactor to simplify. McCabe MC0001 pylint complexity warning issued.
# Read the data from the OPeNDAP url into arrays keyed on parameter name - these arrays may take a bit of memory
# The reads here take advantage of OPeNDAP access mechanisms to efficiently transfer data across the network
for pname in self.include_names:
if pname not in self.ds:
continue # Quietly skip over parameters not in ds: allows combination of variables and files in same loader
# Peek at the shape and pull apart the data from its grid coordinates
logger.info('Reading data from %s: %s %s', self.url, pname, type(self.ds[pname]))
if isinstance(self.ds[pname], pydap.model.GridType):
# On tzyx grid - default for all OS formatted station data COARDS coordinate ordering conventions
# E.g. for http://elvis.shore.mbari.org/thredds/dodsC/agg/OS_MBARI-M1_R_TS, shape = (74040, 11, 1, 1)
# or http://elvis.shore.mbari.org/thredds/dodsC/agg/OS_MBARI-M1_R_TS, shape = (74850, 1, 1, 1)
try:
tIndx = self.getTimeBegEndIndices(self.ds[self.ds[pname].keys()[1]])
except KeyError as e:
logger.warn("%s: Skipping", e)
continue
try:
# Subselect along the time axis, get all z values
logger.info("From: %s", self.url)
logger.info("Using constraints: ds['%s']['%s'][%d:%d:%d,:,0,0]", pname, pname, tIndx[0], tIndx[-1], self.stride)
v = self.ds[pname][pname].data[tIndx[0]:tIndx[-1]:self.stride,:,0,0]
except ValueError as err:
message = ('\nGot error "%s" reading data from URL: %s.\n'
'If it is: "string size must be a multiple of element size"'
' and the URL is a TDS aggregation then the cache files must'
' be removed and the tomcat hosting TDS restarted.' % (err, self.url))
logger.error(message)
raise OpendapError(message)
except pydap.exceptions.ServerError as e:
if self.stride > 1:
logger.warn('%s and stride > 1. Skipping dataset %s', e, self.url)
continue
else:
logger.exception('%s', e)
sys.exit(-1)
if len(v.shape) == 1:
logger.info("len(v.shape) = 1; likely EPIC timeseries data - reshaping to add a 'depth' dimension")
v = v.reshape(v.shape[0], 1)
# The STOQS datavalue
data[pname] = iter(v) # Iterator on time axis delivering all z values in an array with .next()
# CF (nee COARDS) has tzyx coordinate ordering, time is at index [1] and depth is at [2]
times[pname] = self.ds[self.ds[pname].keys()[1]].data[tIndx[0]:tIndx[-1]:self.stride]
# Try and get the depths array, first by CF/COARDS coordinate rules, then by EPIC conventions
try:
depths[pname] = self.ds[self.ds[pname].keys()[2]].data[:] # TODO lookup more precise depth from conversion from pressure
except IndexError:
logger.warn('Variable %s has less than 2 coordinates: %s', pname, self.ds[pname].keys())
depths[pname] = []
if len(depths[pname]) == 1:
try:
logger.info('Attempting to set nominal depth from EPIC Convention sensor_depth variable attribute')
depths[pname] = [self.ds[pname].attributes['sensor_depth']]
except KeyError:
logger.info('Variable %s does not have a sensor_depth attribute', pname)
if not list(depths[pname]):
logger.warn('Depth coordinate not found at index [2]. Looking for nominal position from EPIC Convention global attributes.')
try:
depths[pname] = [self.ds.attributes['NC_GLOBAL']['nominal_instrument_depth']]
nomLats[pname] = self.ds.attributes['NC_GLOBAL']['latitude']
nomLons[pname] = self.ds.attributes['NC_GLOBAL']['longitude']
except KeyError:
logger.warn('EPIC nominal position not found in global attributes. Assigning from variables.')
depths[pname] = self.ds['depth'].data[0]
nomLats[pname] = self.ds['lat'].data[0][0]
nomLons[pname] = self.ds['lon'].data[0][0]
timeUnits[pname] = self.ds[self.ds[pname].keys()[1]].units.lower()
if timeUnits[pname] == 'true julian day':
# Create COARDS time from EPIC data
time2s = self.ds['time2']['time2'].data[tIndx[0]:tIndx[-1]:self.stride]
timeUnits[pname] = 'seconds since 1970-01-01 00:00:00'
epoch_secs = []
for jd, ms in zip(times[pname], time2s):
gcal = jd2gcal(jd - 0.5, ms / 86400000.0)
gcal_datetime = datetime(*gcal[:3]) + timedelta(days=gcal[3])
epoch_secs.append(to_udunits(gcal_datetime, timeUnits[pname]))
times[pname] = epoch_secs
timeUnits[pname] = timeUnits[pname].replace('utc', 'UTC') # coards requires UTC in uppercase
if self.ds[self.ds[pname].keys()[1]].units == 'seconds since 1970-01-01T00:00:00Z':
timeUnits[pname] = 'seconds since 1970-01-01 00:00:00' # coards 1.0.4 and earlier doesn't like ISO format
if depths and nomLats and nomLons:
logger.info('Nominal position assigned from EPIC Convention global attributes')
nomDepths = depths
else:
nomDepths, nomLats, nomLons = self.getNominalLocation() # Possible to have both precise and nominal locations with this approach
shape_length = self.get_shape_length(pname)
if shape_length == 4:
logger.info('%s has shape of 4, assume that singleton dimensions are used for nominal latitude and longitude', pname)
# Assumes COARDS coordinate ordering
latitudes[pname] = float(self.ds[self.ds[pname].keys()[3]].data[0]) # TODO lookup more precise gps lat via coordinates pointing to a vector
longitudes[pname] = float(self.ds[self.ds[pname].keys()[4]].data[0]) # TODO lookup more precise gps lon via coordinates pointing to a vector
elif shape_length == 3 and 'EPIC' in self.ds.attributes['NC_GLOBAL']['Conventions'].upper():
# Special fix for USGS EPIC ADCP variables missing depth coordinate, but having nominal sensor depth metadata
latitudes[pname] = float(self.ds[self.ds[pname].keys()[2]].data[0]) # TODO lookup more precise gps lat via coordinates pointing to a vector
longitudes[pname] = float(self.ds[self.ds[pname].keys()[3]].data[0]) # TODO lookup more precise gps lon via coordinates pointing to a vector
depths[pname] = nomDepths[pname]
elif shape_length == 2:
logger.info('%s has shape of 2, assuming no latitude and longitude singletime'
' dimensions. Using nominal location read from auxially coordinates', pname)
longitudes[pname] = nomLons[pname]
latitudes[pname] = nomLats[pname]
elif shape_length == 1:
logger.info('%s has shape of 1, assuming no latitude, longitude, and'
' depth singletime dimensions. Using nominal location read'
' from auxially coordinates', pname)
longitudes[pname] = nomLons[pname]
latitudes[pname] = nomLats[pname]
depths[pname] = nomDepths[pname]
else:
raise Exception('{} has shape of {}. Can handle only shapes of 2, and 4'.format(pname, shape_length))
else:
logger.warn('Variable %s is not of type pydap.model.GridType', pname)
logger.warn('Variable %s is not of type pydap.model.GridType with a shape length of 4.'
' It has a shape length of %d.', pname, shape_length)
# Deliver the data harmonized as rows as an iterator so that they are fed as needed to the database
for pname in data.keys():
logger.info('Delivering rows of data for %s', pname)
l = 0
for depthArray in data[pname]:
k = 0
##logger.debug('depthArray = %s', depthArray)
##logger.debug('nomDepths = %s', nomDepths)
values = {}
for dv in depthArray:
values[pname] = float(dv)
values['time'] = times[pname][l]
values['depth'] = depths[pname][k]
values['latitude'] = latitudes[pname]
values['longitude'] = longitudes[pname]
values['timeUnits'] = timeUnits[pname]
try:
values['nomDepth'] = nomDepths[pname][k]
except IndexError:
values['nomDepth'] = nomDepths[pname]
values['nomLat'] = nomLats[pname]
values['nomLon'] = nomLons[pname]
yield values
k = k + 1
l = l + 1
def _genTrajectory(self):
'''
Generator of trajectory data. The data values are a function of time and the coordinates attribute
identifies the depth, latitude, and longitude from where the measurement was made.
Using terminology from CF-1.6 assume data is from a discrete geometry type of trajectory.
Provides a uniform dictionary that contains attributes and their associated values without the need
to individualize code for each data source.
'''
ac = {}
data = {}
times = {}
depths = {}
latitudes = {}
longitudes = {}
timeUnits = {}
measurements = {}
# TODO: Refactor to simplify. McCabe MC0001 pylint complexity warning issued. Too many local variables.
# Read the data from the OPeNDAP url into arrays keyed on parameter name - these arrays may take a bit of memory
# The reads here take advantage of OPeNDAP access mechanisms to efficiently transfer data across the network
for pname in self.include_names:
if pname not in self.ds.keys():
logger.warn('include_name %s not in dataset %s', pname, self.url)
continue
# Peek at the shape and pull apart the data from its grid coordinates
# Only single trajectories are allowed
shape_length = self.get_shape_length(pname)
logger.info('Reading data from %s: %s %s %s', self.url, pname, shape_length, type(self.ds[pname]))
if shape_length == 1 and isinstance(self.ds[pname], pydap.model.BaseType):
# Legacy Dorado data need to be processed as BaseType; Example data:
# dsdorado = open_url('http://odss.mbari.org/thredds/dodsC/CANON_september2012/dorado/Dorado389_2012_256_00_256_00_decim.nc')
# dsdorado['temperature'].shape = (12288,)
ac[pname] = self.getAuxCoordinates(pname)
try:
logger.debug("ac[pname]['time'] = %s", ac[pname]['time'])
tIndx = self.getTimeBegEndIndices(self.ds[ac[pname]['time']])
except (NoValidData, InvalidSliceRequest) as e:
logger.warn('Skipping this parameter. %s', e)
continue
try:
# Subselect along the time axis
logger.info("Using constraints: ds['%s'][%d:%d:%d]", pname, tIndx[0], tIndx[-1], self.stride)
v = self.ds[pname].data[tIndx[0]:tIndx[-1]:self.stride]
except ValueError as err:
logger.error('\nGot error "%s" reading data from URL: %s.\n'
'If it is: "string size must be a multiple of element size"'
' and the URL is a TDS aggregation then the cache files must'
' be removed and the tomcat hosting TDS restarted.', err, self.url)
sys.exit(1)
except pydap.exceptions.ServerError as e:
logger.exception('%s', e)
sys.exit(-1)
continue
# The STOQS datavalue
data[pname] = iter(v) # Iterator on time axis delivering all values in an array with .next()
# Peek at coordinate attribute to get depth, latitude, longitude values from the other BaseTypes
logger.info('ac = %s', ac)
times[pname] = self.ds[ac[pname]['time']].data[tIndx[0]:tIndx[-1]:self.stride]
try:
depths[pname] = self.ds[ac[pname]['depth']].data[tIndx[0]:tIndx[-1]:self.stride]
except KeyError:
# Allow for variables with no depth coordinate to be loaded at the depth specified in auxCoords
if isinstance(ac[pname]['depth'], float):
depths[pname] = ac[pname]['depth'] * np.ones(len(times[pname]))
latitudes[pname] = self.ds[ac[pname]['latitude']].data[tIndx[0]:tIndx[-1]:self.stride]
longitudes[pname] = self.ds[ac[pname]['longitude']].data[tIndx[0]:tIndx[-1]:self.stride]
timeUnits[pname] = self.ds[ac[pname]['time']].units.lower()
timeUnits[pname] = timeUnits[pname].replace('utc', 'UTC') # coards requires UTC in uppercase
if self.ds[ac[pname]['time']].units == 'seconds since 1970-01-01T00:00:00Z':
timeUnits[pname] = 'seconds since 1970-01-01 00:00:00' # coards doesn't like ISO format
elif shape_length == 1 and isinstance(self.ds[pname], pydap.model.GridType):
# LRAUV data need to be processed as GridType
ac[pname] = self.getAuxCoordinates(pname)
tIndx = self.getTimeBegEndIndices(self.ds[ac[pname]['time']])
try:
# Subselect along the time axis
logger.info("Using constraints: ds['%s']['%s'][%d:%d:%d]", pname, pname, tIndx[0], tIndx[-1], self.stride)
v = self.ds[pname][pname].data[tIndx[0]:tIndx[-1]:self.stride]
except ValueError as err:
if str(err) == 'need more than 1 value to unpack':
# Likely stride is larger than length of array; report and skip
logger.error('%s', err)
continue
else:
logger.error('''\nGot error '%s' reading data from URL: %s.
If it is: 'string size must be a multiple of element size' and the URL is a TDS aggregation
then the cache files must be removed and the tomcat hosting TDS restarted.''', err, self.url)
sys.exit(1)
except pydap.exceptions.ServerError as e:
logger.error('%s', e)
continue
# The STOQS datavalue
data[pname] = iter(v) # Iterator on time axis delivering all values in an array with .next()
# Peek at coordinate attribute to get depth, latitude, longitude values from the other BaseTypes
logger.info('ac = %s', ac)
times[pname] = self.ds[ac[pname]['time']].data[tIndx[0]:tIndx[-1]:self.stride]
depths[pname] = self.ds[ac[pname]['depth']][ac[pname]['depth']].data[tIndx[0]:tIndx[-1]:self.stride]
latitudes[pname] = self.ds[ac[pname]['latitude']][ac[pname]['latitude']].data[tIndx[0]:tIndx[-1]:self.stride]
longitudes[pname] = self.ds[ac[pname]['longitude']][ac[pname]['longitude']].data[tIndx[0]:tIndx[-1]:self.stride]
timeUnits[pname] = self.ds[ac[pname]['time']].units.lower()
elif shape_length == 2 and isinstance(self.ds[pname], pydap.model.GridType):
# Customized for accepting LOPC data from Dorado - has only time coordinate
# LOPC data has only time therefore we look up the closest Instantpoint and
# assign it's corresponding Measurement from other already loaded Dorado data.
ac[pname] = self.getAuxCoordinates(pname)
tIndx = self.getTimeBegEndIndices(self.ds[ac[pname]['time']])
try:
# Subselect along the time axis
logger.info("Using constraints: ds['%s']['%s'][%d:%d:%d]", pname, pname, tIndx[0], tIndx[-1], self.stride)
v = self.ds[pname][pname].data[tIndx[0]:tIndx[-1]:self.stride]
except ValueError as err:
logger.error('\nGot error "%s" reading data from URL: %s.\n'
'If it is: "string size must be a multiple of element size"'
' and the URL is a TDS aggregation then the cache files must'
' be removed and the tomcat hosting TDS restarted.', err, self.url)
sys.exit(1)
except pydap.exceptions.ServerError as e:
logger.error('%s', e)
continue
# The STOQS MeasureParameter dataarray - v is a list of Arrays
data[pname] = iter(v) # Iterator on time axis delivering all arrays in an array with .next()
times[pname] = self.ds[ac[pname]['time']].data[tIndx[0]:tIndx[-1]:self.stride]
timeUnits[pname] = self.ds[ac[pname]['time']].units.lower()
# Add LOPC's bin array to the Parameter's domain
# TODO: save its attributes as Resources
self.getParameterByName(pname).domain = list(self.ds['bin'][:])
self.getParameterByName(pname).save(using=self.dbAlias)
measurements[pname] = []
for udtime in times[pname]:
tv = from_udunits(udtime, timeUnits[pname])
try:
ip, secs_diff = get_closest_instantpoint(self.associatedActivityName, tv, self.dbAlias)
max_secs_diff = 2
if secs_diff > max_secs_diff:
logger.warn('LOPC data at %s more than %s secs different from existing measurement: %s',
tv, max_secs_diff, secs_diff)
except ClosestTimeNotFoundException as e:
logger.error('Could not find corresponding measurment for LOPC data measured at %s', tv)
continue
else:
measurements[pname].append(m.Measurement.objects.using(self.dbAlias).get(instantpoint=ip))
else:
logger.warn('Variable %s is not of type pydap.model.GridType with a '
'shape length of 1 or 2 (for LOPC/LISST-100 data). It '
'is type %s with shape length = %d.',
pname, type(self.ds[pname]), shape_length)
# Deliver the data harmonized as rows as an iterator so that they are fed as needed to the database
for pname in data.keys():
logger.debug('Delivering rows of data for %s', pname)
l = 0
values = {}
for dv in data[pname]:
values[pname] = dv
values['time'] = times[pname][l]