Skip to content

Commit

Permalink
Modify data-structures to aid ref counting
Browse files Browse the repository at this point in the history
It is desirable that variables get cleaned up when they go out of
scope, meaning that their ref count must go to zero. Some objects
kept closures or circular references which means they must be
cleaned by the garbage collector which can happen at unpredictable
times. Make these objects able to be dropped with reference counting.
  • Loading branch information
natelust committed Aug 25, 2021
1 parent 507161b commit 7d5fc2f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 34 deletions.
53 changes: 24 additions & 29 deletions python/lsst/pipe/base/butlerQuantumContext.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@

__all__ = ("ButlerQuantumContext",)

import types
import typing

from .connections import InputQuantizedConnection, OutputQuantizedConnection, DeferredDatasetRef
from .struct import Struct
from lsst.daf.butler import DatasetRef, Butler, Quantum

from typing import Any, Union, List, Sequence


class ButlerQuantumContext:
"""A Butler-like class specialized for a single quantum
Expand Down Expand Up @@ -79,30 +78,24 @@ def __init__(self, butler: Butler, quantum: Quantum):
for refs in quantum.outputs.values():
for ref in refs:
self.allOutputs.add((ref.datasetType, ref.dataId))
self.__butler = butler

def _get(self, ref: DatasetRef) -> Any:
# Butler methods below will check for unresolved DatasetRefs and
# raise appropriately, so no need for us to do that here.
if isinstance(ref, DeferredDatasetRef):
self._checkMembership(ref.datasetRef, self.allInputs)
return self.__butler.getDirectDeferred(ref.datasetRef)

else:
self._checkMembership(ref, self.allInputs)
return self.__butler.getDirect(ref)

def _put(self, value: Any, ref: DatasetRef):
self._checkMembership(ref, self.allOutputs)
self.__butler.put(value, ref)

# Create closures over butler to discourage anyone from directly
# using a butler reference
def _get(self, ref):
# Butler methods below will check for unresolved DatasetRefs and
# raise appropriately, so no need for us to do that here.
if isinstance(ref, DeferredDatasetRef):
self._checkMembership(ref.datasetRef, self.allInputs)
return butler.getDirectDeferred(ref.datasetRef)

else:
self._checkMembership(ref, self.allInputs)
return butler.getDirect(ref)

def _put(self, value, ref):
self._checkMembership(ref, self.allOutputs)
butler.put(value, ref)

self._get = types.MethodType(_get, self)
self._put = types.MethodType(_put, self)

def get(self, dataset: typing.Union[InputQuantizedConnection,
typing.List[DatasetRef],
DatasetRef]) -> object:
def get(self, dataset: Union[InputQuantizedConnection, List[DatasetRef], DatasetRef]) -> Any:
"""Fetches data from the butler
Parameters
Expand Down Expand Up @@ -151,8 +144,8 @@ def get(self, dataset: typing.Union[InputQuantizedConnection,
else:
raise TypeError("Dataset argument is not a type that can be used to get")

def put(self, values: typing.Union[Struct, typing.List[typing.Any], object],
dataset: typing.Union[OutputQuantizedConnection, typing.List[DatasetRef], DatasetRef]):
def put(self, values: Union[Struct, List[Any], Any],
dataset: Union[OutputQuantizedConnection, List[DatasetRef], DatasetRef]):
"""Puts data into the butler
Parameters
Expand Down Expand Up @@ -197,6 +190,8 @@ def put(self, values: typing.Union[Struct, typing.List[typing.Any], object],
else:
self._put(valuesAttribute, refs)
elif isinstance(dataset, list):
if not isinstance(values, Sequence):
raise ValueError("Values to put must be a sequence")
if len(dataset) != len(values):
raise ValueError("There must be a common number of references and values to put")
for i, ref in enumerate(dataset):
Expand All @@ -206,7 +201,7 @@ def put(self, values: typing.Union[Struct, typing.List[typing.Any], object],
else:
raise TypeError("Dataset argument is not a type that can be used to put")

def _checkMembership(self, ref: typing.Union[typing.List[DatasetRef], DatasetRef], inout: set):
def _checkMembership(self, ref: Union[List[DatasetRef], DatasetRef], inout: set):
"""Internal function used to check if a DatasetRef is part of the input
quantum
Expand Down
18 changes: 13 additions & 5 deletions python/lsst/pipe/base/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@

import contextlib
import logging
from typing import Optional

from lsst.pex.config import ConfigurableField
import lsst.daf.base as dafBase
from .timer import logInfo
from .task_logging import getTaskLogger

import weakref

try:
import lsstDebug
except ImportError:
Expand Down Expand Up @@ -137,7 +140,8 @@ class Task:

def __init__(self, config=None, name=None, parentTask=None, log=None):
self.metadata = dafBase.PropertyList()
self._parentTask = parentTask
self.__parentTask: Optional[weakref.ReferenceType]
self.__parentTask = parentTask if parentTask is None else weakref.ref(parentTask)

if parentTask is not None:
if name is None:
Expand Down Expand Up @@ -170,13 +174,17 @@ def __init__(self, config=None, name=None, parentTask=None, log=None):
self._display = lsstDebug.Info(self.__module__).display
else:
self._display = None
self._taskDict[self._fullName] = self
self._taskDict[self._fullName] = weakref.ref(self)

@property
def _parentTask(self) -> Optional['Task']:
return self.__parentTask if self.__parentTask is None else self.__parentTask()

def emptyMetadata(self):
"""Empty (clear) the metadata for this Task and all sub-Tasks.
"""
for subtask in self._taskDict.values():
subtask.metadata = dafBase.PropertyList()
subtask().metadata = dafBase.PropertyList()

def getSchemaCatalogs(self):
"""Get the schemas generated by this task.
Expand Down Expand Up @@ -232,7 +240,7 @@ def getAllSchemaCatalogs(self):
"""
schemaDict = self.getSchemaCatalogs()
for subtask in self._taskDict.values():
schemaDict.update(subtask.getSchemaCatalogs())
schemaDict.update(subtask().getSchemaCatalogs())
return schemaDict

def getFullMetadata(self):
Expand All @@ -259,7 +267,7 @@ def getFullMetadata(self):
"""
fullMetadata = dafBase.PropertySet()
for fullName, task in self.getTaskDict().items():
fullMetadata.set(fullName.replace(".", ":"), task.metadata)
fullMetadata.set(fullName.replace(".", ":"), task().metadata)
return fullMetadata

def getFullName(self):
Expand Down

0 comments on commit 7d5fc2f

Please sign in to comment.