Skip to content

Commit

Permalink
Merge branch 'tickets/DM-34175'
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulPrice committed Apr 5, 2022
2 parents 7a69c27 + 8ffaabf commit c18b5ea
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 41 deletions.
7 changes: 7 additions & 0 deletions doc/changes/DM-34175.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
* Made choice of required ``ObservationInfo`` properties configurable
through ``RawIngestTask.getObservationInfoSubsets``.
* Added the concept of "dependency" records to be added to the registry before
adding the exposure record; this makes it easier to satisfy foreign key
constraints when the exposure relates to dimensions beyond the standard set.
* Added ``RawIngestTask`` methods ```makeExposureRecord` and ``makeDependencyRecords``
to provide hooks for subclasses to provide values for additional columns.
7 changes: 6 additions & 1 deletion python/lsst/obs/base/_instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,9 @@ def makeDataIdTranslatorFactory(self) -> TranslatorFactory:
raise NotImplementedError("Must be implemented by derived classes.")


def makeExposureRecordFromObsInfo(obsInfo: ObservationInfo, universe: DimensionUniverse) -> DimensionRecord:
def makeExposureRecordFromObsInfo(
obsInfo: ObservationInfo, universe: DimensionUniverse, **kwargs: Any
) -> DimensionRecord:
"""Construct an exposure DimensionRecord from
`astro_metadata_translator.ObservationInfo`.
Expand All @@ -531,6 +533,8 @@ def makeExposureRecordFromObsInfo(obsInfo: ObservationInfo, universe: DimensionU
the exposure.
universe : `DimensionUniverse`
Set of all known dimensions.
**kwargs
Additional field values for this record.
Returns
-------
Expand Down Expand Up @@ -572,6 +576,7 @@ def makeExposureRecordFromObsInfo(obsInfo: ObservationInfo, universe: DimensionU
tracking_dec=dec,
sky_angle=sky_angle,
zenith_angle=zenith_angle,
**kwargs,
)


Expand Down
157 changes: 117 additions & 40 deletions python/lsst/obs/base/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,16 @@ class RawExposureData:
"""Set of all known dimensions.
"""

record: Optional[DimensionRecord] = None
record: DimensionRecord
"""The exposure `DimensionRecord` that must be inserted into the
`~lsst.daf.butler.Registry` prior to file-level ingest (`DimensionRecord`).
"""

def __post_init__(self, universe: DimensionUniverse) -> None:
# We don't care which file or dataset we read metadata from, because
# we're assuming they'll all be the same; just use the first ones.
self.record = makeExposureRecordFromObsInfo(self.files[0].datasets[0].obsInfo, universe)
dependencyRecords: Dict[str, DimensionRecord]
"""Additional records that must be inserted into the
`~lsst.daf.butler.Registry` prior to ingesting the exposure ``record``
(e.g., to satisfy foreign key constraints), indexed by the dimension name.
"""


def makeTransferChoiceField(
Expand Down Expand Up @@ -459,6 +460,47 @@ def extractMetadata(self, filename: ResourcePath) -> RawFileData:
instrument=instrument,
)

@classmethod
def getObservationInfoSubsets(cls) -> Tuple[Set, Set]:
"""Return subsets of fields in the `ObservationInfo` that we care about
These fields will be used in constructing an exposure record.
Returns
-------
required : `set`
Set of `ObservationInfo` field names that are required.
optional : `set`
Set of `ObservationInfo` field names we will use if they are
available.
"""
required = {
"datetime_begin",
"datetime_end",
"detector_num",
"exposure_id",
"exposure_time",
"instrument",
"observation_id",
"observation_type",
"physical_filter",
}
optional = {
"altaz_begin",
"boresight_rotation_coord",
"boresight_rotation_angle",
"dark_time",
"exposure_group",
"tracking_radec",
"object",
"observation_counter",
"observation_reason",
"observing_day",
"science_program",
"visit_id",
}
return required, optional

def _calculate_dataset_info(
self, header: Union[Mapping[str, Any], ObservationInfo], filename: ResourcePath
) -> RawFileDatasetInfo:
Expand All @@ -477,41 +519,12 @@ def _calculate_dataset_info(
The dataId, and observation information associated with this
dataset.
"""
# To ensure we aren't slowed down for no reason, explicitly
# list here the properties we need for the schema.
# Use a dict with values a boolean where True indicates
# that it is required that we calculate this property.
ingest_subset = {
"altaz_begin": False,
"boresight_rotation_coord": False,
"boresight_rotation_angle": False,
"dark_time": False,
"datetime_begin": True,
"datetime_end": True,
"detector_num": True,
"exposure_group": False,
"exposure_id": True,
"exposure_time": True,
"instrument": True,
"tracking_radec": False,
"object": False,
"observation_counter": False,
"observation_id": True,
"observation_reason": False,
"observation_type": True,
"observing_day": False,
"physical_filter": True,
"science_program": False,
"visit_id": False,
}

required, optional = self.getObservationInfoSubsets()
if isinstance(header, ObservationInfo):
obsInfo = header
missing = []
# Need to check the required properties are present.
for property, required in ingest_subset.items():
if not required:
continue
for property in required:
# getattr does not need to be protected because it is using
# the defined list above containing properties that must exist.
value = getattr(obsInfo, property)
Expand All @@ -528,8 +541,8 @@ def _calculate_dataset_info(
header,
pedantic=False,
filename=str(filename),
required={k for k in ingest_subset if ingest_subset[k]},
subset=set(ingest_subset),
required=required,
subset=required | optional,
)

dataId = DataCoordinate.standardize(
Expand Down Expand Up @@ -760,10 +773,72 @@ def groupByExposure(self, files: Iterable[RawFileData]) -> List[RawExposureData]
byExposure[f.datasets[0].dataId.subset(exposureDimensions)].append(f)

return [
RawExposureData(dataId=dataId, files=exposureFiles, universe=self.universe)
RawExposureData(
dataId=dataId,
files=exposureFiles,
universe=self.universe,
record=self.makeExposureRecord(exposureFiles[0].datasets[0].obsInfo, self.universe),
dependencyRecords=self.makeDependencyRecords(
exposureFiles[0].datasets[0].obsInfo, self.universe
),
)
for dataId, exposureFiles in byExposure.items()
]

def makeExposureRecord(
self, obsInfo: ObservationInfo, universe: DimensionUniverse, **kwargs: Any
) -> DimensionRecord:
"""Construct a registry record for an exposure
This is a method that subclasses will often want to customize. This can
often be done by calling this base class implementation with additional
``kwargs``.
Parameters
----------
obsInfo : `ObservationInfo`
Observation details for (one of the components of) the exposure.
universe : `DimensionUniverse`
Set of all known dimensions.
**kwargs
Additional field values for this record.
Returns
-------
record : `DimensionRecord`
The exposure record that must be inserted into the
`~lsst.daf.butler.Registry` prior to file-level ingest.
"""
return makeExposureRecordFromObsInfo(obsInfo, universe, **kwargs)

def makeDependencyRecords(
self, obsInfo: ObservationInfo, universe: DimensionUniverse
) -> Dict[str, DimensionRecord]:
"""Construct dependency records
These dependency records will be inserted into the
`~lsst.daf.butler.Registry` before the exposure records, because they
are dependencies of the exposure. This allows an opportunity to satisfy
foreign key constraints that exist because of dimensions related to the
exposure.
This is a method that subclasses may want to customize, if they've
added dimensions that relate to an exposure.
Parameters
----------
obsInfo : `ObservationInfo`
Observation details for (one of the components of) the exposure.
universe : `DimensionUniverse`
Set of all known dimensions.
Returns
-------
records : `dict` [`str`, `DimensionRecord`]
The records to insert, indexed by dimension name.
"""
return {}

def expandDataIds(self, data: RawExposureData) -> RawExposureData:
"""Expand the data IDs associated with a raw exposure.
Expand All @@ -773,7 +848,7 @@ def expandDataIds(self, data: RawExposureData) -> RawExposureData:
----------
exposure : `RawExposureData`
A structure containing information about the exposure to be
ingested. Must have `RawExposureData.records` populated. Should
ingested. Must have `RawExposureData.record` populated. Should
be considered consumed upon return.
Returns
Expand Down Expand Up @@ -1078,6 +1153,8 @@ def ingestFiles(
)

try:
for name, record in exposure.dependencyRecords.items():
self.butler.registry.syncDimensionData(name, record, update=update_exposure_records)
inserted_or_updated = self.butler.registry.syncDimensionData(
"exposure",
exposure.record,
Expand Down

0 comments on commit c18b5ea

Please sign in to comment.