#### This code is intended for utilization by OP-airPLS in the context of executing batch processing operations with actual datasets.

It is permissible to manually adjust several parameters, contingent upon the user's familiarity with the optimized parameters. These adjustments include lambda, tau, p-order (set to 2 by default), max iteration (default: 1000), and an option to determine whether airPLS operates on a normalized spectrum (in which case it is subsequently unnormalized) or not.

Inquiries regarding the utilization of this code are welcomed. Please contact: jiaheng.cui@uga.edu.

In [1]:
import numpy as np
from tqdm import trange
import os
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import savgol_filter
from scipy.stats import norm
from scipy.sparse import csc_matrix, eye, diags
from scipy.sparse.linalg import spsolve
from scipy.optimize import minimize
import warnings
from scipy import integrate

This code is modified from Yizeng Liang and Zhang Zhimin (https://github.com/zmzhang/airPLS). Their liscence is preserved according to their requirement.

In [2]:
'''
airPLS.py Copyright 2014 Renato Lombardo - renato.lombardo@unipa.it
Baseline correction using adaptive iteratively reweighted penalized least squares

This program is a translation in python of the R source code of airPLS version 2.0
by Yizeng Liang and Zhang Zhimin - https://code.google.com/p/airpls
Reference:
Z.-M. Zhang, S. Chen, and Y.-Z. Liang, Baseline correction using adaptive iteratively reweighted penalized least squares. Analyst 135 (5), 1138-1146 (2010).

Description from the original documentation:

Baseline drift always blurs or even swamps signals and deteriorates analytical results, particularly in multivariate analysis.  It is necessary to correct baseline drift to perform further data analysis. Simple or modified polynomial fitting has been found to be effective in some extent. However, this method requires user intervention and prone to variability especially in low signal-to-noise ratio environments. The proposed adaptive iteratively reweighted Penalized Least Squares (airPLS) algorithm doesn't require any user intervention and prior information, such as detected peaks. It iteratively changes weights of sum squares errors (SSE) between the fitted baseline and original signals, and the weights of SSE are obtained adaptively using between previously fitted baseline and original signals. This baseline estimator is general, fast and flexible in fitting baseline.


LICENCE
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>
'''

def WhittakerSmooth(x, w, lambda_, differences=1):
    # print("\n--- Starting WhittakerSmooth ---")
    X = np.matrix(x)
    m = X.size
    E = eye(m, format='csc')
    for i in range(differences):
        E = E[1:] - E[:-1]
    W = diags(w, 0, shape=(m,m))
    
    A = csc_matrix(W + (lambda_ * E.T * E))
    B = csc_matrix(W * X.T)
    
    try:
        background = spsolve(A, B)
    except Exception as e:
        print(f"Error in spsolve: {str(e)}")
        return None
        
    # print("--- Finished WhittakerSmooth ---")
    return np.array(background), W, A, B

def airPLS(x, lambda_=100, tau=0.001, porder=2, itermax=1000, normalization=False):
    # print(f"\n--- Starting airPLS with lambda={lambda_}, tau={tau}, itermax={itermax} ---")
    try:
        m = x.shape[0]
        if normalization:
            # print("Using normalization")
            original_spectrum = x.copy()
            # Try auto area normalization
            wavenumber = np.arange(m) # Fake wavenumber since we don't know the true wavenumber, but we just want a normalizing constant
            # Check for NaN or inf values
            if np.any(np.isnan(x)) or np.any(np.isnan(wavenumber)) or np.any(np.isinf(x)) or np.any(np.isinf(wavenumber)):
                print("Warning: NaN or inf values detected in the data")
            else:
                # Calculate the area using trapezoidal rule
                scaling_factor = integrate.trapz(x, wavenumber)
                if scaling_factor == 0:
                    scaling_factor == 1
                    print("Warning: Area is zero, using original spectrum")
                # print(scaling_factor)
                
                # Normalize the spectrum
                normalized_x = x / scaling_factor
        
        weights = np.ones(m)
        # Store iteration data
        z_iterations = []
        d_iterations = []
        dssn_iterations = []
        weights_iterations = []

        for i in range(1, itermax+1):
            # print(f"Working on iteration={i}")
            if normalization:
                z, W, A, B = WhittakerSmooth(normalized_x, weights, lambda_, porder)
            else:
                z, W, A, B = WhittakerSmooth(x, weights, lambda_, porder)
            if z is None:
                print(f"WhittakerSmooth failed at iteration {i}")
                return None
            
            if normalization:
                d = normalized_x - z
            else:
                d = x - z
            dssn = np.abs(d[d<0].sum())

            # Store iteration data
            z_iterations.append(z)
            d_iterations.append(d)
            dssn_iterations.append(dssn)
            weights_iterations.append(weights.copy())
            if normalization:
                if(dssn < tau * (abs(normalized_x)).sum() or i == itermax):
                    # print(f"Stopping criterion met at iteration {i}, dssn={dssn}, tau * (abs(normalized_x)).sum()={tau * (abs(normalized_x)).sum()}")
                    break
            else:
                if(dssn < tau * (abs(x)).sum() or i == itermax):
                    # print(f"Stopping criterion met at iteration {i}, dssn={dssn}, tau * (abs(x)).sum()={tau * (abs(x)).sum()}")
                    break
            weights[d>=0] = 0

            try:
                with warnings.catch_warnings(record=True) as caught_warnings:
                    warnings.simplefilter("always")
                    if np.isinf(dssn):
                        dssn = np.finfo(dssn.dtype).max
                    exp_term = i * np.abs(d[d<0]) / dssn
                    max_exp = 709  # round(np.log(np.finfo(float).max))
                    exp_term = np.minimum(exp_term, max_exp)
                    weights[d<0] = np.exp(exp_term)
                    if caught_warnings:
                        warning = caught_warnings[0]
                        print(f"Warning at iteration {i} in exp calculation: {warning.message}")

            except Exception as e:
                print(f"Exception at iteration {i} in exp calculation: {e}")
                if normalization:
                    normalized_original_spectrum = original_spectrum / scaling_factor
                    normalized_predicted_x = normalized_original_spectrum - z
                    unnormalized_predicted_x = normalized_predicted_x * scaling_factor
                    z = original_spectrum - unnormalized_predicted_x
                return z

            try:
                if np.isinf(dssn):
                    dssn = np.finfo(dssn.dtype).max
                weights[0] = np.exp(i * (d[d<0]).max() / dssn)
            except Warning as warn:
                print(f"Warning at iteration {i} in w[0] calculation: {warn}")
                if normalization:
                    normalized_original_spectrum = original_spectrum / scaling_factor
                    normalized_predicted_x = normalized_original_spectrum - z
                    unnormalized_predicted_x = normalized_predicted_x * scaling_factor
                    z = original_spectrum - unnormalized_predicted_x
                return z

            weights[-1] = weights[0]

        if normalization:
            normalized_original_spectrum = original_spectrum / scaling_factor
            normalized_predicted_x = normalized_original_spectrum - z
            unnormalized_predicted_x = normalized_predicted_x * scaling_factor
            z = original_spectrum - unnormalized_predicted_x
        return z
    except Exception as e:
        print(f"Error in airPLS: {str(e)}")
        print(f"Traceback: {traceback.format_exc()}")
        return None

#### Example usage: Process all csvs under the root folder.

In [None]:
# Path to the folder containing the CSV files (replace with your actual path)
folder_path = r'E:\Jiaheng Cui\Baseline removal\LPS SERS spectra\Interpolated CSV-2'

# Process each CSV file in the folder
for file_name in os.listdir(folder_path):
    if file_name.endswith('.csv'):  # Check if the file is a CSV
        print("Processing " + file_name)
        file_path = os.path.join(folder_path, file_name)
        df = pd.read_csv(file_path, sep='\t')
        
        # Apply airPLS to every column except the first one
        for col in df.columns[1:]:
            baseline = airPLS(df[col].values, lambda_=1e4, tau=1e-5, porder=2, itermax=1000, normalization=True)
            df[col] = df[col].values - baseline
        
        # Define the new file name
        new_file_name = file_name.replace('.csv', '_airPLS_corrected.csv')
        new_file_path = os.path.join(folder_path, new_file_name)
        
        # Save the processed dataframe to a new CSV file
        df.to_csv(new_file_path, sep='\t', index=False)