In [1]:
from src.StreamPort.core import Engine
from src.StreamPort.device.analyses import PressureCurvesAnalyses

#files -> routine_lc, error_lc_files -> error_lc
%store -r files error_lc_files

#eng = Engine(analyses = PressureCurvesAnalyses(files=error_lc_files))
eng = Engine(analyses = PressureCurvesAnalyses(files=files))

print("Number of analyses: ", len(eng.analyses.data))


Number of analyses:  374


In [2]:
pressure_vector = eng.analyses.data[150]['pressure_var']#[2:-2]# cropping the first and last element ###error_lc curve 184 -  vialEmpty
time = eng.analyses.data[150]['time_var']#[2:-2]
name = eng.analyses.data[150]['name']
sample = eng.analyses.data[150]['sample']
import numpy as np
import plotly.graph_objects as go
# Apply baseline correction. Comparing multiple algorithms to choose the best one. 
# Selected algorithm 3.SGF is implemented in the device/methods.py file.

In [3]:

"""
1. Simple Moving Average
- smoothed version of the original signal, with each point replaced by the average of itself and its <window_size - 1> nearest neighbors
- this smoothed vector is then subtracted from the original pressure vector to obtain the baseline corrected vector while retaining noise
"""
window_size = 10
smoothed_vector = np.convolve(pressure_vector, np.ones(window_size) / window_size, mode='same')
# baseline correction by subtracting the smoothed vector from the original pressure vector
baseline_corrected_vector = pressure_vector - smoothed_vector
fig = go.Figure()
fig.add_trace(go.Scatter(x=time, y=pressure_vector, mode='lines', name='Raw Curve'))
fig.add_trace(go.Scatter(x=time, y=smoothed_vector, mode='lines', name='Smoothed Curve'))
fig.add_trace(go.Scatter(x=time, y=baseline_corrected_vector, mode='lines', name='Baseline Corrected Vector'))
fig.update_layout(title='Baseline Correction - Simple Moving Average', xaxis_title='Time', yaxis_title='Pressure')
fig.show()
fig.write_image("dev/figures/fig_baseline_correction_simple_moving_average.png", width=1100, height= 350, scale = 3)

In [4]:

"""
2. Polynomial Least Squares Fitting
- fit a polynomial of degree <degree> to the original signal, to simulate a blank chromatogram and subtract it from the original signal 
"""
coefficients = np.polyfit(time, pressure_vector, 8)  # degree 3 polynomial
smoothed_vector = np.polyval(coefficients, time)
# baseline correction by subtracting the smoothed vector from the original pressure vector
baseline_corrected_vector = pressure_vector - smoothed_vector
fig = go.Figure()
fig.add_trace(go.Scatter(x=time, y=pressure_vector, mode='lines', name='Raw Curve'))
fig.add_trace(go.Scatter(x=time, y=smoothed_vector, mode='lines', name='Smoothed Curve'))
fig.add_trace(go.Scatter(x=time, y=baseline_corrected_vector, mode='lines', name='Baseline Corrected Vector'))
fig.update_layout(title='Baseline Correction - Polynomial Least Squares Fitting', xaxis_title='Time', yaxis_title='Pressure')
fig.show()
fig.write_image("dev/figures/fig_baseline_correction_polynomial_least_squares_fitting.png", width=1100, height= 350, scale = 3)


In [5]:
"""
3. Savitzky-Golay Filter
- applies a polynomial smoothing filter to the data, which is particularly effective for preserving features of the data while reducing noise
- uses a sliding window to fit a polynomial to the data points within the window, and then replaces the central point with the value of the polynomial at that point
"""
from scipy.signal import savgol_filter
window_size = 13  # Must be odd
poly_order = 2  # Polynomial order
smoothed_vector = savgol_filter(pressure_vector, window_size, poly_order)
baseline_corrected_vector = pressure_vector - smoothed_vector
fig = go.Figure()
fig.add_trace(go.Scatter(x=time, y=pressure_vector, mode='lines', name='Raw Curve'))
fig.add_trace(go.Scatter(x=time, y=smoothed_vector, mode='lines', name='Smoothed Curve'))
fig.add_trace(go.Scatter(x=time, y=baseline_corrected_vector, mode='lines', name='Baseline Corrected Vector'))
fig.update_layout(title='Baseline Correction - Savitzky-Golay Filter', xaxis_title='Time', yaxis_title='Pressure')
fig.show()
fig.write_image("dev/figures/fig_baseline_correction_savitzky_golay_filter.png", width=1100, height= 350, scale = 3)


In [6]:
bins = 7
num_bins = np.array_split(baseline_corrected_vector, bins)
bin_sizes = [len(i) for i in num_bins]
edges = []
temp = 0
for i, j in enumerate(bin_sizes):
    first_edge = (i*j) if temp == 0 else temp 
    last_edge =  (i*j) + j -1 if temp == 0 else temp + j - 1
    temp = last_edge + 1
    edges.append([first_edge, last_edge])
print('Size of baseline corrected vector: ', len(baseline_corrected_vector))
print('Bin sizes: ', bin_sizes)
print('Bin edges: ', edges)
print('Timestamps: ', [[str(time[i[0]].round(3)), str(time[i[1]].round(3))]for i in edges])
fig = go.Figure()
feats = [f"amplitude_range_{time[i[0]].round(3)}_{time[i[1]].round(3)}" for i in edges]
y = [np.max(baseline_corrected_vector[i[0]:i[1]]) - np.min(baseline_corrected_vector[i[0]:i[1]]) for i in edges]
for i in range(len(edges)):
    fig.add_trace(go.Scatter(x=feats, y=y, mode='lines'))
fig.update_layout(title=f'Binned Amplitudes of Baseline Corrected Curves {name} - {sample}', xaxis_title='Features', yaxis_title='Values', showlegend=False)
fig.show()            

Size of baseline corrected vector:  799
Bin sizes:  [115, 114, 114, 114, 114, 114, 114]
Bin edges:  [[0, 114], [115, 228], [229, 342], [343, 456], [457, 570], [571, 684], [685, 798]]
Timestamps:  [['0.0', '0.57'], ['0.575', '1.14'], ['1.145', '1.71'], ['1.715', '2.28'], ['2.285', '2.85'], ['2.855', '3.42'], ['3.425', '3.99']]


In [7]:

"""
4. Asymmetric Least Squares Smoothing
- estimate a baseline in data by minimizing the sum of squared differences between the data and a smooth curve
- does this by applying different penalties to deviations above and below the curve.
- This asymmetry allows the smoother to better fit the baseline while accommodating peaks or other features in the data
"""
from scipy import sparse
from scipy.sparse.linalg import spsolve
smoothness = 1e4 
asymmetry = 0.1
n_iterations=10
L = len(pressure_vector)
D = sparse.csc_matrix(np.diff(np.eye(L), 2))
w = np.ones(L)
for i in range(n_iterations):
    W = sparse.spdiags(w, 0, L, L)
    Z = W + smoothness * D.dot(D.transpose())
    baseline = spsolve(Z, w*pressure_vector)
    w = asymmetry * (pressure_vector > baseline) + (1-asymmetry) * (pressure_vector < baseline)

baseline_corrected_vector = pressure_vector - baseline
fig = go.Figure()
fig.add_trace(go.Scatter(x=time, y=pressure_vector, mode='lines', name='Raw Curve'))
fig.add_trace(go.Scatter(x=time, y=baseline, mode='lines', name='Smoothed Curve'))
fig.add_trace(go.Scatter(x=time, y=baseline_corrected_vector, mode='lines', name='Baseline Corrected Vector'))
fig.update_layout(title='Baseline Correction - Asymmetric Least Squares Smoothing', xaxis_title='Time', yaxis_title='Pressure')
fig.show()
fig.write_image("dev/figures/fig_baseline_correction_AsLS_smoothing.png", width=1100, height= 350, scale = 3)


In [8]:
"""
5. airPLS (adaptive iteratively reweighted Penalized Least Squares)
- fit a polynomial of degree <degree> to the original signal, to simulate a blank chromatogram and subtract it from the original signal 
"""
def logistic(x, m, s):
    return 1.0 / (1.0 + np.exp(-(x - m) / s))

smoothness = 1e5
n_iterations = 50
L = len(pressure_vector)
D = sparse.csc_matrix(np.diff(np.eye(L), 2))
w = np.ones(L)
for i in range(n_iterations):
    W = sparse.spdiags(w, 0, L, L)
    Z = W + smoothness * D.dot(D.transpose())
    smoothed_vector = spsolve(Z, w * pressure_vector)
    d = pressure_vector - smoothed_vector
    d_neg = d[d < 0]
    abs_d_neg = np.abs(d_neg).sum()
    abs_y = np.abs(pressure_vector).sum()
    if abs_d_neg < 0.001 * abs_y:
        break
    if len(d_neg) == 0:
        m_d, s_d = 0, 1
    else:
        m_d = np.mean(d_neg)
        s_d = np.std(d_neg) if np.std(d_neg) > 0 else 1
    w = np.ones(L)
    mask = pressure_vector > smoothed_vector
    w[mask] = logistic(pressure_vector[mask] - smoothed_vector[mask], m_d, s_d)
# baseline correction by subtracting the smoothed vector from the original pressure vector
baseline_corrected_vector = pressure_vector - smoothed_vector
fig = go.Figure()
fig.add_trace(go.Scatter(x=time, y=pressure_vector, mode='lines', name='Raw Curve'))
fig.add_trace(go.Scatter(x=time, y=smoothed_vector, mode='lines', name='Smoothed Curve'))
fig.add_trace(go.Scatter(x=time, y=baseline_corrected_vector, mode='lines', name='Baseline Corrected Vector'))
fig.update_layout(title='Baseline Correction - adaptive iteratively reweighted Penalized Least Squares', xaxis_title='Time', yaxis_title='Pressure')
fig.show()
fig.write_image("dev/figures/fig_baseline_correction_airPLS.png", width=1100, height= 350, scale = 3)


In [9]:
"""
6. arPLS (asymmetrically reweighted Penalized Least Squares)
- fit a polynomial of degree <degree> to the original signal, to simulate a blank chromatogram and subtract it from the original signal 
"""
import numpy as np
from scipy.sparse import diags, identity, csc_matrix
from scipy.sparse.linalg import splu

def arPLS(y, smoothness=1e6, ratio=1e-5, max_iter=30):
    N = len(y)
    e = np.ones(N)

    # Sparse second derivative matrix D
    D = diags([e, -2*e, e], [0, 1, 2], shape=(N-2, N))
    H = smoothness * (D.T @ D)

    w = np.ones(N)
    for _ in range(max_iter):
        W = diags(w, 0)
        A = W + H
        lu = splu(A.tocsc())
        z = lu.solve(w * y)

        d = y - z
        dn = d[d < 0]
        m = dn.mean() if len(dn) > 0 else 0
        s = dn.std() if len(dn) > 0 else 1
        s = max(s, 1e-6)

        x = 2 * (d - (2 * s - m)) / s
        x = np.clip(x, -700, 700)
        wt = 1.0 / (1.0 + np.exp(x))

        if np.linalg.norm(w - wt) / (np.linalg.norm(w) + 1e-8) < ratio:
            break
        w = wt

    return z  # the estimated baseline


baseline = arPLS(pressure_vector)
# baseline correction by subtracting the smoothed vector from the original pressure vector
baseline_corrected_vector = pressure_vector - baseline
fig = go.Figure()
fig.add_trace(go.Scatter(x=time, y=pressure_vector, mode='lines', name='Raw Curve'))
fig.add_trace(go.Scatter(x=time, y=baseline, mode='lines', name='Smoothed Curve'))
fig.add_trace(go.Scatter(x=time, y=baseline_corrected_vector, mode='lines', name='Baseline Corrected Vector'))
fig.update_layout(title='Baseline Correction - asymmetrically reweighted Penalized Least Squares', xaxis_title='Time', yaxis_title='Pressure')
fig.show()
fig.write_image("dev/figures/fig_baseline_correction_arPLS.png", width=1100, height= 350, scale = 3)

In [10]:
"""
Savitzky golay after arPLS. arPLS to remove baseline and savgol to smooth the corrected signal without peak distortion
"""
window_size = 7  # Must be odd
poly_order = 5  # Polynomial order
savgol_smoothed_vector = savgol_filter(baseline_corrected_vector, window_size, poly_order)
fig = go.Figure()
fig.add_trace(go.Scatter(x=time, y=pressure_vector, mode='lines', name='Raw Curve'))
fig.add_trace(go.Scatter(x=time, y=baseline_corrected_vector, mode='lines', name='arPLS Baseline Corrected Curve'))
fig.add_trace(go.Scatter(x=time, y=savgol_smoothed_vector, mode='lines', name='Savgol Smoothed Curve'))
fig.update_layout(title='Baseline Correction - asymmetrically reweighted Penalized Least Squares', xaxis_title='Time', yaxis_title='Pressure')
fig.show()
fig.write_image("dev/figures/fig_baseline_correction_arPLS.png", width=1100, height= 350, scale = 3)


In [11]:
"""
7. SNIP(Statistical Non-linear Iterative Peak)
-  
"""
# apply a double logarithm transformation to the pressure vector
lls_vector = np.log(np.log(np.sqrt(pressure_vector + 1) + 1) + 1)
# Define a function to compute the minimum filter
def min_filter(lls_vector, m):
    """Applies the SNIP minimum filter"""
    lls_filtered = np.copy(lls_vector)
    for i in range(m, len(lls_vector) - m):
        lls_filtered[i] = min(lls_vector[i], (lls_vector[i-m] + lls_vector[i + m])/2)
    return lls_filtered

# Apply the filter for the first 100 iterations
lls_filtered = np.copy(lls_vector)
for m in range(5):
    lls_filtered = min_filter(lls_vector, m)

smoothed_vector = (np.exp(np.exp(lls_filtered) - 1) - 1) ** 2 - 1
# baseline correction by subtracting the smoothed vector from the original pressure vector
baseline_corrected_vector = pressure_vector - smoothed_vector
fig = go.Figure()
fig.add_trace(go.Scatter(x=time, y=pressure_vector, mode='lines', name='Raw Curve'))
fig.add_trace(go.Scatter(x=time, y=smoothed_vector, mode='lines', name='Smoothed Curve'))
fig.add_trace(go.Scatter(x=time, y=baseline_corrected_vector, mode='lines', name='Baseline Corrected Vector'))
fig.update_layout(title='Baseline Correction - Statistical Non-linear Iterative Peak', xaxis_title='Time', yaxis_title='Pressure')
fig.show()
fig.write_image("dev/figures/fig_baseline_correction_SNIP.png", width=1100, height= 350, scale = 3)

In [12]:
"""
8. Smoothing Spline Baseline Correction
- fit a smoothing spline to the original signal, allowing some deviation from the data to obtain a smooth baseline
"""
from scipy.interpolate import UnivariateSpline

# Choose a smoothing factor s (higher s = smoother baseline)
smoothing_factor = 1e4  # may need to tune this value
spline = UnivariateSpline(time, pressure_vector, s=smoothing_factor)
baseline = spline(time)

baseline_corrected_vector = pressure_vector - baseline

fig = go.Figure()
fig.add_trace(go.Scatter(x=time, y=pressure_vector, mode='lines', name='Raw Curve'))
fig.add_trace(go.Scatter(x=time, y=baseline, mode='lines', name='Smoothed Curve'))
fig.add_trace(go.Scatter(x=time, y=baseline_corrected_vector, mode='lines', name='Baseline Corrected Vector'))
fig.update_layout(title='Baseline Correction - Smoothing Spline', xaxis_title='Time', yaxis_title='Pressure')
fig.show()
fig.write_image("dev/figures/fig_baseline_correction_smoothing_spline.png", width=1100, height=350, scale=3)