In [None]:
! pip install numpy
! pip install scipy
! pip install pandas
! pip install matplotlib
! pip install scikit-learn
! pip install openpyxl



In [1]:
import numpy as np
import pandas as pd
from scipy.signal import savgol_filter
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
import os
from openpyxl import load_workbook

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [16]:
os.chdir('/content/drive/My Drive/jj/projects/2025/sidraAjmal/data')
! ls

low_TXA_conc_modified.xlsx  low_TXA_conc.xlsx


In [23]:
def detect_peaks_savgol(data, window=11, poly=2, threshold=3.0, min_peak_height=0.05, min_rel_height=0.10):
    """
    Detect peaks using Savitzky-Golay filter with strict threshold enforcement

    Parameters:
    -----------
    data : list or array
        The fluorescence intensity data
    window : int
        Window size for Savitzky-Golay filter (must be odd)
    poly : int
        Polynomial order for the filter
    threshold : float
        Number of standard deviations a point must exceed to be considered a peak
    min_peak_height : float
        Minimum absolute height a peak must have above the smoothed trendline
        (as a fraction of the mean signal value)
    min_rel_height : float
        Minimum relative height compared to the data range to be considered a peak

    Returns:
    --------
    peaks : list
        Indices of detected peaks
    smoothed : array
        Smoothed data from Savitzky-Golay filter
    residuals : array
        Difference between original data and smoothed trend
    """
    import numpy as np
    from scipy.signal import savgol_filter

    # Handle edge case with small data
    if len(data) < window:
        window = len(data) if len(data) % 2 == 1 else len(data) - 1
    if window < 3:
        window = 3

    # Adjust polynomial order based on window size
    if poly >= window:
        poly = window - 1

    # Apply Savitzky-Golay filter to get smoothed data
    smoothed = savgol_filter(data, window_length=window, polyorder=poly)

    # Calculate residuals (difference between actual and smoothed)
    residuals = np.array(data) - smoothed

    # Calculate statistics for peak detection
    std_residuals = np.std(residuals)
    mean_residuals = np.mean(residuals)

    # Calculate the actual threshold value
    threshold_value = mean_residuals + threshold * std_residuals

    # Debug print the threshold values
    # print(f"Mean residual: {mean_residuals}, Std: {std_residuals}, Threshold: {threshold_value}")

    # Set absolute minimum peak height as a fraction of mean signal
    mean_signal = np.mean(data)
    min_absolute_height = mean_signal * min_peak_height

    # Set minimum peak height relative to data range
    data_range = np.max(data) - np.min(data)
    min_relative_height = data_range * min_rel_height

    # Identify ONLY points that strictly exceed threshold
    peaks = []
    for i in range(len(data)):
        # Residual must exceed threshold value (strict comparison)
        residual_value = residuals[i]
        if residual_value <= threshold_value:
            continue

        # Absolute difference from smoothed trend
        absolute_diff = data[i] - smoothed[i]
        if absolute_diff <= min_absolute_height:
            continue

        # Relative height check compared to data range
        if absolute_diff <= min_relative_height:
            continue

        # If it passed all checks, add to peaks
        peaks.append(i)

    return peaks, smoothed, residuals

def process_excel_file(file_path, output_dir=None, debug_mode=True,
                       threshold=3.0, min_peak_height=0.05, min_rel_height=0.10):
    """
    Process Excel file with fluorescence data using improved peak detection

    Parameters:
    -----------
    file_path : str
        Path to the Excel file
    output_dir : str
        Directory to save results, default is fluorescence_results_YYYYMMDD_HHMMSS
    debug_mode : bool
        If True, save plots for all analyzed pairs
    threshold : float
        Statistical threshold for peak detection (standard deviations)
    min_peak_height : float
        Minimum height a peak must have (as fraction of mean signal)
    min_rel_height : float
        Minimum height relative to data range
    """
    import os
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    from openpyxl import load_workbook
    from scipy.signal import savgol_filter
    import datetime
    import sys

    # Create a timestamp for the output directory if not provided
    if output_dir is None:
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        output_dir = f"fluorescence_results_{timestamp}"

    # Create output directory
    os.makedirs(output_dir, exist_ok=True)

    # Set up logging to file with the same name as the directory
    log_file = f"{output_dir}.txt"
    # Save the original stdout to restore later
    original_stdout = sys.stdout
    # Open log file in write mode
    log_handle = open(log_file, 'w')
    # Redirect stdout to the log file
    sys.stdout = log_handle

    # Load the workbook
    print(f"Loading workbook: {file_path}")
    wb = load_workbook(file_path, data_only=True)

    results_summary = []

    # Process each sheet
    print("FIXED VERSION: Strict threshold enforcement")
    print(f"Processing with threshold: {threshold}, min_peak_height: {min_peak_height}, min_rel_height: {min_rel_height}")
    print(f"Available sheets: {wb.sheetnames}")

    for sheet_name in wb.sheetnames:
        print(f"\nProcessing sheet: {sheet_name}")
        sheet = wb[sheet_name]

        # Get basic sheet dimensions
        max_row = sheet.max_row
        max_col = sheet.max_column
        print(f"Sheet has {max_row} rows and {max_col} columns")

        # Process the sheet row by row
        valid_groups = 0
        processed_groups = 0

        # Start from row 2 and process in groups of 2
        row = 2
        while row < max_row:
            # Extract data from the first row
            row1_data = []
            for col in range(5, max_col + 1):
                value = sheet.cell(row=row, column=col).value
                if isinstance(value, (int, float)) and value is not None:
                    row1_data.append(value)

            # Only continue if we found data in the first row
            if len(row1_data) >= 5:
                # Check if there's at least one more row to form a pair
                if row + 1 <= max_row:
                    # Extract data from the next row
                    row2_data = []
                    for col in range(5, max_col + 1):
                        value = sheet.cell(row=row+1, column=col).value
                        if isinstance(value, (int, float)) and value is not None:
                            row2_data.append(value)

                    # Only process if both rows have data and lengths match
                    if len(row2_data) >= 5 and len(row1_data) == len(row2_data):
                        processed_groups += 1

                        # Assume first row is Cy5 and second row is Cy3
                        cy5_row = row
                        cy3_row = row + 1
                        cy5_data = row1_data
                        cy3_data = row2_data

                        # Run peak detection with strict threshold enforcement
                        cy5_window = min(11, len(cy5_data)-1) if len(cy5_data) > 2 else 3
                        cy3_window = min(11, len(cy3_data)-1) if len(cy3_data) > 2 else 3

                        # Make sure window length is odd
                        cy5_window = cy5_window if cy5_window % 2 == 1 else cy5_window - 1
                        cy3_window = cy3_window if cy3_window % 2 == 1 else cy3_window - 1

                        # Ensure polynomial order is less than window size
                        cy5_poly = min(2, cy5_window-1)
                        cy3_poly = min(2, cy3_window-1)

                        # Debug print to verify parameters - only if in debug mode
                        if debug_mode:
                            print(f"Peak detection parameters - Cy5: window={cy5_window}, poly={cy5_poly}, threshold={threshold}")
                            print(f"Peak detection parameters - Cy3: window={cy3_window}, poly={cy3_poly}, threshold={threshold}")

                        # Detect peaks with the updated function that strictly enforces thresholds
                        cy5_peaks, cy5_smoothed, cy5_residuals = detect_peaks_savgol(
                            cy5_data, window=cy5_window, poly=cy5_poly,
                            threshold=threshold, min_peak_height=min_peak_height,
                            min_rel_height=min_rel_height
                        )
                        cy3_peaks, cy3_smoothed, cy3_residuals = detect_peaks_savgol(
                            cy3_data, window=cy3_window, poly=cy3_poly,
                            threshold=threshold, min_peak_height=min_peak_height,
                            min_rel_height=min_rel_height
                        )

                        # Calculate exact threshold values for verification
                        cy5_std = np.std(cy5_residuals)
                        cy5_mean = np.mean(cy5_residuals)
                        cy5_threshold_line = cy5_mean + threshold * cy5_std

                        cy3_std = np.std(cy3_residuals)
                        cy3_mean = np.mean(cy3_residuals)
                        cy3_threshold_line = cy3_mean + threshold * cy3_std

                        # Verify peaks actually exceed thresholds
                        max_cy5_residual = max(cy5_residuals) if len(cy5_residuals) > 0 else 0
                        max_cy3_residual = max(cy3_residuals) if len(cy3_residuals) > 0 else 0

                        cy5_exceeds = max_cy5_residual > cy5_threshold_line
                        cy3_exceeds = max_cy3_residual > cy3_threshold_line

                        # Check if this is a positive match (Cy5 has peaks, Cy3 doesn't)
                        match_found = (len(cy5_peaks) > 0 and len(cy3_peaks) == 0)

                        # Always print for positive matches, only print others in debug mode
                        if match_found or debug_mode:
                            print(f"Row {cy5_row}: Max residual = {max_cy5_residual:.2f}, Threshold = {cy5_threshold_line:.2f}, Exceeds = {cy5_exceeds}")
                            print(f"Row {cy3_row}: Max residual = {max_cy3_residual:.2f}, Threshold = {cy3_threshold_line:.2f}, Exceeds = {cy3_exceeds}")

                        # Check if this is a positive match (Cy5 has peaks, Cy3 doesn't)
                        match_found = (len(cy5_peaks) > 0 and len(cy3_peaks) == 0)

                        if match_found:
                            valid_groups += 1
                            print(f"THIS IS A POSITIVE MATCH! Cy5 has {len(cy5_peaks)} peaks, Cy3 has none.")
                        elif debug_mode:
                            print(f"Not a match - {'Cy5 has no peaks' if len(cy5_peaks) == 0 else 'Cy3 has peaks too'}")

                        # Check whether to visualize this data pair
                        should_visualize = match_found  # Always visualize positive matches
                        if debug_mode:                  # In debug mode, visualize all pairs
                            should_visualize = True

                        if should_visualize:
                            # Create visualization
                            fig, axs = plt.subplots(2, 1, figsize=(12, 10))

                            # Plot Cy5 data
                            axs[0].plot(cy5_data, 'g-', label=f'Cy5 Data (Row {cy5_row})')
                            axs[0].plot(cy5_smoothed, 'r--', alpha=0.7, label='Smoothed Trend')
                            if cy5_peaks:
                                axs[0].scatter([p for p in cy5_peaks], [cy5_data[i] for i in cy5_peaks],
                                            color='green', marker='o', s=100, label='Detected Peaks')
                            axs[0].set_title(f'Sheet: {sheet_name}, Cy5 Fluorescence (Row {cy5_row})')
                            axs[0].legend()
                            axs[0].grid(True)

                            # Plot Cy3 data
                            axs[1].plot(cy3_data, 'b-', label=f'Cy3 Data (Row {cy3_row})')
                            axs[1].plot(cy3_smoothed, 'r--', alpha=0.7, label='Smoothed Trend')
                            if cy3_peaks:
                                axs[1].scatter([p for p in cy3_peaks], [cy3_data[i] for i in cy3_peaks],
                                            color='red', marker='o', s=100, label='Detected Peaks')
                            axs[1].set_title(f'Cy3 Fluorescence (Row {cy3_row})')
                            axs[1].legend()
                            axs[1].grid(True)

                            # Add residual plots to help diagnose peak detection performance
                            if debug_mode:
                                fig.set_size_inches(12, 15)
                                plt.subplots_adjust(hspace=0.4)

                                # Add two more subplots for residuals
                                axs = [axs[0], axs[1],
                                      fig.add_subplot(4, 1, 3),
                                      fig.add_subplot(4, 1, 4)]

                                # Calculate threshold lines for visualization
                                std_cy5 = np.std(cy5_residuals)
                                mean_cy5 = np.mean(cy5_residuals)
                                cy5_threshold_line = mean_cy5 + threshold * std_cy5

                                std_cy3 = np.std(cy3_residuals)
                                mean_cy3 = np.mean(cy3_residuals)
                                cy3_threshold_line = mean_cy3 + threshold * std_cy3

                                # Show max residual values to debug peak detection
                                cy5_max_residual = np.max(cy5_residuals)
                                cy3_max_residual = np.max(cy3_residuals)

                                # Add detailed threshold info to the titles
                                cy5_title = f'Cy5 Residuals (Max={cy5_max_residual:.2f}, Threshold={cy5_threshold_line:.2f})'
                                cy3_title = f'Cy3 Residuals (Max={cy3_max_residual:.2f}, Threshold={cy3_threshold_line:.2f})'

                                # Plot Cy5 residuals with threshold information
                                axs[2].plot(cy5_residuals, 'g-', label='Cy5 Residuals')
                                axs[2].axhline(y=0, color='k', linestyle='-', alpha=0.3)
                                axs[2].axhline(y=cy5_threshold_line, color='r',
                                              linestyle='--', label=f'Threshold ({threshold} σ)')

                                # Annotate the plot with the actual values
                                for i in range(len(cy5_residuals)):
                                    if cy5_residuals[i] > 0.7 * cy5_threshold_line:  # Show points close to threshold
                                        axs[2].annotate(f"{cy5_residuals[i]:.1f}",
                                                      xy=(i, cy5_residuals[i]),
                                                      xytext=(0, 10),
                                                      textcoords='offset points',
                                                      fontsize=8)

                                if cy5_peaks:
                                    axs[2].scatter([p for p in cy5_peaks], [cy5_residuals[i] for i in cy5_peaks],
                                                color='green', marker='o', s=100)
                                axs[2].set_title(cy5_title)
                                axs[2].legend()
                                axs[2].grid(True)

                                # Plot Cy3 residuals with threshold information
                                axs[3].plot(cy3_residuals, 'b-', label='Cy3 Residuals')
                                axs[3].axhline(y=0, color='k', linestyle='-', alpha=0.3)
                                axs[3].axhline(y=cy3_threshold_line, color='r',
                                              linestyle='--', label=f'Threshold ({threshold} σ)')

                                # Annotate the plot with the actual values
                                for i in range(len(cy3_residuals)):
                                    if cy3_residuals[i] > 0.7 * cy3_threshold_line:  # Show points close to threshold
                                        axs[3].annotate(f"{cy3_residuals[i]:.1f}",
                                                      xy=(i, cy3_residuals[i]),
                                                      xytext=(0, 10),
                                                      textcoords='offset points',
                                                      fontsize=8)

                                if cy3_peaks:
                                    axs[3].scatter([p for p in cy3_peaks], [cy3_residuals[i] for i in cy3_peaks],
                                                color='red', marker='o', s=100)
                                axs[3].set_title(cy3_title)
                                axs[3].legend()
                                axs[3].grid(True)

                            plt.tight_layout()

                            # Add debug indicator to filename if not a positive match
                            if match_found:
                                plot_filename = f"{output_dir}/{sheet_name}_Rows{cy5_row}-{cy3_row}.png"
                            else:
                                plot_filename = f"{output_dir}/DEBUG_{sheet_name}_Rows{cy5_row}-{cy3_row}.png"

                            plt.savefig(plot_filename)
                            plt.close(fig)

                            # Add to results summary if it's a positive match
                            if match_found:
                                # Check the peak heights (absolute value)
                                peak_heights = [cy5_data[p] - cy5_smoothed[p] for p in cy5_peaks]
                                relative_heights = [h / np.mean(cy5_data) * 100 for h in peak_heights]

                                results_summary.append({
                                    'Sheet': sheet_name,
                                    'Rows': f"{cy5_row}-{cy3_row}",
                                    'Cy5 Peaks': len(cy5_peaks),
                                    'Cy5 Peak Positions': [p+1 for p in cy5_peaks],
                                    'Cy5 Peak Heights': [f"{h:.2f}" for h in peak_heights],
                                    'Cy5 Relative Heights (%)': [f"{h:.2f}%" for h in relative_heights],
                                    'Max Residual': f"{max_cy5_residual:.2f}",
                                    'Threshold': f"{cy5_threshold_line:.2f}",
                                    'Plot': plot_filename
                                })

            # Move to the next pair of rows
            row += 2

        print(f"Processed {processed_groups} groups, {valid_groups} had Cy5 peaks and no Cy3 peaks in this sheet")

    # Create summary report
    if results_summary:
        summary_df = pd.DataFrame(results_summary)
        summary_df.to_csv(f"{output_dir}/peak_detection_summary.csv", index=False)
        print(f"\nSummary report saved to {output_dir}/peak_detection_summary.csv")
        print(f"Found {len(results_summary)} instances where Cy5 has peaks and Cy3 has none")

        print("\nPositive matches found:")
        for i, result in enumerate(results_summary, 1):
            print(f"{i}. Sheet: {result['Sheet']}, Rows: {result['Rows']}")
            print(f"   Cy5 has {result['Cy5 Peaks']} peaks at positions: {result['Cy5 Peak Positions']}")
            print(f"   Max residual: {result['Max Residual']} (threshold: {result['Threshold']})")
            print(f"   Peak heights: {result['Cy5 Peak Heights']}")
            print(f"   Relative to mean signal: {result['Cy5 Relative Heights (%)']}")
    else:
        print("\nNo matching patterns found in the data (Cy5 with peaks and Cy3 without)")

    # Close the log file and restore stdout
    sys.stdout = original_stdout
    log_handle.close()

    print(f"Results saved to directory: {output_dir}")
    print(f"Log file saved to: {log_file}")

    return results_summary

# Example usage
if __name__ == "__main__":
    import datetime

    file_path = "low_TXA_conc.xlsx"

    # Generate timestamp for output directory
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    output_dir = f"fluorescence_results_{timestamp}"

    # Process with strict threshold enforcement
    results = process_excel_file(
        file_path,
        output_dir=output_dir,
        debug_mode=False,  # Set to False to only show matching pairs
        threshold=3.0,    # Standard deviations threshold
        min_peak_height=0.05,
        min_rel_height=0.10
    )

    # If you want to debug, uncomment the following:
    # debug_timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    # debug_results = process_excel_file(
    #     file_path,
    #     output_dir=f"fluorescence_results_debug_{debug_timestamp}",
    #     debug_mode=True,  # Generate plots for all pairs
    #     threshold=3.0,
    #     min_peak_height=0.05,
    #     min_rel_height=0.10
    # )

Results saved to directory: fluorescence_results_20250313_232542
Log file saved to: fluorescence_results_20250313_232542.txt
