forked from pytroll/satpy
/
yaml_reader.py
1387 lines (1146 loc) · 56.3 KB
/
yaml_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2016-2019 Satpy developers
#
# This file is part of satpy.
#
# satpy is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# satpy is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# satpy. If not, see <http://www.gnu.org/licenses/>.
"""Base classes and utilities for all readers configured by YAML files."""
import glob
import itertools
import logging
import os
import warnings
from abc import ABCMeta, abstractmethod
from collections import OrderedDict, deque
from contextlib import suppress
from fnmatch import fnmatch
from weakref import WeakValueDictionary
import numpy as np
import xarray as xr
import yaml
try:
from yaml import UnsafeLoader
except ImportError:
from yaml import Loader as UnsafeLoader # type: ignore
from pyresample.boundary import AreaDefBoundary, Boundary
from pyresample.geometry import AreaDefinition, StackedAreaDefinition, SwathDefinition
from trollsift.parser import globify, parse
from satpy import DatasetDict
from satpy.aux_download import DataDownloadMixin
from satpy.dataset import DataID, DataQuery, get_key
from satpy.dataset.dataid import default_co_keys_config, default_id_keys_config, get_keys_from_config
from satpy.resample import add_crs_xy_coords, get_area_def
from satpy.utils import recursive_dict_update
logger = logging.getLogger(__name__)
def listify_string(something):
"""Take *something* and make it a list.
*something* is either a list of strings or a string, in which case the
function returns a list containing the string.
If *something* is None, an empty list is returned.
"""
if isinstance(something, str):
return [something]
if something is not None:
return list(something)
return list()
def _get_filebase(path, pattern):
"""Get the end of *path* of same length as *pattern*."""
# convert any `/` on Windows to `\\`
path = os.path.normpath(path)
# A pattern can include directories
tail_len = len(pattern.split(os.path.sep))
return os.path.join(*str(path).split(os.path.sep)[-tail_len:])
def _match_filenames(filenames, pattern):
"""Get the filenames matching *pattern*."""
matching = set()
glob_pat = globify(pattern)
for filename in filenames:
if fnmatch(_get_filebase(filename, pattern), glob_pat):
matching.add(filename)
return matching
def _verify_reader_info_assign_config_files(config, config_files):
try:
reader_info = config['reader']
except KeyError:
raise KeyError(
"Malformed config file {}: missing reader 'reader'".format(
config_files))
else:
reader_info['config_files'] = config_files
def load_yaml_configs(*config_files, loader=UnsafeLoader):
"""Merge a series of YAML reader configuration files.
Args:
*config_files (str): One or more pathnames
to YAML-based reader configuration files that will be merged
to create a single configuration.
loader: Yaml loader object to load the YAML with. Defaults to
`UnsafeLoader`.
Returns: dict
Dictionary representing the entire YAML configuration with the
addition of `config['reader']['config_files']` (the list of
YAML pathnames that were merged).
"""
config = {}
logger.debug('Reading %s', str(config_files))
for config_file in config_files:
with open(config_file, 'r', encoding='utf-8') as fd:
config = recursive_dict_update(config, yaml.load(fd, Loader=loader))
_verify_reader_info_assign_config_files(config, config_files)
return config
class AbstractYAMLReader(metaclass=ABCMeta):
"""Base class for all readers that use YAML configuration files.
This class should only be used in rare cases. Its child class
`FileYAMLReader` should be used in most cases.
"""
def __init__(self, config_dict):
"""Load information from YAML configuration file about how to read data files."""
if isinstance(config_dict, str):
raise ValueError("Passing config files to create a Reader is "
"deprecated. Use ReaderClass.from_config_files "
"instead.")
self.config = config_dict
self.info = self.config['reader']
self.name = self.info['name']
self.file_patterns = []
for file_type, filetype_info in self.config['file_types'].items():
filetype_info.setdefault('file_type', file_type)
# correct separator if needed
file_patterns = [os.path.join(*pattern.split('/'))
for pattern in filetype_info['file_patterns']]
filetype_info['file_patterns'] = file_patterns
self.file_patterns.extend(file_patterns)
if 'sensors' in self.info and not isinstance(self.info['sensors'], (list, tuple)):
self.info['sensors'] = [self.info['sensors']]
self.datasets = self.config.get('datasets', {})
self._id_keys = self.info.get('data_identification_keys', default_id_keys_config)
self._co_keys = self.info.get('coord_identification_keys', default_co_keys_config)
self.info['filenames'] = []
self.all_ids = {}
self.load_ds_ids_from_config()
@classmethod
def from_config_files(cls, *config_files, **reader_kwargs):
"""Create a reader instance from one or more YAML configuration files."""
config_dict = load_yaml_configs(*config_files)
return config_dict['reader']['reader'](config_dict, **reader_kwargs)
@property
def sensor_names(self):
"""Names of sensors whose data is being loaded by this reader."""
return self.info['sensors'] or []
@property
def all_dataset_ids(self):
"""Get DataIDs of all datasets known to this reader."""
return self.all_ids.keys()
@property
def all_dataset_names(self):
"""Get names of all datasets known to this reader."""
# remove the duplicates from various calibration and resolutions
return set(ds_id['name'] for ds_id in self.all_dataset_ids)
@property
def available_dataset_ids(self):
"""Get DataIDs that are loadable by this reader."""
logger.warning(
"Available datasets are unknown, returning all datasets...")
return self.all_dataset_ids
@property
def available_dataset_names(self):
"""Get names of datasets that are loadable by this reader."""
return (ds_id['name'] for ds_id in self.available_dataset_ids)
@property
@abstractmethod
def start_time(self):
"""Start time of the reader."""
@property
@abstractmethod
def end_time(self):
"""End time of the reader."""
@abstractmethod
def filter_selected_filenames(self, filenames):
"""Filter provided filenames by parameters in reader configuration.
Returns: iterable of usable files
"""
@abstractmethod
def load(self, dataset_keys):
"""Load *dataset_keys*."""
def supports_sensor(self, sensor):
"""Check if *sensor* is supported.
Returns True is *sensor* is None.
"""
if sensor and not (set(self.info.get("sensors")) &
set(listify_string(sensor))):
return False
return True
def select_files_from_directory(
self, directory=None, fs=None):
"""Find files for this reader in *directory*.
If directory is None or '', look in the current directory.
Searches the local file system by default. Can search on a remote
filesystem by passing an instance of a suitable implementation of
``fsspec.spec.AbstractFileSystem``.
Args:
directory (Optional[str]): Path to search.
fs (Optional[FileSystem]): fsspec FileSystem implementation to use.
Defaults to None, using local file
system.
Returns:
list of strings describing matching files
"""
filenames = set()
if directory is None:
directory = ''
# all the glob patterns that we are going to look at
all_globs = {os.path.join(directory, globify(pattern))
for pattern in self.file_patterns}
# custom filesystem or not
if fs is None:
matcher = glob.iglob
else:
matcher = fs.glob
# get all files matching these patterns
for glob_pat in all_globs:
filenames.update(matcher(glob_pat))
return filenames
def select_files_from_pathnames(self, filenames):
"""Select the files from *filenames* this reader can handle."""
selected_filenames = []
filenames = set(filenames) # make a copy of the inputs
for pattern in self.file_patterns:
matching = _match_filenames(filenames, pattern)
filenames -= matching
for fname in matching:
if fname not in selected_filenames:
selected_filenames.append(fname)
if len(selected_filenames) == 0:
logger.warning("No filenames found for reader: %s", self.name)
return selected_filenames
def get_dataset_key(self, key, **kwargs):
"""Get the fully qualified `DataID` matching `key`.
See `satpy.readers.get_key` for more information about kwargs.
"""
return get_key(key, self.all_ids.keys(), **kwargs)
def load_ds_ids_from_config(self):
"""Get the dataset ids from the config."""
ids = []
for dataset in self.datasets.values():
# xarray doesn't like concatenating attributes that are lists
# https://github.com/pydata/xarray/issues/2060
if 'coordinates' in dataset and \
isinstance(dataset['coordinates'], list):
dataset['coordinates'] = tuple(dataset['coordinates'])
id_keys = get_keys_from_config(self._id_keys, dataset)
# Build each permutation/product of the dataset
id_kwargs = self._build_id_permutations(dataset, id_keys)
for id_params in itertools.product(*id_kwargs):
dsid = DataID(id_keys, **dict(zip(id_keys, id_params)))
ids.append(dsid)
# create dataset infos specifically for this permutation
ds_info = dataset.copy()
for key in dsid.keys():
if isinstance(ds_info.get(key), dict):
with suppress(KeyError):
# KeyError is suppressed in case the key does not represent interesting metadata,
# eg a custom type
ds_info.update(ds_info[key][dsid.get(key)])
# this is important for wavelength which was converted
# to a tuple
ds_info[key] = dsid.get(key)
self.all_ids[dsid] = ds_info
return ids
def _build_id_permutations(self, dataset, id_keys):
"""Build each permutation/product of the dataset."""
id_kwargs = []
for key, idval in id_keys.items():
val = dataset.get(key, idval.get('default') if idval is not None else None)
val_type = None
if idval is not None:
val_type = idval.get('type')
if val_type is not None and issubclass(val_type, tuple):
# special case: wavelength can be [min, nominal, max]
# but is still considered 1 option
id_kwargs.append((val,))
elif isinstance(val, (list, tuple, set)):
# this key has multiple choices
# (ex. 250 meter, 500 meter, 1000 meter resolutions)
id_kwargs.append(val)
elif isinstance(val, dict):
id_kwargs.append(val.keys())
else:
# this key only has one choice so make it a one
# item iterable
id_kwargs.append((val,))
return id_kwargs
class FileYAMLReader(AbstractYAMLReader, DataDownloadMixin):
"""Primary reader base class that is configured by a YAML file.
This class uses the idea of per-file "file handler" objects to read file
contents and determine what is available in the file. This differs from
the base :class:`AbstractYAMLReader` which does not depend on individual
file handler objects. In almost all cases this class should be used over
its base class and can be used as a reader by itself and requires no
subclassing.
"""
def __init__(self,
config_dict,
filter_parameters=None,
filter_filenames=True,
**kwargs):
"""Set up initial internal storage for loading file data."""
super(FileYAMLReader, self).__init__(config_dict)
self.file_handlers = {}
self.available_ids = {}
self.filter_filenames = self.info.get('filter_filenames', filter_filenames)
self.filter_parameters = filter_parameters or {}
self.coords_cache = WeakValueDictionary()
self.register_data_files()
@property
def sensor_names(self):
"""Names of sensors whose data is being loaded by this reader."""
if not self.file_handlers:
return self.info['sensors']
file_handlers = (handlers[0] for handlers in
self.file_handlers.values())
sensor_names = set()
for fh in file_handlers:
try:
sensor_names.update(fh.sensor_names)
except NotImplementedError:
continue
if not sensor_names:
return self.info['sensors']
return sorted(sensor_names)
@property
def available_dataset_ids(self):
"""Get DataIDs that are loadable by this reader."""
return self.available_ids.keys()
@property
def start_time(self):
"""Start time of the earlier file used by this reader."""
if not self.file_handlers:
raise RuntimeError("Start time unknown until files are selected")
return min(x[0].start_time for x in self.file_handlers.values())
@property
def end_time(self):
"""End time of the latest file used by this reader."""
if not self.file_handlers:
raise RuntimeError("End time unknown until files are selected")
return max(x[-1].end_time for x in self.file_handlers.values())
@staticmethod
def check_file_covers_area(file_handler, check_area):
"""Check if the file covers the current area.
If the file doesn't provide any bounding box information or 'area'
was not provided in `filter_parameters`, the check returns True.
"""
try:
gbb = Boundary(*file_handler.get_bounding_box())
except NotImplementedError as err:
logger.debug("Bounding box computation not implemented: %s",
str(err))
else:
abb = AreaDefBoundary(get_area_def(check_area), frequency=1000)
intersection = gbb.contour_poly.intersection(abb.contour_poly)
if not intersection:
return False
return True
def find_required_filehandlers(self, requirements, filename_info):
"""Find the necessary file handlers for the given requirements.
We assume here requirements are available.
Raises:
KeyError, if no handler for the given requirements is available.
RuntimeError, if there is a handler for the given requirements,
but it doesn't match the filename info.
"""
req_fh = []
filename_info = set(filename_info.items())
if requirements:
for requirement in requirements:
for fhd in self.file_handlers[requirement]:
if set(fhd.filename_info.items()).issubset(filename_info):
req_fh.append(fhd)
break
else:
raise RuntimeError("No matching requirement file of type "
"{}".format(requirement))
# break everything and continue to next
# filetype!
return req_fh
def sorted_filetype_items(self):
"""Sort the instance's filetypes in using order."""
processed_types = []
file_type_items = deque(self.config['file_types'].items())
while len(file_type_items):
filetype, filetype_info = file_type_items.popleft()
requirements = filetype_info.get('requires')
if requirements is not None:
# requirements have not been processed yet -> wait
missing = [req for req in requirements
if req not in processed_types]
if missing:
file_type_items.append((filetype, filetype_info))
continue
processed_types.append(filetype)
yield filetype, filetype_info
@staticmethod
def filename_items_for_filetype(filenames, filetype_info):
"""Iterate over the filenames matching *filetype_info*."""
if not isinstance(filenames, set):
# we perform set operations later on to improve performance
filenames = set(filenames)
for pattern in filetype_info['file_patterns']:
matched_files = set()
matches = _match_filenames(filenames, pattern)
for filename in matches:
try:
filename_info = parse(
pattern, _get_filebase(filename, pattern))
except ValueError:
logger.debug("Can't parse %s with %s.", filename, pattern)
continue
matched_files.add(filename)
yield filename, filename_info
filenames -= matched_files
def _new_filehandler_instances(self, filetype_info, filename_items, fh_kwargs=None):
"""Generate new filehandler instances."""
requirements = filetype_info.get('requires')
filetype_cls = filetype_info['file_reader']
if fh_kwargs is None:
fh_kwargs = {}
for filename, filename_info in filename_items:
try:
req_fh = self.find_required_filehandlers(requirements,
filename_info)
except KeyError as req:
msg = "No handler for reading requirement {} for {}".format(
req, filename)
warnings.warn(msg)
continue
except RuntimeError as err:
warnings.warn(str(err) + ' for {}'.format(filename))
continue
yield filetype_cls(filename, filename_info, filetype_info, *req_fh, **fh_kwargs)
def time_matches(self, fstart, fend):
"""Check that a file's start and end time mtach filter_parameters of this reader."""
start_time = self.filter_parameters.get('start_time')
end_time = self.filter_parameters.get('end_time')
fend = fend or fstart
if start_time and fend and fend < start_time:
return False
if end_time and fstart and fstart > end_time:
return False
return True
def metadata_matches(self, sample_dict, file_handler=None):
"""Check that file metadata matches filter_parameters of this reader."""
# special handling of start/end times
if not self.time_matches(
sample_dict.get('start_time'), sample_dict.get('end_time')):
return False
for key, val in self.filter_parameters.items():
if key != 'area' and key not in sample_dict:
continue
if key in ['start_time', 'end_time']:
continue
elif key == 'area' and file_handler:
if not self.check_file_covers_area(file_handler, val):
logger.info('Filtering out %s based on area',
file_handler.filename)
break
elif key in sample_dict and val != sample_dict[key]:
# don't use this file
break
else:
# all the metadata keys are equal
return True
return False
def filter_filenames_by_info(self, filename_items):
"""Filter out file using metadata from the filenames.
Currently only uses start and end time. If only start time is available
from the filename, keep all the filename that have a start time before
the requested end time.
"""
for filename, filename_info in filename_items:
fend = filename_info.get('end_time')
fstart = filename_info.setdefault('start_time', fend)
if fend and fend < fstart:
# correct for filenames with 1 date and 2 times
fend = fend.replace(year=fstart.year,
month=fstart.month,
day=fstart.day)
filename_info['end_time'] = fend
if self.metadata_matches(filename_info):
yield filename, filename_info
def filter_fh_by_metadata(self, filehandlers):
"""Filter out filehandlers using provide filter parameters."""
for filehandler in filehandlers:
filehandler.metadata['start_time'] = filehandler.start_time
filehandler.metadata['end_time'] = filehandler.end_time
if self.metadata_matches(filehandler.metadata, filehandler):
yield filehandler
def filter_selected_filenames(self, filenames):
"""Filter provided files based on metadata in the filename."""
if not isinstance(filenames, set):
# we perform set operations later on to improve performance
filenames = set(filenames)
for _, filetype_info in self.sorted_filetype_items():
filename_iter = self.filename_items_for_filetype(filenames,
filetype_info)
if self.filter_filenames:
filename_iter = self.filter_filenames_by_info(filename_iter)
for fn, _ in filename_iter:
yield fn
def _new_filehandlers_for_filetype(self, filetype_info, filenames, fh_kwargs=None):
"""Create filehandlers for a given filetype."""
filename_iter = self.filename_items_for_filetype(filenames,
filetype_info)
if self.filter_filenames:
# preliminary filter of filenames based on start/end time
# to reduce the number of files to open
filename_iter = self.filter_filenames_by_info(filename_iter)
filehandler_iter = self._new_filehandler_instances(filetype_info,
filename_iter,
fh_kwargs=fh_kwargs)
filtered_iter = self.filter_fh_by_metadata(filehandler_iter)
return list(filtered_iter)
def create_filehandlers(self, filenames, fh_kwargs=None):
"""Organize the filenames into file types and create file handlers."""
filenames = list(OrderedDict.fromkeys(filenames))
logger.debug("Assigning to %s: %s", self.info['name'], filenames)
self.info.setdefault('filenames', []).extend(filenames)
filename_set = set(filenames)
created_fhs = {}
# load files that we know about by creating the file handlers
for filetype, filetype_info in self.sorted_filetype_items():
filehandlers = self._new_filehandlers_for_filetype(filetype_info,
filename_set,
fh_kwargs=fh_kwargs)
if filehandlers:
created_fhs[filetype] = filehandlers
self.file_handlers[filetype] = sorted(
self.file_handlers.get(filetype, []) + filehandlers,
key=lambda fhd: (fhd.start_time, fhd.filename))
# load any additional dataset IDs determined dynamically from the file
# and update any missing metadata that only the file knows
self.update_ds_ids_from_file_handlers()
return created_fhs
def _file_handlers_available_datasets(self):
"""Generate a series of available dataset information.
This is done by chaining file handler's
:meth:`satpy.readers.file_handlers.BaseFileHandler.available_datasets`
together. See that method's documentation for more information.
Returns:
Generator of (bool, dict) where the boolean tells whether the
current dataset is available from any of the file handlers. The
boolean can also be None in the case where no loaded file handler
is configured to load the dataset. The
dictionary is the metadata provided either by the YAML
configuration files or by the file handler itself if it is a new
dataset. The file handler may have also supplemented or modified
the information.
"""
# flatten all file handlers in to one list
flat_fhs = (fh for fhs in self.file_handlers.values() for fh in fhs)
id_values = list(self.all_ids.values())
configured_datasets = ((None, ds_info) for ds_info in id_values)
for fh in flat_fhs:
# chain the 'available_datasets' methods together by calling the
# current file handler's method with the previous ones result
configured_datasets = fh.available_datasets(configured_datasets=configured_datasets)
return configured_datasets
def update_ds_ids_from_file_handlers(self):
"""Add or modify available dataset information.
Each file handler is consulted on whether or not it can load the
dataset with the provided information dictionary.
See
:meth:`satpy.readers.file_handlers.BaseFileHandler.available_datasets`
for more information.
"""
avail_datasets = self._file_handlers_available_datasets()
new_ids = {}
for is_avail, ds_info in avail_datasets:
# especially from the yaml config
coordinates = ds_info.get('coordinates')
if isinstance(coordinates, list):
# xarray doesn't like concatenating attributes that are
# lists: https://github.com/pydata/xarray/issues/2060
ds_info['coordinates'] = tuple(ds_info['coordinates'])
ds_info.setdefault('modifiers', tuple()) # default to no mods
# Create DataID for this dataset
ds_id = DataID(self._id_keys, **ds_info)
# all datasets
new_ids[ds_id] = ds_info
# available datasets
# False == we have the file type but it doesn't have this dataset
# None == we don't have the file type object to ask
if is_avail:
self.available_ids[ds_id] = ds_info
self.all_ids = new_ids
@staticmethod
def _load_dataset(dsid, ds_info, file_handlers, dim='y', **kwargs):
"""Load only a piece of the dataset."""
slice_list = []
failure = True
for fh in file_handlers:
try:
projectable = fh.get_dataset(dsid, ds_info)
if projectable is not None:
slice_list.append(projectable)
failure = False
except KeyError:
logger.warning("Failed to load {} from {}".format(dsid, fh),
exc_info=True)
if failure:
raise KeyError(
"Could not load {} from any provided files".format(dsid))
if dim not in slice_list[0].dims:
return slice_list[0]
res = xr.concat(slice_list, dim=dim)
combined_info = file_handlers[0].combine_info(
[p.attrs for p in slice_list])
res.attrs = combined_info
return res
def _load_dataset_data(self, file_handlers, dsid, **kwargs):
ds_info = self.all_ids[dsid]
proj = self._load_dataset(dsid, ds_info, file_handlers, **kwargs)
# FIXME: areas could be concatenated here
# Update the metadata
proj.attrs['start_time'] = file_handlers[0].start_time
proj.attrs['end_time'] = file_handlers[-1].end_time
proj.attrs['reader'] = self.name
return proj
def _preferred_filetype(self, filetypes):
"""Get the preferred filetype out of the *filetypes* list.
At the moment, it just returns the first filetype that has been loaded.
"""
if not isinstance(filetypes, list):
filetypes = [filetypes]
# look through the file types and use the first one that we have loaded
for filetype in filetypes:
if filetype in self.file_handlers:
return filetype
return None
def _load_area_def(self, dsid, file_handlers, **kwargs):
"""Load the area definition of *dsid*."""
return _load_area_def(dsid, file_handlers)
def _get_coordinates_for_dataset_key(self, dsid):
"""Get the coordinate dataset keys for *dsid*."""
ds_info = self.all_ids[dsid]
cids = []
for cinfo in ds_info.get('coordinates', []):
if not isinstance(cinfo, dict):
cinfo = {'name': cinfo}
for key in self._co_keys:
if key == 'name':
continue
if key in ds_info:
if ds_info[key] is not None:
cinfo[key] = ds_info[key]
cid = DataQuery.from_dict(cinfo)
cids.append(self.get_dataset_key(cid))
return cids
def _get_coordinates_for_dataset_keys(self, dsids):
"""Get all coordinates."""
coordinates = {}
for dsid in dsids:
cids = self._get_coordinates_for_dataset_key(dsid)
coordinates.setdefault(dsid, []).extend(cids)
return coordinates
def _get_file_handlers(self, dsid):
"""Get the file handler to load this dataset."""
ds_info = self.all_ids[dsid]
filetype = self._preferred_filetype(ds_info['file_type'])
if filetype is None:
logger.warning("Required file type '%s' not found or loaded for "
"'%s'", ds_info['file_type'], dsid['name'])
else:
return self.file_handlers[filetype]
def _make_area_from_coords(self, coords):
"""Create an appropriate area with the given *coords*."""
if len(coords) == 2:
lons, lats = self._get_lons_lats_from_coords(coords)
sdef = self._make_swath_definition_from_lons_lats(lons, lats)
return sdef
if len(coords) != 0:
raise NameError("Don't know what to do with coordinates " + str(
coords))
def _get_lons_lats_from_coords(self, coords):
"""Get lons and lats from the coords list."""
lons, lats = None, None
for coord in coords:
if coord.attrs.get('standard_name') == 'longitude':
lons = coord
elif coord.attrs.get('standard_name') == 'latitude':
lats = coord
if lons is None or lats is None:
raise ValueError('Missing longitude or latitude coordinate: ' + str(coords))
return lons, lats
def _make_swath_definition_from_lons_lats(self, lons, lats):
"""Make a swath definition instance from lons and lats."""
key = None
try:
key = (lons.data.name, lats.data.name)
sdef = self.coords_cache.get(key)
except AttributeError:
sdef = None
if sdef is None:
sdef = SwathDefinition(lons, lats)
sensor_str = '_'.join(self.info['sensors'])
shape_str = '_'.join(map(str, lons.shape))
sdef.name = "{}_{}_{}_{}".format(sensor_str, shape_str,
lons.attrs.get('name', lons.name),
lats.attrs.get('name', lats.name))
if key is not None:
self.coords_cache[key] = sdef
return sdef
def _load_dataset_area(self, dsid, file_handlers, coords, **kwargs):
"""Get the area for *dsid*."""
try:
return self._load_area_def(dsid, file_handlers, **kwargs)
except NotImplementedError:
if any(x is None for x in coords):
logger.warning(
"Failed to load coordinates for '{}'".format(dsid))
return None
area = self._make_area_from_coords(coords)
if area is None:
logger.debug("No coordinates found for %s", str(dsid))
return area
def _load_dataset_with_area(self, dsid, coords, **kwargs):
"""Load *dsid* and its area if available."""
file_handlers = self._get_file_handlers(dsid)
if not file_handlers:
return
try:
ds = self._load_dataset_data(file_handlers, dsid, **kwargs)
except (KeyError, ValueError) as err:
logger.exception("Could not load dataset '%s': %s", dsid, str(err))
return None
coords = self._assign_coords_from_dataarray(coords, ds)
area = self._load_dataset_area(dsid, file_handlers, coords, **kwargs)
if area is not None:
ds.attrs['area'] = area
ds = add_crs_xy_coords(ds, area)
return ds
@staticmethod
def _assign_coords_from_dataarray(coords, ds):
"""Assign coords from the *ds* dataarray if needed."""
if not coords:
coords = []
for coord in ds.coords.values():
if coord.attrs.get('standard_name') in ['longitude', 'latitude']:
coords.append(coord)
return coords
def _load_ancillary_variables(self, datasets, **kwargs):
"""Load the ancillary variables of `datasets`."""
all_av_ids = self._gather_ancillary_variables_ids(datasets)
loadable_av_ids = [av_id for av_id in all_av_ids if av_id not in datasets]
if not all_av_ids:
return
if loadable_av_ids:
self.load(loadable_av_ids, previous_datasets=datasets, **kwargs)
for dataset in datasets.values():
new_vars = []
for av_id in dataset.attrs.get('ancillary_variables', []):
if isinstance(av_id, DataID):
new_vars.append(datasets[av_id])
else:
new_vars.append(av_id)
dataset.attrs['ancillary_variables'] = new_vars
def _gather_ancillary_variables_ids(self, datasets):
"""Gather ancillary variables' ids.
This adds/modifies the dataset's `ancillary_variables` attr.
"""
all_av_ids = set()
for dataset in datasets.values():
ancillary_variables = dataset.attrs.get('ancillary_variables', [])
if not isinstance(ancillary_variables, (list, tuple, set)):
ancillary_variables = ancillary_variables.split(' ')
av_ids = []
for key in ancillary_variables:
try:
av_ids.append(self.get_dataset_key(key))
except KeyError:
logger.warning("Can't load ancillary dataset %s", str(key))
all_av_ids |= set(av_ids)
dataset.attrs['ancillary_variables'] = av_ids
return all_av_ids
def get_dataset_key(self, key, available_only=False, **kwargs):
"""Get the fully qualified `DataID` matching `key`.
This will first search through available DataIDs, datasets that
should be possible to load, and fallback to "known" datasets, those
that are configured but aren't loadable from the provided files.
Providing ``available_only=True`` will stop this fallback behavior
and raise a ``KeyError`` exception if no available dataset is found.
Args:
key (str, float, DataID, DataQuery): Key to search for in this reader.
available_only (bool): Search only loadable datasets for the
provided key. Loadable datasets are always searched first,
but if ``available_only=False`` (default) then all known
datasets will be searched.
kwargs: See :func:`satpy.readers.get_key` for more information about
kwargs.
Returns:
Best matching DataID to the provided ``key``.
Raises:
KeyError: if no key match is found.
"""
try:
return get_key(key, self.available_dataset_ids, **kwargs)
except KeyError:
if available_only:
raise
return get_key(key, self.all_dataset_ids, **kwargs)
def load(self, dataset_keys, previous_datasets=None, **kwargs):
"""Load `dataset_keys`.
If `previous_datasets` is provided, do not reload those.
"""
all_datasets = previous_datasets or DatasetDict()
datasets = DatasetDict()
# Include coordinates in the list of datasets to load
dsids = [self.get_dataset_key(ds_key) for ds_key in dataset_keys]
coordinates = self._get_coordinates_for_dataset_keys(dsids)
all_dsids = list(set().union(*coordinates.values())) + dsids
for dsid in all_dsids:
if dsid in all_datasets:
continue
coords = [all_datasets.get(cid, None)
for cid in coordinates.get(dsid, [])]
ds = self._load_dataset_with_area(dsid, coords, **kwargs)
if ds is not None:
all_datasets[dsid] = ds
if dsid in dsids:
datasets[dsid] = ds
self._load_ancillary_variables(all_datasets, **kwargs)
return datasets
def _load_area_def(dsid, file_handlers):
"""Load the area definition of *dsid*."""
area_defs = [fh.get_area_def(dsid) for fh in file_handlers]
area_defs = [area_def for area_def in area_defs
if area_def is not None]
final_area = StackedAreaDefinition(*area_defs)
return final_area.squeeze()
def _set_orientation(dataset, upper_right_corner):
"""Set the orientation of geostationary datasets.
Allows to flip geostationary imagery when loading the datasets.
Example call: scn.load(['VIS008'], upper_right_corner='NE')
Args:
dataset: Dataset to be flipped.
upper_right_corner (str): Direction of the upper right corner of the image after flipping.
Possible options are 'NW', 'NE', 'SW', 'SE', or 'native'.
The common upright image orientation corresponds to 'NE'.
Defaults to 'native' (no flipping is applied).
"""
# do some checks and early returns
if upper_right_corner == 'native':
logger.debug("Requested orientation for Dataset {} is 'native' (default). "
"No flipping is applied.".format(dataset.attrs.get('name')))
return dataset
if upper_right_corner not in ['NW', 'NE', 'SE', 'SW', 'native']: