Skip to content

Commit

Permalink
Merge branch 'tickets/DM-23681'
Browse files Browse the repository at this point in the history
  • Loading branch information
plazas committed Mar 30, 2020
2 parents fb54480 + fb8dc78 commit a90c110
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 54 deletions.
9 changes: 5 additions & 4 deletions python/lsst/cp/pipe/ptc.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ def __init__(self, ampNames):
self.__dict__["nonLinearity"] = {ampName: [] for ampName in ampNames}
self.__dict__["nonLinearityError"] = {ampName: [] for ampName in ampNames}
self.__dict__["nonLinearityResiduals"] = {ampName: [] for ampName in ampNames}
self.__dict__["coefficientLinearizeSquared"] = {ampName: [] for ampName in ampNames}

# final results
self.__dict__["gain"] = {ampName: -1. for ampName in ampNames}
Expand Down Expand Up @@ -356,7 +355,8 @@ def runDataRef(self, dataRef, visitPairs):
# Fit PTC and (non)linearity of signal vs time curve.
# Fill up PhotonTransferCurveDataset object.
# Fill up array for LUT linearizer.
dataset = self.fitPtcAndNonLinearity(dataset, lookupTableArray, ptcFitType=self.config.ptcFitType)
dataset = self.fitPtcAndNonLinearity(dataset, tableArray=lookupTableArray,
ptcFitType=self.config.ptcFitType)

if self.config.makePlots:
self.plot(dataRef, dataset, ptcFitType=self.config.ptcFitType)
Expand Down Expand Up @@ -658,8 +658,8 @@ def calculateLinearityResidualAndLinearizers(self, exposureTimeVector, meanSigna
# Use linear part to get time at wich signal is maxAduForLookupTableLinearizer ADU
tMax = (self.config.maxAduForLookupTableLinearizer - parsFit[0])/parsFit[1]
timeRange = np.linspace(0, tMax, self.config.maxAduForLookupTableLinearizer)
signalIdeal = parsFit[0] + parsFit[1]*timeRange
signalUncorrected = self.funcPolynomial(parsFit, timeRange)
signalIdeal = (parsFit[0] + parsFit[1]*timeRange).astype(int)
signalUncorrected = (self.funcPolynomial(parsFit, timeRange)).astype(int)
linearizerTableRow = signalIdeal - signalUncorrected # LinearizerLookupTable has corrections

# Use quadratic and linear part of fit to produce c0 for LinearizeSquared
Expand Down Expand Up @@ -793,6 +793,7 @@ def errFunc(p, x, y):
dataset.nonLinearity[ampName] = np.nan
dataset.nonLinearityError[ampName] = np.nan
dataset.nonLinearityResiduals[ampName] = np.nan
dataset.coefficientLinearizeSquared[ampName] = np.nan
continue

# Fit the PTC
Expand Down
160 changes: 110 additions & 50 deletions tests/test_ptc.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,13 @@ def setUp(self):
self.flatExp2.image.array[:] = flatData2

# create fake PTC data to see if fit works, for one amp ('amp')
flux = 1000 # ADU/sec
self.flux = 1000 # ADU/sec
timeVec = np.arange(1., 201.)
muVec = flux*timeVec # implies that signal-chain non-linearity is zero
self.k2NonLinearity = -5e-6
muVec = self.flux*timeVec + self.k2NonLinearity*timeVec**2 # quadratic signal-chain non-linearity
# Assumes parameter linResidualTimeIndex = 2 (default) in PTC task
self.inputNonLinearityResiduals = 100*(1 - ((muVec[2]/timeVec[2])/(muVec/timeVec)))

self.gain = 1.5 # e-/ADU
self.c1 = 1./self.gain
self.noiseSq = 5*self.gain # 7.5 (e-)^2
Expand All @@ -95,61 +99,117 @@ def setUp(self):
self.dataset.rawExpTimes[ampName] = timeVec
self.dataset.rawMeans[ampName] = muVec

def test_ptcFitQuad(self):
localDataset = copy.copy(self.dataset)
for ampName in self.ampNames:
localDataset.rawVars[ampName] = [self.noiseSq + self.c1*mu + self.c2*mu**2 for
mu in localDataset.rawMeans[ampName]]

config = copy.copy(self.defaultConfig)
config.polynomialFitDegree = 2
task = cpPipe.ptc.MeasurePhotonTransferCurveTask(config=config)

task.fitPtcAndNonLinearity(localDataset, ptcFitType='POLYNOMIAL')

for ampName in self.ampNames:
self.assertAlmostEqual(self.gain, localDataset.gain[ampName])
self.assertAlmostEqual(np.sqrt(self.noiseSq)*self.gain, localDataset.noise[ampName])
# Linearity residual should be zero
self.assertTrue(localDataset.nonLinearityResiduals[ampName].all() == 0)

def test_ptcFitCubic(self):
def ptcFitAndCheckPtc(self, order=None, fitType='', doTableArray=False):
localDataset = copy.copy(self.dataset)
for ampName in self.ampNames:
localDataset.rawVars[ampName] = [self.noiseSq + self.c1*mu + self.c2*mu**2 + self.c3*mu**3 for
mu in localDataset.rawMeans[ampName]]

config = copy.copy(self.defaultConfig)
config.polynomialFitDegree = 3

if fitType == 'POLYNOMIAL':
if order not in [2, 3]:
RuntimeError("Enter a valid polynomial order for this test: 2 or 3")
if order == 2:
for ampName in self.ampNames:
localDataset.rawVars[ampName] = [self.noiseSq + self.c1*mu + self.c2*mu**2 for
mu in localDataset.rawMeans[ampName]]
config.polynomialFitDegree = 2
if order == 3:
for ampName in self.ampNames:
localDataset.rawVars[ampName] = [self.noiseSq + self.c1*mu + self.c2*mu**2 + self.c3*mu**3
for mu in localDataset.rawMeans[ampName]]
config.polynomialFitDegree = 3
elif fitType == 'ASTIERAPPROXIMATION':
g = self.gain
for ampName in self.ampNames:
localDataset.rawVars[ampName] = [(0.5/(self.a00*g**2)*(np.exp(2*self.a00*mu*g)-1) +
self.noiseSq/(g*g)) for mu in localDataset.rawMeans[ampName]]
else:
RuntimeError("Enter a fit function type: 'POLYNOMIAL' or 'ASTIERAPPROXIMATION'")

config.maxAduForLookupTableLinearizer = 200000 # Max ADU in input mock flats
task = cpPipe.ptc.MeasurePhotonTransferCurveTask(config=config)

task.fitPtcAndNonLinearity(localDataset, ptcFitType='POLYNOMIAL')

if doTableArray:
numberAmps = len(self.ampNames)
numberAduValues = config.maxAduForLookupTableLinearizer
lookupTableArray = np.zeros((numberAmps, numberAduValues), dtype=np.int)
returnedDataset = task.fitPtcAndNonLinearity(localDataset, ptcFitType=fitType,
tableArray=lookupTableArray)
else:
returnedDataset = task.fitPtcAndNonLinearity(localDataset, ptcFitType=fitType)

if doTableArray:
# check that the linearizer table has been filled out properly
for i in np.arange(numberAmps):
tMax = (config.maxAduForLookupTableLinearizer)/self.flux
timeRange = np.linspace(0., tMax, config.maxAduForLookupTableLinearizer)
signalIdeal = timeRange*self.flux
signalUncorrected = task.funcPolynomial(np.array([0.0, self.flux, self.k2NonLinearity]),
timeRange)
linearizerTableRow = signalIdeal.astype(int) - signalUncorrected.astype(int)
self.assertEqual(len(linearizerTableRow), len(lookupTableArray[i, :]))
for j in np.arange(len(linearizerTableRow)):
self.assertAlmostEqual(linearizerTableRow[j], lookupTableArray[i, :][j], places=6)

# check entries in localDataset, which was modified by the function
for ampName in self.ampNames:
self.assertEqual(fitType, localDataset.ptcFitType[ampName])
self.assertAlmostEqual(self.gain, localDataset.gain[ampName])
self.assertAlmostEqual(np.sqrt(self.noiseSq)*self.gain, localDataset.noise[ampName])
# Linearity residual should be zero
self.assertTrue(localDataset.nonLinearityResiduals[ampName].all() == 0)

def test_ptcFitAstier(self):
localDataset = copy.copy(self.dataset)
g = self.gain # next line is too long without this shorthand!
for ampName in self.ampNames:
localDataset.rawVars[ampName] = [(0.5/(self.a00*g**2)*(np.exp(2*self.a00*mu*g)-1) +
self.noiseSq/(g*g)) for mu in localDataset.rawMeans[ampName]]

config = copy.copy(self.defaultConfig)
task = cpPipe.ptc.MeasurePhotonTransferCurveTask(config=config)

task.fitPtcAndNonLinearity(localDataset, ptcFitType='ASTIERAPPROXIMATION')

if fitType == 'POLYNOMIAL':
self.assertAlmostEqual(self.c1, localDataset.ptcFitPars[ampName][1])
self.assertAlmostEqual(np.sqrt(self.noiseSq)*self.gain, localDataset.noise[ampName])
else:
self.assertAlmostEqual(self.a00, localDataset.ptcFitPars[ampName][0])
# noise already in electrons for 'ASTIERAPPROXIMATION' fit
self.assertAlmostEqual(np.sqrt(self.noiseSq), localDataset.noise[ampName])
# Nonlinearity fit parameters
self.assertAlmostEqual(0.0, localDataset.nonLinearity[ampName][0])
self.assertAlmostEqual(self.flux, localDataset.nonLinearity[ampName][1])
self.assertAlmostEqual(self.k2NonLinearity, localDataset.nonLinearity[ampName][2])

# Non-linearity coefficient for quadratic linearizer
self.assertAlmostEqual(-self.k2NonLinearity/(self.flux**2),
localDataset.coefficientLinearizeSquared[ampName])

# Linearity residuals
self.assertEqual(len(localDataset.nonLinearityResiduals[ampName]),
len(self.inputNonLinearityResiduals[localDataset.visitMask[ampName]]))

for i in np.arange(len(localDataset.nonLinearityResiduals[ampName])):
self.assertAlmostEqual(localDataset.nonLinearityResiduals[ampName][i],
self.inputNonLinearityResiduals[i])

# check entries in returned dataset (should be the same as localDataset after calling the funciton)
for ampName in self.ampNames:
self.assertAlmostEqual(self.gain, localDataset.gain[ampName])
# noise already comes out of the fit in electrons with Astier
self.assertAlmostEqual(np.sqrt(self.noiseSq), localDataset.noise[ampName])
# Linearity residual should be zero
self.assertTrue(localDataset.nonLinearityResiduals[ampName].all() == 0)
self.assertEqual(fitType, returnedDataset.ptcFitType[ampName])
self.assertAlmostEqual(self.gain, returnedDataset.gain[ampName])
if fitType == 'POLYNOMIAL':
self.assertAlmostEqual(self.c1, returnedDataset.ptcFitPars[ampName][1])
self.assertAlmostEqual(np.sqrt(self.noiseSq)*self.gain, returnedDataset.noise[ampName])
else:
self.assertAlmostEqual(self.a00, returnedDataset.ptcFitPars[ampName][0])
# noise already in electrons for 'ASTIERAPPROXIMATION' fit
self.assertAlmostEqual(np.sqrt(self.noiseSq), returnedDataset.noise[ampName])

# Nonlinearity fit parameters
self.assertAlmostEqual(0.0, localDataset.nonLinearity[ampName][0])
self.assertAlmostEqual(self.flux, localDataset.nonLinearity[ampName][1])
self.assertAlmostEqual(self.k2NonLinearity, localDataset.nonLinearity[ampName][2])

# Non-linearity coefficient for linearizer
self.assertAlmostEqual(-self.k2NonLinearity/(self.flux**2),
localDataset.coefficientLinearizeSquared[ampName])

# Linearity residuals
self.assertEqual(len(localDataset.nonLinearityResiduals[ampName]),
len(self.inputNonLinearityResiduals[localDataset.visitMask[ampName]]))

for i in np.arange(len(localDataset.nonLinearityResiduals[ampName])):
self.assertAlmostEqual(localDataset.nonLinearityResiduals[ampName][i],
self.inputNonLinearityResiduals[i])

def test_ptcFit(self):
for createArray in [True, False]:
for typeAndOrder in [('POLYNOMIAL', 2), ('POLYNOMIAL', 3), ('ASTIERAPPROXIMATION', None)]:
self.ptcFitAndCheckPtc(fitType=typeAndOrder[0], order=typeAndOrder[1],
doTableArray=createArray)

def test_meanVarMeasurement(self):
task = self.defaultTask
Expand Down

0 comments on commit a90c110

Please sign in to comment.