Skip to content
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
370 lines (275 sloc) 12 KB
import os
import glob
from collections import OrderedDict
import warnings
import numpy as np
from astropy.wcs import WCS
import sunpy
from import GenericMap, MapMetaValidationError
from import CompositeMap
from import MapSequence
from import read_file
from import FileHeader
from import download_file
from sunpy.util import expand_list
from sunpy.util.metadata import MetaDict
from sunpy.util.config import get_and_create_download_dir
from sunpy.util.exceptions import SunpyDeprecationWarning
from sunpy.util.datatype_factory_base import BasicRegistrationFactory
from sunpy.util.datatype_factory_base import NoMatchError
from sunpy.util.datatype_factory_base import MultipleMatchError
from sunpy.util.datatype_factory_base import ValidationFunctionError
from urllib.request import urlopen
import dask.array
SUPPORTED_ARRAY_TYPES += (dask.array.Array,)
except ImportError:
__authors__ = ["Russell Hewett, Stuart Mumford"]
__email__ = ""
# Make a mock DatabaseEntry class if sqlalchemy is not installed
from sunpy.database.tables import DatabaseEntry
except ImportError:
class DatabaseEntry(object):
__all__ = ['Map', 'MapFactory']
class MapFactory(BasicRegistrationFactory):
Map(\\*args, \\*\\*kwargs)
Map factory class. Used to create a variety of Map objects. Valid map types
are specified by registering them with the factory.
>>> import
>>> from import fits
>>> import # doctest: +REMOTE_DATA
>>> mymap = # doctest: +REMOTE_DATA
The SunPy Map factory accepts a wide variety of inputs for creating maps
* Preloaded tuples of (data, header) pairs
>>> mymap =, header)) # doctest: +SKIP
headers are some base of `dict` or `collections.OrderedDict`, including
`` or `sunpy.util.metadata.MetaDict` classes.
* data, header pairs, not in tuples
>>> mymap =, header) # doctest: +SKIP
* data, wcs object, in tuple
>>> from astropy.wcs import WCS
>>> wcs = WCS( # doctest: +REMOTE_DATA
>>> data = fits.getdata( # doctest: +REMOTE_DATA
>>> mymap =, wcs)) # doctest: +REMOTE_DATA
* data, wcs object, not in tuple
>>> from astropy.wcs import WCS
>>> wcs = WCS( # doctest: +REMOTE_DATA
>>> data = fits.getdata( # doctest: +REMOTE_DATA
>>> mymap =, wcs) # doctest: +REMOTE_DATA
* File names
>>> mymap ='file1.fits') # doctest: +SKIP
* All fits files in a directory by giving a directory
>>> mymap ='local_dir/sub_dir') # doctest: +SKIP
* Some regex globs
>>> mymap ='eit_*.fits') # doctest: +SKIP
* URLs
>>> mymap = # doctest: +SKIP
* DatabaseEntry
>>> mymap = # doctest: +SKIP
* Lists of any of the above
>>> mymap =['file1.fits', 'file2.fits', 'file3.fits', 'directory1/']) # doctest: +SKIP
* Any mixture of the above not in a list
>>> mymap =, header), data2, header2, 'file1.fits', url_str, 'eit_*.fits')) # doctest: +SKIP
""" # noqa
def _read_file(self, fname, **kwargs):
""" Read in a file name and return the list of (data, meta) pairs in
that file. """
# File gets read here. This needs to be generic enough to seamlessly
# call a fits file or a jpeg2k file, etc
pairs = read_file(fname, **kwargs)
new_pairs = []
for pair in pairs:
filedata, filemeta = pair
assert isinstance(filemeta, FileHeader)
# This tests that the data is more than 1D
if len(np.shape(filedata)) > 1:
data = filedata
meta = MetaDict(filemeta)
new_pairs.append((data, meta))
return new_pairs
def _validate_meta(self, meta):
Validate a meta argument.
if isinstance(meta,
return True
elif isinstance(meta, dict):
return True
return False
def _parse_args(self, *args, **kwargs):
Parses an args list for data-header pairs. args can contain any
mixture of the following entries:
* tuples of data,header
* data, header not in a tuple
* data, wcs object in a tuple
* data, wcs object not in a tuple
* filename, which will be read
* directory, from which all files will be read
* glob, from which all files will be read
* url, which will be downloaded and read
* lists containing any of the above.
self._parse_args(data, header,
(data, header),
['file1', 'file2', 'file3'],
data_header_pairs = list()
already_maps = list()
# Account for nested lists of items
args = expand_list(args)
# For each of the arguments, handle each of the cases
i = 0
while i < len(args):
arg = args[i]
# Data-header or data-WCS pair
if isinstance(arg, SUPPORTED_ARRAY_TYPES):
arg_header = args[i+1]
if isinstance(arg_header, WCS):
arg_header = args[i+1].to_header()
if self._validate_meta(arg_header):
pair = (args[i], OrderedDict(arg_header))
i += 1 # an extra increment to account for the data-header pairing
# File name
elif (isinstance(arg, str) and
path = os.path.expanduser(arg)
pairs = self._read_file(path, **kwargs)
data_header_pairs += pairs
# Directory
elif (isinstance(arg, str) and
path = os.path.expanduser(arg)
files = [os.path.join(path, elem) for elem in os.listdir(path)]
for afile in files:
data_header_pairs += self._read_file(afile, **kwargs)
# Glob
elif (isinstance(arg, str) and '*' in arg):
files = glob.glob(os.path.expanduser(arg))
for afile in files:
data_header_pairs += self._read_file(afile, **kwargs)
# Already a Map
elif isinstance(arg, GenericMap):
elif (isinstance(arg, str) and
url = arg
path = download_file(url, get_and_create_download_dir())
pairs = self._read_file(path, **kwargs)
data_header_pairs += pairs
# A database Entry
elif isinstance(arg, DatabaseEntry):
data_header_pairs += self._read_file(arg.path, **kwargs)
raise ValueError("File not found or invalid input")
i += 1
# In the end, if there are already maps it should be put in the same
# order as the input, currently they are not.
return data_header_pairs, already_maps
def __call__(self, *args, composite=False, sequence=False, silence_errors=False, **kwargs):
""" Method for running the factory. Takes arbitrary arguments and
keyword arguments and passes them to a sequence of pre-registered types
to determine which is the correct Map-type to build.
Arguments args and kwargs are passed through to the validation
function and to the constructor for the final type. For Map types,
validation function must take a data-header pair as an argument.
composite : `bool`, optional
Indicates if collection of maps should be returned as a ``.
Default is `False`.
sequence : `bool`, optional
Indicates if collection of maps should be returned as a ``.
Default is `False`.
silence_errors : `bool`, optional
If set, ignore data-header pairs which cause an exception.
Default is ``False``.
Extra keyword arguments are passed through to `` such
as `memmap` for FITS files.
data_header_pairs, already_maps = self._parse_args(*args, **kwargs)
new_maps = list()
# Loop over each registered type and check to see if WidgetType
# matches the arguments. If it does, use that type.
for pair in data_header_pairs:
data, header = pair
meta = MetaDict(header)
new_map = self._check_registered_widgets(data, meta, **kwargs)
except (NoMatchError, MultipleMatchError, ValidationFunctionError):
if not silence_errors:
except MapMetaValidationError as e:
warnings.warn(f"One of the data, header pairs failed to validate with: {e}")
except Exception:
new_maps += already_maps
if not len(new_maps):
raise RuntimeError('No maps loaded')
# If the list is meant to be a sequence, instantiate a map sequence
if sequence:
return MapSequence(new_maps, **kwargs)
# If the list is meant to be a composite map, instantiate one
if composite:
return CompositeMap(new_maps, **kwargs)
if len(new_maps) == 1:
return new_maps[0]
return new_maps
def _check_registered_widgets(self, data, meta, **kwargs):
candidate_widget_types = list()
for key in self.registry:
# Call the registered validation function for each registered class
if self.registry[key](data, meta, **kwargs):
n_matches = len(candidate_widget_types)
if n_matches == 0:
if self.default_widget_type is None:
raise NoMatchError("No types match specified arguments and no default is set.")
candidate_widget_types = [self.default_widget_type]
elif n_matches > 1:
raise MultipleMatchError("Too many candidate types identified ({0})."
"Specify enough keywords to guarantee unique type"
# Only one is found
WidgetType = candidate_widget_types[0]
return WidgetType(data, meta, **kwargs)
def _is_url(arg):
except Exception:
return False
return True
class InvalidMapInput(ValueError):
"""Exception to raise when input variable is not a Map instance and does
not point to a valid Map input file."""
class InvalidMapType(ValueError):
"""Exception to raise when an invalid type of map is requested with Map
class NoMapsFound(ValueError):
"""Exception to raise when input does not point to any valid maps or files
Map = MapFactory(registry=GenericMap._registry, default_widget_type=GenericMap,
You can’t perform that action at this time.