# peak_functions_test

This nb documents some of the tests in peak_functions_test module
authors: J.J. Gomez-Cadenas


In [None]:
import datetime

In [None]:
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

In [None]:
from __future__ import print_function
import sys
import os
from glob import glob
from time import time

In [None]:
%matplotlib inline
%load_ext autoreload
%autoreload 2
import matplotlib.pyplot as plt
import pandas as pd
import tables as tb
import numpy as np
import math


In [None]:
from invisible_cities.database import load_db
from invisible_cities.core.system_of_units_c import SystemOfUnits
import invisible_cities.sierpe.blr as blr
import invisible_cities.core.mpl_functions as mpl
import invisible_cities.core.wfm_functions as wfm
import invisible_cities.core.tbl_functions as tbl
import invisible_cities.core.peak_functions_c as cpf
import invisible_cities.core.peak_functions as pf
import invisible_cities.core.pmaps_functions as pmf
import invisible_cities.core.sensor_functions as sf

In [None]:
from invisible_cities.core.core_functions import define_window

In [None]:
import invisible_cities.core.core_functions as cf

In [None]:
import numpy.testing as npt

In [None]:
from scipy import signal

## List of tests

#### 1. test_csum_zs_blr_cwf()

Previous tests have shown that BLR waveforms corrected waveforms (CWF) are identical within tolerance. Therefore the calibrated sum and the zero-supressed sum of the BLR and the CWF must also be equal within tolerance. 

The test reads a sample file of 40 keV electrons. 
  

In [None]:
def diff_csum_zs_blr_cwf():
    """Test that:
     1) the calibrated sum (csum) of the BLR and the CWF is the same
    within tolerance.
     2) csum and zeros-supressed sum (zs) are the same
    within tolerance
    """

    RWF_file = (os.environ['ICDIR']
               + '/database/test_data/electrons_40keV_z250_RWF.h5')

    diff_csum = []
    diff_zs = []
    with tb.open_file(RWF_file, 'r') as h5rwf:
        pmtrwf, pmtblr, sipmrwf = tbl.get_vectors(h5rwf)
        DataPMT = load_db.DataPMT()
        coeff_c = abs(DataPMT.coeff_c.values)
        coeff_blr = abs(DataPMT.coeff_blr.values)
        adc_to_pes = abs(DataPMT.adc_to_pes.values)

        for event in range(10):
            CWF = blr.deconv_pmt(pmtrwf[event], coeff_c, coeff_blr)
            csum_cwf = cpf.calibrated_pmt_sum(CWF, adc_to_pes,
                                              n_MAU=100, thr_MAU=3)
            csum_blr = cpf.calibrated_pmt_sum(pmtblr[event].astype(np.float64),
                                              adc_to_pes,
                                              n_MAU=100, thr_MAU=3)

            diff = csum_cwf - csum_blr
            norm = np.sum(csum_blr)
            diff_csum.append(np.sum(diff)/norm)
           

            wfzs_ene, wfzs_indx = cpf.wfzs(csum_cwf, threshold=0.5)
            dff = abs(np.sum(csum_cwf) - np.sum(wfzs_ene))
            norm = np.sum(csum_cwf)
            diff_zs.append(dff/norm)
    return diff_csum, diff_zs


In [None]:
diff_csum, diff_zs = diff_csum_zs_blr_cwf()

In [None]:
mpl.histo(diff_csum, nbins=10, 
          title="diff csum BLR-CWF", xlabel="abs(e[blr] - e[cwf])", ylabel="Frequency")

In [None]:
mpl.histo(diff_zs, nbins=10, 
          title="diff zs BLR-CWF", xlabel="abs(e[blr] - e[cwf])", ylabel="Frequency")

In **test_csum_zs_blr_cwf()** ($IC/invisible_cities/core/peak_functions_test.py) the values of tolerances for the assert is taken at 0.1. 

#### 2. test_csum_python_cython()

This test simply asserts that the calibrated sum is the same for python and cython functions

### Checks with toy model

#### A toy signal

In [None]:
def toy_pmt_signal():
    """ Mimick a PMT waveform."""
    v0 = cf.np_constant(200,1.)
    v1 = cf.np_range(1.1,2.1,0.1)
    v2 = cf.np_constant(10,2.)
    v3 = cf.np_reverse_range(1.1,2.1,0.1)
    
    v = np.concatenate((v0, v1, v2, v3, v0))
    pmt = np.concatenate((v, v, v))
    return pmt 

In [None]:
v = toy_pmt_signal()

In [None]:
plt.plot(v)

#### sum of 10 toy (identical) pmts

In [None]:
def toy_pmt_sum(v, npmt = 10):
    """Return the sum of npmt waveforms"""
    vsum = np.zeros(v.shape[0])
    for i in range(npmt):
        vsum = np.add(vsum, v)
    return vsum

In [None]:
vsum = pmt_sum(v, npmt = 10)

In [None]:
plt.plot(vsum)

## Calibrated PMT sum

First, prepare CWF and adc_to_pes vectors needed by csum (set adc_to_pes to 1)

In [None]:
def toy_cwf_and_adc(v, npmt=10):
    """Return CWF and adc_to_pes for toy example"""
    CWF = []
    for i in range(npmt):
        CWF.append(v)

    adc_to_pes = np.ones(v.shape[0])
    return np.array(CWF), adc_to_pes

In [None]:
CWF, adc_to_pes = toy_cwf_and_adc(v, npmt=10)

In [None]:
plt.plot(CWF[9])

In [None]:
NPMT = CWF.shape[0]
NWF  = CWF.shape[1]
print(NPMT,NWF)

The function below is idential to the function in module except for the explicit plot of the
MAU

In [None]:
def calibrated_pmt_sum(CWF, adc_to_pes, n_MAU=200, thr_MAU=5):
    """A copy of the calibrated_pmt_sum in peak_functions, but plotting the MAU"""
    NPMT = CWF.shape[0]
    NWF  = CWF.shape[1]
    MAU = np.array(np.ones(n_MAU), dtype = np.double) * (1 / n_MAU)
    print(MAU)
    pmt_thr = np.zeros((NPMT, NWF), dtype=np.double)
    csum    = np.zeros(       NWF,  dtype=np.double)
    MAU_pmt = np.zeros(       NWF,  dtype=np.double)

    for j in range(NPMT):
        # MAU for each of the PMTs, following the waveform
        MAU_pmt = signal.lfilter(MAU, 1, CWF[j,:])
        if j == 0:
            plt.plot(MAU_pmt)       
            plt.plot(CWF[j])
            plt.show()
        for k in range(NWF):
            if CWF[j,k] >= MAU_pmt[k] + thr_MAU:
                pmt_thr[j,k] = CWF[j,k]

    for j in range(NPMT):
        for k in range(NWF):
            csum[k] += pmt_thr[j, k] * 1 / adc_to_pes[j]
    return csum

#### For a toy function and testing purposes, the MAU has to be an scalar

In [None]:
csum = calibrated_pmt_sum(CWF, adc_to_pes, n_MAU=1, thr_MAU=0) 

#### Otherwise, the bheaviour is only approximated!

In [None]:
csum = calibrated_pmt_sum(CWF, adc_to_pes, n_MAU=10, thr_MAU=0) 

In [None]:
csum = calibrated_pmt_sum(CWF, adc_to_pes, n_MAU=50, thr_MAU=0) 

#### n_MAU = 1 yields correct behaviour

In [None]:
csum = calibrated_pmt_sum(CWF, adc_to_pes, n_MAU=1, thr_MAU=0) 

In [None]:
def plot_csum_vsum():
    """plot csum and vsum (must be close)."""
    v = toy_pmt_signal()
    vsum = toy_pmt_sum(v, npmt = 10)
    CWF, adc_to_pes = toy_cwf_and_adc(v, npmt=10)
    csum = pf.calibrated_pmt_sum(CWF, adc_to_pes, n_MAU=1, thr_MAU=0) 
    plt.plot(csum)
    plt.plot(vsum)
    plt.show()

In [None]:
plot_csum_vsum()

### Test of zsum

**Direct zsum of vsum**

In [None]:
def vsum_zsum(vsum, threshold=10):
    """Compute ZS over vsum"""
    return vsum[vsum > threshold]

In [None]:
vsum_zs = vsum_zsum(vsum, threshold=10)

In [None]:
plt.plot(vsum_zs)

**And using the function in peak_functions**

In [None]:
wfzs_ene, wfzs_indx = pf.wfzs(csum, threshold=10)

In [None]:
plt.plot(wfzs_ene)

#### The indexes show three different "peaks"

200-229
630-659
1060-1089

In [None]:
wfzs_indx

#### the ZS function in the model and in the module is the same

In [None]:
def plot_csum_vsum_zs(vsum, csum, threshold=10):
    """Test that csum yields expected value."""
    vsum_zs = vsum_zsum(vsum, threshold=threshold)
    wfzs_ene, wfzs_indx = pf.wfzs(csum, threshold=10)
    plt.plot(vsum_zs)
    plt.plot(wfzs_ene)
    plt.show()

In [None]:
plot_csum_vsum_zs(vsum, csum, threshold=10)

### Testing S12

#### Testing time_from_index: python vs cython

In [None]:
def test_time_from_index(wfzs_indx):
    """Test that time_from_index yields same python-cython."""
    i1 = pf.time_from_index(wfzs_indx)
    i2 = cpf.time_from_index(wfzs_indx)
    npt.assert_allclose(i1, i2)

In [None]:
test_time_from_index(wfzs_indx)

#### testing rebin_waveform: python vs cython

In [None]:
def test_rebin_waveform(wfzs_ene, wfzs_indx):
    """Test that rebin_waveform yields same python-cython."""
    t = pf.time_from_index(wfzs_indx)
    e = wfzs_ene
    t1, e1 = pf.rebin_waveform(t, e, stride=10)
    t2,e2 = cpf.rebin_waveform(t, e, stride=10)
    npt.assert_allclose(t1, t2)
    npt.assert_allclose(e1, e2)

In [None]:
test_rebin_waveform(wfzs_ene, wfzs_indx)

#### testing find_S12: python vs cython

#### The S12L found by the algorithm running over the toy model is correct.
3 separated but identical "peaks"  going from 11 to 20 and then to 11 again

#### python

In [None]:
S12L1 = pf.find_S12(wfzs_ene, wfzs_indx,
             tmin = 0, tmax = 1e+6,
             lmin = 0, lmax = 1000000,
             stride=4, rebin=False, rebin_stride=40)

In [None]:
S12L1

#### cython

In [None]:
S12L2 = cpf.find_S12(wfzs_ene, wfzs_indx,
             tmin = 0, tmax = 1e+6,
             lmin = 0, lmax = 1000000,
             stride=4, rebin=False, rebin_stride=40)

In [None]:
S12L2

#### Thus a test that S12L is the same in cython and python

In [None]:
def test_S12L(S12L1, S12L2):
    """"Test that test_S12L yields same python-cython."""""
    indx = list(S12L1.keys())
    
    for i in indx:
        t1 = S12L1[i][0]
        e1 = S12L1[i][1]
        t2 = S12L2[i][0]
        e2 = S12L2[i][1]
        npt.assert_allclose(t1, t2)
        npt.assert_allclose(e1, e2)

In [None]:
test_S12L(S12L1, S12L2)

#### And a test that checks that the result is as expected

In [None]:
def test_S12_S1(wfzs_ene, wfzs_indx):
    """S1 like test of S12"""
    S12L = cpf.find_S12(wfzs_ene, wfzs_indx,
             tmin = 0, tmax = 1e+6,
             lmin = 0, lmax = 1000000,
             stride=4, rebin=False, rebin_stride=40)
    #this yields 3 idential vectors of energy
    E = np.array([ 11.,  12.,  13.,  14.,  15.,  16.,  17.,  18.,  19.,  20.,  20.,
          20.,  20.,  20.,  20.,  20.,  20.,  20.,  20.,  20.,  20.,  19.,
          18.,  17.,  16.,  15.,  14.,  13.,  12.,  11.])
    for i in S12L.keys():
        e = S12L[i][1]
        npt.assert_allclose(e,E)

In [None]:
test_S12_S1(wfzs_ene, wfzs_indx)

#### test can be repated rebinning the waveform

In [None]:
S12L2 = cpf.find_S12(wfzs_ene, wfzs_indx,
             tmin = 0, tmax = 1e+6,
             lmin = 0, lmax = 1000000,
             stride=10, rebin=True, rebin_stride=10)

In [None]:
S12L2

#### all tests in module peak_functions_test