In [None]:
# No guess of lost waveform;
# charge noises applied on points above thresholds

In [None]:
# -*- coding: utf-8 -*-
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
import urllib.request

np.random.seed(20)

use_warning = False

def gauss(x, mu, sigma):
    return 1/(sigma * np.sqrt(2*np.pi)) * np.exp(-(x-mu)**2/sigma/sigma/2)

def gauss_filter_freq(freq, sigma):
    return np.exp(-0.5 * (freq / sigma)**2)

def ensure_response_file(npz_path="response_44_v2a_full.npz"):
    npz_path = Path(npz_path)
    if npz_path.exists():
        return npz_path

    url = "https://raw.githubusercontent.com/DUNE/larnd-sim/develop/larndsim/bin/response_44_v2a_full.npz"
    print(f"Downloading {url} -> {npz_path} ...")
    urllib.request.urlretrieve(url, npz_path.as_posix())
    return npz_path

def deconv(wf, k):
    deconvq = np.fft.ifft(np.fft.fft(wf, n=len(wf)) / np.fft.fft(k, n=len(wf))).real
    return deconvq[:len(wf)+1-len(k)]

def gauss_filter(wf, width, dt):
    '''width
    '''
    wf_fft = np.fft.fft(wf, n=len(wf))
    kt = np.arange(len(wf), dtype=np.float64)
    freq = np.fft.fftfreq(len(wf), d=dt)
    gfilter = gauss_filter_freq(freq, width)

    return np.fft.ifft(wf_fft * gfilter).real

def deconv_filter(wf, kernel, width, dt):
    '''width
    T is the sampling rate, which is not used here.
    '''
    wf_fft = np.fft.fft(wf, n=len(wf))
    kernel_fft = np.fft.fft(kernel, n=len(wf))
    kt = np.arange(len(wf), dtype=np.float64)
    freq = np.fft.fftfreq(len(wf), d=dt)
    gfilter = gauss_filter_freq(freq, width)

    return np.fft.ifft(wf_fft / kernel_fft * gfilter).real[:len(wf)-len(kernel)+2]

def field_response(npz_path="response_44_v2a_full.npz"):
    path = ensure_response_file(npz_path)
    return np.load(path)

def trigger(wf, thres, noise=0):
    return np.argmax((np.cumsum(wf)+noise * np.random.normal(0, 1, wf.shape))>thres)

def integrate_k(wf, k):
    if wf.shape[0] % k:
        # raise ValueError('wf.shape[0] % k != 0')
        wf = np.pad(wf, (0, k - wf.shape[0] % k))
        if use_warning:
            print('warning: length of wf is now', len(wf))
    return wf.reshape(-1, k).sum(axis=-1)

def downsample_cum(wf_fg, k, start_idx_fg, noise):
    # print(start_idx_fg, k)
    wf_cum_k = np.cumsum(wf_fg)[start_idx_fg::k]
    wf_cum_noise = noise * np.random.normal(0, 1, wf_cum_k.shape)
    return wf_cum_k + wf_cum_noise, np.sum(wf_cum_noise)

def add_noise(wf, amp):
    noise = amp*np.random.normal(0, 1, wf.shape)
    return wf + noise, np.sum(noise)

# integrate every k ticks
def trigger_integrate_k(wf, k, start_idx):
    if wf[start_idx:].shape[0] % k:
        # raise ValueError('wf.shape[0] % k != 0')
        wf = np.pad(wf[start_idx:], (0, k - wf[start_idx:].shape[0] % k))
        if use_warning:
            print('warning: length of wf is now', len(wf))
    else:
        wf = wf[start_idx:]
    return wf.reshape(-1, k).sum(axis=-1), start_idx

def scale_q_test(qscale, shift=0, kticks=30, thres=5, noise=1, use_filter: None|dict = None, show: bool =True):
    q = gauss(np.arange(-10, 10, 0.1), 0, 0.5)
    q /= np.sum(q)
    qunit = gauss(np.arange(-10, 10, 0.1), 0, 0.5)
    qunit /= np.sum(qunit)

    q *= qscale
    q = np.roll(q, shift)
    # if show:
    #     plt.figure()
    #     plt.title('Charge distribution')
    #     plt.plot(q)
    fr0 = field_response()['response'][0,0]
    fr0 = fr0.reshape(-1, 2).sum(axis=-1)[-1801:]
    fr0 *= 0.05

    wf = np.convolve(q, fr0, mode='full')

    start_idx_fg = trigger(wf, thres, noise=0)
    wf_full_k = integrate_k(wf, kticks)
    wf_cum_above_threshold, total_noise = downsample_cum(wf, kticks, start_idx_fg//kticks*kticks, noise)
    wf_full_k[start_idx_fg//kticks:-1] = np.diff(wf_cum_above_threshold)
    # print('compare-------------', 'slice', wf_full_k[start_idx_fg//kticks], 'cum above', wf_cum_above_threshold[1])

    # print(np.sum(wf_full_k), np.sum(wf), np.sum(q))

    fr0_k = integrate_k(fr0, kticks)

    if use_filter:
        qdeconv2 = deconv_filter(wf_full_k, fr0_k, width=use_filter['width'], dt=kticks)
        qsmear = gauss_filter(q, width=use_filter['width'], dt=1)
    else:
        qdeconv2 = deconv(wf_full_k, fr0_k)

    if show:
        plt.figure()
        plt.plot(np.arange(len(wf_full_k))*kticks, wf_full_k, label='guess')
        plt.plot(wf*kticks, label='wf true')
        plt.legend()
        plt.show()

    if show:
        plt.figure()
        plt.plot(np.arange(len(qdeconv2))*kticks, qdeconv2/kticks, label='average')
        plt.plot(q, label='true')
        if use_filter:
            plt.plot(qsmear, label=f'smeared true, sigma={1/(2*np.pi*use_filter['width']):.3f} ticks')
        plt.legend()
        plt.title(f'Injected charge {np.sum(q):.1f}')
        plt.show()

    return np.sum(qdeconv2) - np.sum(q), np.argmax(qdeconv2)*kticks - np.argmax(q), total_noise

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def fit_line_band(x, y, k=2.0, use_vertical_residual=True):
    """
    """
    x = np.asarray(x).ravel()
    y = np.asarray(y).ravel()
    if x.size != y.size:
        raise ValueError("len(x) must be equal to len(y)")

    # 1) y = a*x + b
    a, b = np.polyfit(x, y, deg=1)

    # 2) residue
    y_hat = a * x + b
    r = y - y_hat

    if use_vertical_residual:
        # vertical residue
        sigma = np.sqrt(np.mean(r**2))
    else:
        # from point to lineï¼š|ax - y + b| / sqrt(a^2 + 1)
        d = np.abs(a * x - y + b) / np.sqrt(a**2 + 1.0)
        sigma = np.sqrt(np.mean(d**2))

    band_half = k * sigma
    return a, b, sigma, band_half

def plot_line_band(x, y, a, b, band_half, title=None, labels={}):
    x = np.asarray(x).ravel()
    y = np.asarray(y).ravel()

    # sorted
    xs = np.linspace(x.min(), x.max(), 300)
    ys = a * xs + b

    plt.figure(figsize=(7, 5))
    plt.scatter(x, y, s=25, alpha=0.8, label=labels.get('data', 'data'))
    plt.plot(xs, ys, linewidth=2, label=f"fit: y={a:.3f}x+{b:.3f}")
    plt.fill_between(xs, ys - band_half, ys + band_half, alpha=0.2,
                     label=f"band: +/-{band_half:.3f}")

    plt.xlabel(labels.get('xaxis', 'x'))
    plt.ylabel(labels.get('yaxis', 'y'))
    if title:
        plt.title(title)
    plt.legend()
    plt.tight_layout()
    plt.show()

In [None]:
delta_q, delta_t, total_noise = scale_q_test(50, show=True, noise=1, use_filter={'width':0.02})
print('delta_q -----------------------:', delta_q, 'total_noise', total_noise)

In [None]:
delta_qs2 = []
delta_ts2 = []
total_noises2 = []
width_in_true = 0.1
kticks=30
for qscale in np.arange(6, 100, 0.5):
    delta_q, delta_t, total_noise = scale_q_test(qscale, show=False, kticks=kticks, use_filter={'width': width_in_true})
    delta_qs2.append(delta_q)
    delta_ts2.append(delta_t)
    total_noises2.append(total_noise)

In [None]:
a2, b2, sigma, band_half = fit_line_band(np.arange(6, 100, 0.5), delta_qs2, k=1.0, use_vertical_residual=True)
plot_line_band(np.arange(6, 100, 0.5), delta_qs2, a2, b2, band_half=band_half,
               title=f"Gaussian filter, width = {width_in_true}"
               )
plt.figure()
plt.scatter(total_noises2, delta_qs2)
plt.xlabel('added noise')
plt.ylabel('delta Q')
# plt.ylim(-15, 15)

In [None]:
delta_qs = []
delta_ts = []
total_noises = []
kticks = 30
for qscale in np.arange(6, 100, 0.5):
    delta_q, delta_t, delta_noise = scale_q_test(qscale, kticks=kticks, noise=1.0, show=False)
    delta_qs.append(delta_q)
    delta_ts.append(delta_t)
    total_noises.append(delta_noise)

In [None]:
a, b, sigma, band_half = fit_line_band(np.arange(6, 100, 0.5), delta_qs, k=1.0, use_vertical_residual=True)
plot_line_band(np.arange(6, 100, 0.5), delta_qs, a, b, band_half=band_half, labels={'data':'delta Q'})

plt.figure()
plt.scatter(total_noises, delta_qs)
plt.xlabel('added noise')
plt.ylabel('delta Q')

In [None]:
plt.figure()
plt.scatter(total_noises, delta_qs, label='without filter')
plt.scatter(total_noises2, delta_qs2, label='with filter')
plt.legend()
plt.xlabel('added noise')
plt.ylabel('delta Q')

In [None]:
delta_q, delta_t, total_noise = scale_q_test(50, show=True, noise=0, use_filter={'width':0.02})
print('delta_q -----------------------:', delta_q, 'total_noise', total_noise)