Skip to content

Commit

Permalink
Merge pull request #365 from lsst/tickets/DM-28964
Browse files Browse the repository at this point in the history
DM-28964: add progress reporting
  • Loading branch information
TallJimbo committed Mar 18, 2021
2 parents a2726f1 + 7b2956c commit 08db658
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 151 deletions.
5 changes: 4 additions & 1 deletion python/lsst/obs/base/defineVisits.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
DataId,
DimensionGraph,
DimensionRecord,
Progress,
Timespan,
)

Expand Down Expand Up @@ -325,6 +326,7 @@ def __init__(self, config: Optional[DefineVisitsConfig] = None, *, butler: Butle
super().__init__(config, **kwargs)
self.butler = butler
self.universe = self.butler.registry.dimensions
self.progress = Progress("obs.base.DefineVisitsTask")
self.makeSubtask("groupExposures")
self.makeSubtask("computeVisitRegions", butler=self.butler)

Expand Down Expand Up @@ -545,7 +547,8 @@ def run(self, dataIds: Iterable[DataId], *,
zip(definitions, itertools.repeat(collections)))
# Iterate over visits and insert dimension data, one transaction per
# visit. If a visit already exists, we skip all other inserts.
for visitRecords in allRecords:
for visitRecords in self.progress.wrap(allRecords, total=len(definitions),
desc="Computing regions and inserting visits"):
with self.butler.registry.transaction():
if self.butler.registry.syncDimensionData("visit", visitRecords.visit):
self.butler.registry.insertDimensionData("visit_definition",
Expand Down
127 changes: 65 additions & 62 deletions python/lsst/obs/base/gen2to3/calibRepoConverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ def _queryGen2CalibRegistry(self, db: sqlite3.Connection, datasetType: DatasetTy
return
yield from results

def _finish(self, datasets: Mapping[DatasetType, Mapping[Optional[str], List[FileDataset]]]):
def _finish(self, datasets: Mapping[DatasetType, Mapping[Optional[str], List[FileDataset]]],
count: int) -> None:
# Docstring inherited from RepoConverter.
# Read Gen2 calibration repository and extract validity ranges for
# all datasetType + calibDate combinations we ingested.
Expand All @@ -163,67 +164,69 @@ def _finish(self, datasets: Mapping[DatasetType, Mapping[Optional[str], List[Fil
db = sqlite3.connect(calibFile)
db.row_factory = sqlite3.Row

for datasetType, datasetsByCalibDate in datasets.items():
if not datasetType.isCalibration():
continue
gen2keys = {}
if "detector" in datasetType.dimensions.names:
gen2keys[self.task.config.ccdKey] = int
if "physical_filter" in datasetType.dimensions.names:
gen2keys["filter"] = str
translator = self.instrument.makeDataIdTranslatorFactory().makeMatching(
datasetType.name,
gen2keys,
instrument=self.instrument.getName()
)
for calibDate, datasetsForCalibDate in datasetsByCalibDate.items():
assert calibDate is not None, ("datasetType.isCalibration() is set by "
"the presence of calibDate in the Gen2 template")
# Build a mapping that lets us find DatasetRefs by data ID,
# for this DatasetType and calibDate. We know there is only
# one ref for each data ID (given DatasetType and calibDate as
# well).
refsByDataId = {}
for dataset in datasetsForCalibDate:
refsByDataId.update((ref.dataId, ref) for ref in dataset.refs)
# Query the Gen2 calibration repo for the validity ranges for
# this DatasetType and calibDate, and look up the appropriate
# refs by data ID.
for row in self._queryGen2CalibRegistry(db, datasetType, calibDate):
# For validity times we use TAI as some gen2 repos have
# validity dates very far in the past or future.
timespan = Timespan(
astropy.time.Time(row["validStart"], format="iso", scale="tai"),
astropy.time.Time(row["validEnd"], format="iso", scale="tai"),
)
# Make a Gen2 data ID from query results.
gen2id = {}
if "detector" in datasetType.dimensions.names:
gen2id[self.task.config.ccdKey] = row[self.task.config.ccdKey]
if "physical_filter" in datasetType.dimensions.names:
gen2id["filter"] = row["filter"]
# Translate that to Gen3.
gen3id, _ = translator(gen2id)
dataId = DataCoordinate.standardize(gen3id, graph=datasetType.dimensions)
ref = refsByDataId.get(dataId)
if ref is not None:
# Validity ranges must not overlap for the same dataID
# datasetType combination. Use that as a primary
# key and store the timespan and ref in a tuple
# as the value for later timespan validation.
timespansByDataId[(ref.dataId, ref.datasetType.name)].append((timespan, ref))
else:
# The Gen2 calib registry mentions this dataset, but it
# isn't included in what we've ingested. This might
# sometimes be a problem, but it should usually
# represent someone just trying to convert a subset of
# the Gen2 repo, so I don't think it's appropriate to
# warn or even log at info, since in that case there
# may be a _lot_ of these messages.
self.task.log.debug(
"Gen2 calibration registry entry has no dataset: %s for calibDate=%s, %s.",
datasetType.name, calibDate, dataId
with self.progress.bar(desc="Querying Gen2 calibRegistry", total=count) as progressBar:
for datasetType, datasetsByCalibDate in datasets.items():
if not datasetType.isCalibration():
continue
gen2keys = {}
if "detector" in datasetType.dimensions.names:
gen2keys[self.task.config.ccdKey] = int
if "physical_filter" in datasetType.dimensions.names:
gen2keys["filter"] = str
translator = self.instrument.makeDataIdTranslatorFactory().makeMatching(
datasetType.name,
gen2keys,
instrument=self.instrument.getName()
)
for calibDate, datasetsForCalibDate in datasetsByCalibDate.items():
assert calibDate is not None, ("datasetType.isCalibration() is set by "
"the presence of calibDate in the Gen2 template")
# Build a mapping that lets us find DatasetRefs by data ID,
# for this DatasetType and calibDate. We know there is
# only one ref for each data ID (given DatasetType and
# calibDate as well).
refsByDataId = {}
for dataset in datasetsForCalibDate:
refsByDataId.update((ref.dataId, ref) for ref in dataset.refs)
# Query the Gen2 calibration repo for the validity ranges
# for this DatasetType and calibDate, and look up the
# appropriate refs by data ID.
for row in self._queryGen2CalibRegistry(db, datasetType, calibDate):
# For validity times we use TAI as some gen2 repos have
# validity dates very far in the past or future.
timespan = Timespan(
astropy.time.Time(row["validStart"], format="iso", scale="tai"),
astropy.time.Time(row["validEnd"], format="iso", scale="tai"),
)
# Make a Gen2 data ID from query results.
gen2id = {}
if "detector" in datasetType.dimensions.names:
gen2id[self.task.config.ccdKey] = row[self.task.config.ccdKey]
if "physical_filter" in datasetType.dimensions.names:
gen2id["filter"] = row["filter"]
# Translate that to Gen3.
gen3id, _ = translator(gen2id)
dataId = DataCoordinate.standardize(gen3id, graph=datasetType.dimensions)
ref = refsByDataId.get(dataId)
if ref is not None:
# Validity ranges must not overlap for the same
# dataID datasetType combination. Use that as a
# primary key and store the timespan and ref in a
# tuple as the value for later timespan validation.
timespansByDataId[(ref.dataId, ref.datasetType.name)].append((timespan, ref))
else:
# The Gen2 calib registry mentions this dataset,
# but it isn't included in what we've ingested.
# This might sometimes be a problem, but it should
# usually represent someone just trying to convert
# a subset of the Gen2 repo, so I don't think it's
# appropriate to warn or even log at info, since in
# that case there may be a _lot_ of these messages.
self.task.log.debug(
"Gen2 calibration registry entry has no dataset: %s for calibDate=%s, %s.",
datasetType.name, calibDate, dataId
)
progressBar.update(len(datasetsForCalibDate))

# Analyze the timespans to check for overlap problems
# Gaps of a day should be closed since we assume differing
Expand All @@ -243,7 +246,7 @@ def _finish(self, datasets: Mapping[DatasetType, Mapping[Optional[str], List[Fil
# cache the messages for later.
info_messages = set()
warn_messages = set()
for timespans in timespansByDataId.values():
for timespans in self.progress.wrap(timespansByDataId.values(), desc="Fixing validity ranges"):
# Sort all the timespans and check overlaps
sorted_timespans = sorted(timespans, key=lambda x: x[0])
timespan_prev, ref_prev = sorted_timespans.pop(0)
Expand Down

0 comments on commit 08db658

Please sign in to comment.