Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-36395: Support incremental redefinition of multi-snap visits #447

Merged
merged 11 commits into from
May 4, 2023
3 changes: 3 additions & 0 deletions doc/changes/DM-36395.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added support for defining visits incrementally as exposures are ingested.
This allows files from the telescope to be ingested one at a time whilst redefining the existing visits.
Additionally ``--update-records`` and ``--incremental`` have been added to the ``butler define-visits`` command-line.
14 changes: 14 additions & 0 deletions python/lsst/obs/base/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,20 @@ def convert(*args, **kwargs):
metavar=typeStrAcceptsMultiple,
)
@where_option()
@click.option(
"--update-records/--no-update-records",
default=False,
help="Use this option to force updates to the visit definition record. "
"Should only be used if you know that there has been a change to the "
"exposure records, such as a change to the metadata translator.",
)
@click.option(
"--incremental/--no-incremental",
default=False,
help="Use this option to force updates to the visit definition record "
"when multi-snap visits are being ingested incrementally and so you "
"might encounter partial visits. Implies --update-records.",
)
@options_file_option()
def define_visits(*args, **kwargs):
"""Define visits from exposures in the butler registry.
Expand Down
173 changes: 162 additions & 11 deletions python/lsst/obs/base/defineVisits.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,50 @@ def __init__(self, config: GroupExposuresConfig, **kwargs: Any):
configBaseType=GroupExposuresConfig,
)

@abstractmethod
def find_missing(
self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry
) -> list[DimensionRecord]:
"""Determine, if possible, which exposures might be missing.

Parameters
----------
exposures : `list` of `lsst.daf.butler.DimensionRecord`
The exposure records to analyze.
registry : `lsst.daf.butler.Registry`
A butler registry that contains these exposure records.

Returns
-------
missing : `list` of `lsst.daf.butler.DimensionRecord`
Any exposure records present in registry that were related to
the given exposures but were missing from that list and deemed
to be relevant.

Notes
-----
Only some grouping schemes are able to find missing exposures. It
is acceptable to return an empty list.
"""
raise NotImplementedError()

@abstractmethod
def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]:
"""Group the exposures in a way most natural for this visit definition.

Parameters
----------
exposures : `list` of `lsst.daf.butler.DimensionRecord`
The exposure records to group.

Returns
-------
groups : `dict` [Any, `list` of `DimensionRecord`]
Groupings of exposure records. The key type is relevant to the
specific visit definition and could be a string or a tuple.
"""
raise NotImplementedError()

@abstractmethod
def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
"""Group the given exposures into visits.
Expand Down Expand Up @@ -596,6 +640,7 @@ def run(
*,
collections: Optional[str] = None,
update_records: bool = False,
incremental: bool = False,
) -> None:
"""Add visit definitions to the registry for the given exposures.

Expand All @@ -616,6 +661,12 @@ def run(
OPTION THAT SHOULD ONLY BE USED TO FIX REGIONS AND/OR METADATA THAT
ARE KNOWN TO BE BAD, AND IT CANNOT BE USED TO REMOVE EXPOSURES OR
DETECTORS FROM A VISIT.
incremental : `bool`, optional
If `True` indicate that exposures are being ingested incrementally
and visit definition will be run on partial visits. This will
force ``update_records`` to `True`. If there is any risk that
files are being ingested incrementally it is critical that this
parameter is set to `True` and not to rely on ``updated_records``.

Raises
------
Expand All @@ -631,6 +682,8 @@ def run(
}
if not data_id_set:
raise RuntimeError("No exposures given.")
if incremental:
update_records = True
# Extract exposure DimensionRecords, check that there's only one
# instrument in play, and check for non-science exposures.
exposures = []
Expand Down Expand Up @@ -663,8 +716,17 @@ def run(
for visitSystem in visitSystems:
self.log.info("Registering visit_system %d: %s.", visitSystem.value, visitSystem)
self.butler.registry.syncDimensionData(
"visit_system", {"instrument": instrument, "id": visitSystem.value, "name": str(visitSystem)}
"visit_system",
{"instrument": instrument, "id": visitSystem.value, "name": str(visitSystem)},
)

# In true incremental we will be given the second snap on its
# own on the assumption that the previous snap was already handled.
# For correct grouping we need access to the other exposures in the
# visit.
if incremental:
exposures.extend(self.groupExposures.find_missing(exposures, self.butler.registry))

# Group exposures into visits, delegating to subtask.
self.log.info("Grouping %d exposure(s) into visits.", len(exposures))
definitions = list(self.groupExposures.group(exposures))
Expand Down Expand Up @@ -697,6 +759,20 @@ def run(
self.butler.registry.insertDimensionData(
"visit_system_membership", *visitRecords.visit_system_membership
)
elif incremental and len(visitRecords.visit_definition) > 1:
# The visit record was modified. This could happen
# if a multi-snap visit was redefined with an
# additional snap so play it safe and allow for the
# visit definition to be updated. We use update=False
# here since there should not be any rows updated,
# just additional rows added. update=True does not work
# correctly with multiple records. In incremental mode
# we assume that the caller wants the visit definition
# to be updated and has no worries about provenance
# with the previous definition.
for definition in visitRecords.visit_definition:
self.butler.registry.syncDimensionData("visit_definition", definition)

# [Re]Insert visit_detector_region records for both inserts
# and updates, because we do allow updating to affect the
# region calculations.
Expand Down Expand Up @@ -790,6 +866,16 @@ class _GroupExposuresOneToOneTask(GroupExposuresTask, metaclass=ABCMeta):

ConfigClass = _GroupExposuresOneToOneConfig

def find_missing(
self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry
) -> list[DimensionRecord]:
# By definition no exposures can be missing.
return []

def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]:
# No grouping.
return {exposure.id: [exposure] for exposure in exposures}

def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
# Docstring inherited from GroupExposuresTask.
visit_systems = {VisitSystem.from_name("one-to-one")}
Expand Down Expand Up @@ -836,12 +922,37 @@ class _GroupExposuresByGroupMetadataTask(GroupExposuresTask, metaclass=ABCMeta):

ConfigClass = _GroupExposuresByGroupMetadataConfig

def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
# Docstring inherited from GroupExposuresTask.
visit_systems = {VisitSystem.from_name("by-group-metadata")}
def find_missing(
self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry
) -> list[DimensionRecord]:
groups = self.group_exposures(exposures)
missing_exposures: list[DimensionRecord] = []
for exposures_in_group in groups.values():
# We can not tell how many exposures are expected to be in the
# visit so we have to query every time.
first = exposures_in_group[0]
records = set(
registry.queryDimensionRecords(
"exposure",
where="exposure.group_name = group",
bind={"group": first.group_name},
instrument=first.instrument,
)
)
records.difference_update(set(exposures_in_group))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems kind of a shame to have this by group, then merge it all into one big list, then group again. But I guess that keeps the interfaces simple.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does, although I think there is a proposal to remove this subclass completely in #453 so it's not worth trying to make it more efficient.

missing_exposures.extend(list(records))
return missing_exposures

def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]:
groups = defaultdict(list)
for exposure in exposures:
groups[exposure.group_name].append(exposure)
return groups

def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
# Docstring inherited from GroupExposuresTask.
visit_systems = {VisitSystem.from_name("by-group-metadata")}
groups = self.group_exposures(exposures)
for visitName, exposuresInGroup in groups.items():
instrument = exposuresInGroup[0].instrument
visitId = exposuresInGroup[0].group_id
Expand Down Expand Up @@ -889,14 +1000,49 @@ class _GroupExposuresByCounterAndExposuresTask(GroupExposuresTask, metaclass=ABC

ConfigClass = _GroupExposuresByCounterAndExposuresConfig

def find_missing(
self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry
) -> list[DimensionRecord]:
"""Analyze the exposures and return relevant exposures known to
registry.
"""
groups = self.group_exposures(exposures)
missing_exposures: list[DimensionRecord] = []
for exposures_in_group in groups.values():
sorted_exposures = sorted(exposures_in_group, key=lambda e: e.seq_num)
first = sorted_exposures[0]

# Only need to look for the seq_nums that we don't already have.
seq_nums = set(range(first.seq_start, first.seq_end + 1))
seq_nums.difference_update({exp.seq_num for exp in sorted_exposures})

if seq_nums:
# Missing something. Check registry.
records = list(
registry.queryDimensionRecords(
"exposure",
where="exposure.seq_start = seq_start AND exposure.seq_end = seq_end AND "
"exposure.seq_num IN (seq_nums)",
bind={"seq_start": first.seq_start, "seq_end": first.seq_end, "seq_nums": seq_nums},
instrument=first.instrument,
)
)
missing_exposures.extend(records)

return missing_exposures

def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]:
groups = defaultdict(list)
for exposure in exposures:
groups[exposure.day_obs, exposure.seq_start, exposure.seq_end].append(exposure)
return groups

def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
# Docstring inherited from GroupExposuresTask.
system_one_to_one = VisitSystem.from_name("one-to-one")
system_seq_start_end = VisitSystem.from_name("by-seq-start-end")

groups = defaultdict(list)
for exposure in exposures:
groups[exposure.day_obs, exposure.seq_start, exposure.seq_end].append(exposure)
groups = self.group_exposures(exposures)
for visit_key, exposures_in_group in groups.items():
instrument = exposures_in_group[0].instrument

Expand All @@ -917,21 +1063,26 @@ def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionDat
)
skip_multi = True

multi_exposure = False
if first.seq_start != first.seq_end:
# This is a multi-exposure visit regardless of the number
# of exposures present.
multi_exposure = True

# Define the one-to-one visits.
num_exposures = len(exposures_in_group)
for exposure in exposures_in_group:
# Default is to use the exposure ID and name unless
# this is the first exposure in a multi-exposure visit.
visit_name = exposure.obs_id
visit_id = exposure.id
visit_systems = {system_one_to_one}

if num_exposures == 1:
if not multi_exposure:
# This is also a by-counter visit.
# It will use the same visit_name and visit_id.
visit_systems.add(system_seq_start_end)

elif num_exposures > 1 and not skip_multi and exposure == first:
elif not skip_multi and exposure == first:
# This is the first legitimate exposure in a multi-exposure
# visit. It therefore needs a modified visit name and ID
# so it does not clash with the multi-exposure visit
Expand All @@ -948,7 +1099,7 @@ def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionDat
)

# Multi-exposure visit.
if not skip_multi and num_exposures > 1:
if not skip_multi and multi_exposure:
# Define the visit using the first exposure
visit_name = first.obs_id
visit_id = first.id
Expand Down
20 changes: 14 additions & 6 deletions python/lsst/obs/base/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ def expandDataIds(self, data: RawExposureData) -> RawExposureData:
return data

def prep(
self, files: Iterable[ResourcePath], *, pool: Optional[PoolType] = None, processes: int = 1
self, files: Iterable[ResourcePath], *, pool: Optional[PoolType] = None
) -> Tuple[Iterator[RawExposureData], List[ResourcePath]]:
"""Perform all non-database-updating ingest preprocessing steps.

Expand All @@ -919,8 +919,6 @@ def prep(
pool : `multiprocessing.Pool`, optional
If not `None`, a process pool with which to parallelize some
operations.
processes : `int`, optional
The number of processes to use. Ignored if ``pool`` is not `None`.

Returns
-------
Expand All @@ -930,8 +928,6 @@ def prep(
bad_files : `list` of `str`
List of all the files that could not have metadata extracted.
"""
if pool is None and processes > 1:
pool = Pool(processes)
mapFunc = map if pool is None else pool.imap_unordered

def _partition_good_bad(
Expand Down Expand Up @@ -1157,7 +1153,19 @@ def ingestFiles(
Number of exposures that failed when ingesting raw datasets.
"""

exposureData, bad_files = self.prep(files, pool=pool, processes=processes)
created_pool = False
if pool is None and processes > 1:
pool = Pool(processes)
created_pool = True

try:
exposureData, bad_files = self.prep(files, pool=pool)
finally:
if created_pool and pool:
# The pool is not needed any more so close it if we created
# it to ensure we clean up resources.
pool.close()
pool.join()

# Up to this point, we haven't modified the data repository at all.
# Now we finally do that, with one transaction per exposure. This is
Expand Down
20 changes: 19 additions & 1 deletion python/lsst/obs/base/script/defineVisits.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,16 @@
log = logging.getLogger("lsst.obs.base.defineVisits")


def defineVisits(repo, config_file, collections, instrument, where=None, raw_name="raw"):
def defineVisits(
repo,
config_file,
collections,
instrument,
where=None,
update_records=False,
incremental=False,
raw_name="raw",
):
"""Implements the command line interface `butler define-visits` subcommand,
should only be called by command line tools and unit test code that tests
this function.
Expand All @@ -49,6 +58,13 @@ def defineVisits(repo, config_file, collections, instrument, where=None, raw_nam
where : `str`, optional
Query clause to use when querying for dataIds. Can be used to limit
the relevant exposures.
update_records : `bool`, optional
Control whether recalculated visit definitions will be accepted or
not.
incremental : `bool`, optional
Declare that the visit definitions are being run in a situation
where data from multi-snap visits are being ingested incrementally
and so the visit definition could change as new data arrive.
raw_name : `str`, optional
Name of the raw dataset type name. Defaults to 'raw'.

Expand Down Expand Up @@ -101,4 +117,6 @@ def defineVisits(repo, config_file, collections, instrument, where=None, raw_nam
where=where,
).expanded(),
collections=collections,
update_records=update_records,
incremental=incremental,
)