Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-24638: Convert TransformSourceTableTask and friends to Gen3 #426

Merged
merged 3 commits into from
Feb 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion pipelines/_Coaddition.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
description: Coaddition
tasks:
makeWarp: lsst.pipe.tasks.makeCoaddTempExp.MakeWarpTask
makeWarp:
class: lsst.pipe.tasks.makeCoaddTempExp.MakeWarpTask
config:
makePsfMatched: true
doWriteEmptyWarps: true
assembleCoadd: lsst.pipe.tasks.assembleCoadd.CompareWarpAssembleCoaddTask
subsets:
coaddition:
Expand Down
1 change: 0 additions & 1 deletion pipelines/_Forced.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,3 @@ subsets:
- forcedPhotCcd
- forcedPhotCoadd
description: A set of tasks to run when doing forced measurements
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this pass github checks? I though yaml lint required a new line at the end (but maybe I have that backwards)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lint DOES require a newline at the end. You had 2 newlines before: https://github.com/lsst/pipe_tasks/blob/master/pipelines/_Forced.yaml


18 changes: 18 additions & 0 deletions pipelines/_SingleFrame.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,28 @@ tasks:
isr: lsst.ip.isr.IsrTask
characterizeImage: lsst.pipe.tasks.characterizeImage.CharacterizeImageTask
calibrate: lsst.pipe.tasks.calibrate.CalibrateTask
writeSourceTable: lsst.pipe.tasks.postprocess.WriteSourceTableTask
transformSourceTable: lsst.pipe.tasks.postprocess.TransformSourceTableTask
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to define another subset (or put them in Eli's consolidate group). The current subset if someone says run processCcd will only run the top 3 tasks and not your two new ones (which might be what we want) but if you want a grouping that encompasses your new tasks, maybe something like:

subsets:
  singleFrame:
    subset:
      - isr
      - characterizeImage
      - calibrate
      - writeSourceTable
      - transformSourceTable
    description: Single frame processing that includes table transformations
  processCcd:
      ....

These subsets let people quickly run a select group of tasks from the command line.

Copy link
Contributor Author

@yalsayyad yalsayyad Feb 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to mirror what we had before. Can subsets refer to each other? e.g.

subsets:
  processCcd:
      - isr
      - characterizeImage
      - calibrate
  singleFrame:
    subset:
      - processCcd
      - writeSourceTable
      - transformSourceTable
    description: Single frame processing that includes table transformations

Edit: They can't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't get the DECam repo going in time to add for all cameras, so @natelust take a look at https://github.com/lsst/obs_subaru/pull/341/files

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So currently this is the only place that labeled subsets cant be used as a substitute for labels. This is because at the time we didnt think it was worth adding code and complexity to track and handle cyclical definitions, to save a few lines in a (mostly) static file. If you think this is a good feature to have, we can add it.

consolidateSourceTable: lsst.pipe.tasks.postprocess.ConsolidateSourceTableTask
subsets:
processCcd:
subset:
- isr
- characterizeImage
- calibrate
description: A set of tasks to run when doing single frame processing
sourceTable:
subset:
- writeSourceTable
- transformSourceTable
- consolidateSourceTable
description: Set of tasks to generate parquet Source Tables from output of processCcd subset.
singleFrame:
subset:
- isr
- characterizeImage
- calibrate
- writeSourceTable
- transformSourceTable
- consolidateSourceTable
description: Set of tasks for complete single frame processing. Analogous to SingleFrameDriver.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we should mention SingleFrameDriver in the docs, it does not help anyone unfamiliar with pipe_drivers, and its somewhat ties this package to that, potentially after the later is retired. It's up to your judgment though. If you feel it should be there then feel free to leave it.

32 changes: 22 additions & 10 deletions python/lsst/pipe/tasks/functors.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,17 +431,22 @@ def multilevelColumns(self, data, **kwargs):
)

def __call__(self, data, **kwargs):
"""Apply the functor to the data table

Parameters
----------
data : `lsst.daf.butler.DeferredDatasetHandle`,
`lsst.pipe.tasks.parquetTable.MultilevelParquetTable`,
`lsst.pipe.tasks.parquetTable.ParquetTable`,
or `pandas.DataFrame`.
The table or a pointer to a table on disk from which columns can
be accessed
"""
columnIndex = self._get_columnIndex(data)

# First, determine whether data has a multilevel index (either gen2 or gen3)
is_multiLevel = isinstance(data, MultilevelParquetTable) or isinstance(columnIndex, pd.MultiIndex)

# Simple single-level column index, gen2
if isinstance(data, ParquetTable) and not is_multiLevel:
columns = self.columns
df = data.toDataFrame(columns=columns)
valDict = {k: f._func(df) for k, f in self.funcDict.items()}

# Multilevel index, gen2 or gen3
if is_multiLevel:
columns = self.multilevelColumns(data, columnIndex=columnIndex)
Expand All @@ -463,10 +468,17 @@ def __call__(self, data, **kwargs):
except Exception:
valDict[k] = f.fail(subdf)

# non-multilevel, gen3 (TODO: this should work, but this case is not tested in test_functors.py)
elif isinstance(data, DeferredDatasetHandle):
columns = self.columns
df = data.get(parameters={"columns": columns})
else:
if isinstance(data, DeferredDatasetHandle):
# input if Gen3 deferLoad=True
df = data.get(columns=self.columns)
elif isinstance(data, pd.DataFrame):
# input if Gen3 deferLoad=False
df = data
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the parq instance be here? I think it's worth making this explicit and raising on else.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meanwhile, does the __call__ need a docstring that explicitly says what parq can be? There's a lot of datatypes, but not all possible datatypes that this supports.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parq instance is for the Gen3 case if deferLoad=False. The inputs datatypes are pd.DataFrames after loading. and the parq.toDataFrame is the original (Gen2). I'll add inline comments.

# Original Gen2 input is type ParquetTable and the fallback
df = data.toDataFrame(columns=self.columns)

valDict = {k: f._func(df) for k, f in self.funcDict.items()}

try:
Expand Down
141 changes: 121 additions & 20 deletions python/lsst/pipe/tasks/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,25 @@ def writeMetadata(self, dataRefList):
pass


class WriteSourceTableConfig(pexConfig.Config):
class WriteSourceTableConnections(pipeBase.PipelineTaskConnections,
dimensions=("instrument", "visit", "detector")):

catalog = connectionTypes.Input(
doc="Input full-depth catalog of sources produced by CalibrateTask",
name="src",
storageClass="SourceCatalog",
dimensions=("instrument", "visit", "detector")
)
outputCatalog = connectionTypes.Output(
doc="Catalog of sources, `src` in Parquet format",
name="source",
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector")
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for DM-27164 (and this will need to be rebased of course), I defined from lsst.pipe.base import connectionTypes so you can use just connectionTypes here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not obvious to me that connectionTypes is more readable than pipeBase.connectionTypes when there are other pipeBase objects abounding, but sure, I'll change them all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to makework! But we don't have any standard convention on this, and that's unpleasant.


class WriteSourceTableConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=WriteSourceTableConnections):
doApplyExternalPhotoCalib = pexConfig.Field(
dtype=bool,
default=False,
Expand All @@ -211,7 +229,7 @@ class WriteSourceTableConfig(pexConfig.Config):
)


class WriteSourceTableTask(CmdLineTask):
class WriteSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
"""Write source table to parquet
"""
_DefaultName = "writeSourceTable"
Expand All @@ -226,6 +244,13 @@ def runDataRef(self, dataRef):
result = self.run(src, ccdVisitId=ccdVisitId)
dataRef.put(result.table, 'source')

def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
inputs['ccdVisitId'] = butlerQC.quantum.dataId.pack("visit_detector")
result = self.run(**inputs).table
outputs = pipeBase.Struct(outputCatalog=result.toDataFrame())
butlerQC.put(outputs, outputRefs)

def run(self, catalog, ccdVisitId=None):
"""Convert `src` catalog to parquet

Expand All @@ -242,7 +267,7 @@ def run(self, catalog, ccdVisitId=None):
``table``
`ParquetTable` version of the input catalog
"""
self.log.info("Generating parquet table from src catalog")
self.log.info("Generating parquet table from src catalog %s", ccdVisitId)
df = catalog.asAstropy().to_pandas().set_index('id', drop=True)
df['ccdVisitId'] = ccdVisitId
return pipeBase.Struct(table=ParquetTable(dataFrame=df))
Expand Down Expand Up @@ -428,7 +453,24 @@ def compute(self, dropna=False, pool=None):
return self._df


class TransformCatalogBaseConfig(pexConfig.Config):
class TransformCatalogBaseConnections(pipeBase.PipelineTaskConnections,
dimensions=()):
"""Expected Connections for subclasses of TransformCatalogBaseTask.

Must be subclassed.
"""
inputCatalog = connectionTypes.Input(
name="",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here for connectionTypes

storageClass="DataFrame",
)
outputCatalog = connectionTypes.Output(
name="",
storageClass="DataFrame",
)


class TransformCatalogBaseConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=TransformCatalogBaseConnections):
functorFile = pexConfig.Field(
dtype=str,
doc='Path to YAML file specifying functors to be computed',
Expand All @@ -437,7 +479,7 @@ class TransformCatalogBaseConfig(pexConfig.Config):
)


class TransformCatalogBaseTask(CmdLineTask):
class TransformCatalogBaseTask(CmdLineTask, pipeBase.PipelineTask):
"""Base class for transforming/standardizing a catalog

by applying functors that convert units and apply calibrations.
Expand Down Expand Up @@ -520,10 +562,32 @@ def inputDataset(self):
def ConfigClass(self):
raise NotImplementedError('Subclass must define "ConfigClass" attribute')

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self.config.functorFile:
self.log.info('Loading tranform functor definitions from %s',
self.config.functorFile)
self.funcs = CompositeFunctor.from_file(self.config.functorFile)
self.funcs.update(dict(PostprocessAnalysis._defaultFuncs))
else:
self.funcs = None

def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
if self.funcs is None:
raise ValueError("config.functorFile is None. "
"Must be a valid path to yaml in order to run Task as a PipelineTask.")
result = self.run(parq=inputs['inputCatalog'], funcs=self.funcs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If self.config.functorFile is False, does that mean self.funcs is never defined? If so I think this line will raise an exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.config.functorFile is optional in the sense that you should be able to instantiate the task in the notebook and run any functors you want (not necessarily from a yaml); and there's no default set of functors that make sense for the base class.

self.config.functorFile is not optional when running the subclasses as a commandlineTask or pipelineTask. I'll add an appropriate validation error message somewhere.

dataId=outputRefs.outputCatalog.dataId.full)
outputs = pipeBase.Struct(outputCatalog=result)
butlerQC.put(outputs, outputRefs)

def runDataRef(self, dataRef):
parq = dataRef.get()
funcs = self.getFunctors()
df = self.run(parq, funcs=funcs, dataId=dataRef.dataId)
if self.funcs is None:
raise ValueError("config.functorFile is None. "
"Must be a valid path to yaml in order to run as a CommandlineTask.")
df = self.run(parq, funcs=self.funcs, dataId=dataRef.dataId)
self.write(df, dataRef)
return df

Expand Down Expand Up @@ -556,14 +620,11 @@ def run(self, parq, funcs=None, dataId=None, band=None):
return df

def getFunctors(self):
funcs = CompositeFunctor.from_file(self.config.functorFile)
funcs.update(dict(PostprocessAnalysis._defaultFuncs))
return funcs
return self.funcs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same


def getAnalysis(self, parq, funcs=None, band=None):
# Avoids disk access if funcs is passed
if funcs is None:
funcs = self.getFunctors()
funcs = self.funcs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

analysis = PostprocessAnalysis(parq, funcs, filt=band)
return analysis

Expand Down Expand Up @@ -765,7 +826,27 @@ def writeMetadata(self, dataRef):
pass


class TransformSourceTableConfig(TransformCatalogBaseConfig):
class TransformSourceTableConnections(pipeBase.PipelineTaskConnections,
dimensions=("instrument", "visit", "detector")):

inputCatalog = connectionTypes.Input(
doc="Wide input catalog of sources produced by WriteSourceTableTask",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here about connectionTypes.

name="source",
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector"),
deferLoad=True
)
outputCatalog = connectionTypes.Output(
doc="Narrower, per-detector Source Table transformed and converted per a "
"specified set of functors",
name="sourceTable",
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector")
)


class TransformSourceTableConfig(TransformCatalogBaseConfig,
pipelineConnections=TransformSourceTableConnections):
pass


Expand All @@ -778,11 +859,6 @@ class TransformSourceTableTask(TransformCatalogBaseTask):
inputDataset = 'source'
outputDataset = 'sourceTable'

def writeMetadata(self, dataRef):
"""No metadata to write.
"""
pass

@classmethod
def _makeArgumentParser(cls):
parser = ArgumentParser(name=cls._DefaultName)
Expand Down Expand Up @@ -1017,11 +1093,29 @@ def makeDataRefList(self, namespace):
self.refList = outputRefList


class ConsolidateSourceTableConfig(pexConfig.Config):
class ConsolidateSourceTableConnections(pipeBase.PipelineTaskConnections,
dimensions=("instrument", "visit")):
inputCatalogs = connectionTypes.Input(
doc="Input per-detector Source Tables",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here.

name="sourceTable",
storageClass="DataFrame",
dimensions=("instrument", "visit", "detector"),
multiple=True
)
outputCatalog = connectionTypes.Output(
doc="Per-visit concatenation of Source Table",
name="sourceTable_visit",
storageClass="DataFrame",
dimensions=("instrument", "visit")
)


class ConsolidateSourceTableConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=ConsolidateSourceTableConnections):
pass


class ConsolidateSourceTableTask(CmdLineTask):
class ConsolidateSourceTableTask(CmdLineTask, pipeBase.PipelineTask):
"""Concatenate `sourceTable` list into a per-visit `sourceTable_visit`
"""
_DefaultName = 'consolidateSourceTable'
Expand All @@ -1030,6 +1124,13 @@ class ConsolidateSourceTableTask(CmdLineTask):
inputDataset = 'sourceTable'
outputDataset = 'sourceTable_visit'

def runQuantum(self, butlerQC, inputRefs, outputRefs):
inputs = butlerQC.get(inputRefs)
self.log.info("Concatenating %s per-detector Source Tables",
len(inputs['inputCatalogs']))
df = pd.concat(inputs['inputCatalogs'])
butlerQC.put(pipeBase.Struct(outputCatalog=df), outputRefs)

def runDataRef(self, dataRefList):
self.log.info("Concatenating %s per-detector Source Tables", len(dataRefList))
df = pd.concat([dataRef.get().toDataFrame() for dataRef in dataRefList])
Expand Down