Skip to content

Commit

Permalink
Utilized vectorized APIs in Gen3 ingest and gen2to3.
Browse files Browse the repository at this point in the history
This also switches to using daf.butler.FileDataset instead of tuples
in the RepoConverter APIs.
  • Loading branch information
TallJimbo authored and yalsayyad committed Oct 26, 2019
1 parent 0529541 commit 4805b01
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 31 deletions.
27 changes: 13 additions & 14 deletions python/lsst/obs/base/gen2to3/repoConverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Generic, TypeVar, List, Tuple, Optional, Iterator, Set, Any, Callable, Dict

from lsst.daf.butler import DatasetRef, Butler as Butler3, DataCoordinate
from lsst.daf.butler import DatasetRef, Butler as Butler3, DataCoordinate, FileDataset
from lsst.sphgeom import RangeSet, Region

from .filePathParser import FilePathParser
Expand Down Expand Up @@ -344,7 +344,7 @@ def makeDataIdExtractor(self, datasetTypeName: str, parser: FilePathParser,
"""
raise NotImplementedError()

def iterDatasets(self) -> Iterator[Tuple[str, DatasetRef]]:
def iterDatasets(self) -> Iterator[FileDataset]:
"""Iterate over all datasets in the repository that should be
ingested into the Gen3 repository.
Expand All @@ -353,8 +353,9 @@ class implementation at some point in their own logic.
Yields
------
fileNameInRoot : `str`
Name of the file to be ingested, relative to the repository root.
dataset : `FileDataset`
Structures representing datasets to be ingested. Paths should be
absolute.
ref : `lsst.daf.butler.DatasetRef`
Reference for the Gen3 datasets, including a complete `DatasetType`
and data ID.
Expand All @@ -381,7 +382,7 @@ def isRepoRoot(dirName):
ref = self._extractDatasetRef(fileNameInRoot)
if ref is not None:
if self.subset is None or self.subset.isRelated(ref.dataId):
yield fileNameInRoot, ref
yield FileDataset(path=os.path.join(self.root, fileNameInRoot), ref=ref)
else:
self._handleUnrecognizedFile(fileNameInRoot)

Expand Down Expand Up @@ -453,25 +454,23 @@ class implementation at some point in their own logic. More often,
`insertDimensionData`.
"""
self.task.log.info("Finding datasets in repo %s.", self.root)
datasets = defaultdict(list)
for fileNameInRoot, ref in self.iterDatasets():
datasets[ref.datasetType].append((fileNameInRoot, ref))
for datasetType, toIngest in datasets.items():
datasetsByType = defaultdict(list)
for dataset in self.iterDatasets():
datasetsByType[dataset.ref.datasetType].append(dataset)
for datasetType, datasetsForType in datasetsByType.items():
self.task.registry.registerDatasetType(datasetType)
self.task.log.info("Ingesting %s %s datasets.", len(toIngest), datasetType.name)
self.task.log.info("Ingesting %s %s datasets.", len(datasetsForType), datasetType.name)
try:
butler3, collections = self.getButler(datasetType.name)
except LookupError as err:
self.task.log.warn(str(err))
continue
try:
refs = [butler3.ingest(os.path.join(self.root, fileNameInRoot), ref,
transfer=self.task.config.transfer)
for fileNameInRoot, ref in toIngest]
butler3.ingest(*datasetsForType, transfer=self.task.config.transfer)
except LookupError as err:
raise LookupError(f"Error expanding data ID for dataset type {datasetType.name}.") from err
for collection in collections:
self.task.registry.associate(collection, refs)
self.task.registry.associate(collection, [dataset.ref for dataset in datasetsForType])

def getButler(self, datasetTypeName: str) -> Tuple[Butler3, List[str]]:
"""Create a new Gen3 Butler appropriate for a particular dataset type.
Expand Down
10 changes: 6 additions & 4 deletions python/lsst/obs/base/gen2to3/rootRepoConverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import itertools
from typing import TYPE_CHECKING, Iterator, Tuple, List

from lsst.daf.butler import DatasetType, DatasetRef
from lsst.daf.butler import DatasetType, DatasetRef, FileDataset
from .calibRepoConverter import CURATED_CALIBRATION_DATASET_TYPES
from .standardRepoConverter import StandardRepoConverter

Expand Down Expand Up @@ -120,7 +120,7 @@ def insertDimensionData(self):
records[dimension].extend(recordsForDimension)
self.task.raws.insertDimensionData(records)

def iterDatasets(self) -> Iterator[Tuple[str, DatasetRef]]:
def iterDatasets(self) -> Iterator[FileDataset]:
# Docstring inherited from RepoConverter.
# Iterate over reference catalog files.
for refCat, dimension in self._refCats:
Expand All @@ -133,11 +133,13 @@ def iterDatasets(self) -> Iterator[Tuple[str, DatasetRef]]:
if m is not None:
htmId = int(m.group(1))
dataId = self.task.registry.expandDataId({dimension: htmId})
yield os.path.join("ref_cats", refCat, fileName), DatasetRef(datasetType, dataId)
yield FileDataset(path=os.path.join(self.root, "ref_cats", refCat, fileName),
ref=DatasetRef(datasetType, dataId))
else:
for htmId in self.subset.skypix[dimension]:
dataId = self.task.registry.expandDataId({dimension: htmId})
yield os.path.join("ref_cats", refCat, f"{htmId}.fits"), DatasetRef(datasetType, dataId)
yield FileDataset(path=os.path.join(self.root, "ref_cats", refCat, f"{htmId}.fits"),
ref=DatasetRef(datasetType, dataId))
yield from super().iterDatasets()

def ingest(self):
Expand Down
7 changes: 4 additions & 3 deletions python/lsst/obs/base/gen2to3/standardRepoConverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@

__all__ = ["StandardRepoConverter"]

import os.path
from dataclasses import dataclass
from typing import TYPE_CHECKING, Iterator, Tuple

from lsst.log import Log
from lsst.log.utils import temporaryLogLevel
from lsst.daf.persistence import Butler as Butler2
from lsst.daf.butler import DatasetType, DatasetRef, DataCoordinate
from lsst.daf.butler import DatasetType, DatasetRef, DataCoordinate, FileDataset
from .repoConverter import RepoConverter
from .filePathParser import FilePathParser
from .dataIdExtractor import DataIdExtractor
Expand Down Expand Up @@ -160,11 +161,11 @@ def makeDataIdExtractor(self, datasetTypeName: str, parser: FilePathParser,
skyMapName=struct.name if struct is not None else None,
)

def iterDatasets(self) -> Iterator[Tuple[str, DatasetRef]]:
def iterDatasets(self) -> Iterator[FileDataset]:
# Docstring inherited from RepoConverter.
for struct in self._foundSkyMapsByCoaddName.values():
if self.task.isDatasetTypeIncluded(struct.ref.datasetType.name):
yield struct.filename, struct.ref
yield FileDataset(path=os.path.join(self.root, struct.filename), ref=struct.ref)
yield from super().iterDatasets()

# Class attributes that will be shadowed by public instance attributes;
Expand Down
16 changes: 6 additions & 10 deletions python/lsst/obs/base/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
DatasetRef,
DatasetType,
DimensionRecord,
FileDataset,
)
from lsst.daf.butler.instrument import makeExposureRecordFromObsInfo, makeVisitRecordFromObsInfo
from lsst.geom import Box2D
Expand Down Expand Up @@ -480,19 +481,14 @@ def ingestExposureDatasets(self, exposure: RawExposureData, butler: Optional[But
refs : `list` of `lsst.daf.butler.DatasetRef`
Dataset references for ingested raws.
"""
# TODO: once Butler has the ability to do bulk inserts of
# dataset rows (or at least avoid per-dataset savepoints),
# use that.
refs = []
if butler is None:
butler = self.butler
for file in exposure.files:
path = os.path.abspath(file.filename)
ref = butler.ingest(path, self.datasetType, file.dataId,
transfer=self.config.transfer,
datasets = [FileDataset(path=os.path.abspath(file.filename),
ref=DatasetRef(self.datasetType, file.dataId),
formatter=file.FormatterClass)
refs.append(ref)
return refs
for file in exposure.files]
butler.ingest(*datasets, transfer=self.config.transfer)
return [dataset.ref for dataset in datasets]

def run(self, files, pool: Optional[Pool] = None, processes: int = 1):
"""Ingest files into a Butler data repository.
Expand Down

0 comments on commit 4805b01

Please sign in to comment.