# All the below expect testdata_ci_hsc and ci_hsc_gen3 to be setup

### Start with a task
PipelineTasks are Tasks that have extra information and methods to allow an executor to supply the data that it will need at the appropriate time. With this in mind, a good place to start and example is with a simple task like the one below. It's job is to do aperture naive aperture photometry on already identified sources.

In [4]:
import lsst.pipe.base as pipeBase
import lsst.pex.config as pexConfig
import lsst.afw.table as afwTable
import lsst.afw.image as afwImage

class ApertureTaskConfig(pexConfig.Config):
    apRad = pexConfig.Field(doc="Radius of aperture", dtype=int, default=4) 

class ApertureTask(pipeBase.Task):

    ConfigClass = ApertureTaskConfig
    _DefaultName = "apertureTask"
    
    def __init__(self, config: pexConfig.Config, *args, **kwargs):
        super().__init__(*args, config=config, **kwargs)
        self.apRad = self.config.apRad
    
        self.outputSchema = afwTable.SourceTable.makeMinimalSchema()
        self.apKey = self.outputSchema.addField("apFlux", type=np.float64, doc="Ap flux measured")
    
        self.outputCatalog = afwTable.SourceCatalog(self.outputSchema)

    def run(self, exposure: afwImage.Exposure, inputCatalog: afwTable.SourceCatalog) -> pipeBase.Struct:
        # Get the dimensions of the exposure
        dimensions = exposure.getDimensions()

        # Get indexes for each pixel
        indy, indx = np.indices((dimensions.getY(), dimensions.getX()))

        # Loop over each record in the catalog
        for source in inputCatalog:
            # Create an aperture and measure the flux
            center = source.getCentroid()
            mask = ((indy - center.getY())**2 + (indx - center.getX())**2)**0.5 < self.apRad
            flux = np.sum(exposure.image.array*mask)

            # Add a record to the output catalog
            tmpRecord = self.outputCatalog.addNew()
            tmpRecord.set(self.apKey, flux)
        
        return self.outputCatalog

## Convertng to a Pipeline task

The first thing a PipelineTask needs is a way for an executor to identify what dataset types the task will consume and produce. Additionally an executor needs to know what unit of work, or Quantum, the task will operate on. This information is defined inside a PipelineTaskConnections class, commonly refered to as a connections class. The following is an example of a connections class for the example ApertureTask that was defined above.

In [5]:
class ApertureTaskConnections(pipeBase.PipelineTaskConnections,
                              dimensions=("visit", "detector", "abstract_filter", "skymap")):
    exposure = pipeBase.connectionTypes.Input(doc="Input exposure to make measurements on",
                                              dimensions=("instrument", "visit", "detector"),
                                              storageClass="ExposureF",
                                              name="calexp")
    inputCatalog = pipeBase.connectionTypes.Input(doc="Input catalog with existing measurements",
                                                  dimensions=("instrument", "visit", "detector",),
                                                  storageClass="SourceCatalog",
                                                  name="src") 
    outputCatalog = pipeBase.connectionTypes.Output(doc="Aperture measurements",
                                                    dimensions=("visit", "detector", "abstract_filter",
                                                                "skymap"),
                                                    storageClass="SourceCatalog",
                                                    name="customAperture")

A connections class is associated with a PipelineTask's Config class as seen below. This association serves not only to link the connections to a PipelineTask, but also provides a way to configure a connections dataset type name.

In [6]:
class ApertureTaskConfig(pipeBase.PipelineTaskConfig,
                         pipelineConnections=ApertureTaskConnections):
    apRad = pexConfig.Field(doc="Radius of aperture", dtype=float, default=4)

Finally the ApertureTask itself is converted, which may be as simple as changing the base class for the Task. In this example the runQuantum method from PipelineTask is also overloaded to demonstrate how it works, and how it can be used to alter arguments before run is called.

In [19]:
# Finally we modify the task to make it a command line task

class ApertureTask(pipeBase.PipelineTask):

    ConfigClass = ApertureTaskConfig
    _DefaultName = "apertureTask"

    def __init__(self, config: pipeBase.PipelineTaskConfig, *args, **kwargs):
        super().__init__(*args, config=config, **kwargs)
        self.apRad = self.config.apRad

        self.outputSchema = afwTable.SourceTable.makeMinimalSchema()
        self.apKey = self.outputSchema.addField("apFlux", type=np.float64, doc="Ap flux measured")

        self.outputCatalog = afwTable.SourceCatalog(self.outputSchema)

    def run(self, exposure: afwImage.Exposure, inputCatalog: afwTable.SourceCatalog, msg: str) -> pipeBase.Struct:
        self.log.info(f"Running aperture phot {msg}")
        # Get the dimensions of the exposure
        dimensions = exposure.getDimensions()

        # Get indexes for each pixel
        indy, indx = np.indices((dimensions.getY(), dimensions.getX()))

        # Loop over each record in the catalog
        for i, source in enumerate(inputCatalog):
            if i%100 == 0:
                self.log.info(f"Processing record {i}")
            # Create an aperture and measure the flux
            center = source.getCentroid()
            mask = ((indy - center.getY())**2 + (indx - center.getX())**2)**0.5 < self.apRad
            flux = np.sum(exposure.image.array*mask)

            # Add a record to the output catalog
            tmpRecord = self.outputCatalog.addNew()
            tmpRecord.set(self.apKey, flux)

        return pipeBase.Struct(outputCatalog=self.outputCatalog)
    
    def runQuantum(self, butlerQC: pipeBase.ButlerQuantumContext,
                   inputRefs: pipeBase.InputQuantizedConnection,
                   outputRefs: pipeBase.OutputQuantizedConnection):
        inputs = butlerQC.get(inputRefs)
        # Add an extra argument that is to be passed to run, this is not
        # part of the base class runQuantum
        inputs['msg'] = str(butlerQC.quantum.dataId)
        outputs = self.run(**inputs)
        butlerQC.put(outputs, outputRefs)

Note that the names of the attributes of the connection class which are inputs match the arguments to the run method. The attribute name that is an output type matches the attribute name in the `Struct` that is returned by the run method

In [21]:
!PYTHONPATH=$PYTHONPATH:$(pwd) pipetask run -j 1 -b "$CI_HSC_GEN3_DIR"/DATA/butler.yaml -i shared/ci_hsc_output -o aptest6 --register-dataset-types -t WritingPipelineTasks.ApertureTask -d "detector = 22 AND visit = 903334 AND abstract_filter = 'r'"

ctrl.mpexec.cmdLineFwk INFO: QuantumGraph contains 1 quanta for 1 tasks
apertureTask INFO: Running aperture phot {'instrument': 'HSC', 'skymap': 'discrete/ci_hsc', 'detector': 22, 'visit': 903334}
apertureTask INFO: Processing record 0
apertureTask INFO: Processing record 100
apertureTask INFO: Processing record 200
apertureTask INFO: Processing record 300
apertureTask INFO: Processing record 400
apertureTask INFO: Processing record 500
apertureTask INFO: Processing record 600
apertureTask INFO: Processing record 700
apertureTask INFO: Processing record 800


### And so much more
There is a lot more this demo does not cover, but an indepth demo that exapnds on what was laied out here will soon be merged into the documentation of pipe_base. This includes things like

* Arguments to init
* Optional datasets
* Shared templates for dataset type names
* Prerequisite inputs
* Controlling the parallelization axis
* Deferred loading of datasets
* Altering connections through configuration
* Validating quanta prior to execution

## Defining a pipeline document
Now that the `ApertureTask` is a `PipelineTask` it can be put into a pipeline. As it is a measurement on a single detector, it makes sense to extend the single detector processing with this task. Below is a pipeline that does this.

In [17]:
!cat DemoPipeline.yaml

description: "A demo for DM BootCamp 2019"

# This pipeline should run against HSC data, this loads relevant overrides
instrument: lsst.obs.subaru.gen3.hsc.instrument.HyperSuprimeCam

# Extend the default ProcessCcd, but exclude isr and characterizeImage tasks
# for the sake of time
inherits:
  location: $PIPE_TASKS_DIR/pipelines/ProcessCcd.yaml
  exclude:
    - isr
    - charImage

# Add in the Task that was just created
tasks:
  example:
    class: WritingPipelineTasks.ApertureTask
    config:
      # Set the radius for the aperture
      apRad: 4.5
      # change the name of the output dataset type
      connections.outputCatalog: extraAperture

contracts:
  # verify that the aperture used in this pipeine is also used in the default
  # circular aperture
  - example.apRad in calibrate.measurement.plugins["base_CircularApertureFlux"].radii


Using python it is possible to see what the expanded pipeline will look like before it is executed

In [22]:
print(pipeBase.Pipeline.fromFile("DemoPipeline.yaml"))

description: A demo for DM BootCamp 2019
instrument: lsst.obs.subaru.gen3.hsc.instrument.HyperSuprimeCam
tasks:
  calibrate:
    class: lsst.pipe.tasks.calibrate.CalibrateTask
  example:
    class: WritingPipelineTasks.ApertureTask
    config:
    - apRad: 4.5
      connections.outputCatalog: extraAperture
contracts:
- contract: example.apRad in calibrate.measurement.plugins["base_CircularApertureFlux"].radii



### Run the pipeline
Pipelines are run very similar to regular tasks, specifying a pipeline instead of, or in addition to, tasks defined on the command line

In [2]:
!PYTHONPATH=$PYTHONPATH:$(pwd) pipetask run -j 1 -b "$CI_HSC_GEN3_DIR"/DATA/butler.yaml -i shared/ci_hsc_output -o aptest-7 --register-dataset-types -p DemoPipeline.yaml -d "detector = 22 AND visit = 903334 AND abstract_filter = 'r'" 

ctrl.mpexec.cmdLineFwk INFO: QuantumGraph contains 2 quanta for 2 tasks
calibrate.detection INFO: Detected 3974 positive peaks in 757 footprints and 8085 negative peaks in 169 footprints to 5 sigma
calibrate.detection INFO: Resubtracting the background after object detection
calibrate.deblend INFO: Deblending 757 sources
calibrate.deblend WARN: Parent 775958091962253313: skipping masked footprint (area: 293)
calibrate.deblend WARN: Parent 775958091962253320: skipping masked footprint (area: 619)
calibrate.deblend WARN: Parent 775958091962253325: skipping masked footprint (area: 2489)
calibrate.deblend WARN: Parent 775958091962253329: skipping masked footprint (area: 668)
calibrate.deblend WARN: Parent 775958091962253338: skipping masked footprint (area: 503)
calibrate.deblend WARN: Parent 775958091962253340: skipping masked footprint (area: 299)
calibrate.deblend WARN: Parent 775958091962253341: skipping masked footprint (area: 331)
calibrate.deblend WARN: Parent 775958091962253343: sk

Read back in the dataset to verify the execution was successful

In [14]:
import lsst.daf.butler as dafButler
from lsst.utils import getPackageDir
import os

# Define path to butler
butler_loc = os.path.join(getPackageDir("ci_hsc_gen3"), "DATA/butler.yaml")

# Create a butler
butler = dafButler.Butler(butler_loc, collection="aptest-7")

# Fetch the dataset
fluxes = butler.get("extraAperture", {"detector": 22, "visit":903334, "instrument":"HSC", "skymap":"discrete/ci_hsc"})

# Print a flux
print(fluxes[100]["apFlux"])

414.15576171875
