<a href="https://colab.research.google.com/github/yongchanzzz/FSEC2csv/blob/main/FSEC_FindPeaks.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# FSEC Multi-Peak Analysis
Upload a .csv file generated by [FSEC2csv](https://colab.research.google.com/github/yongchanzzz/FSEC2csv/blob/main/FSEC2csv.ipynb) and it will calculate the area under curve for each peak using Gaussian deconvolution.

This notebook supports fitting **1-5 peaks** with user-defined search regions.

In [None]:
#@title Upload CSV file
from google.colab import files

# Upload CSV file
uploaded = files.upload()
print("\nFile uploaded successfully!")

In [None]:
#@title Configure Analysis Parameters
#@markdown **Basic Parameters**
flow_rate = 0.4  # @param {type:"number"}
num_peaks = 2    # @param {type:"number", min:1, max:5, step:1}

print(f"Flow rate: {flow_rate} mL/min")
print(f"Number of peaks to fit: {num_peaks}")
print("\nProceed to the next cell to configure peak search parameters.")

In [None]:
#@title Peak Search Parameters
from IPython.display import display, HTML, clear_output
import ipywidgets as widgets

# Store peak parameters
peak_params = {}

def create_peak_inputs():
    """Create input widgets for each peak"""
    peak_widgets = []

    for i in range(num_peaks):
        peak_num = i + 1

        # Default values based on original notebook
        default_start = 2.45 if i == 0 else 2.15
        default_end   = 2.65 if i == 0 else 2.35

        start_widget = widgets.FloatText(
            value=default_start,
            description=f'Peak {peak_num} Start Vol:',
            style={'description_width': 'initial'},
            layout=widgets.Layout(width='300px')
        )

        end_widget = widgets.FloatText(
            value=default_end,
            description=f'Peak {peak_num} End Vol:',
            style={'description_width': 'initial'},
            layout=widgets.Layout(width='300px')
        )

        width_widget = widgets.FloatText(
            value=0.0,
            description=f'Peak {peak_num} Width:',
            style={'description_width': 'initial'},
            layout=widgets.Layout(width='300px')
        )

        # Group widgets for this peak
        peak_box = widgets.VBox([
            widgets.HTML(f'<b>Peak {peak_num} Parameters:</b>'),
            start_widget,
            end_widget,
            width_widget,
            widgets.HTML('<i>Note: Width = 0 means auto-fit; >0 means fixed width</i><br>')
        ])

        peak_widgets.append({
            'box': peak_box,
            'start': start_widget,
            'end': end_widget,
            'width': width_widget
        })

    return peak_widgets

def collect_parameters(button):
    """Collect all peak parameters from widgets"""
    global peak_params
    peak_params = {}

    for i, widget_set in enumerate(peak_widgets):
        peak_num = i + 1
        peak_params[peak_num] = {
            'start_vol': widget_set['start'].value,
            'end_vol': widget_set['end'].value,
            'width': widget_set['width'].value
        }

    # Display confirmation
    with output_area:
        clear_output()
        print("✅ Parameters collected successfully!")
        print("Peak Configuration:")
        for peak_num, params in peak_params.items():
            print(f"Peak {peak_num}: Vol {params['start_vol']:.3f} - {params['end_vol']:.3f}, Width = {params['width']}")
        print("Ready to run analysis! Proceed to the next cell.")

# Create widgets
peak_widgets = create_peak_inputs()
confirm_button = widgets.Button(
    description='Confirm Parameters',
    button_style='success'
)
confirm_button.on_click(collect_parameters)

output_area = widgets.Output()

# Display all widgets
all_widgets = [widget_set['box'] for widget_set in peak_widgets]
all_widgets.extend([confirm_button, output_area])

display(widgets.VBox(all_widgets))

In [None]:
#@title Run Multi-Peak Analysis
import pandas as pd
import numpy as np
from scipy.optimize import curve_fit
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import zipfile, os
from datetime import datetime
from google.colab import files

def gaussian(x, a, b, c):
    """Single Gaussian function"""
    return a * np.exp(-((x - b)**2) / (2 * c**2))

def multi_gaussian(x, *params):
    """Multi-Gaussian function for n peaks"""
    n_peaks = len(params) // 3
    result = np.zeros_like(x)

    for i in range(n_peaks):
        a, b, c = params[3*i:3*i+3]
        result += gaussian(x, a, b, c)

    return result

def find_peak_in_region(time, intensity, start_vol, end_vol, flow_rate):
    """Find peak maximum in specified volume region with peak detection"""
    start_time = start_vol / flow_rate
    end_time = end_vol / flow_rate

    mask = (time >= start_time) & (time <= end_time)
    if not mask.any():
        return None

    t, i = time[mask], intensity[mask]

    # Use peak detection for better initial guess
    try:
        peaks, _ = find_peaks(i, height=i.max() * 0.1)
        if len(peaks) > 0:
            best_peak = max(peaks, key=lambda p: i.iloc[p])
            return t.iloc[best_peak], i.iloc[best_peak]
    except:
        pass

    # Fallback to maximum
    idx = np.argmax(i)
    return t.iloc[idx], i.iloc[idx]

def fit_multi_peak(time, intensity, peak_params, flow_rate):
    """Enhanced multi-peak fitting with better convergence"""
    x = time.values
    y = intensity.values

    # Find overall fitting range
    all_starts = [p['start_vol']/flow_rate for p in peak_params.values()]
    all_ends = [p['end_vol']/flow_rate for p in peak_params.values()]
    fit_start = min(all_starts)
    fit_end = max(all_ends)

    mask = (x >= fit_start) & (x <= fit_end)
    x_fit, y_fit = x[mask], y[mask]

    if len(x_fit) == 0:
        return None

    # Normalize y-values to improve numerical stability
    y_max = np.max(y_fit)
    y_normalized = y_fit / y_max

    # Prepare initial guesses and bounds with better estimates
    p0 = []
    lower_bounds = []
    upper_bounds = []
    fitted_params = []
    fixed_params = []

    for peak_num, params in peak_params.items():
        # Find peak in region
        peak_info = find_peak_in_region(time, intensity,
                                       params['start_vol'], params['end_vol'], flow_rate)
        if peak_info is None:
            return None

        b_guess, a_guess = peak_info
        a_guess_normalized = a_guess / y_max  # Normalize amplitude guess

        if params['width'] > 0:
            # Fixed width - only fit amplitude and position
            fixed_params.append(f"c{peak_num}={params['width']}")
        else:
            # Auto-fit width with better initial guess
            c_guess = (params['end_vol'] - params['start_vol']) / (8 * flow_rate)  # More conservative initial guess
            p0.append(c_guess)
            lower_bounds.append(c_guess * 0.1)  # Minimum width
            upper_bounds.append(c_guess * 20)   # Maximum width
            fitted_params.append(f"c{peak_num}")

        # Always fit amplitude and position
        p0.extend([a_guess_normalized, b_guess])
        lower_bounds.extend([0.001, params['start_vol']/flow_rate])  # Small minimum amplitude
        upper_bounds.extend([5.0, params['end_vol']/flow_rate])  # Reasonable upper bound for normalized amplitude
        fitted_params.extend([f"a{peak_num}", f"b{peak_num}"])

    # Create fitting function that handles fixed widths
    def fitting_function(x, *fit_params):
        full_params = []
        fit_idx = 0

        for peak_num, params in peak_params.items():
            if params['width'] > 0:
                # Fixed width
                a = fit_params[fit_idx]
                b = fit_params[fit_idx + 1]
                c = params['width'] / flow_rate  # Convert to time units
                fit_idx += 2
            else:
                # Fitted width
                c = fit_params[fit_idx]
                a = fit_params[fit_idx + 1]
                b = fit_params[fit_idx + 2]
                fit_idx += 3

            full_params.extend([a, b, c])

        return multi_gaussian(x, *full_params)

    # Try multiple optimization strategies with different settings
    strategies = [
        {'maxfev': 20000, 'method': 'lm'},
        {'maxfev': 30000, 'method': 'trf'},
        {'maxfev': 50000, 'method': 'dogbox'},
        {'maxfev': 100000, 'method': 'trf', 'ftol': 1e-12, 'xtol': 1e-12}
    ]

    popt = None
    last_error = None

    for i, strategy in enumerate(strategies):
        try:
            popt, pcov = curve_fit(
                fitting_function, x_fit, y_normalized,
                p0=p0,
                bounds=(lower_bounds, upper_bounds),
                **strategy
            )
            print(f"    Convergence achieved with strategy {i+1}")
            break  # Success!
        except Exception as e:
            last_error = e
            if i < len(strategies) - 1:
                print(f"    Strategy {i+1} failed, trying next...")
            continue

    if popt is None:
        print(f"    ⚠️  All optimization strategies failed. Last error: {last_error}")
        return None

    # Reconstruct full parameters and denormalize
    full_params = []
    fit_idx = 0

    for peak_num, params in peak_params.items():
        if params['width'] > 0:
            a = popt[fit_idx] * y_max  # Denormalize amplitude
            b = popt[fit_idx + 1]
            c = params['width'] / flow_rate
            fit_idx += 2
        else:
            c = popt[fit_idx]
            a = popt[fit_idx + 1] * y_max  # Denormalize amplitude
            b = popt[fit_idx + 2]
            fit_idx += 3

        full_params.extend([a, b, c])

    return full_params, x_fit, y_fit, fitted_params, fixed_params

def main():
    # Check if parameters are set
    if not peak_params:
        print("❌ Error: Please configure peak parameters first!")
        return

    # Load data
    fname = next(iter(uploaded.keys()))
    df = pd.read_csv(fname)
    time = df['time']

    # Create output directories
    os.makedirs('plots', exist_ok=True)
    results = []

    print(f"Processing {len(df.columns) - 1} series with {num_peaks} peaks...")
    print(f"Using enhanced fitting algorithm with multiple convergence strategies...")

    failed_series = []

    for col in df.columns.drop('time'):
        print(f"\n📊 Processing '{col}'...")
        intensity = df[col]

        # Fit peaks
        fit_result = fit_multi_peak(time, intensity, peak_params, flow_rate)

        if fit_result is None:
            print(f"    ❌ Skipped '{col}': fitting failed")
            failed_series.append(col)
            continue

        full_params, x_fit, y_fit, fitted_params, fixed_params = fit_result

        # Calculate areas and positions
        series_result = {'Series': col}

        for i in range(num_peaks):
            peak_num = i + 1
            a, b, c = full_params[3*i:3*i+3]

            area = a * c * np.sqrt(2 * np.pi)
            position = b * flow_rate

            series_result.update({
                f'Peak{peak_num}_Position_Vol': position,
                f'Peak{peak_num}_Area': area,
                f'Peak{peak_num}_Height': a,
                f'Peak{peak_num}_Width': c * flow_rate  # Convert back to volume units
            })

        # Calculate RMSE and R-squared
        y_pred = multi_gaussian(x_fit, *full_params)
        rmse = np.sqrt(np.mean((y_fit - y_pred)**2))
        ss_res = np.sum((y_fit - y_pred)**2)
        ss_tot = np.sum((y_fit - np.mean(y_fit))**2)
        r_squared = 1 - (ss_res / ss_tot) if ss_tot > 0 else 0

        series_result.update({
            'RMSE': rmse,
            'R_squared': r_squared
        })

        results.append(series_result)

        # Create enhanced plot
        plt.figure(figsize=(8, 5))
        vol = time * flow_rate

        # Plot original data
        plt.plot(vol, intensity, '-', label='Data', alpha=0.7)

        # Plot overall fit
        plt.plot(vol, multi_gaussian(time, *full_params),
                '--', linewidth=2, label='Total Fit', color='red')

        # Plot individual peaks
        colors = ['orange', 'green', 'blue', 'purple', 'brown']
        for i in range(num_peaks):
            a, b, c = full_params[3*i:3*i+3]
            plt.plot(vol, gaussian(time, a, b, c),
                    ':', color=colors[i % len(colors)], linewidth=2,
                    label=f'Peak {i+1} (Area: {a * c * np.sqrt(2 * np.pi):.2f})')

        plt.xlabel('Volume (mL)')
        plt.ylabel('Intensity')
        plt.title(f'Multi-Peak Deconvolution — {col}\nRMSE: {rmse:.4f}, R²: {r_squared:.4f}')
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig(f'plots/fit_{col}.png', dpi=150, bbox_inches='tight')
        plt.close()

        print(f"    ✅ Success! RMSE: {rmse:.4f}, R²: {r_squared:.4f}")

    # Save results
    if results:
        res_df = pd.DataFrame(results)
        res_df.to_csv('peak_areas.csv', index=False)
        print(f"📊 Results saved for {len(results)} series")

        if failed_series:
            print(f"⚠️  Failed series ({len(failed_series)}): {', '.join(failed_series)}")
    else:
        print("\n❌ No results to save")
        return

    # Create enhanced detailed report
    with open('analysis_report.txt', 'w') as f:
        f.write("=== Multi-Peak Chromatogram Analysis Report ===\n")
        f.write(f"Analysis Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write(f"Input file: {fname}\n")
        f.write(f"Flow rate: {flow_rate} mL/min\n")
        f.write(f"Number of peaks fitted: {num_peaks}\n")
        f.write(f"Total series processed: {len(df.columns) - 1}\n")
        f.write(f"Successful fits: {len(results)}\n")
        f.write(f"Failed fits: {len(failed_series)}\n\n")

        f.write("=== Peak Search Parameters ===\n")
        for peak_num, params in peak_params.items():
            f.write(f"Peak {peak_num}:\n")
            f.write(f"- Search volume range: {params['start_vol']:.3f} - {params['end_vol']:.3f} mL\n")
            f.write(f"- Width constraint: {'Fixed at ' + str(params['width']) if params['width'] > 0 else 'Auto-fitted'}\n")

        f.write("Software packages used:\n")
        f.write("- numpy (numerical operations)\n")
        f.write("- scipy.optimize.curve_fit (non-linear fitting)\n")
        f.write("- scipy.signal.find_peaks (peak detection)\n")
        f.write("- matplotlib.pyplot (plotting)\n")
        f.write("- pandas (data handling)\n")

        f.write("Mathematical model:\n")
        f.write("- Single Gaussian: G(x; a,b,c) = a * exp(-((x-b)²)/(2c²))\n")
        f.write("- Multi-peak model: f(x) = Σ G(x; a_i,b_i,c_i) for i=1 to {num_peaks}\n")
        f.write("- Peak area = a * c * √(2π)\n\n")

        if results:
            f.write("=== Fit Quality Summary ===\n")
            rmse_values = [r['RMSE'] for r in results]
            r2_values = [r['R_squared'] for r in results]
            f.write(f"Average RMSE: {np.mean(rmse_values):.4f}\n")
            f.write(f"RMSE range: {np.min(rmse_values):.4f} - {np.max(rmse_values):.4f}\n")
            f.write(f"Average R²: {np.mean(r2_values):.4f}\n")
            f.write(f"R² range: {np.min(r2_values):.4f} - {np.max(r2_values):.4f}\n\n")

            f.write("Individual series fit quality:\n")
            for result in results:
                f.write(f"  {result['Series']}: RMSE={result['RMSE']:.4f}, R²={result['R_squared']:.4f}\n")

        if failed_series:
            f.write("\n=== Failed Series ===\n")
            for series in failed_series:
                f.write(f"  {series}\n")

        f.write("\n=== End of Report ===\n")

    # Create downloadable zip file
    zip_name = f"peak_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"

    with zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) as zipf:
        # Add results and report
        zipf.write('peak_areas.csv')
        zipf.write('analysis_report.txt')

        # Add all plots
        for filename in os.listdir('plots'):
            if filename.endswith('.png'):
                zipf.write(os.path.join('plots', filename))

    print(f"📦 Downloading results package: {zip_name}")
    files.download(zip_name)

    print("🎉 Enhanced analysis complete!")
    print(f"Results include:")
    print(f"  - peak_areas.csv: Quantitative results with R² values")
    print(f"  - analysis_report.txt: Detailed analysis report")
    print(f"  - {len(results)} enhanced fitting plots")

    if failed_series:
        print(f"Troubleshooting tips for failed series:")
        print(f"  - Check if peak search ranges are appropriate")
        print(f"  - Consider using fixed widths for difficult peaks")
        print(f"  - Verify data quality and signal-to-noise ratio")

if __name__ == "__main__":
    main()