In [10]:
import os
import re
import datetime as dt
from collections import namedtuple
from math import inf
from pathlib import Path
from tempfile import TemporaryDirectory
from xml.etree import ElementTree
from zipfile import ZipFile

from swarmclient.client import SwarmClient, SwarmFile
from swarmclient.config import ClientConfig

In [2]:
def get_orbit_data(satellite, batch_size=50, max_files=None):
    
    if satellite not in ('A', 'B', 'C'):
        raise ValueError(
            'Satellite ID must be A, B or C, not %s' % satellite
        )
    if max_files is None:
        max_files = inf

    cc = ClientConfig()
    cln = SwarmClient(cc.default_site, *cc.get_site(cc.default_site).values())

    path = f'Auxiliary/Sat_{satellite}/MPL{satellite}ORBPRE'

    files = [elm for n, elm in enumerate(cln.search(path, file_type=f'MPL{satellite}ORBPRE')) if n < max_files]
    
    with TemporaryDirectory() as tmp:
        print('downloading files to %s' % tmp)
        while len(files):
            cln.download(files[:batch_size], tmp)
            files = files[batch_size:]
        print('done')
            
    

In [3]:
try:
    get_orbit_data('A', max_files=50)
except Exception as err:
    print(err)

downloading files to /var/folders/kr/0v3bs2p53wdd98x4103pjmdm0000gn/T/tmpkbdqn1by


SW_OPER_MPLAORBPRE_20131122T131347_20131207T131347_0014.ZIP: 100%|██████████| 17.8k/17.8k [00:00<00:00, 79.8kB/s]
SW_OPER_MPLAORBPRE_20131122T131447_20131202T133247_0016.ZIP: 100%|██████████| 12.4k/12.4k [00:00<00:00, 66.5kB/s]
SW_OPER_MPLAORBPRE_20131122T131447_20131202T232900_0017.ZIP: 100%|██████████| 12.9k/12.9k [00:00<00:00, 70.8kB/s]
SW_OPER_MPLAORBPRE_20131122T131447_20131203T002900_0018.ZIP: 100%|██████████| 13.0k/13.0k [00:00<00:00, 69.8kB/s]
SW_OPER_MPLAORBPRE_20131122T131447_20131203T002900_0019.ZIP: 100%|██████████| 13.0k/13.0k [00:00<00:00, 70.1kB/s]
SW_OPER_MPLAORBPRE_20131122T131447_20131203T055900_0020.ZIP: 100%|██████████| 13.2k/13.2k [00:00<00:00, 70.0kB/s]
SW_OPER_MPLAORBPRE_20131122T131447_20131203T055900_0021.ZIP: 100%|██████████| 13.0k/13.0k [00:00<00:00, 71.7kB/s]
SW_OPER_MPLAORBPRE_20131122T131447_20140102T235900_0021.ZIP: 100%|██████████| 47.2k/47.2k [00:00<00:00, 173kB/s] 
SW_OPER_MPLAORBPRE_20131122T131447_20140102T235900_0022.ZIP: 100%|██████████| 47.2k/47.2

done





In [4]:
datadir = Path('~/Workspace/data').expanduser()

In [5]:
testfile = Path(datadir, 'SW_OPER_MPLAORBPRE_20190101T000000_20190205T000000_1866.ZIP')

In [12]:
Position = namedtuple('Position', ['x', 'y', 'z'])

In [13]:
p = Position(1, 2, 3)

In [15]:
p[0] == p.x

True

In [16]:
Velocity = namedtuple('Velocity', ['vx', 'vy', 'vz'])

In [17]:
OSV = namedtuple('OSV', ['tai', 'utc', 'ut1', 'absolute_orbit', 'position', 'velocity', 'quality'])

In [51]:
class PredictedOrbitFile:
    """Class for Swarm Predicted Orbit Files.
    :param str file_class: 4 characters File Class.
    :param str file_type: 10 characters File Type: `MPLAORBPRE`, `MPLBORBPRE`,
        `MPLCORBPRE`.
    :param str validity_start: Validity Start of the file as `yyyymmddThhmmss`
        string.
    :param str validity_stop: Validity Stop of the file as `yyyymmddThhmmss`
        string.
    :param str version: file version as 4 characters string.
    :param str suffix: file suffix: `.EEF`, `.ZIP`. If empty, the file is a
        logical file.
    :param str parent: parent directory. Default: current directory.
    :type parent: str or os.PathLike
    :raises ValueError: in case of invalid parameters.
    """

    def __init__(self, file_class, file_type, validity_start, validity_stop,
                 version, suffix='', parent=''):
        self.file_class = file_class
        self.file_type = file_type
        self.validity_start = validity_start
        self.validity_stop = validity_stop
        self.version = version
        self.suffix = suffix
        self.parent = parent

    def __eq__(self, other):
        if not isinstance(other, PredictedOrbitFile):
            raise TypeError(
                "can't compare PredictedOrbitFile with %s" %
                type(other).__name__
            )
        return (
            self.file_class == other.file_class and
            self.file_type == other.file_type and
            self.validity_start == other.validity_start and
            self.validity_stop == other.validity_stop and
            self.version == other.version and
            self.suffix == other.suffix and
            self.parent == other.parent
        )

    def __hash__(self):
        return hash(
            (
                self.mission_id,
                self.file_class,
                self.file_type,
                self.validity_start,
                self.validity_stop,
                self.version,
                self.suffix,
                self.parent
            )
        )

    def __repr__(self):
        return (
            f'<PredictedOrbitFile({self.mission_id}, {self.file_class}, '
            f'{self.file_type}, {self.validity_start}, {self.validity_stop}, '
            f'{self.version}, {self.suffix})>'
        )

    def __str__(self):
        return str(self.path)

    @property
    def mission_id(self):
        """Mission code always equal to `SW`."""
        return 'SW'

    @property
    def file_class(self):
        """File class as 4 characters string."""
        return self.__file_class

    @file_class.setter
    def file_class(self, value):
        value = str(value).upper()
        if not re.match(r'\w{4}$', value):
            raise ValueError('File Class not valid: %s' % value)
        self.__file_class = value

    @property
    def file_type(self):
        """File type as 10 characters string."""
        return self.__file_type

    @file_type.setter
    def file_type(self, value):
        value = str(value).upper()
        if value not in ('MPLAORBPRE', 'MPLBORBPRE', 'MPLCORBPRE'):
            raise ValueError('File Type not valid: %s' % value)
        self.__file_type = value

    @property
    def validity_start(self):
        """Validity start as `yyyymmddThhmmss` string."""
        return self.__validity_start

    @validity_start.setter
    def validity_start(self, value):
        value = str(value)
        if not re.match(r'\d{8}T\d{6}', value):
            raise ValueError(
                'datetime value does not match pattern yyyymmddThhmmss'
            )
        self.__validity_start = value

    @property
    def validity_stop(self):
        """Validity stop as `yyyymmddThhmmss` string."""
        return self.__validity_stop

    @validity_stop.setter
    def validity_stop(self, value):
        value = str(value)
        if not re.match(r'\d{8}T\d{6}', value):
            raise ValueError(
                'datetime value does not match pattern yyyymmddThhmmss'
            )
        self.__validity_stop = value

    @property
    def version(self):
        """File version as 4 characters string."""
        return self.__version

    @version.setter
    def version(self, value):
        value = int(value)
        if not 0 <= value <= 9999:
            raise ValueError(
                f'version is not between 0 and 9999: {value}'
            )
        self.__version = f'{value:04d}'

    @property
    def suffix(self):
        """File suffix as string."""
        return self.__suffix

    @suffix.setter
    def suffix(self, value):
        self.__suffix = str(value)

    @property
    def name(self):
        """File name as string."""
        return (
            f'{self.mission_id}_{self.file_class}_{self.file_type}_'
            f'{self.validity_start}_{self.validity_stop}_{self.version}'
            f'{self.suffix}'
        )

    @property
    def path(self):
        """File path as `pathlib.Path`."""
        return self.parent / self.name

    @property
    def satellite_id(self):
        """Satellite ID."""
        return self.file_type[2]

    @property
    def parent(self):
        """Parent directory"""
        return self.__parent

    @parent.setter
    def parent(self, value):
        self.__parent = value if isinstance(value, os.PathLike) \
            else Path(value)

    @property
    def creation_date(self):
        """Get file creation date."""
        if self.suffix.upper().endswith('ZIP'):
            zf = ZipFile(self.path)
            with zf.open(zf.namelist()[0]) as fh:
                et = ElementTree.parse(fh)
        else:
            et = ElementTree.parse(self.path)
        namespaces = {'ns': 'http://eop-cfi.esa.int/CFI'}
        return dt.datetime.strptime(
            et.findtext('.//ns:Creation_Date', namespaces=namespaces),
            'UTC=%Y-%m-%dT%H:%M:%S'
        ).replace(tzinfo=dt.timezone.utc)
    
    @property
    def orbits(self):
        """Iterator yielding orbit data."""
        if self.suffix.upper().endswith('ZIP'):
            zf = ZipFile(self.path)
            with zf.open(zf.namelist()[0]) as fh:
                et = ElementTree.parse(fh)
        else:
            et = ElementTree.parse(self.path)
        namespaces = {'ns': 'http://eop-cfi.esa.int/CFI'}
        for osv in et.iterfind('.//ns:OSV', namespaces):
            yield OSV(
                tai=dt.datetime.strptime(
                    osv.findtext('ns:TAI', namespaces=namespaces),
                    'TAI=%Y-%m-%dT%H:%M:%S.%f'
                ).replace(tzinfo=dt.timezone.utc),
                utc=dt.datetime.strptime(
                    osv.findtext('ns:UTC', namespaces=namespaces),
                    'UTC=%Y-%m-%dT%H:%M:%S.%f'
                ).replace(tzinfo=dt.timezone.utc),
                ut1=dt.datetime.strptime(
                    osv.findtext('ns:UT1', namespaces=namespaces),
                    'UT1=%Y-%m-%dT%H:%M:%S.%f'
                ).replace(tzinfo=dt.timezone.utc),
                absolute_orbit=int(
                    osv.findtext('ns:Absolute_Orbit', namespaces=namespaces)
                ),
                Position(
                    x=float(
                        osv.findtext('ns:X', namespaces=namespaces)
                    ),
                    y=float(
                        osv.findtext('ns:Y', namespaces=namespaces)
                    ),
                    z=float(
                        osv.findtext('ns:Z', namespaces=namespaces)
                    ),
                ),
                Velocity(
                    vx=float(
                        osv.findtext('ns:VX', namespaces=namespaces)
                    ),
                    vy=float(
                        osv.findtext('ns:VY', namespaces=namespaces)
                    ),
                    vz=float(
                        osv.findtext('ns:VZ', namespaces=namespaces)
                    ),
                ),
                quality=int(
                    osv.findtext('ns:Quality', namespaces=namespaces)
                )
            )

    def intersects(self, begin_date, end_date):
        """Check if the temporal validity of the Swarm file intersects the time
        window given by [`begin_date`,`end_date`].
        :param datetime.datetime begin_date: lower bound of the time window.
        :param datetime.datetime end_date: upper bound of the time window.
        :return bool: ``True`` if the temporal validity of the file intersects
            the time given time window. ``False`` otherwise.
        """
        def _tofloat(value):
            if value == '00000000T000000':
                return -inf
            elif value == '99999999T999999':
                return inf
            elif isinstance(value, str):
                return datetime.datetime.strptime(value, '%Y%m%dT%H%M%S') \
                    .replace(tzinfo=datetime.timezone.utc).timestamp()
            elif isinstance(value, datetime.date):
                if not hasattr(value, 'time'):
                    value = datetime.datetime.combine(value, datetime.time())
                return value.replace(tzinfo=datetime.timezone.utc).timestamp()
            elif isinstance(value, (int, float)):
                return value
            else:
                raise TypeError(f'type not valid for {value}')

        return (
            _tofloat(self.validity_stop) >= _tofloat(begin_date) and
            _tofloat(self.validity_start) <= _tofloat(end_date)
        )

    @classmethod
    def frompath(cls, filename):
        """Return an instance of :class:`PredictedOrbitFile` from a path-like object.
        :param filename: file name.
        :type filename: str or pathlib.PathLike
        :return: file name as a :class:`PredictedOrbitFile` object.
        :rtype: PredictedOrbitFile
        :raises ValueError: if the file is not valid.
        """
        pattern = re.compile(
            r'SW_(?P<file_class>\w{4})_(?P<file_type>MPL[ABC]ORBPRE)_'
            r'(?P<validity_start>\d{8}T\d{6})_(?P<validity_stop>\d{8}T\d{6})_'
            r'(?P<version>\d{4})(?P<suffix>[\.\w]+)'
        )
        if not isinstance(filename, os.PathLike):
            filename = Path(filename)
        match = pattern.match(filename.name)
        if match is None:
            raise ValueError(f'file not valid: {filename}')
        args = match.groupdict()
        args.update(parent=filename.parent)
        return PredictedOrbitFile(**args)

SyntaxError: positional argument follows keyword argument (<ipython-input-51-20693b3a6025>, line 219)

In [48]:
pof.creation_date

datetime.datetime(2019, 1, 1, 14, 8, 41, tzinfo=datetime.timezone.utc)

In [49]:
list(pof.orbits)

[OSV(tai=datetime.datetime(2019, 1, 1, 0, 5, 3, 630862, tzinfo=datetime.timezone.utc), utc=datetime.datetime(2019, 1, 1, 0, 4, 26, 630862, tzinfo=datetime.timezone.utc), ut1=datetime.datetime(2019, 1, 1, 0, 4, 26, 594430, tzinfo=datetime.timezone.utc), absolute_orbit=28693, position=Position(x=-4929641.913, y=-4707974.333, z=10235.203), velocity=Velocity(vx=-82.979842, vy=118.58567, vz=7640.677723), quality=0),
 OSV(tai=datetime.datetime(2019, 1, 1, 1, 38, 25, 35934, tzinfo=datetime.timezone.utc), utc=datetime.datetime(2019, 1, 1, 1, 37, 48, 35934, tzinfo=datetime.timezone.utc), ut1=datetime.datetime(2019, 1, 1, 1, 37, 47, 999445, tzinfo=datetime.timezone.utc), absolute_orbit=28694, position=Position(x=-6395082.933, y=-2359926.982, z=10236.103), velocity=Velocity(vx=-29.159589, vy=141.964413, vz=7640.700815), quality=0),
 OSV(tai=datetime.datetime(2019, 1, 1, 3, 11, 46, 401228, tzinfo=datetime.timezone.utc), utc=datetime.datetime(2019, 1, 1, 3, 11, 9, 401228, tzinfo=datetime.timezone.u