In [1]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

In [2]:
import lsst.sims.maf.metrics as metrics
import lsst.sims.maf.stackers as stackers
import lsst.sims.maf.slicers as slicers
import lsst.sims.maf.plots as plots
import lsst.sims.maf.metricBundles as mb

In [63]:
try:
    del metrics.BaseMetric.registry['__main__.LightcurveInversionMetric']
except:
    pass

def _setVis(ssoObs, snrLimit, snrCol, visCol):
    if snrLimit is not None:
        vis = np.where(ssoObs[snrCol] >= snrLimit)[0]
    else:
        vis = np.where(ssoObs[visCol] > 0)[0]
    return vis


class LightcurveInversionMetric(metrics.BaseMoMetric):
    """
    Determine if the cumulative sum of observations of a target are enough to enable lightcurve
    inversion for shape modeling. For this to be true, multiple conditions need to be
    satisfied:

    1) The SNR-weighted number of observations (each observation is weighted by its SNR, up to a max of 100)
    must be larger than the threshhold weightDet (default 50)
    2) Ecliptic longitudinal coverage needs to be at least 90 degrees, and the absolute deviation
    needs to be at least 1/8th the longitudinal coverage.
    3) The phase angle coverage needs to span at least 5 degrees.

    For evaluation of condition 2, the median ecliptic longitude is subtracted from all longitudes,
    and the modulo 360 of those values is taken. This ensures that the wrap around 360 is handled
    correctly.

    For more information on the above conditions, please see
    https://docs.google.com/document/d/1GAriM7trpTS08uanjUF7PyKALB2JBTjVT7Y6R30i0-8/edit?usp=sharing

    Parameters
    ----------
    weightDet: float, opt
        The SNR-weighted number of detections required (per bandpass in any ONE of the filters in filterlist).
        Default 50.
    snrLimit: float or None, opt
        If snrLimit is set as a float, then requires object to be above snrLimit SNR in the image.
        If snrLimit is None, this uses the probabilistic 'visibility' calculated by the vis stacker,
        which means SNR ~ 5.   Default is None.
    snrMax: float, opt
        Maximum value toward the SNR-weighting to consider. Default 100.
    filterlist: list of str, opt
        The filters which the lightcurve inversion could be based on. Requirements must be met in one of
        these filters.

    Returns
    -------
    int
        0 (could not perform lightcurve inversion) or 1 (could)
    """

    def __init__(self, weightDet=50, snrLimit=None, snrMax=100,
                 filterlist=('u', 'g', 'r', 'i', 'z', 'y'), **kwargs):
        super().__init__(**kwargs)
        self.snrLimit = snrLimit
        self.snrMax = snrMax
        self.weightDet = weightDet
        self.filterlist = filterlist
        self.badval = 0

    def run(self, ssoObs, orb, Hval):
        # Calculate the clipped SNR - ranges from snrLimit / SNR+vis to snrMax.
        clipSnr = np.minimum(ssoObs[self.snrCol], self.snrMax)
        if self.snrLimit is not None:
            clipSnr = np.where(ssoObs[self.snrCol] <= self.snrLimit, 0, clipSnr)
        else:
            clipSnr = np.where(ssoObs[self.visCol] == 0, 0, clipSnr)
        if len(np.where(clipSnr > 0)[0]) == 0:
            return self.badval
        # Check each filter in filterlist: 
        # stop as soon as find a filter that matches requirements.
        inversion_possible = 0
        for f in self.filterlist:
            # Is the SNR-weight sum of observations in this filter high enough?
            match = np.where(ssoObs[self.filterCol] == f)
            snrSum = np.sum(clipSnr[match]) / self.snrMax
            if snrSum < self.weightDet:
                # Do not have enough SNR-weighted observations, so skip on to the next filter.
                continue
            # Is the ecliptic longitude coverage for the visible observations sufficient?
            # Is the phase coverage sufficient?
            vis = np.where(clipSnr[match] > 0)
            ecL = ssoObs['ecLon'][match][vis]
            phaseAngle = ssoObs['phase'][match][vis]
            # Calculate the absolute deviation and range of ecliptic longitude.
            ecL_centred = (ecL - np.median(ecL)) % 360.0
            aDev = np.sum(np.abs(ecL_centred - np.mean(ecL_centred))) / len(ecL_centred)
            dL = np.max(ecL) - np.min(ecL)
            # Calculate the range of the phase angle
            dp = np.max(phaseAngle) - np.min(phaseAngle)
            # Metric requirement is that dL >= 90 deg, absolute deviation is greater than dL/8
            # and then that the phase coverage is more than 5 degrees.
            # Stop as soon as find a case where this is true.
            if dL >= 90.0 and aDev >= dL / 8 and dp >= 5:
                inversion_possible += 1
                break
        return inversion_possible

In [15]:
# Read in some test observations and test orbits
orbitfile = 'mbas_test.des'
obsfile = 'baseline2018a__mbas_test_obs.txt'
Hrange = np.arange(15, 20.5, 5)
s = slicers.MoObjSlicer(Hrange=Hrange)
s.setupSlicer(orbitFile=orbitfile, obsFile=obsfile)

In [76]:
# We can test the metric on a single observation/H value by using it directly.
# But we do still need to run the MoMag stacker to add the apparent magnitude etc to the provided observations.
# We also need to run the ecliptic lat/lon stacker.
magstacker = stackers.MoMagStacker()
eclstacker = stackers.EclStacker()
Hval = 16
i = 0
testobs = magstacker.run(s[i]['obs'], Hval=Hval, Href=s[i]['orbit']['H'])
testobs = eclstacker.run(testobs, Hval=None, Href=None)
# See .. now we have SNR and vis and mag.
testobs['SNR'][0:10], testobs['SNR'].max(), testobs['SNR'].min()

(array([17.29546906, 12.6477188 , 13.33466695, 13.66672833, 13.95747481,
        16.57374877,  5.48605043,  7.00638719, 60.91702622, 57.17374023]),
 227.3946049271624,
 1.393902861084676)

In [77]:
# So now we can test the new metric on a single observation/H value..
m = LightcurveInversionMetric(snrLimit=None)
m.run(testobs, s[i]['orbit'], Hval)

1

In [78]:
# And we can test it on more H values / orbits using the metric bundle & group
bundle = mb.MoMetricBundle(m, s, stackerList=[magstacker, eclstacker])

In [80]:
g = mb.MoMetricBundleGroup({'test': bundle})
g.runAll()

Running metrics ['test']
Calculated and saved all metrics.


In [81]:
bundle.metricValues

masked_array(
  data=[[1.0, --],
        [1.0, --],
        [1.0, --]],
  mask=[[False,  True],
        [False,  True],
        [False,  True]],
  fill_value=0.0)