Skip to content

Commit

Permalink
Merge branch 'tickets/DM-26526'
Browse files Browse the repository at this point in the history
  • Loading branch information
kfindeisen committed Sep 9, 2020
2 parents 6fa9fb8 + 5e0639e commit 12e5eb2
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 3 deletions.
63 changes: 60 additions & 3 deletions python/lsst/obs/base/defineVisits.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@

from abc import ABCMeta, abstractmethod
from collections import defaultdict
import itertools
import dataclasses
from typing import Any, Dict, Iterable, List, Optional, Tuple
from multiprocessing import Pool

from lsst.daf.butler import (
Butler,
DataCoordinate,
DataId,
DimensionGraph,
DimensionRecord,
Expand Down Expand Up @@ -321,6 +323,22 @@ def __init__(self, config: Optional[DefineVisitsConfig] = None, *, butler: Butle
self.makeSubtask("groupExposures")
self.makeSubtask("computeVisitRegions", butler=self.butler)

@classmethod
# WARNING: this method hardcodes the parameters to pipe.base.Task.__init__.
# Nobody seems to know a way to delegate them to Task code.
def _makeTask(cls, config: DefineVisitsConfig, butler: Butler, name: str, parentTask: Task):
"""Construct a DefineVisitsTask using only positional arguments.
Parameters
----------
All parameters are as for `DefineVisitsTask`.
"""
return cls(config=config, butler=butler, name=name, parentTask=parentTask)

# Overrides Task.__reduce__
def __reduce__(self):
return (self._makeTask, (self.config, self.butler, self._name, self._parentTask))

ConfigClass = DefineVisitsConfig

_DefaultName = "defineVisits"
Expand Down Expand Up @@ -405,6 +423,45 @@ def _buildVisitRecords(self, definition: VisitDefinitionData, *,
]
)

def _expandExposureId(self, dataId: DataId) -> DataCoordinate:
"""Return the expanded version of an exposure ID.
A private method to allow ID expansion in a pool without resorting
to local callables.
Parameters
----------
dataId : `dict` or `DataCoordinate`
Exposure-level data ID.
Returns
-------
expanded : `DataCoordinate`
A data ID that includes full metadata for all exposure dimensions.
"""
dimensions = DimensionGraph(self.universe, names=["exposure"])
return self.butler.registry.expandDataId(dataId, graph=dimensions)

def _buildVisitRecordsSingle(self, args) -> _VisitRecords:
"""Build the DimensionRecords associated with a visit and collection.
A wrapper for `_buildVisitRecords` to allow it to be run as part of
a pool without resorting to local callables.
Parameters
----------
args : `tuple` [`VisitDefinition`, any]
A tuple consisting of the ``definition`` and ``collections``
arguments to `_buildVisitRecords`, in that order.
Results
-------
records : `_VisitRecords`
Struct containing DimensionRecords for the visit, including
associated dimension elements.
"""
return self._buildVisitRecords(args[0], collections=args[1])

def run(self, dataIds: Iterable[DataId], *,
pool: Optional[Pool] = None,
processes: int = 1,
Expand Down Expand Up @@ -433,8 +490,7 @@ def run(self, dataIds: Iterable[DataId], *,
mapFunc = map if pool is None else pool.imap_unordered
# Normalize, expand, and deduplicate data IDs.
self.log.info("Preprocessing data IDs.")
dimensions = DimensionGraph(self.universe, names=["exposure"])
dataIds = set(mapFunc(lambda d: self.butler.registry.expandDataId(d, graph=dimensions), dataIds))
dataIds = set(mapFunc(self._expandExposureId, dataIds))
if not dataIds:
raise RuntimeError("No exposures given.")
# Extract exposure DimensionRecords, check that there's only one
Expand Down Expand Up @@ -475,7 +531,8 @@ def run(self, dataIds: Iterable[DataId], *,
# This is the only parallel step, but it _should_ be the most expensive
# one (unless DB operations are slow).
self.log.info("Computing regions and other metadata for %d visit(s).", len(definitions))
allRecords = mapFunc(lambda d: self._buildVisitRecords(d, collections=collections), definitions)
allRecords = mapFunc(self._buildVisitRecordsSingle,
zip(definitions, itertools.repeat(collections)))
# Iterate over visits and insert dimension data, one transaction per
# visit.
for visitRecords in allRecords:
Expand Down
16 changes: 16 additions & 0 deletions python/lsst/obs/base/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,22 @@ def __init__(self, config: Optional[RawIngestConfig] = None, *, butler: Butler,
# have all the relevant metadata translators loaded.
Instrument.importAll(self.butler.registry)

@classmethod
# WARNING: this method hardcodes the parameters to pipe.base.Task.__init__.
# Nobody seems to know a way to delegate them to Task code.
def _makeTask(cls, config: RawIngestConfig, butler: Butler, name: str, parentTask: Task):
"""Construct a RawIngestTask using only positional arguments.
Parameters
----------
All parameters are as for `RawIngestTask`.
"""
return cls(config=config, butler=butler, name=name, parentTask=parentTask)

# Overrides Task.__reduce__
def __reduce__(self):
return (self._makeTask, (self.config, self.butler, self._name, self._parentTask))

def extractMetadata(self, filename: str) -> RawFileData:
"""Extract and process metadata from a single raw file.
Expand Down
87 changes: 87 additions & 0 deletions tests/test_defineVisits.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# This file is part of obs_base.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import os
import pickle
import shutil
import tempfile
import unittest

import lsst.daf.butler as dafButler
import lsst.daf.butler.tests as butlerTests

from lsst.obs.base import DefineVisitsTask


TESTDIR = os.path.dirname(__file__)


class DefineVisitsTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Create a new butler once only."""
cls.root = tempfile.mkdtemp(dir=TESTDIR)

dataIds = {
"instrument": ["DummyCam"],
"physical_filter": ["d-r"],
"exposure": [42, 43, 44],
"visit": [42, 43, 44],
}

cls.creatorButler = butlerTests.makeTestRepo(cls.root, dataIds)

# Create dataset types used by the tests
cls.storageClassFactory = dafButler.StorageClassFactory()
for datasetTypeName, storageClassName in (("raw", "ExposureF"),
):
storageClass = cls.storageClassFactory.getStorageClass(storageClassName)
butlerTests.addDatasetType(cls.creatorButler,
datasetTypeName,
{"instrument", "exposure"},
storageClass)

@classmethod
def tearDownClass(cls):
if cls.root is not None:
shutil.rmtree(cls.root, ignore_errors=True)

def setUp(self):
self.butler = butlerTests.makeTestCollection(self.creatorButler)

self.config = DefineVisitsTask.ConfigClass()
self.config.computeVisitRegions.active.padding = 42 # non-default value
self.task = DefineVisitsTask(config=self.config, butler=self.butler)

def testPickleTask(self):
stream = pickle.dumps(self.task)
copy = pickle.loads(stream)
self.assertEqual(self.task.getFullName(), copy.getFullName())
self.assertEqual(self.task.log.getName(), copy.log.getName())
self.assertEqual(self.task.config, copy.config)
self.assertEqual(self.task.butler._config, copy.butler._config)
self.assertEqual(self.task.butler.collections, copy.butler.collections)
self.assertEqual(self.task.butler.run, copy.butler.run)
self.assertEqual(self.task.universe, copy.universe)


if __name__ == "__main__":
unittest.main()
87 changes: 87 additions & 0 deletions tests/test_ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# This file is part of obs_base.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import os
import pickle
import shutil
import tempfile
import unittest

import lsst.daf.butler as dafButler
import lsst.daf.butler.tests as butlerTests

from lsst.obs.base import RawIngestTask


TESTDIR = os.path.dirname(__file__)


class RawIngestTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Create a new butler once only."""
cls.root = tempfile.mkdtemp(dir=TESTDIR)

dataIds = {
"instrument": ["DummyCam"],
"physical_filter": ["d-r"],
"exposure": [42, 43, 44],
}

cls.creatorButler = butlerTests.makeTestRepo(cls.root, dataIds)

# Create dataset types used by the tests
cls.storageClassFactory = dafButler.StorageClassFactory()
for datasetTypeName, storageClassName in (("raw", "ExposureF"),
):
storageClass = cls.storageClassFactory.getStorageClass(storageClassName)
butlerTests.addDatasetType(cls.creatorButler,
datasetTypeName,
{"instrument", "exposure"},
storageClass)

@classmethod
def tearDownClass(cls):
if cls.root is not None:
shutil.rmtree(cls.root, ignore_errors=True)

def setUp(self):
self.butler = butlerTests.makeTestCollection(self.creatorButler)

self.config = RawIngestTask.ConfigClass()
self.config.transfer = "copy" # safe non-default value
self.task = RawIngestTask(config=self.config, butler=self.butler)

def testPickleTask(self):
stream = pickle.dumps(self.task)
copy = pickle.loads(stream)
self.assertEqual(self.task.getFullName(), copy.getFullName())
self.assertEqual(self.task.log.getName(), copy.log.getName())
self.assertEqual(self.task.config, copy.config)
self.assertEqual(self.task.butler._config, copy.butler._config)
self.assertEqual(self.task.butler.collections, copy.butler.collections)
self.assertEqual(self.task.butler.run, copy.butler.run)
self.assertEqual(self.task.universe, copy.universe)
self.assertEqual(self.task.datasetType, copy.datasetType)


if __name__ == "__main__":
unittest.main()

0 comments on commit 12e5eb2

Please sign in to comment.