Skip to content

Commit

Permalink
Add a unit test for ScaleVariance and ComputeNoiseCorrelation
Browse files Browse the repository at this point in the history
  • Loading branch information
arunkannawadi committed Sep 19, 2022
1 parent 92e5cc0 commit 497be54
Showing 1 changed file with 144 additions and 0 deletions.
144 changes: 144 additions & 0 deletions tests/test_variance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# This file is part of meas_algorithms.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import itertools
import unittest

import lsst.utils.tests
import numpy as np
from lsst.afw import image as afwImage
from lsst.meas.algorithms import (
ComputeNoiseCorrelationConfig,
ComputeNoiseCorrelationTask,
ScaleVarianceTask,
)


class NoiseVarianceTestCase(lsst.utils.tests.TestCase):
def _generateImage(
self, rho1: float, rho2: float, size: int = 512, seed: int = 12345
):
"""Create a correlated noise field using simple translations.
Y[i,j] = X[i,j] + a1 X[i-1,j] + a2 X[i,j-1]
Var(X[i,j]) = 1
Cov( Y[i,j], V[i-1,j] ) = a1
Cov ( Y[i,j], V[i,j-1] ) = a2
Var(Y[i,j]) = 1 + a1**2 + a2**2
rho_i = a_i / sqrt(1 + a1**2 + a2**2) for i = 1, 2
Parameters
----------
rho1 : float
Correlation coefficient along the horizontal (+x) direction.
rho2 : float
Correlation coefficient along the vertical (+y) direction.
size : int, optional
Size of the noise image to generate.
seed : int, optional
Seed for the random number generator.
Returns
-------
mi: `lsst.afw.image.MaskedImage`
MaskedImage containing the correlated noise field
and a per-pixel variance plane.
"""
np.random.seed(seed)
noise = np.random.randn(size, size).astype(np.float32)
# Solve for the kernel parameters (a1, a2) & generate correlated noise.
r2 = rho1**2 + rho2**2
if r2 > 0:
k = 0.5 * (1 + np.sqrt(1 - 4 * r2)) / r2
a1, a2 = k * rho1, k * rho2
corr_noise = (
noise + a1 * np.roll(noise, 1, axis=0) + a2 * np.roll(noise, 1, axis=1)
)
else:
a1, a2 = 0, 0
corr_noise = noise
image = afwImage.ImageF(array=corr_noise[1:-1, 1:-1])
variance = afwImage.ImageF(size - 2, size - 2, (1 + a1**2 + a2**2))
mi = afwImage.MaskedImageF(image=image, variance=variance)
return mi

@lsst.utils.tests.methodParameters(
rho=((0.0, 0.0), (-0.2, 0.0), (0.0, 0.1), (0.15, 0.25), (0.25, -0.15))
)
def testScaleVariance(self, rho):
"""Test that the ScaleVarianceTask scales the variance plane correctly."""
task = ScaleVarianceTask()
rho1, rho2 = rho
mi = self._generateImage(rho1, rho2)
scaleFactors = task.computeScaleFactors(mi)
# Check for consistency between pixelFactor and imageFactor
self.assertFloatsAlmostEqual(
scaleFactors.pixelFactor, scaleFactors.imageFactor, atol=1e-6
)
# Since the variance plane is adjusted for the correlation, the scale
# factor should be 1.0 within statistical error.
self.assertFloatsAlmostEqual(scaleFactors.pixelFactor, 1.0, rtol=2e-2)

@lsst.utils.tests.methodParametersProduct(
rho=((0.0, 0.0), (0.2, 0.0), (0.0, -0.1), (0.15, 0.25), (-0.25, 0.15)),
scaleEmpircalVariance=(True, False),
subtractEmpiricalMean=(True, False),
)
def testComputeCorrelation(self, rho, scaleEmpircalVariance, subtractEmpiricalMean):
"""Test that the noise correlation coefficients are computed correctly."""
config = ComputeNoiseCorrelationConfig(size=5)
config.scaleEmpiricalVariance = scaleEmpircalVariance
config.subtractEmpiricalMean = subtractEmpiricalMean
task = ComputeNoiseCorrelationTask(config=config)

rho1, rho2 = rho
mi = self._generateImage(rho1, rho2)
corr_matrix = task.run(mi)

# corr_matrix elements should be zero except for (1,0), (0,1) & (0,0).
# Use the other elements to get an estimate of the statistical
# uncertainty in our estimates.
err = np.std(
[
corr_matrix(i, j)
for i, j in itertools.product(range(5), range(5))
if (i + j > 1)
]
)

self.assertLess(abs(corr_matrix(1, 0) / corr_matrix(0, 0) - rho1), 2 * err)
self.assertLess(abs(corr_matrix(0, 1) / corr_matrix(0, 0) - rho2), 2 * err)
self.assertLess(err, 3e-3) # Check that the err is much less than rho.


class MemoryTestCase(lsst.utils.tests.MemoryTestCase):
pass


def setup_module(module):
lsst.utils.tests.init()


if __name__ == "__main__":
lsst.utils.tests.init()
unittest.main()

0 comments on commit 497be54

Please sign in to comment.