Skip to content

Commit

Permalink
Clean ups requested by reviewer
Browse files Browse the repository at this point in the history
  • Loading branch information
timj committed Jul 2, 2021
1 parent 1bdf59f commit 71fa33d
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 45 deletions.
13 changes: 9 additions & 4 deletions python/lsst/daf/butler/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,9 +554,10 @@ def collection_chain(**kwargs):
@click.option("--id-generation-mode",
default="UNIQUE",
help="Mode to use for generating dataset IDs. The default creates a unique ID. Other options"
" are: 'DATAID_TYPE' for creating a reproducible ID from the dataID;"
" 'DATAID_TYPE_RUN' for creating a reproducible ID from the dataID and run. The latter is"
" usually used for 'raw'-type data.",
" are: 'DATAID_TYPE' for creating a reproducible ID from the dataID and dataset type;"
" 'DATAID_TYPE_RUN' for creating a reproducible ID from the dataID, dataset type and run."
" The latter is usually used for 'raw'-type data that will be ingested in multiple."
" repositories.",
callback=to_upper,
type=click.Choice(("UNIQUE", "DATAID_TYPE", "DATAID_TYPE_RUN"), case_sensitive=False))
@click.option("--data-id",
Expand All @@ -574,7 +575,7 @@ def collection_chain(**kwargs):
def ingest_files(**kwargs):
"""Ingest files from table file.
DATASET_TYPE is the name of the dataset type to associated with these
DATASET_TYPE is the name of the dataset type to be associated with these
files. There can only be one dataset type per invocation of this
command.
Expand All @@ -591,5 +592,9 @@ def ingest_files(**kwargs):
type. Relative file URI by default is assumed to be relative to the
current working directory but can be overridden using the ``--prefix``
option.
This command does not create dimension records and so any records must
be created by other means. This command should not be used to ingest
raw camera exposures.
"""
script.ingest_files(**kwargs)
139 changes: 98 additions & 41 deletions python/lsst/daf/butler/script/ingest_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,23 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations

__all__ = ("ingest_files",)

import logging
from typing import Optional, Tuple, Dict, Any
from typing import Optional, Tuple, Dict, Any, List, TYPE_CHECKING
from collections import defaultdict

from astropy.table import Table

from lsst.utils import doImport

from .. import Butler, DatasetIdGenEnum
from ..core import FileDataset, DatasetRef, ButlerURI, DataCoordinate
from ..core import FileDataset, DatasetRef, ButlerURI

if TYPE_CHECKING:
from ..core import DimensionUniverse, DatasetType

log = logging.getLogger(__name__)

Expand All @@ -49,19 +53,27 @@ def ingest_files(repo: str, dataset_type: str, run: str, table_file: str,
URI string of the Butler repo to use.
dataset_type : `str`
The name of the dataset type for the files to be ingested.
formatter : `str`
Fully-qualified python class name for the `Formatter` to use
to read the ingested files.
run : `str`
The run in which the files should be ingested.
table_file : `str`
Path to a table file to read.
id_generation_mode : `str`
Path to a table file to read. This file can be in any format that
can be read by Astropy so long as Astropy can determine the format
itself.
data_id : `tuple` of `str`
Tuple of strings of the form ``keyword=value`` that can be used
to define dataId elements that are fixed for all ingested files
found in the table file. This allows those columns to be missing
from the table file. Dimensions given here override table columns.
formatter : `str`, optional
Fully-qualified python class name for the `Formatter` to use
to read the ingested files. If `None` the formatter is read from
datastore configuration based on the dataset type.
id_generation_mode : `str`, optional
Mode to use for generating IDs. Should map to `DatasetGenIdEnum`.
prefix : `str`
prefix : `str`, optional
Prefix to use when resolving relative paths in table files. The default
is to use the current working directory.
transfer : `str`
transfer : `str`, optional
Transfer mode to use for ingest.
"""

Expand All @@ -72,69 +84,114 @@ def ingest_files(repo: str, dataset_type: str, run: str, table_file: str,
else:
formatter = None

# Force null prefix to None for API compatibility.
# Force empty string prefix (from click) to None for API compatibility.
if not prefix:
prefix = None

# Convert the dataset ID gen more string to enum
# Convert the dataset ID gen mode string to enum.
id_gen_mode = DatasetIdGenEnum.__members__[id_generation_mode]

# Create the butler with the relevant run attached.
butler = Butler(repo, run=run)

datasetType = butler.registry.getDatasetType(dataset_type)

# Convert the k=v strings into a dataId dict.
universe = butler.registry.dimensions
common_data_id = parse_data_id_tuple(data_id, universe)

# Convert any additional k=v strings in the dataId tuple to dict
# form.
common_data_id: Dict[str, Any] = {}
for id_str in data_id:
dimension_str, value = id_str.split("=")
# Read the table assuming that Astropy can work out the format.
table = Table.read(table_file)

try:
dimension = universe.getStaticDimensions()[dimension_str]
except KeyError:
raise ValueError(f"DataID dimension '{dimension_str}' is not known to this universe.") from None
datasets = extract_datasets_from_table(table, common_data_id, datasetType, formatter, prefix)

# Cast the value to the right python type (since they will be
# strings at this point).
value = dimension.primaryKey.getPythonType()(value)
butler.ingest(*datasets, transfer=transfer, run=run, idGenerationMode=id_gen_mode)

common_data_id[dimension_str] = value

# Read the table and determine the dimensions (the first column
# is the file URI).
table = Table.read(table_file)
def extract_datasets_from_table(table: Table, common_data_id: Dict, datasetType: DatasetType,
formatter: Optional[str] = None,
prefix: Optional[str] = None,) -> List[FileDataset]:
"""Extract datasets from the supplied table.
dimensions = table.colnames
dimensions.pop(0)
Parameters
----------
table : `astropy.table.Table`
Table containing the datasets. The first column is assumed to be
the file URI and the remaining columns are dimensions.
common_data_id : `dict`
Data ID values that are common to every row in the table. These
take priority if a dimension in this dataId is also present as
a column in the table.
datasetType : `DatasetType`
The dataset type to be associated with the ingested data.
formatter : `str`, optional
Fully-qualified python class name for the `Formatter` to use
to read the ingested files. If `None` the formatter is read from
datastore configuration based on the dataset type.
prefix : `str`
Prefix to be used for relative paths. Can be `None` for current
working directory.
Returns
-------
datasets : `list` of `FileDataset`
The `FileDataset` object corresponding to the rows in the table.
The number of elements in this list can be smaller than the number
of rows in the file because one file can appear in multiple rows
with different dataIds.
"""
# The file is the first column and everything else is assumed to
# be dimensions so we need to know the name of that column.
file_column = table.colnames[0]

# Handle multiple dataIds per file by grouping by file.
refs_by_file = defaultdict(list)
n_datasets = 0
n_dataset_refs = 0
for row in table:
dataId: Dict[str, Any] = {k: row[k] for k in dimensions}

# Convert the row to a dataId, remembering to extract the
# path column.
dataId = dict(row)
path = dataId.pop(file_column)

# The command line can override a column.
dataId.update(common_data_id)

standardized = DataCoordinate.standardize(dataId, graph=datasetType.dimensions)

ref = DatasetRef(datasetType, standardized, conform=False)
# Create the dataset ref that is to be ingested.
ref = DatasetRef(datasetType, dataId)

# Convert path to absolute (because otherwise system will
# assume relative to datastore root and that is almost certainly
# never the right default here).
path = ButlerURI(row[0], root=prefix, forceAbsolute=True)
path_uri = ButlerURI(path, root=prefix, forceAbsolute=True)

refs_by_file[path].append(ref)
n_datasets += 1
refs_by_file[path_uri].append(ref)
n_dataset_refs += 1

datasets = [FileDataset(path=file,
datasets = [FileDataset(path=file_uri,
refs=refs,
formatter=formatter,) for file, refs in refs_by_file.items()]
formatter=formatter,) for file_uri, refs in refs_by_file.items()]

log.info("Ingesting %d dataset(s) from %d file(s)", n_datasets, len(datasets))
log.info("Ingesting %d dataset ref(s) from %d file(s)", n_dataset_refs, len(datasets))

butler.ingest(*datasets, transfer=transfer, run=run, idGenerationMode=id_gen_mode)
return datasets


def parse_data_id_tuple(data_ids: Tuple[str, ...], universe: DimensionUniverse) -> Dict[str, Any]:
# Convert any additional k=v strings in the dataId tuple to dict
# form.
data_id: Dict[str, Any] = {}
for id_str in data_ids:
dimension_str, value = id_str.split("=")

try:
dimension = universe.getStaticDimensions()[dimension_str]
except KeyError:
raise ValueError(f"DataID dimension '{dimension_str}' is not known to this universe.") from None

# Cast the value to the right python type (since they will be
# strings at this point).
value = dimension.primaryKey.getPythonType()(value)

data_id[dimension_str] = value
return data_id

0 comments on commit 71fa33d

Please sign in to comment.