Skip to content

Commit

Permalink
Support new visit record metadata
Browse files Browse the repository at this point in the history
Adds a new exposure grouping class that groups by
seq_start/seq_end and also creates visit for each
exposure. This can only work with updated registry.

The old scheme still works for old registry although
the default grouping algorithm is not dynamic based
on the schema version.
  • Loading branch information
timj committed Apr 22, 2022
1 parent eebaf3c commit 9a3b746
Showing 1 changed file with 224 additions and 29 deletions.
253 changes: 224 additions & 29 deletions python/lsst/obs/base/defineVisits.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
"VisitDefinitionData",
]

import cmath
import dataclasses
import math
import operator
from abc import ABCMeta, abstractmethod
from collections import defaultdict
Expand Down Expand Up @@ -81,6 +83,9 @@ class VisitDefinitionData:
"""Dimension records for the exposures that are part of this visit.
"""

visit_system_mask: int = 0
"""Mask of all the visit systems associated with this visit."""


@dataclasses.dataclass
class _VisitRecords:
Expand All @@ -99,6 +104,9 @@ class _VisitRecords:
of a 'visit' and a 'detector' with a region on the sky.
"""

visit_system_membership: List[DimensionRecord]
"""Records relating visits to an associated visit system."""


class GroupExposuresConfig(Config):
pass
Expand Down Expand Up @@ -153,17 +161,17 @@ def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionDat
raise NotImplementedError()

@abstractmethod
def getVisitSystem(self) -> Tuple[int, str]:
def getVisitSystems(self) -> List[Tuple[int, str]]:
"""Return identifiers for the 'visit_system' dimension this
algorithm implements.
Returns
-------
id : `int`
Integer ID for the visit system (given an instrument).
name : `str`
Unique string identifier for the visit system (given an
instrument).
visit_systems : `List[Tuple[int, str]]`
The visit systems used by this algorithm. The first value
in each tuple is the integer ID of the visit system (for
a given instrument) and the second element is the string
identifier for the visit system.
"""
raise NotImplementedError()

Expand Down Expand Up @@ -271,7 +279,7 @@ def compute(
class DefineVisitsConfig(Config):
groupExposures = GroupExposuresTask.registry.makeField(
doc="Algorithm for grouping exposures into visits.",
default="one-to-one",
default="one-to-one-and-by-counter",
)
computeVisitRegions = ComputeVisitRegionsTask.registry.makeField(
doc="Algorithm from computing visit and visit+detector regions.",
Expand Down Expand Up @@ -359,7 +367,7 @@ def _buildVisitRecords(
Parameters
----------
definition : `VisitDefinition`
definition : `VisitDefinitionData`
Struct with identifiers for the visit and records for its
constituent exposures.
collections : Any, optional
Expand All @@ -374,6 +382,11 @@ def _buildVisitRecords(
Struct containing DimensionRecords for the visit, including
associated dimension elements.
"""
dimension = self.universe["visit"]

# Some registries support additional items.
supported = {meta.name for meta in dimension.metadata}

# Compute all regions.
visitRegion, visitDetectorRegions = self.computeVisitRegions.compute(
definition, collections=collections
Expand Down Expand Up @@ -406,9 +419,61 @@ def _buildVisitRecords(
if zenith_angle is not None:
zenith_angle /= len(definition.exposures)

# New records that may not be supported.
extras: Dict[str, Any] = {}
if "seq_num" in supported:
extras["seq_num"] = _reduceOrNone(min, (e.seq_num for e in definition.exposures))
if "azimuth" in supported:
# Must take into account 0/360 problem.
extras["azimuth"] = _calc_mean_angle([e.azimuth for e in definition.exposures])

# visit_system handling changed. This is the current schema logic.
if "visit_system_mask" in supported:
extras["visit_system_mask"] = definition.visit_system_mask

# Map visit to exposure.
visit_definition = [
self.universe["visit_definition"].RecordClass(
instrument=definition.instrument,
visit=definition.id,
exposure=exposure.id,
)
for exposure in definition.exposures
]

# Map visit to visit system (by unpacking the mask).
visit_system_membership = []
for visit_system_id, _ in self.groupExposures.getVisitSystems():
if 2**visit_system_id & definition.visit_system_mask:
record = self.universe["visit_system_membership"].RecordClass(
instrument=definition.instrument,
visit=definition.id,
visit_system=visit_system_id,
)
visit_system_membership.append(record)

else:
# The old approach can only handle one visit system at a time.
visit_system_id = self.groupExposures.getVisitSystems()[0][0]
extras["visit_system"] = visit_system_id

# The old visit_definition included visit system.
visit_definition = [
self.universe["visit_definition"].RecordClass(
instrument=definition.instrument,
visit=definition.id,
exposure=exposure.id,
visit_system=visit_system_id,
)
for exposure in definition.exposures
]

# This concept does not exist in old schema.
visit_system_membership = []

# Construct the actual DimensionRecords.
return _VisitRecords(
visit=self.universe["visit"].RecordClass(
visit=dimension.RecordClass(
instrument=definition.instrument,
id=definition.id,
name=definition.name,
Expand All @@ -418,23 +483,16 @@ def _buildVisitRecords(
observation_reason=observation_reason,
day_obs=observing_day,
zenith_angle=zenith_angle,
visit_system=self.groupExposures.getVisitSystem()[0],
exposure_time=exposure_time,
timespan=timespan,
region=visitRegion,
# TODO: no seeing value in exposure dimension records, so we
# can't set that here. But there are many other columns that
# both dimensions should probably have as well.
**extras,
),
visit_definition=[
self.universe["visit_definition"].RecordClass(
instrument=definition.instrument,
visit=definition.id,
exposure=exposure.id,
visit_system=self.groupExposures.getVisitSystem()[0],
)
for exposure in definition.exposures
],
visit_definition=visit_definition,
visit_system_membership=visit_system_membership,
visit_detector_region=[
self.universe["visit_detector_region"].RecordClass(
instrument=definition.instrument,
Expand Down Expand Up @@ -515,11 +573,12 @@ def run(
(instrument,) = instruments
# Ensure the visit_system our grouping algorithm uses is in the
# registry, if it wasn't already.
visitSystemId, visitSystemName = self.groupExposures.getVisitSystem()
self.log.info("Registering visit_system %d: %s.", visitSystemId, visitSystemName)
self.butler.registry.syncDimensionData(
"visit_system", {"instrument": instrument, "id": visitSystemId, "name": visitSystemName}
)
visitSystems = self.groupExposures.getVisitSystems()
for visitSystemId, visitSystemName in visitSystems:
self.log.info("Registering visit_system %d: %s.", visitSystemId, visitSystemName)
self.butler.registry.syncDimensionData(
"visit_system", {"instrument": instrument, "id": visitSystemId, "name": visitSystemName}
)
# Group exposures into visits, delegating to subtask.
self.log.info("Grouping %d exposure(s) into visits.", len(exposures))
definitions = list(self.groupExposures.group(exposures))
Expand Down Expand Up @@ -548,6 +607,10 @@ def run(
self.butler.registry.insertDimensionData(
"visit_definition", *visitRecords.visit_definition
)
if visitRecords.visit_system_membership:
self.butler.registry.insertDimensionData(
"visit_system_membership", *visitRecords.visit_system_membership
)
# [Re]Insert visit_detector_region records for both inserts
# and updates, because we do allow updating to affect the
# region calculations.
Expand Down Expand Up @@ -580,6 +643,29 @@ def _value_if_equal(a: _T, b: _T) -> Optional[_T]:
return a if a == b else None


def _calc_mean_angle(angles: List[float]) -> float:
"""Calculate the mean angle, taking into account 0/360 wrapping.
Parameters
----------
angles : `list` [`float`]
Angles to average together, in degrees.
Returns
-------
average : `float`
Average angle in degrees.
"""
# Save on all the math if we only have one value.
if len(angles) == 1:
return angles[0]

# Convert polar coordinates of unit circle to complex values.
# Average the complex values.
# Convert back to a phase angle.
return math.degrees(cmath.phase(sum(cmath.rect(1.0, math.radians(d)) for d in angles) / len(angles)))


class _GroupExposuresOneToOneConfig(GroupExposuresConfig):
visitSystemId = Field(
doc="Integer ID of the visit_system implemented by this grouping algorithm.",
Expand Down Expand Up @@ -609,11 +695,12 @@ def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionDat
id=exposure.id,
name=exposure.obs_id,
exposures=[exposure],
visit_system_mask=2**self.config.visitSystemId,
)

def getVisitSystem(self) -> Tuple[int, str]:
def getVisitSystems(self) -> List[Tuple[int, str]]:
# Docstring inherited from GroupExposuresTask.
return (self.config.visitSystemId, self.config.visitSystemName)
return [(self.config.visitSystemId, self.config.visitSystemName)]


class _GroupExposuresByGroupMetadataConfig(GroupExposuresConfig):
Expand Down Expand Up @@ -655,12 +742,120 @@ def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionDat
e.group_id == visitId for e in exposuresInGroup
), "Grouping by exposure.group_name does not yield consistent group IDs"
yield VisitDefinitionData(
instrument=instrument, id=visitId, name=visitName, exposures=exposuresInGroup
instrument=instrument,
id=visitId,
name=visitName,
exposures=exposuresInGroup,
visit_system_mask=2**self.config.visitSystemId,
)

def getVisitSystem(self) -> Tuple[int, str]:
def getVisitSystems(self) -> List[Tuple[int, str]]:
# Docstring inherited from GroupExposuresTask.
return [(self.config.visitSystemId, self.config.visitSystemName)]


class _GroupExposuresByCounterAndExposuresConfig(GroupExposuresConfig):
visitSystemId = Field(
doc="Integer ID of the visit_system implemented by this grouping algorithm.",
dtype=int,
default=5, # Not appliccable. One-to-one ID OR by-group-counter ID.
)
visitSystemName = Field(
doc="String name of the visit_system implemented by this grouping algorithm.",
dtype=str,
default="by-counter-and-exposures", # Not real name.
)


@registerConfigurable("one-to-one-and-by-counter", GroupExposuresTask.registry)
class _GroupExposuresByCounterAndExposuresTask(GroupExposuresTask, metaclass=ABCMeta):
"""An exposure grouping algorithm that uses the sequence start and
sequence end metadata but also creates one-to-one
This algorithm uses the exposure.seq_start and
exposure.seq_end fields to collect related snaps.
It also groups single exposures.
"""

ConfigClass = _GroupExposuresByCounterAndExposuresConfig
_visit_systems = {"one-to-one": 0, "by-group-counter": 2}

def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
# Docstring inherited from GroupExposuresTask.
groups = defaultdict(list)
for exposure in exposures:
groups[exposure.day_obs, exposure.seq_start, exposure.seq_end].append(exposure)
for visit_key, exposures_in_group in groups.items():
instrument = exposures_in_group[0].instrument

# It is possible that the first exposure in a visit has not
# been ingested. This can be determined and if that is the case
# we can not reliably define the multi-exposure visit.
skip_multi = False
sorted_exposures = sorted(exposures_in_group, key=lambda e: e.seq_num)
first = sorted_exposures.pop(0)
if first.seq_num != first.seq_start:
# Special case seq_num == 0 since that implies that the instrument
# has no counters and therefore no multi-exposure visits.
if first.seq_num == 0:
self.log.warning(
"First exposure for visit %s is not present. Skipping the multi-snap definition.",
visit_key,
)
skip_multi = True

# Define the one-to-one visits.
num_exposures = len(exposures_in_group)
for exposure in exposures_in_group:
# Default is to use the exposure ID and name unless
# this is the first exposure in a multi-exposure visit.
visit_name = exposure.obs_id
visit_id = exposure.id
visit_system_mask = 2 ** self._visit_systems["one-to-one"]

if num_exposures == 1:
# This is also a by-counter visit.
# It will use the same visit_name and visit_id.
visit_system_mask |= 2 ** self._visit_systems["by-group-counter"]

elif num_exposures > 1 and not skip_multi and exposure == first:
# This is the first legitimate exposure in a multi-exposure
# visit. It therefore needs a modified visit name and ID
# so it does not clash with the multi-exposure visit
# definition.
visit_name = f"{visit_name}_first"
visit_id = int(f"9{visit_id}")

yield VisitDefinitionData(
instrument=instrument,
id=visit_id,
name=visit_name,
exposures=[exposure],
visit_system_mask=visit_system_mask,
)

# Multi-exposure visit.
if not skip_multi and num_exposures > 1:
# Define the visit using the first exposure
visit_name = first.obs_id
visit_id = first.id
visit_system_mask = 2 ** self._visit_systems["by-group-counter"]

yield VisitDefinitionData(
instrument=instrument,
id=visit_id,
name=visit_name,
exposures=exposures_in_group,
visit_system_mask=visit_system_mask,
)

def getVisitSystems(self) -> List[Tuple[int, str]]:
# Docstring inherited from GroupExposuresTask.
return (self.config.visitSystemId, self.config.visitSystemName)
# Using a Config for this is difficult because what this grouping
# algorithm is doing is using two visit systems.
# One is using metadata (but not by-group) and the other is the
# one-to-one. For now hard-code in class.
return [(v, k) for k, v in self._visit_systems.items()]


class _ComputeVisitRegionsFromSingleRawWcsConfig(ComputeVisitRegionsConfig):
Expand Down

0 comments on commit 9a3b746

Please sign in to comment.