In [None]:
import numpy as np
import scipy as sp
import pandas as pd
import matplotlib.pyplot as plt
from numpy import float64 as f64

In [None]:
# For interactive plots
%matplotlib widget
%cd '/home/jovyan/work/labs/lab-5.5'

### Utilities

In [None]:
label_font = 12
markersize = 8

### Data analysis

In [None]:
names = ["am241", "co60", "cs137", "eu152", "na22"]

In [None]:
def load_dfs():
    return {
        name: df
        for (name, df) in [(name, pd.read_csv(f"data/{name}.csv")) for name in names]
    }

In [None]:
def filter_counts(df: pd.DataFrame, window_length: int = 50, polyorder: int = 5):
    df["Filtered"] = sp.signal.savgol_filter(
        df["Counts"], window_length, polyorder)


def find_peaks(df: pd.DataFrame, **kwargs):
    peaks, properites = sp.signal.find_peaks(df["Filtered"], **kwargs)
    return (peaks, properites)


def gauss(x, a, mu, sigma):
    return a * np.exp(-0.5 * (x - mu) ** 2 / sigma**2.0)


def fit_gaussian(xs, ys) -> tuple:
    return sp.optimize.curve_fit(gauss, xs, ys, p0=[ys.max(), xs.mean(), 1.0])


def get_xs_for_fitting(properties, peak_idx, widen_by: f64 = 2.0):
    left_ip = properties["left_ips"][peak_idx]
    right_ip = properties["right_ips"][peak_idx]
    width = properties["widths"][peak_idx]

    left_ip -= widen_by * width / 2
    right_ip += widen_by * width / 2

    xs = np.array([x for x in range(int(left_ip), int(right_ip))])
    return xs


def fit_peak(df: pd.DataFrame, properties, peak_idx, widen_by: f64 = 2.0):
    xs = get_xs_for_fitting(properties, peak_idx, widen_by)
    ys = df["Filtered"][xs[0]: xs[-1] + 1]
    return fit_gaussian(xs, ys)

In [None]:
def plot_with_peaks(
    ax,
    df,
    peaks: np.array,
    properties: dict,
    markevery: int = 10,
    plot_color: str = "green",
    peaks_mark_color="red",
    plot_linestyle: str = "--",
    plot_marker: str = "o",
    peaks_marker: str = "x",
    xs_name="Channel",
):
    n_arr = df[xs_name]
    signal_arr = df["Filtered"]

    # https://github.com/WenqiJiang/matplotlib-templates/blob/master/plot/plot.py
    plot0 = ax.plot(
        n_arr,
        signal_arr,
        color=plot_color,
        marker=plot_marker,
        markersize=markersize,
        markevery=markevery,
        linestyle=plot_linestyle,
    )

    ax.scatter(peaks, signal_arr[peaks],
               marker=peaks_marker, color=peaks_mark_color)

    ax.hlines(
        y=properties["width_heights"],
        xmin=properties["left_ips"],
        xmax=properties["right_ips"],
        color="C1",
    )

    ax.get_xaxis().set_visible(True)
    ax.grid(True, which="both")
    ax.set_xlabel("$n$", fontsize=label_font)
    ax.set_ylabel("$signal$", fontsize=label_font)

    plt.rcParams.update({"figure.autolayout": True})
    return plot0

In [None]:
def plot_fitted_peaks(
    ax,
    ax_other,
    df: pd.DataFrame,
    peak_indices: np.array,
    widen_peaks_by: np.array,
    properties: dict,
):
    ax.set_xlim(ax_other.get_xlim())
    ax.set_ylim(ax_other.get_ylim())

    for i, widen_by in zip(peak_indices, widen_peaks_by):
        coeff, _ = fit_peak(df, properties, i, widen_by=widen_by)
        _, mean, sigma = coeff
        xs = np.linspace(mean - 3.0 * sigma, mean + 3.0 * sigma)
        ax.plot(xs, np.vectorize(gauss)(xs, *coeff))

    ax.get_xaxis().set_visible(True)
    ax.grid(True, which="both")

In [None]:
dfs = load_dfs()
for material, df in dfs.items():
    filter_counts(df, window_length=25, polyorder=5)

In [None]:
fitargs = {"prominence": 10.0, "width": 10.0, "distance": 10.0, "rel_height": 0.5}

In [None]:
def plot_and_approx(dfs, name, peak_nums, xs_label="Channel", prefix=""):
    df = dfs[name]
    fig, (ax_raw, ax_fitted) = plt.subplots(2, 1, figsize=(10, 6))

    ax_raw.set_title(f"Отфильтрованный спектр {name}")
    ax_fitted.set_title(f"Fitted спектр {name}")

    peaks, properties = find_peaks(df, **fitargs)
    plot0 = plot_with_peaks(
        ax_raw,
        df,
        peaks,
        properties,
        plot_color="blue",
        plot_marker=",",
        peaks_marker="X",
        plot_linestyle="-",
        markevery=100,
        xs_name=xs_label,
    )

    plot_fitted_peaks(
        ax_fitted, ax_raw, df, peak_nums, np.ones(len(peak_nums)), properties
    )

    ax_raw.legend([plot0[0]], ["$\mu_n$"], loc="upper right", fontsize=label_font)
    fig.savefig(f"output/{prefix}fitted-{name}-{xs_label}.pdf")
    return fig

In [None]:
peak_indices_calibrate = {
    "cs137": [6],
    "co60": [12, 13],
    "na22": [4]
}

peak_energies_calibrate = {
    "cs137": [661.7],
    "co60": [1173.2, 1332.5],
    "na22": [1274.0]
}

peak_indices_all = {
    "cs137": [6],
    "co60": [12, 13],
    "na22": [3, 4],
    "am241": [0, 1],
    "eu152": [2, 4, 6],
}

In [None]:
peaks = {name: find_peaks(dfs[name], **fitargs) for name in names}

photopeaks_xs = []
photopeaks_ys = []
for name, values in peak_indices_calibrate.items():
    idxs, _ = peaks[name]
    photopeaks_xs.extend([idxs[i] for i in values])
    photopeaks_ys.extend(peak_energies_calibrate[name])

result = sp.stats.linregress(photopeaks_xs, photopeaks_ys)
xs_fit = np.linspace(min(photopeaks_xs), max(photopeaks_xs), 1000)
ys_fit = result.slope * xs_fit + result.intercept

print(
    f"E = ({result.intercept:e} +- {result.intercept_stderr:e}) + ({result.slope:e} +- {result.stderr:e}) * N"
)

In [None]:
def convert_to_energy(n):
    return n * result.slope + result.intercept


def convert_to_sd_energy(n):
    return n * result.stderr + result.intercept_stderr

In [None]:
for df in dfs.values():
    df["Energy"] = np.vectorize(convert_to_energy)(df["Channel"])

In [None]:
peaks_dict = {
    "Material": [], 
    "N": [],
    "Delta(N)": [], 
    "E": [], 
    "Delta(E)": [], 
    "R": []
}

df_peaks = pd.DataFrame(peaks_dict)

In [None]:
for name in names:
    idxs = peak_indices_all[name]
    peaks, properties = find_peaks(dfs[name], **fitargs)
    widths = properties["widths"]

    for idx in idxs:
        peak = peaks[idx]
        width = widths[idx]

        new_row = {
            "Material": name,
            "N": peak,
            "Delta(N)": width,
            "E": convert_to_energy(peak),
            "Delta(E)": result.slope * width,
            "SD(E)": convert_to_sd_energy(peak),
            "SD(DE)": result.slope * width * result.stderr,
        }

        df_peaks = pd.concat(
            [df_peaks, pd.DataFrame([new_row])], ignore_index=True)

In [None]:
df_peaks["R"] = df_peaks["Delta(E)"] / df_peaks["E"]
df_peaks["SD(R)"] = df_peaks["R"] * df_peaks["SD(E)"] / df_peaks["E"]

In [None]:
def setup_axes(ax: plt.Axes):
    # https://github.com/WenqiJiang/matplotlib-templates/blob/master/plot/plot.py
    ax.get_xaxis().set_visible(True)
    ax.get_yaxis().set_visible(True)
    ax.grid(True, which="both")
    plt.rcParams.update({"figure.autolayout": True})

In [None]:
%%capture
fig, ax = plt.subplots(1, 1, figsize=(10, 6))

ax.scatter(photopeaks_xs, photopeaks_ys)
ax.plot(xs_fit, ys_fit)

setup_axes(ax)
ax.set_xlabel("$N_{channel}$")
ax.set_ylabel("$E$")

fig.set_label("Рис 1. Калибровочная зависимость")
fig.savefig(f"output/calibrating.pdf")

### Plots

In [None]:
%%capture
plot_and_approx(dfs, 'cs137', peak_indices_calibrate['cs137'])
plot_and_approx(dfs, 'co60', peak_indices_calibrate['co60'])
plot_and_approx(dfs, 'na22', peak_indices_calibrate['na22'])

In [None]:
%%capture
plot_and_approx(dfs, "cs137", peak_indices_all["cs137"], prefix="all")
plot_and_approx(dfs, "co60", peak_indices_all["co60"], prefix="all")
plot_and_approx(dfs, "na22", peak_indices_all["na22"], prefix="all")
plot_and_approx(dfs, "am241", peak_indices_all["am241"], prefix="all")
plot_and_approx(dfs, "eu152", peak_indices_all["eu152"], prefix="all")

In [None]:
df_peaks.to_excel("output/peaks.xlsx")

In [None]:
with_dropped = df_peaks.drop(index=[0])

In [None]:
with_dropped

In [None]:
inv_e = 1.0 / with_dropped["E"]
inv_e_sd = 1.0 / with_dropped['E'] ** 2.0 * with_dropped['SD(E)']
r_sq = with_dropped["R"] ** 2.0
r_sq_sd = 2.0 * r_sq *  with_dropped["SD(R)"]
result_r = sp.stats.linregress(inv_e, with_dropped["R"] ** 2.0)
xs_fit_r = np.linspace(min(inv_e), max(inv_e), 1000)
ys_fit_r = result_r.slope * xs_fit_r + result_r.intercept

In [None]:
fig, ax = plt.subplots(1, 1, figsize=(10, 6))

ax.errorbar(inv_e, r_sq, xerr=inv_e_sd, yerr=r_sq_sd, fmt='o')
ax.plot(xs_fit_r, ys_fit_r)

setup_axes(ax)
ax.set_xlabel("$\\frac{1}{E}$")
ax.set_ylabel("$R^2$")

fig.set_label("Рис 2. Зависимость R от E")
fig.savefig("output/r-relation.pdf")