# Report: RMHPATH01_v0.13
How to use:  
1. Drop this report into the experiments /data folder  
2. Run the first code cell. Select your quant_model from the dropdown  
3. Run the rest of the report  
4. Hide code cells then save report output

In [None]:
import ipywidgets as widgets
from IPython.display import display

# Define the options grouped by category
options = {
    'Recommended': [
        'SLR; log10(x); exc_std0; ct',
        'SLR; log10(x); exc_std0; rdml_log2N0 (mean eff) - no plateau - stat efficiency'
    ],
    'Recommended-debug': [
        'SLR; log10(x); exc_std0; rdml_Cq (indiv eff - for debug use)',
        'SLR; log10(x); exc_std0; rdml_Cq with group threshold (indiv eff - for debug use)'
    ],
    'Developmental': [
        '4PL; linear(x); exc_std0; rdml_log2N0 (mean eff) - no plateau - stat efficiency',
        '4PL; log10(x); exc_std0; rdml_log2N0 (mean eff) - no plateau - stat efficiency',
        '5PL; linear(x); exc_std0; rdml_log2N0 (mean eff) - no plateau - stat efficiency',
        '5PL; log10(x); exc_std0; rdml_log2N0 (mean eff) - no plateau - stat efficiency'
    ],
    'Developmental-debug': [
        '4PL; linear(x); exc_std0; rdml_Cq (indiv eff - for debug use)',
        '4PL; linear(x); exc_std0; rdml_Cq with group threshold (indiv eff - for debug use)',
        '4PL; log10(x); exc_std0; rdml_Cq (indiv eff - for debug use)',
        '4PL; log10(x); exc_std0; rdml_Cq with group threshold (indiv eff - for debug use)',
        '5PL; linear(x); exc_std0; rdml_Cq (indiv eff - for debug use)',
        '5PL; linear(x); exc_std0; rdml_Cq with group threshold (indiv eff - for debug use)',
        '5PL; log10(x); exc_std0; rdml_Cq (indiv eff - for debug use)',
        '5PL; log10(x); exc_std0; rdml_Cq with group threshold (indiv eff - for debug use)'
    ]
}

# Create a list of tuples for the dropdown with group headers
dropdown_options = []
for group, items in options.items():
    dropdown_options.append((f'--- {group} ---', None))  # Add group header
    dropdown_options.extend([(item, item) for item in items])

# Create the dropdown widget
quant_model_dropdown = widgets.Dropdown(
    options=dropdown_options,
    value=None,
    description='Model:',
    style={'description_width': 'initial'},
    layout={'width': 'auto'}
)

# Function to handle selection changes
def on_value_change(change):
    if change['new'] is not None:  # Ignore selection of group headers
        global quant_model
        quant_model = change['new']
        print(f"quant_model has been set to: {quant_model}")

# Register the callback
quant_model_dropdown.observe(on_value_change, names='value')

# Display the widget
display(quant_model_dropdown)

In [None]:
import ipywidgets as widgets
from IPython.display import display, HTML, Javascript

class CodeToggler:
    def __init__(self):
        self.is_visible = True  # Track state
        
        # Improved JavaScript with state tracking and all cells handling
        self.js_code = """
        var jupyterCodeToggler = {
            isVisible: true,
            
            toggleCodeCells: function() {
                // Get all input cells including those before the button
                var codeCells = document.querySelectorAll('div.input');
                var newDisplay = this.isVisible ? 'none' : 'block';
                
                // Update all cells
                codeCells.forEach(function(cell) {
                    cell.style.display = newDisplay;
                });
                
                // Toggle state
                this.isVisible = !this.isVisible;
                
                // Store state in localStorage for persistence
                localStorage.setItem('jupyterCodeTogglerState', this.isVisible);
            },
            
            initializeState: function() {
                // Restore state from localStorage or default to visible
                var savedState = localStorage.getItem('jupyterCodeTogglerState');
                this.isVisible = savedState === null ? true : savedState === 'true';
                
                // Apply initial state
                var codeCells = document.querySelectorAll('div.input');
                var display = this.isVisible ? 'block' : 'none';
                codeCells.forEach(function(cell) {
                    cell.style.display = display;
                });
            }
        };
        
        // Initialize state when the notebook loads
        jupyterCodeToggler.initializeState();
        """
        
        # Create and configure the button
        self.button = widgets.Button(
            description="Toggle Code Cells",
            layout=widgets.Layout(width='250px'),
            tooltip="Click to show/hide code cells"
        )
        
        # Define the toggle function
        def toggle_code_cells(button):
            display(Javascript("jupyterCodeToggler.toggleCodeCells();"))
            self.is_visible = not self.is_visible
            button.description = "Hide Code Cells" if self.is_visible else "Show Code Cells"
            
        # Bind the function to the button
        self.button.on_click(toggle_code_cells)
    
    def initialize(self):
        """Initialize the toggler in the notebook"""
        # First inject the JavaScript code
        display(HTML(f"<script>{self.js_code}</script>"))
        # Then display the button
        display(self.button)

# Create and initialize the toggler
toggler = CodeToggler()
toggler.initialize()

In [None]:
# Find base_path and the ProxiPal module (no need to install ProxiPal or assign to environment PATH)
from pathlib import Path
import os
import sys

# Find base path first
required_folders = {'data', 'python', 'samples', 'templates', 'quality'}
cwd = Path(os.getcwd())
base_path = next((p for p in cwd.parents if required_folders <= {d.name for d in p.iterdir() if d.is_dir()}), None)

# Add the python directory to the system path
sys.path.append(str(base_path / 'python'))

# Now you can import ProxiPal
# from ProxiPal import *

## Working Directory
Displays where the files are located, time of analysis and runtime environment

In [None]:
from IPython.display import HTML, display
from contextlib import contextmanager
import sys
import io

@contextmanager
def small_print():
    old_stdout = sys.stdout
    string_io = io.StringIO()
    sys.stdout = string_io
    try:
        yield
    finally:
        sys.stdout = old_stdout
        output = string_io.getvalue()
        # Preserve newlines and tabs by replacing them with HTML equivalents
        formatted_output = output.replace('\n', '<br>').replace('\t', '&nbsp;&nbsp;&nbsp;&nbsp;')
        display(HTML(f'<div style="font-size: 11px; font-family: monospace; white-space: pre;">{formatted_output}</div>'))

# Use it like this:
with small_print():
    from ProxiPal import *


In [None]:
# Read in py_metatable processed by ProxiPal
py_metatable = pd.read_csv(cwd / 'exports/py_metatable.csv', low_memory = False)

## Instrument Parameters
All instrument parameters available, just pick what you need

In [None]:
columns_of_interest = ['ExperimentName',
                        'filepath_txt', 'Kit_#','Operator(s)', 
                        'Comment',
                       'InstrumentType', 
                        'InstrumentSerialNumber',
                       'ExperimentRunStartTime', 
                    'Stage/CyclewhereAnalysisisperformed',
                      'Chemistry', 'PassiveReference', 'BlockType',
                     'CalibrationBackgroundisexpired',
                     'CalibrationPureDyeROXisexpired',
                     'CalibrationPureDyeSYBRisexpired',
                     'CalibrationROIisexpired',
                     'CalibrationUniformityisexpired']

# Create a DataFrame to store results
result_df = pd.DataFrame(columns=['Parameter', 'Status'])

for col in columns_of_interest:
    unique_vals = py_metatable[col].unique()
    if len(unique_vals) == 1:
        status = str(unique_vals[0])
    else:
        status = f"WARNING: {len(unique_vals)} different values found: {', '.join(map(str, unique_vals))}"
        
    new_row = pd.DataFrame({'Parameter': [col], 'Status': [status]})
    result_df = pd.concat([result_df, new_row], ignore_index=True)

display(result_df.style.set_properties(subset=['Status'], **{'width': '650px'}))

## Plate Plan

In [None]:
    fig1 = create_plate_visualization(py_metatable, palette = None, font_size = 10)
    plt.close('all')
    plt.figure(fig1)
    plt.show()

## Plate Results  

In [None]:
v1 = quant_model.split('; ')[3]
v2 = quant_model + '; raw_ng/L'
v3 = 'rdml_indiv PCR eff'
plot_metatable = py_metatable.copy()
plot_metatable[v1] = pd.to_numeric(plot_metatable[v1], errors='coerce').round(1)
temp_v2 = pd.to_numeric(plot_metatable[v2], errors='coerce')
plot_metatable[v2] = pd.array([int(round(i)) if pd.notna(i) else pd.NA for i in temp_v2], dtype="Int64")
plot_metatable[v3] = pd.to_numeric(plot_metatable[v3], errors='coerce').round(2)
fig2 = create_plate_visualization(plot_metatable, plate_format= 96, palette = None, font_size = 9,
                                   value1=(v1, 'cycle: '), 
                                   value2=(v2, 'ng/L: '),
                                  heatmap=True, heatmap_palette="vlag", heatmap_value=(v3, 'eff: '))
plt.close('all')
plt.figure(fig2)
plt.show()

## Standard Curve
Simple linear regresion has been the most performant to date. 4PL and 5PL available but are prone to over fitting. All statistics for either model are available; if you want a statistic that is missing let me know

In [None]:
print("\nStandards Plot:")
plt.figure(figsize=(8, 6))  # Create figure explicitly
slr_plot = plot_slr_standards(metatable=py_metatable, 
                          threshold_type=quant_model.split('; ')[3],
                          std0_status=quant_model.split('; ')[2],
                          figsize=(8, 6),
                          separate_plots=False)
plt.show(slr_plot)  # Force plot display

SLR_experiment_tables = extract_experiment_tables(
    df=py_metatable,
    filepath_csv=py_metatable['filepath_csv'].unique()[0],  # Use filepath from metatable
    quant_model=quant_model.split('; ')[0],
    threshold_type=quant_model.split('; ')[3],
    transform_x=quant_model.split('; ')[1],
    std0_status=quant_model.split('; ')[2],
    sample_type = 'standards',
    simple_headers=True
)

# Display report tables for each standard
for key in SLR_experiment_tables.keys():
    if 'report_table' in key:
        print(f"\n{key}:")
        display(SLR_experiment_tables[key])

## QC Check
This is a rudimentary placeholder table displaying QC failures.  
Notation for ng/L values = [position;mean_ng/L;n.stdev from mean]

In [None]:
def process_qc_data(qc_df, metatable_df):
    """
    Process QC data and identify failures based on standard deviation ranges.
    Uses mean values from qc_df to evaluate failures for both raw and mean ng/L values.
    Groups mean_ng/L failures by rep_id before evaluation.
    
    Parameters:
    qc_df (pd.DataFrame): DataFrame containing QC information
    metatable_df (pd.DataFrame): DataFrame containing measurement data with dilution column
    
    Returns:
    pd.DataFrame: Updated QC DataFrame with fail values
    """
    # Create a copy of the input DataFrame to avoid modifying the original
    qc_df = qc_df.copy()
    
    def parse_range(range_str):
        """Extract min and max values from range string '[min, max]'"""
        return [float(x) for x in range_str.strip('[]').split(',')]
    
    def calculate_deviations(value, mean, std):
        """Calculate number of standard deviations from mean"""
        return abs(value - mean) / std
    
    def check_failures(sample_data, mean_val, std_val, value_column):
        """Check for failures in a specific value column"""
        fails = []
        for pos_idx, pos_row in sample_data.iterrows():
            value = pos_row[value_column]
            # Skip if value is NaN or empty
            if pd.isna(value):
                continue
            position = pos_row['position']
            
            # Check if value is outside the acceptable range defined by mean ± 2*std
            if (value < (mean_val - 2*std_val)) or (value > (mean_val + 2*std_val)):
                dev_calc = calculate_deviations(value, mean_val, std_val)
                fail_str = f"[{position};{round(value)};{dev_calc:.1f}]"
                fails.append(fail_str)
        return fails
    
    def check_mean_failures(sample_data, mean_val, std_val, value_column):
        """Check for failures in mean values after grouping by rep_id"""
        fails = []
        # Group by rep_id and get unique mean_ng/L values
        grouped = sample_data.groupby('rep_id')
        
        for rep_id, group in grouped:
            # Get the mean_ng/L value for this group
            mean_ng_L = group[value_column].iloc[0]  # All values in group should be same
            
            # Skip if value is NaN or empty
            if pd.isna(mean_ng_L):
                continue
                
            # Check if mean value is outside acceptable range
            if (mean_ng_L < (mean_val - 2*std_val)) or (mean_ng_L > (mean_val + 2*std_val)):
                dev_calc = calculate_deviations(mean_ng_L, mean_val, std_val)
                # Collect all positions in this group
                positions = group['position'].tolist()
                # Format as "position1, position2, ... [mean_value, dev_calc]"
                positions_str = ", ".join(positions)
                fail_str = f"{positions_str} [{round(mean_ng_L)}, {dev_calc:.1f}]"
                fails.append(fail_str)
        return fails
    
    # Process each QC sample
    for idx, row in qc_df.iterrows():
        sample_id_full = row['sample_id']
        # Extract the base sample ID (remove the bracketed value)
        base_sample_id = sample_id_full.split('[')[0]
        mean_val = row['mean']
        std_val = row['std']
        
        # Filter metatable for current sample_id (using base sample ID)
        sample_data = metatable_df[
            metatable_df['sample_id'].str.startswith(base_sample_id)
        ]
        
        # Check raw values
        raw_fails = check_failures(sample_data, mean_val, std_val, 
                                 quant_model + '; raw_ng/L')
        if raw_fails:
            qc_df.at[idx, 'fail raw_ng/L'] = '; '.join(raw_fails)
            
        # Check mean values with new grouping logic
        mean_fails = check_mean_failures(sample_data, mean_val, std_val,
                                       quant_model + '; mean_ng/L')
        if mean_fails:
            qc_df.at[idx, 'fail mean_ng/L'] = '; '.join(mean_fails)
    
    return qc_df

In [None]:
# Path to prefiltered mastertable for imprecision analysis
NfL_imprecision_nov24 = pd.read_csv(quality_folder / 'imprecision_tables' / 'NfL_imprecision_nov24.csv', low_memory = False)

# Analyse for imprecision
print("Reporting Excel calculated statistics")
stats_results = analyze_sample_stats(NfL_imprecision_nov24, model_type =  'usr_raw_ng/L', 
                                     decimal_places = 'integer', format_timing = 'before')

stats_results = stats_results[stats_results['sample_id'].isin(['NFL-QC-H', 'NFL-QC-M', 'NFL-QC-L'])]

# create a pretty table for the report
stats_results.reset_index(inplace=True, drop=True)
stats_results.drop(['n_samples', 'std1_range', 'std3_range'], axis=1, inplace=True)
qc_df = process_qc_data(stats_results, py_metatable)

# Create a pretty version for display
pretty_table = qc_df.copy()

# Replace semicolons with HTML line breaks in the failure columns
if 'fail raw_ng/L' in pretty_table.columns:
    pretty_table['fail raw_ng/L'] = pretty_table['fail raw_ng/L'].str.replace('; ', '<br>')
if 'fail mean_ng/L' in pretty_table.columns:
    pretty_table['fail mean_ng/L'] = pretty_table['fail mean_ng/L'].str.replace('; ', '<br>')

# Display with HTML and preserve the line breaks
from IPython.display import display, HTML
display(HTML(pretty_table.replace(np.nan,'').to_html(escape=False)))

## Samples 
/ separates multiple duplicate groups

In [None]:
# Set pandas option to prevent downcasting warning
pd.set_option('future.no_silent_downcasting', True)
# Define the mean_ng column name
mean_ng_col = quant_model + '; mean_ng/L'
# Load master table
master_table = load_most_recent_mastertable(base_path / 'exports')[0]
# Create a copy of the master table
df = master_table.copy()
# Process numeric columns
numeric_columns = [mean_ng_col, 'age']
for col in numeric_columns:
    if col in df.columns:
        df[col] = (pd.to_numeric(df[col], errors='coerce')
                  .round(0)
                  .fillna(-999)
                  .astype(int)
                  .replace(-999, ''))
# Get path query
path_query = py_metatable['filepath_csv'].unique()[0]
# Select and filter columns
columns_to_keep = [
    'sample_id', 'tube_id', mean_ng_col,
    'med_abbrev', 'age', 'sex', 'specimen_type', 
    'collection_type', 'comments (anyone can use)'
]
# Filter the dataframe
filtered_df = df.loc[df['filepath_csv'] == path_query, columns_to_keep].copy()
# Handle NA values with explicit type conversion
filtered_df = filtered_df.astype(str).replace('nan', '')
# Apply ignore and standards filters - keeping exact original regex
filtered_df = filtered_df[
    (df.loc[df['filepath_csv'] == path_query, 'usr_ignore'] != 1) & 
#     (~filtered_df['sampleid'].str.contains(r'\[(.*?)\]', regex=True, na=False))
    (~filtered_df['sample_id'].str.contains(r'\[(?:.*?)\]', regex=True, na=False))
]
# Rename mean_ng column
filtered_df = filtered_df.rename(columns={mean_ng_col: 'mean_ng/L'})
# Group and aggregate
sample_table = filtered_df.groupby(['sample_id', 'tube_id']).agg(
    lambda x: x.iloc[0] if x.nunique() == 1 else '/'.join(x.unique())
).reset_index()
# Print info
print("Default mean_ng/L = " + quant_model + "; mean_ng/L")
print("To change mean_ng/L model, change the code directly")
# Display result
display(sample_table)

## Replicates
printout at top indicates some column names are abbreviated, in this case the parameters for the quantitation model have been removed from the ng/L column headers.

In [None]:
rep_dict = extract_experiment_tables(df = py_metatable.copy(), 
                                     filepath_csv = py_metatable['filepath_csv'].unique()[0], 
                                    quant_model=quant_model.split('; ')[0],
                                    threshold_type=quant_model.split('; ')[3],
                                    transform_x=quant_model.split('; ')[1],
                                    std0_status=quant_model.split('; ')[2],
                                    sample_type = 'samples',
                                    simple_headers=True,
                                 custom_columns = ['sample_id', 'tube_id', 'position',
                                                   'dilution', 
                                                   quant_model.split('; ')[3], 
                                                   quant_model.split('; ')[3] + '; mean', 
                                                   quant_model + '; raw_ng/L',
                                                   quant_model + '; mean_ng/L'])

rep_dict['sample_report_table']['tube_id'] = rep_dict['sample_report_table']['tube_id'].replace('nan', '')
rep_dict['sample_report_table'] = rep_dict['sample_report_table'].rename(columns={quant_model.split('; ')[3]: 'cycle'})

rep_dict['sample_report_table']

## Wells  
"usr_ignore" indicates that a reaction has been manually disqualified, reasons are given in "comments". "rep_id" indicates a replicate group.  
"PCR eff" here refers to reaction efficiency in a single reaction (its cited and used in the heatmap above). This metric is only available where instrument data has been also been analysed in linreg (which we do automatically). PCR reaction efficiency here is calculated from the amplification curve slope of an individual reaction; Standard Curve based PCR reaction efficiency (above) is a serial dilution based efficiency estimate that relies on multiple reactions.  
  
Tracking individual reaction efficiency might indicate thermal inconsistencies in the instrument or sample interferences. If it doesn't show a drift down across the plate then poor optics in some well positions might better explain the issues we see in lane 12. That may be identifiable by pre-pcr or ROX value. 

In [None]:
well_dict = extract_experiment_tables(df = py_metatable.copy(), 
                                     filepath_csv = py_metatable['filepath_csv'].unique()[0], 
                                    quant_model=quant_model.split('; ')[0],
                                    threshold_type=quant_model.split('; ')[3],
                                    transform_x=quant_model.split('; ')[1],
                                    std0_status=quant_model.split('; ')[2],
                                    sample_type = 'wells',
                                    simple_headers=True,
                                 custom_columns = ['sample_id', 'tube_id', 'position', 'usr_ignore',
                                                   quant_model.split('; ')[3], 
                                                   'tm',
                                                   quant_model + '; raw_ng/L',
                                                   'rdml_indiv PCR eff',
                                                   'usr_comments'])





well_dict['wells_table']['tube_id'] = well_dict['wells_table']['tube_id'].replace(np.nan, 'n/a')
well_dict['wells_table']['usr_comments'] = well_dict['wells_table']['usr_comments'].replace(np.nan, '')
well_dict['wells_table'] = well_dict['wells_table'].rename(columns={'rdml_indiv PCR eff': 'PCR_eff'})
pd.set_option('display.max_rows', 100)  # shows up to 100 rows


well_dict['wells_table'] = well_dict['wells_table'].rename(columns={quant_model.split('; ')[3]: 'cycle'})
well_dict['wells_table']

# Export CSV results for RMH Pathology

In [None]:
# Create a single-row dataframe with all the info
Plate_QC_summary = pd.DataFrame({
    'Date': [py_metatable['ExperimentRunStartTime'].unique()[0]],
    '# Samples': [len(sample_table)],
    'Plate #': [""],
    'Plate Batch': [py_metatable['Kit_#'].unique()[0]],
    'Instrument': [py_metatable['InstrumentType'].unique()[0] + str(py_metatable['InstrumentSerialNumber'].unique())],
    'Operator(s)': [py_metatable['Operator(s)'].unique()[0]],
    'QC-H mean': [qc_df.loc[qc_df['sample_id'] == 'NFL-QC-H', 'mean'].item()],
    'QC-H status': [qc_df.loc[qc_df['sample_id'] == 'NFL-QC-H', 'fail mean_ng/L'].fillna("pass").item()],
    'QC-M status': [qc_df.loc[qc_df['sample_id'] == 'NFL-QC-M', 'fail mean_ng/L'].fillna("pass").item()],
    'QC-L status': [qc_df.loc[qc_df['sample_id'] == 'NFL-QC-L', 'fail mean_ng/L'].fillna("pass").item()]
})

Plate_QC_summary

In [None]:
def export_qc_summary(button):
    # Disable the button while processing
    button_export.disabled = True
    
    # Get current date for filename

    filename = f'{py_metatable['expt_folder_csv'].unique()[0]}_Plate_QC_summary.csv'
    
    # Export to CSV
    try:
        Plate_QC_summary.to_csv(filename, index=False)
        print(f"QC Summary exported successfully to: {filename}")
        print("Export completed on: ", datetime.now().strftime("%A %d/%m/%y %H:%M"))
    except Exception as e:
        print(f"Error exporting file: {str(e)}")
    
    # Re-enable the button
    button_export.disabled = False

# Create a button
button_export = widgets.Button(
    description="Export QC Summary",
    layout=widgets.Layout(width='250px')
)

# Assign the function to the button's on_click event
button_export.on_click(export_qc_summary)

# Display the button
display(button_export)

In [None]:
def export_sample_table(button):
    # Disable the button while processing
    button_export.disabled = True
    
    # Get current date for filename

    filename = f'{py_metatable['expt_folder_csv'].unique()[0]}_Sample_Table.csv'
    
    # Export to CSV
    try:
        sample_table.to_csv(filename, index=False)
        print(f"Sample Table exported successfully to: {filename}")
        print("Export completed on: ", datetime.now().strftime("%A %d/%m/%y %H:%M"))
    except Exception as e:
        print(f"Error exporting file: {str(e)}")
    
    # Re-enable the button
    button_export.disabled = False

# Create a button
button_export = widgets.Button(
    description="Export Sample Table",
    layout=widgets.Layout(width='250px')
)

# Assign the function to the button's on_click event
button_export.on_click(export_sample_table)

# Display the button
display(button_export)