Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-35741: Add InMemoryDatasetHandle #268

Merged
merged 2 commits into from
Aug 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/changes/DM-35741.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
A new class ``InMemoryDatasetHandle`` is now available.
This class provides a variant of ``lsst.daf.butler.DeferredDatasetHandle`` that does not require a butler and lets you store your in-memory objects in something that looks like one and so can be passed to ``Task.run()`` methods that expect to be able to do deferred loading.
1 change: 1 addition & 0 deletions python/lsst/pipe/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from . import connectionTypes, pipelineIR
from ._dataset_handle import *
from ._instrument import *
from ._status import *
from ._task_metadata import *
Expand Down
160 changes: 160 additions & 0 deletions python/lsst/pipe/base/_dataset_handle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# This file is part of pipe_base.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations

__all__ = ["InMemoryDatasetHandle"]

import dataclasses
from typing import Any, Optional

from lsst.daf.butler import DataCoordinate, DimensionUniverse, StorageClass, StorageClassFactory
from lsst.utils.introspection import get_full_type_name


# Use an empty dataID as a default.
def _default_dataId() -> DataCoordinate:
return DataCoordinate.makeEmpty(DimensionUniverse())


@dataclasses.dataclass(frozen=True)
class InMemoryDatasetHandle:
"""An in-memory version of a `~lsst.daf.butler.DeferredDatasetHandle`."""

def get(
self, *, component: Optional[str] = None, parameters: Optional[dict] = None, **kwargs: dict
) -> Any:
"""Retrieves the dataset pointed to by this handle

This handle may be used multiple times, possibly with different
parameters.

Parameters
----------
component : `str` or None
If the deferred object is a component dataset type, this parameter
may specify the name of the component to use in the get operation.
timj marked this conversation as resolved.
Show resolved Hide resolved
parameters : `dict` or None
The parameters argument will be passed to the butler get method.
It defaults to None. If the value is not None, this dict will
be merged with the parameters dict used to construct the
`DeferredDatasetHandle` class.
**kwargs
This argument is deprecated and only exists to support legacy
gen2 butler code during migration. It is completely ignored
and will be removed in the future.

Returns
-------
return : `object`
The dataset pointed to by this handle. This is the actual object
that was initially stored and not a copy. Modifying this object
will modify the stored object. If the stored object is `None` this
method always returns `None` regardless of any component request or
parameters.
"""
if self.inMemoryDataset is None:
return None

if self.parameters is not None:
mergedParameters = self.parameters.copy()
if parameters is not None:
mergedParameters.update(parameters)
elif parameters is not None:
mergedParameters = parameters
else:
mergedParameters = {}

if component or mergedParameters:
# This requires a storage class look up to locate the delegate
# class.
storageClass = self._getStorageClass()
inMemoryDataset = self.inMemoryDataset

# Parameters for derived components are applied against the
# composite.
if component in storageClass.derivedComponents:
storageClass.validateParameters(parameters)

# Process the parameters (hoping this never modified the
# original object).
inMemoryDataset = storageClass.delegate().handleParameters(inMemoryDataset, mergedParameters)
mergedParameters = {} # They have now been used

readStorageClass = storageClass.derivedComponents[component]
else:
if component:
readStorageClass = storageClass.components[component]
else:
readStorageClass = storageClass
readStorageClass.validateParameters(mergedParameters)

if component:
inMemoryDataset = storageClass.delegate().getComponent(inMemoryDataset, component)

if mergedParameters:
inMemoryDataset = readStorageClass.delegate().handleParameters(
inMemoryDataset, mergedParameters
)

return inMemoryDataset
else:
# If there are no parameters or component requests the object
# can be returned as is.
return self.inMemoryDataset

def _getStorageClass(self) -> StorageClass:
factory = StorageClassFactory()
if self.storageClass:
return factory.getStorageClass(self.storageClass)

# Need to match python type.
pytype = type(self.inMemoryDataset)
for storageClass in factory.values():
# It is possible for a single python type to refer to multiple
# storage classes such that this could be quite fragile.
if storageClass.is_type(pytype):
return storageClass

raise ValueError(
"Unable to find a StorageClass with associated with type "
f"{get_full_type_name(self.inMemoryDataset)}"
)

inMemoryDataset: Any
"""The object to store in this dataset handle for later retrieval.
"""

storageClass: Optional[str] = None
"""The name of the `~lsst.daf.butler.StorageClass` associated with this
dataset.

If `None`, the storage class will be looked up from the factory.
"""

parameters: Optional[dict] = None
"""Optional parameters that may be used to specify a subset of the dataset
to be loaded (`dict` or `None`).
"""

dataId: DataCoordinate = dataclasses.field(default_factory=_default_dataId)
"""The `~lsst.daf.butler.DataCoordinate` associated with this dataset
handle.
"""
134 changes: 134 additions & 0 deletions tests/test_dataset_handle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# This file is part of pipe_base.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import unittest

from lsst.daf.butler import DataCoordinate, DimensionUniverse, StorageClassConfig, StorageClassFactory
from lsst.daf.butler.tests import MetricsExample
from lsst.pipe.base import InMemoryDatasetHandle

storageClasses = """
Integer:
pytype: int
StructuredDataTestDict:
pytype: dict
StructuredDataTestList:
pytype: list
delegate: lsst.daf.butler.tests.ListDelegate
parameters:
- slice
derivedComponents:
counter: Integer
StructuredDataTest:
# Data from a simple Python class
pytype: lsst.daf.butler.tests.MetricsExample
delegate: lsst.daf.butler.tests.MetricsDelegate
# Use YAML formatter by default
components:
# Components are those supported by get.
summary: StructuredDataTestDict
output: StructuredDataTestDict
data: StructuredDataTestList
parameters:
- slice
derivedComponents:
counter: Integer
"""


class SpecialThing:
"""Class known not to have associated StorageClass"""


class TestDatasetHandle(unittest.TestCase):
@classmethod
def setUpClass(cls):
config = StorageClassConfig.fromYaml(storageClasses)
factory = StorageClassFactory()
factory.addFromConfig(config)

def test_dataset_handle_basic(self):
inMemoryDataset = 42
hdl = InMemoryDatasetHandle(inMemoryDataset)

self.assertEqual(hdl.get(), inMemoryDataset)

def test_dataset_handle_unknown(self):
inMemoryDataset = SpecialThing()
hdl = InMemoryDatasetHandle(inMemoryDataset)

self.assertEqual(hdl.get(), inMemoryDataset)

with self.assertRaises(ValueError):
# Will not be able to find a matching StorageClass.
hdl.get(parameters={"key": "value"})

def test_dataset_handle_none(self):
hdl = InMemoryDatasetHandle(None)
self.assertIsNone(hdl.get())
self.assertIsNone(hdl.get(component="comp"))
self.assertIsNone(hdl.get(parameters={"something": 42}))

def test_dataset_handle_dataid(self):
hdl = InMemoryDatasetHandle(42)
self.assertEqual(dict(hdl.dataId), {})

dataId = DataCoordinate.makeEmpty(DimensionUniverse())
hdl = InMemoryDatasetHandle(42, dataId=dataId)
self.assertIs(hdl.dataId, dataId)

def test_dataset_handle_metric(self):
metric = MetricsExample(summary={"a": 1, "b": 2}, output={"c": {"d": 5}}, data=[1, 2, 3, 4])

# First with explicit storage class.
hdl = InMemoryDatasetHandle(metric, storageClass="StructuredDataTest")
retrieved = hdl.get()
self.assertEqual(retrieved, metric)

data = hdl.get(component="data")
self.assertEqual(data, metric.data)

# Now with implicit storage class.
hdl = InMemoryDatasetHandle(metric)
data = hdl.get(component="data")
self.assertEqual(data, metric.data)

# Parameters.
data = hdl.get(parameters={"slice": slice(2)})
self.assertEqual(data.summary, metric.summary)
self.assertEqual(data.data, [1, 2])

data = hdl.get(parameters={"slice": slice(2)}, component="data")
self.assertEqual(data, [1, 2])

# Use parameters in constructor and also override.
hdl = InMemoryDatasetHandle(metric, storageClass="StructuredDataTest", parameters={"slice": slice(3)})
self.assertEqual(hdl.get(component="data"), [1, 2, 3])
self.assertEqual(hdl.get(component="counter"), 3)
self.assertEqual(hdl.get(component="data", parameters={"slice": slice(1, 3)}), [2, 3])
self.assertEqual(hdl.get(component="counter", parameters={"slice": slice(1, 3)}), 2)

# Ensure the original has not been modified.
self.assertEqual(len(metric.data), 4)


if __name__ == "__main__":
unittest.main()