In [1]:
import numpy as np
import pandas as pd
import sys
# Add root directory to sys.path to import ExtractModule
sys.path.append(r"C:\Users\10552\OneDrive - Redlen Technologies\Code\H3D-app-deploy")
from Extract_module import ExtractModule, TransformDf
from icecream import ic
from spectrum_peak_finder import PeakFinder

print("Hello, World!")

Hello, World!


In [2]:
import os
def find_csv_files(directory):
    csv_files = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith('.csv'):
                csv_files.append(os.path.join(root, file))
    return csv_files

In [3]:
def crop_roi(array, crop_center, crop_halfwidth=40):
    bin_range = [crop_center - crop_halfwidth, crop_center + crop_halfwidth]
    cropped_array = array[bin_range[0]:bin_range[1]]
    return cropped_array

def find_peak_bin(array, crop_center, crop_halfwidth=40):
    cropped_array = crop_roi(array, crop_center, crop_halfwidth)
    bin_max = np.argmax(cropped_array) + crop_center - crop_halfwidth
    return bin_max

def find_peak_height(array, crop_center, crop_halfwidth=40):
    cropped_array = crop_roi(array, crop_center, crop_halfwidth)
    peak_height = np.max(cropped_array)
    return peak_height

In [4]:
def calculate_peak_count(array, peak_bin, peak_halfwidth=25):
    peak_count = np.sum(array[peak_bin - peak_halfwidth : peak_bin + peak_halfwidth])
    return peak_count

In [5]:
def extract_voltage(filename: str) -> int:
    string_parts = filename.split('_')[0]
    string_parts = string_parts.split('-')
    voltage = int(string_parts[2].replace('V', ''))
    return voltage

In [6]:
def avg_neighbor_counts(df, x_index, y_index, count_type='peak_count'):
    sum_counts = 0
    neighbor_coords = [(-1, 0), (1, 0), (0, -1), (0, 1)]
    neighbor_counter = 0
    for dx, dy in neighbor_coords:
        nx, ny = x_index + dx, y_index + dy
        if (nx in df['x_index'].values) and (ny in df['y_index'].values):
            neighbor_counter += 1
            sum_counts += df.loc[(df['x_index']==nx) & (df['y_index']==ny), count_type].values[0]
    avg_counts = sum_counts / neighbor_counter
    return avg_counts

In [7]:
# def leaking_ratio(row):
#     return row['peak_count'] / row['avg_neighbor_peak_count']

In [8]:
source_peak_bins = {
    "Cs137": 1578,
    "Co57": 244,
    "Am241": 57
}

In [9]:
directory = r"module_voltage_data"
csv_files = find_csv_files(directory)
# print(csv_files)
df_list = []
for csv_file in csv_files:
    filename = os.path.basename(csv_file)
    filename_no_ext = os.path.splitext(filename)[0]
    print(f"{filename_no_ext = }")
    voltage = extract_voltage(filename)
    # print(f"{voltage = }")

    if "Cs137" in filename_no_ext:
        if "Co57" in filename_no_ext:
            source = "Co57"
        else:
            source = "Cs137"
    else:
        source = "Am241"

    EM = ExtractModule(csv_file)
    df_h3d = EM.extract_module2df(module_number=0)
    TD = TransformDf()
    df = TD.transform_df(df_h3d)

    df["source"] = source
    df["voltage"] = voltage
    df["is_treated"] = False
    df.loc[(df["x_index"] == 9) & (df["y_index"] == 3), "is_treated"] = True
    df.loc[(df["x_index"] == 3) & (df["y_index"] == 9), "is_treated"] = True
    df.loc[(df["x_index"] == 9) & (df["y_index"] == 9), "is_treated"] = True

    # # avg spectrum use to determine the bin_peak
    avg_array_bins = df["array_bins"].sum(axis=0) / 121
    avg_peak_bin= find_peak_bin(avg_array_bins, source_peak_bins[source])
    df['avg_peak_bin'] = avg_peak_bin
    df['avg_peak_height'] = find_peak_height(avg_array_bins, source_peak_bins[source])

    df['peak_bin_per_pixel'] = df.apply(lambda row: find_peak_bin(row['array_bins'], 
                                                        source_peak_bins[source]), axis=1)
    
    df['peak_height_per_pixel'] = df.apply(lambda row: find_peak_height(row['array_bins'], 
                                                        source_peak_bins[source]), axis=1)

    df = TD.add_peak_counts(df, bin_peak=avg_peak_bin, bin_width=25)

    df["avg_neighbor_peak_count"] = df.apply(
        lambda row: avg_neighbor_counts(
            df, row["x_index"], row["y_index"], "peak_count"
        ),
        axis=1,
    )

    df["leaking_ratio"] = df.apply(
        lambda row: row["peak_count"] / row["avg_neighbor_peak_count"], axis=1
    )

    df_list.append(df)


filename_no_ext = 'Am241-30min-1000V_Am241'
filename_no_ext = 'Am241-30min-1500V_Am241'


filename_no_ext = 'Am241-30min-2000V_Am241'
filename_no_ext = 'Co57-30min-1000V_Cs137'
filename_no_ext = 'Co57-30min-1500V_Cs137'
filename_no_ext = 'Co57-30min-2000V_Cs137'
filename_no_ext = 'Cs137-30min-1000V_Cs137'
filename_no_ext = 'Cs137-30min-1500V_Cs137'
filename_no_ext = 'Cs137-30min-2000V_Cs137'


In [10]:
# df['array_bins'].values
row = df.iloc[120]
print(row['array_bins'].shape)
print(row['array_bins'])
print(find_peak_bin(row['array_bins'], 97))
print(find_peak_height(row['array_bins'], 97))


(2002,)
[ 0  0  0 ...  0 11 11]
135
11


In [11]:
df['peak_bin'] = df.apply(lambda row: find_peak_bin(row['array_bins'], 
                                                    source_peak_bins[source]), axis=1)

# print(df.head())

In [12]:
full_df = pd.concat(df_list)
print(full_df.columns)
print(full_df.shape)
print(full_df.info())

Index(['x_index', 'y_index', 'pixel_id', 'array_bins', 'total_count',
       'total_counts_norm', 'is_edge', 'source', 'voltage', 'is_treated',
       'avg_peak_bin', 'avg_peak_height', 'peak_bin_per_pixel',
       'peak_height_per_pixel', 'peak_count', 'non_peak_count',
       'avg_neighbor_peak_count', 'leaking_ratio', 'peak_bin'],
      dtype='object')
(1089, 19)
<class 'pandas.core.frame.DataFrame'>
Index: 1089 entries, 1 to 121
Data columns (total 19 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   x_index                  1089 non-null   int32  
 1   y_index                  1089 non-null   int32  
 2   pixel_id                 1089 non-null   int64  
 3   array_bins               1089 non-null   object 
 4   total_count              1089 non-null   int32  
 5   total_counts_norm        1089 non-null   float64
 6   is_edge                  1089 non-null   bool   
 7   source                   1089 non-null 

### Saving the Dataframe
maybe choose a different file format from csv

In [13]:
# remove 'array_bins' column
# full_df_2 = full_df_2.drop(columns=['array_bins'])
# full_df_2.to_csv("full_df_2.csv", index=False)
# full_df_2.to_csv("full_df.csv", index=False)

## Spectrum Analysis

In [14]:
import plotly.express as px
import plotly.graph_objects as go

In [15]:
source_filter = 'Cs137'
source_df = full_df[full_df['source'] == source_filter]
untreated_df = source_df[source_df['is_treated'] == False]
treated_df = source_df[source_df['is_treated'] == True] 

untreated_avg_array = untreated_df['array_bins'].sum(axis=0) /untreated_df.shape[0]
treated_avg_array = treated_df['array_bins'].sum(axis=0) /treated_df.shape[0]


In [16]:
# source_filter = 'Am241'
# source_filter = 'Co57'
source_filter = 'Cs137'
source_df = full_df[full_df['source'] == source_filter]

fig = go.Figure()

line_colors = ['red', 'green', 'blue']
for i,voltage in enumerate([1000, 1500, 2000]):
    
    untreated_df = source_df[(source_df['is_treated'] == False) & (source_df['voltage'] == voltage)]
    untreated_avg_array = untreated_df['array_bins'].sum(axis=0) /untreated_df.shape[0]
    fig.add_trace(
        go.Scatter(
            x=np.arange(1, len(untreated_avg_array) + 1),
            y=untreated_avg_array,
            name="Untreated @ " + str(voltage) + "V",
            mode='lines',
            line=dict(dash='dash'),
            marker=dict(color=line_colors[i])
        )
    )
    
    treated_df = source_df[(source_df['is_treated'] == True) & (source_df['voltage'] == voltage)]
    treated_avg_array = treated_df['array_bins'].sum(axis=0) /treated_df.shape[0]
    fig.add_trace(
        go.Scatter(
            x=np.arange(1, len(treated_avg_array) + 1),
            y=treated_avg_array,
            name="Treated @ " + str(voltage) + "V",
            mode='lines',
            marker=dict(color=line_colors[i]),
        )
    )
fig.update_layout(
    title="Treated vs Untreated Pixel Spectrums " + source_filter,
    xaxis_title="Bin Number",
    yaxis_title="Counts",
    showlegend=True,
)

fig.show()