线性模型

In [1]:
import hashlib
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from lmfit import Minimizer, Parameters
from sklearn.linear_model import LinearRegression

# Load data
data_path = "../data/LogisticGrowthData_Cleaned.csv"
results_path = "../results"
plots_path = os.path.join(results_path, "quadratic_plots")

# Ensure result storage directories exist
os.makedirs(results_path, exist_ok=True)
os.makedirs(plots_path, exist_ok=True)

# Read data
data_filtered = pd.read_csv(data_path)

# Get all subset IDs
subset_list = data_filtered["ID"].unique()
fit_results = []  # Store fit parameters and R²
successful_fits = []

# Process ID to prevent excessively long filenames
def sanitize_filename(text, max_length=50):
    """Clean filename by removing special characters and limiting length."""
    text = "".join(c if c.isalnum() or c in "_-" else "_" for c in text)
    if len(text) > max_length:
        text = text[:max_length]
        hash_suffix = hashlib.md5(text.encode()).hexdigest()[:8]
        text += f"_{hash_suffix}"
    return text

# Define NLLS objective function
def residuals_quadratic(params, t, data):
    """Compute residuals for quadratic model."""
    v = params.valuesdict()
    model = v['a'] * t**2 + v['b'] * t + v['c']
    return model - data

# Iterate over all subsets for NLLS fitting
for subset_id in subset_list:
    subset_data = data_filtered[data_filtered["ID"] == subset_id]

    # Skip if too few data points
    if len(subset_data) < 5:
        continue

    t = subset_data["Time"].values
    N_obs = subset_data["PopBio"].values

    # Compute OLS as initial values
    X_ols = np.vstack([t**2, t, np.ones_like(t)]).T
    ols_model = LinearRegression(fit_intercept=False)
    ols_model.fit(X_ols, N_obs)
    a_start, b_start, c_start = ols_model.coef_

    # Create parameter object
    params_quadratic = Parameters()
    params_quadratic.add('a', value=a_start)
    params_quadratic.add('b', value=b_start)
    params_quadratic.add('c', value=c_start)

    # Perform NLLS fitting
    minner = Minimizer(residuals_quadratic, params_quadratic, fcn_args=(t, N_obs))
    fit_quadratic_NLLS = minner.minimize()

    # Compute R²
    v_fit = fit_quadratic_NLLS.params.valuesdict()
    N_fit = v_fit['a'] * t**2 + v_fit['b'] * t + v_fit['c']

    ss_res = np.sum((N_obs - N_fit) ** 2)
    ss_tot = np.sum((N_obs - np.mean(N_obs)) ** 2)
    r2 = 1 - (ss_res / ss_tot)

    # Store fit parameters and R²
    fit_results.append({
        "ID": subset_id,
        "a": v_fit['a'],
        "b": v_fit['b'],
        "c": v_fit['c'],
        "R²": r2
    })

    if r2 > 0.6:
        successful_fits.append(subset_id)

    # Generate and save fit plots (only for the first 5 successful fits)
    if len(successful_fits) <= 5:
        plt.figure(figsize=(8, 6))
        plt.scatter(t, N_obs, label="Observed Data", color='blue', alpha=0.6)

        t_fine = np.linspace(min(t), max(t), 100)
        N_fine = v_fit['a'] * t_fine**2 + v_fit['b'] * t_fine + v_fit['c']

        plt.plot(t_fine, N_fine, label=f"NLLS Fit (R²={r2:.3f})", color='red', linewidth=2)
        plt.xlabel("Time")
        plt.ylabel("PopBio")
        plt.title(f"NLLS Quadratic Fit for ID: {subset_id[:30]}...")
        plt.legend()

        # Modify filename to prevent excessive length
        safe_id = sanitize_filename(subset_id)
        plot_filename = os.path.join(plots_path, f"fit_ID_{safe_id}.png")
        plt.savefig(plot_filename, dpi=300)
        plt.close()

# Save fit parameters to CSV
fit_results_df = pd.DataFrame(fit_results)
fit_results_df.to_csv(os.path.join(results_path, "nlls_quadratic_fits.csv"), index=False)

# Output successful fit count
print(f"\n Number of successful NLLS quadratic fits: {len(successful_fits)} / {len(subset_list)}")
print(f" Fit parameters saved to: {results_path}/nlls_quadratic_fits.csv")
print(f" Fit plots saved to: {plots_path} (only for the first 5 successful fits)")



 Number of successful NLLS quadratic fits: 247 / 285
 Fit parameters saved to: ../results/nlls_quadratic_fits.csv
 Fit plots saved to: ../results/quadratic_plots (only for the first 5 successful fits)


In [2]:
import os
import hashlib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from lmfit import Minimizer, Parameters
from sklearn.linear_model import LinearRegression

# Load data
data_path = "../data/LogisticGrowthData_Cleaned.csv"
results_path = "../results"
plots_path = os.path.join(results_path, "two_stage_quadratic_plots")

# Ensure storage directories exist
os.makedirs(results_path, exist_ok=True)
os.makedirs(plots_path, exist_ok=True)

# Read data
data_filtered = pd.read_csv(data_path)

# Get all subset IDs
subset_list = data_filtered["ID"].unique()
successful_fits = []
fit_results = []
max_plots = 5  # Limit number of plots
plot_count = 0  

# Process filename
def sanitize_filename(text, max_length=50):
    """Clean filename by removing special characters and limiting length."""
    text = "".join(c if c.isalnum() or c in "_-" else "_" for c in text)
    if len(text) > max_length:
        text = text[:max_length]
        hash_suffix = hashlib.md5(text.encode()).hexdigest()[:8]
        text += f"_{hash_suffix}"
    return text

# Define NLLS objective function
def residuals_quadratic(params, t, data):
    """Compute residuals for quadratic model."""
    v = params.valuesdict()
    model = v['a'] * t**2 + v['b'] * t + v['c']
    return model - data

# Split data into two phases
def split_into_two_phases(subset_data):
    """Identify the point of highest rate change and split into two phases."""
    if len(subset_data) < 5:
        return [subset_data]
    
    t = subset_data["Time"].values
    y = subset_data["PopBio"].values
    dydt = np.gradient(y, t)
    
    split_index = np.argmax(np.abs(np.gradient(dydt)))
    
    if split_index < 2 or split_index > len(t) - 3:
        return [subset_data]
    
    phase_1 = subset_data.iloc[:split_index+1]
    phase_2 = subset_data.iloc[split_index:]
    
    return [phase_1, phase_2]

# Iterate over all subsets for NLLS fitting
for subset_id in subset_list:
    subset_data = data_filtered[data_filtered["ID"] == subset_id]
    if len(subset_data) < 5:
        continue

    phases = split_into_two_phases(subset_data)
    
    overall_r2 = 0
    phase_models = []

    for i, phase in enumerate(phases):
        if len(phase) < 4:
            continue

        t = phase["Time"].values
        N_obs = phase["PopBio"].values

        # Compute OLS initial values
        X_ols = np.vstack([t**2, t, np.ones_like(t)]).T
        ols_model = LinearRegression(fit_intercept=False)
        ols_model.fit(X_ols, N_obs)
        a_start, b_start, c_start = ols_model.coef_

        # Create parameter object
        params_quadratic = Parameters()
        params_quadratic.add('a', value=a_start)
        params_quadratic.add('b', value=b_start)
        params_quadratic.add('c', value=c_start)

        # Perform NLLS fitting
        minner = Minimizer(residuals_quadratic, params_quadratic, fcn_args=(t, N_obs))
        fit_quadratic_NLLS = minner.minimize()

        # Compute R²
        v_fit = fit_quadratic_NLLS.params.valuesdict()
        N_fit = v_fit['a'] * t**2 + v_fit['b'] * t + v_fit['c']
        
        ss_res = np.sum((N_obs - N_fit) ** 2)
        ss_tot = np.sum((N_obs - np.mean(N_obs)) ** 2)
        r2 = 1 - (ss_res / ss_tot)

        if r2 > 0.6:
            overall_r2 += r2
            phase_models.append((t, N_obs, v_fit, r2))
            
            fit_results.append({
                "ID": subset_id,
                "Phase": i+1,
                "a": v_fit['a'],
                "b": v_fit['b'],
                "c": v_fit['c'],
                "R²": r2
            })

    # Evaluate overall fit result
    if len(phase_models) > 0:
        avg_r2 = overall_r2 / len(phase_models)
        if avg_r2 > 0.6:
            successful_fits.append(subset_id)

            # Generate plots for first 5 successful subsets
            if plot_count < max_plots:
                plt.figure(figsize=(8, 6))
                colors = ['blue', 'green']
                for j, (t, N_obs, v_fit, r2) in enumerate(phase_models):
                    plt.scatter(t, N_obs, label=f"Phase {j+1} Data", color=colors[j % len(colors)], alpha=0.6)
                    t_fine = np.linspace(min(t), max(t), 100)
                    N_fine = v_fit['a'] * t_fine**2 + v_fit['b'] * t_fine + v_fit['c']
                    plt.plot(t_fine, N_fine, label=f"Phase {j+1} (R²={r2:.3f})", color=colors[j % len(colors)], linewidth=2)
                    
                plt.xlabel("Time")
                plt.ylabel("PopBio")
                plt.title(f"NLLS Quadratic Fit for ID: {subset_id[:30]}... (Avg R²={avg_r2:.3f})")
                plt.legend()

                safe_id = sanitize_filename(subset_id)
                plot_filename = os.path.join(plots_path, f"fit_ID_{safe_id}.png")
                plt.savefig(plot_filename, dpi=300)
                plt.close()

                plot_count += 1

# Save fit parameters to CSV
fit_results_df = pd.DataFrame(fit_results)
fit_results_df.to_csv(os.path.join(results_path, "two_stage_quadratic_fits.csv"), index=False)

# Output successful fit count
print(f"\n Number of successful two-stage NLLS quadratic fits: {len(successful_fits)} / {len(subset_list)}")
print(f" Fit parameters saved to: {results_path}/two_stage_quadratic_fits.csv")
print(f" Fit plots saved to: {plots_path} (only for the first 5 successful fits)")

  a = -(dx2)/(dx1 * (dx1 + dx2))
  b = (dx2 - dx1) / (dx1 * dx2)
  c = dx1 / (dx2 * (dx1 + dx2))
  out[tuple(slice1)] = a * f[tuple(slice2)] + b * f[tuple(slice3)] + c * f[tuple(slice4)]
  a = -(dx2)/(dx1 * (dx1 + dx2))
  b = (dx2 - dx1) / (dx1 * dx2)
  c = dx1 / (dx2 * (dx1 + dx2))
  out[tuple(slice1)] = a * f[tuple(slice2)] + b * f[tuple(slice3)] + c * f[tuple(slice4)]
  a = -(dx2)/(dx1 * (dx1 + dx2))
  b = (dx2 - dx1) / (dx1 * dx2)
  b = (dx2 - dx1) / (dx1 * dx2)
  c = dx1 / (dx2 * (dx1 + dx2))
  c = dx1 / (dx2 * (dx1 + dx2))
  out[tuple(slice1)] = a * f[tuple(slice2)] + b * f[tuple(slice3)] + c * f[tuple(slice4)]
  out[tuple(slice1)] = (f[tuple(slice2)] - f[tuple(slice3)]) / dx_n
  a = -(dx2)/(dx1 * (dx1 + dx2))
  b = (dx2 - dx1) / (dx1 * dx2)
  c = dx1 / (dx2 * (dx1 + dx2))
  out[tuple(slice1)] = a * f[tuple(slice2)] + b * f[tuple(slice3)] + c * f[tuple(slice4)]



 Number of successful two-stage NLLS quadratic fits: 261 / 285
 Fit parameters saved to: ../results/two_stage_quadratic_fits.csv
 Fit plots saved to: ../results/two_stage_quadratic_plots (only for the first 5 successful fits)


In [3]:
import os
import hashlib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from lmfit import Minimizer, Parameters
from sklearn.linear_model import LinearRegression

# Load data
data_path = "../data/LogisticGrowthData_Cleaned.csv"
results_path = "../results"
plots_path = os.path.join(results_path, "cubic_plots")

# Ensure storage directories exist
os.makedirs(results_path, exist_ok=True)
os.makedirs(plots_path, exist_ok=True)

# Read data
data_filtered = pd.read_csv(data_path)

# Get all subset IDs
subset_list = data_filtered["ID"].unique()
successful_fits = []
fit_results = []
max_plots = 5  # Limit number of plots
plot_count = 0  

# Process filename
def sanitize_filename(text, max_length=50):
    """Clean filename by removing special characters and limiting length."""
    text = "".join(c if c.isalnum() or c in "_-" else "_" for c in text)
    if len(text) > max_length:
        text = text[:max_length]
        hash_suffix = hashlib.md5(text.encode()).hexdigest()[:8]
        text += f"_{hash_suffix}"
    return text

# Define NLLS objective function (Cubic)
def residuals_cubic(params, t, data):
    """Compute residuals for cubic model."""
    v = params.valuesdict()
    model = v['a'] * t**3 + v['b'] * t**2 + v['c'] * t + v['d']
    return model - data

# Split data into two phases
def split_into_two_phases(subset_data):
    """Identify the point of highest rate change and split into two phases."""
    if len(subset_data) < 5:
        return [subset_data]
    
    t = subset_data["Time"].values
    y = subset_data["PopBio"].values
    dydt = np.gradient(y, t)
    
    split_index = np.argmax(np.abs(np.gradient(dydt)))
    
    if split_index < 2 or split_index > len(t) - 3:
        return [subset_data]
    
    phase_1 = subset_data.iloc[:split_index+1]
    phase_2 = subset_data.iloc[split_index:]
    
    return [phase_1, phase_2]

# Iterate over all subsets for NLLS fitting
for subset_id in subset_list:
    subset_data = data_filtered[data_filtered["ID"] == subset_id]
    if len(subset_data) < 5:
        continue

    phases = split_into_two_phases(subset_data)
    
    overall_r2 = 0
    phase_models = []

    for i, phase in enumerate(phases):
        if len(phase) < 4:
            continue

        t = phase["Time"].values
        N_obs = phase["PopBio"].values

        # Compute OLS initial values
        X_ols = np.vstack([t**3, t**2, t, np.ones_like(t)]).T
        ols_model = LinearRegression(fit_intercept=False)
        ols_model.fit(X_ols, N_obs)
        a_start, b_start, c_start, d_start = ols_model.coef_

        # Create parameter object
        params_cubic = Parameters()
        params_cubic.add('a', value=a_start)
        params_cubic.add('b', value=b_start)
        params_cubic.add('c', value=c_start)
        params_cubic.add('d', value=d_start)

        # Perform NLLS fitting
        minner = Minimizer(residuals_cubic, params_cubic, fcn_args=(t, N_obs))
        fit_cubic_NLLS = minner.minimize()

        # Compute R²
        v_fit = fit_cubic_NLLS.params.valuesdict()
        N_fit = v_fit['a'] * t**3 + v_fit['b'] * t**2 + v_fit['c'] * t + v_fit['d']
        
        ss_res = np.sum((N_obs - N_fit) ** 2)
        ss_tot = np.sum((N_obs - np.mean(N_obs)) ** 2)
        r2 = 1 - (ss_res / ss_tot)

        if r2 > 0.6:
            overall_r2 += r2
            phase_models.append((t, N_obs, v_fit, r2))
            
            fit_results.append({
                "ID": subset_id,
                "Phase": i+1,
                "a": v_fit['a'],
                "b": v_fit['b'],
                "c": v_fit['c'],
                "d": v_fit['d'],
                "R²": r2
            })

    # Evaluate overall fit result
    if len(phase_models) > 0:
        avg_r2 = overall_r2 / len(phase_models)
        if avg_r2 > 0.6:
            successful_fits.append(subset_id)

            # Generate plots for first 5 successful subsets
            if plot_count < max_plots:
                plt.figure(figsize=(8, 6))
                colors = ['blue', 'green']
                for j, (t, N_obs, v_fit, r2) in enumerate(phase_models):
                    plt.scatter(t, N_obs, label=f"Phase {j+1} Data", color=colors[j % len(colors)], alpha=0.6)
                    t_fine = np.linspace(min(t), max(t), 100)
                    N_fine = v_fit['a'] * t_fine**3 + v_fit['b'] * t_fine**2 + v_fit['c'] * t_fine + v_fit['d']
                    plt.plot(t_fine, N_fine, label=f"Phase {j+1} (R²={r2:.3f})", color=colors[j % len(colors)], linewidth=2)
                    
                plt.xlabel("Time")
                plt.ylabel("PopBio")
                plt.title(f"NLLS Cubic Fit for ID: {subset_id[:30]}... (Avg R²={avg_r2:.3f})")
                plt.legend()

                safe_id = sanitize_filename(subset_id)
                plot_filename = os.path.join(plots_path, f"fit_ID_{safe_id}.png")
                plt.savefig(plot_filename, dpi=300)
                plt.close()

                plot_count += 1

# Save fit parameters to CSV
fit_results_df = pd.DataFrame(fit_results)
fit_results_df.to_csv(os.path.join(results_path, "nlls_cubic_fits.csv"), index=False)

# Output successful fit count
print(f"\n Number of successful two-stage NLLS cubic fits: {len(successful_fits)} / {len(subset_list)}")
print(f" Fit parameters saved to: {results_path}/nlls_cubic_fits.csv.csv")
print(f" Fit plots saved to: {plots_path} (only for the first 5 successful fits)")


  a = -(dx2)/(dx1 * (dx1 + dx2))
  b = (dx2 - dx1) / (dx1 * dx2)
  c = dx1 / (dx2 * (dx1 + dx2))
  out[tuple(slice1)] = a * f[tuple(slice2)] + b * f[tuple(slice3)] + c * f[tuple(slice4)]
  a = -(dx2)/(dx1 * (dx1 + dx2))
  b = (dx2 - dx1) / (dx1 * dx2)
  c = dx1 / (dx2 * (dx1 + dx2))
  out[tuple(slice1)] = a * f[tuple(slice2)] + b * f[tuple(slice3)] + c * f[tuple(slice4)]
  a = -(dx2)/(dx1 * (dx1 + dx2))
  b = (dx2 - dx1) / (dx1 * dx2)
  b = (dx2 - dx1) / (dx1 * dx2)
  c = dx1 / (dx2 * (dx1 + dx2))
  c = dx1 / (dx2 * (dx1 + dx2))
  out[tuple(slice1)] = a * f[tuple(slice2)] + b * f[tuple(slice3)] + c * f[tuple(slice4)]
  out[tuple(slice1)] = (f[tuple(slice2)] - f[tuple(slice3)]) / dx_n
  a = -(dx2)/(dx1 * (dx1 + dx2))
  b = (dx2 - dx1) / (dx1 * dx2)
  c = dx1 / (dx2 * (dx1 + dx2))
  out[tuple(slice1)] = a * f[tuple(slice2)] + b * f[tuple(slice3)] + c * f[tuple(slice4)]



 Number of successful two-stage NLLS cubic fits: 268 / 285
 Fit parameters saved to: ../results/nlls_cubic_fits.csv.csv
 Fit plots saved to: ../results/cubic_plots (only for the first 5 successful fits)


In [4]:
import os
import hashlib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from lmfit import Minimizer, Parameters
import warnings

warnings.filterwarnings('ignore')

# Load data
data_path = "../data/LogisticGrowthData_Cleaned.csv"
results_path = "../results"
plots_path = os.path.join(results_path, "logistic_plots")

# Ensure storage directories exist
os.makedirs(results_path, exist_ok=True)
os.makedirs(plots_path, exist_ok=True)

# Read data
data_filtered = pd.read_csv(data_path)

# Get all subset IDs
subset_list = data_filtered["ID"].unique()
successful_fits = []
fit_results = []
max_plots = 5  # Limit number of plots
plot_count = 0  

# Process filename
def sanitize_filename(text, max_length=50):
    """Clean filename by removing special characters and limiting length."""
    text = "".join(c if c.isalnum() or c in "_-" else "_" for c in text)
    if len(text) > max_length:
        text = text[:max_length]
        hash_suffix = hashlib.md5(text.encode()).hexdigest()[:8]
        text += f"_{hash_suffix}"
    return text

# Define Logistic Growth objective function
def logistic_growth(params, t, data):
    """Logistic Growth model residuals."""
    v = params.valuesdict()
    r_t = np.clip(v['r'] * t, -100, 100)  # Prevent overflow in r * t calculation
    
    model = (v['N_0'] * v['N_max'] * np.exp(r_t)) / \
            (v['N_max'] + v['N_0'] * (np.exp(r_t) - 1))
    
    if np.any(np.isnan(model)):
        print(f"⚠️ NaN detected in model calculation! Params: {v}")
    
    return model - data  # Return residuals

# Iterate over all subsets for NLLS fitting
for subset_id in subset_list:
    subset_data = data_filtered[data_filtered["ID"] == subset_id]
    
    if len(subset_data) < 5:
        print(f"⚠️ {subset_id} has too few data points, skipping")
        continue

    subset_data = subset_data.dropna(subset=["Time", "PopBio"])
    t = subset_data["Time"].values
    N_obs = subset_data["PopBio"].values

    if np.any(np.isnan(t)) or np.any(np.isnan(N_obs)):
        print(f"⚠️ {subset_id} contains NaN values, skipping")
        continue

    # Set initial parameters
    N_0_init = max(N_obs[0], 1e-3)  
    N_max_init = max(N_obs[-1], N_obs[0] * 1.1)  
    r_init = max((np.log(N_obs[-1]) - np.log(N_obs[0])) / (t[-1] - t[0]), 1e-3)  

    # Create parameter object
    params_logistic = Parameters()
    params_logistic.add('N_0', value=N_0_init, min=0)
    params_logistic.add('N_max', value=N_max_init, min=N_0_init)
    params_logistic.add('r', value=r_init, min=0, max=1)  

    # Perform NLLS fitting
    minner = Minimizer(logistic_growth, params_logistic, fcn_args=(t, N_obs))
    fit_logistic = minner.minimize(method='leastsq')

    # Compute R²
    v_fit = fit_logistic.params.valuesdict()
    N_fit = (v_fit['N_0'] * v_fit['N_max'] * np.exp(v_fit['r'] * t)) / \
            (v_fit['N_max'] + v_fit['N_0'] * (np.exp(v_fit['r'] * t) - 1))

    ss_res = np.sum((N_obs - N_fit) ** 2)
    ss_tot = np.sum((N_obs - np.mean(N_obs)) ** 2)
    r2 = 1 - (ss_res / ss_tot)

    # Store fit parameters
    fit_results.append({
        "ID": subset_id,
        "N_0": v_fit['N_0'],
        "N_max": v_fit['N_max'],
        "r": v_fit['r'],
        "R²": r2
    })

    if r2 > 0.6:
        successful_fits.append(subset_id)

        # Generate plots for first 5 successful subsets
        if plot_count < max_plots:
            plt.figure(figsize=(8, 6))
            plt.scatter(t, N_obs, label="Observed Data", color='blue', alpha=0.6)

            # Generate smooth fit curve
            t_fine = np.linspace(min(t), max(t), 100)
            N_fine = (v_fit['N_0'] * v_fit['N_max'] * np.exp(v_fit['r'] * t_fine)) / \
                     (v_fit['N_max'] + v_fit['N_0'] * (np.exp(v_fit['r'] * t_fine) - 1))

            plt.plot(t_fine, N_fine, label=f"Logistic NLLS Fit (R²={r2:.3f})", color='red', linewidth=2)
            plt.xlabel("Time")
            plt.ylabel("PopBio")
            plt.title(f"Logistic NLLS Fit for ID: {subset_id[:30]}...")
            plt.legend()

            # Save plot
            safe_id = sanitize_filename(subset_id)
            plot_filename = os.path.join(plots_path, f"fit_ID_{safe_id}.png")
            plt.savefig(plot_filename, dpi=300)
            plt.close()

            plot_count += 1

# Save fit parameters to CSV
fit_results_df = pd.DataFrame(fit_results)
fit_results_df.to_csv(os.path.join(results_path, "nlls_logistic_fits.csv"), index=False)

# Output successful fit count
print(f"\n Number of successful Logistic NLLS fits: {len(successful_fits)} / {len(subset_list)}")
print(f" Fit parameters saved to: {results_path}/nlls_logistic_fits.csv")
print(f" Fit plots saved to: {plots_path} (only for the first 5 successful fits)")


⚠️ Serratia marcescens_10_Pasteurised Skim Milk_Phillips, J.D. and Griffiths, M.W., 1987. The relation between temperature and growth of bacteria in dairy products. Food Microbiology, 4(2), pp.173-185. has too few data points, skipping
⚠️ Serratia marcescens_10_UHT Skim Milk_Phillips, J.D. and Griffiths, M.W., 1987. The relation between temperature and growth of bacteria in dairy products. Food Microbiology, 4(2), pp.173-185. has too few data points, skipping
⚠️ Serratia marcescens_15_Pasteurised Skim Milk_Phillips, J.D. and Griffiths, M.W., 1987. The relation between temperature and growth of bacteria in dairy products. Food Microbiology, 4(2), pp.173-185. has too few data points, skipping
⚠️ Serratia marcescens_15_UHT Skim Milk_Phillips, J.D. and Griffiths, M.W., 1987. The relation between temperature and growth of bacteria in dairy products. Food Microbiology, 4(2), pp.173-185. has too few data points, skipping
⚠️ Serratia marcescens_10_UHT Full-fat Milk_Phillips, J.D. and Griffiths

In [5]:
import os
import hashlib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from lmfit import Parameters, Minimizer

# Load data
data_path = "../data/LogisticGrowthData_Cleaned.csv"
results_path = "../results"
plots_path = os.path.join(results_path, "Gompertz_plots")

# Ensure storage directories exist
os.makedirs(results_path, exist_ok=True)
os.makedirs(plots_path, exist_ok=True)

# Read data
data_filtered = pd.read_csv(data_path)

# Get all subset IDs
subset_list = data_filtered["ID"].unique()
successful_fits = []
fit_results = []
max_plots = 5  # Limit number of plots
plot_count = 0  

# Process filename
def sanitize_filename(text, max_length=50):
    """Clean filename by removing special characters and limiting length."""
    text = "".join(c if c.isalnum() or c in "_-" else "_" for c in text)
    if len(text) > max_length:
        text = text[:max_length]
        hash_suffix = hashlib.md5(text.encode()).hexdigest()[:8]
        text += f"_{hash_suffix}"
    return text

# Define Gompertz Growth objective function
def gompertz_residuals(params, t, data):
    """Gompertz growth model residuals."""
    v = params.valuesdict()
    
    exp_term = np.exp(np.clip(v['r_max'] * np.exp(1) * (v['t_lag'] - t) / 
               ((v['N_max'] - v['N_0']) * np.log(10)) + 1, -100, 100))
    
    model = v['N_0'] + (v['N_max'] - v['N_0']) * np.exp(-exp_term)

    return model - data  # Compute residuals

# Estimate initial values for Gompertz model
def estimate_initial_values(t, popbio):
    """Estimate initial parameters for Gompertz model."""
    N_0_init = max(np.percentile(popbio, 5), 1e-3)  
    N_max_init = max(np.percentile(popbio, 95), N_0_init * 1.1)  
    
    dydt = np.gradient(popbio, t)
    r_max_init = np.clip(max(dydt) / (N_max_init - N_0_init), 0.001, 3)  
    
    t_lag_init = t[np.argmin(dydt)]  
    
    return N_0_init, N_max_init, r_max_init, t_lag_init

# Batch fitting
for subset in subset_list:
    subset_data = data_filtered[data_filtered["ID"] == subset]

    if len(subset_data) < 5:  
        print(f"⚠️ {subset} has too few data points, skipping")
        continue

    t = subset_data["Time"].values
    popbio = subset_data["PopBio"].values

    popbio = np.clip(popbio, 1e-3, None)

    # Compute initial values
    N_0_init, N_max_init, r_max_init, t_lag_init = estimate_initial_values(t, popbio)

    # Create parameter object
    params = Parameters()
    params.add_many(
        ('N_0', N_0_init, True, 1e-3, 10),
        ('N_max', N_max_init, True, N_0_init * 1.1, 100),
        ('r_max', r_max_init, True, 0.001, 3),
        ('t_lag', t_lag_init, True, min(t), max(t))
    )

    try:
        # Perform NLLS fitting
        minner = Minimizer(gompertz_residuals, params, fcn_args=(t, popbio))
        fit_result = minner.minimize()

        # Compute R²
        fitted_values = gompertz_residuals(fit_result.params, t, np.zeros_like(popbio)) + popbio
        ss_res = np.sum((popbio - fitted_values) ** 2)
        ss_tot = np.sum((popbio - np.mean(popbio)) ** 2)
        r2 = 1 - (ss_res / ss_tot)

        # Store fit parameters
        v_fit = fit_result.params.valuesdict()
        fit_results.append({
            "ID": subset,
            "N_0": v_fit['N_0'],
            "N_max": v_fit['N_max'],
            "r_max": v_fit['r_max'],
            "t_lag": v_fit['t_lag'],
            "R²": r2
        })

        if r2 > 0.6:  
            successful_fits.append(subset)

            # Generate plots for first 5 successful subsets
            if plot_count < max_plots:
                plt.figure(figsize=(8, 6))
                plt.scatter(t, popbio, label="Observed Data", color="blue", alpha=0.6)
                plt.plot(t, fitted_values, label=f"Gompertz Fit (R²={r2:.3f})", color="red")
                plt.xlabel("Time")
                plt.ylabel("PopBio")
                plt.title(f"Gompertz Model Fit for {subset[:30]}...")
                plt.legend()

                # Save plot
                safe_id = sanitize_filename(subset)
                plot_filename = os.path.join(plots_path, f"fit_ID_{safe_id}.png")
                plt.savefig(plot_filename, dpi=300)
                plt.close()

                plot_count += 1

    except Exception as e:
        print(f"⚠️ {subset} fitting failed: {e}")
        continue  

# Save fit parameters to CSV
fit_results_df = pd.DataFrame(fit_results)
fit_results_df.to_csv(os.path.join(results_path, "nlls_gompertz_fits.csv"), index=False)

# Output successful fit count
print(f"\n Number of successful Gompertz model fits: {len(successful_fits)} / {len(subset_list)}")
print(f" Fit parameters saved to: {results_path}/nlls_gompertz_fits.csv")
print(f" Fit plots saved to: {plots_path} (only for the first 5 successful fits)")

⚠️ Serratia marcescens_10_Pasteurised Skim Milk_Phillips, J.D. and Griffiths, M.W., 1987. The relation between temperature and growth of bacteria in dairy products. Food Microbiology, 4(2), pp.173-185. has too few data points, skipping
⚠️ Serratia marcescens_10_UHT Skim Milk_Phillips, J.D. and Griffiths, M.W., 1987. The relation between temperature and growth of bacteria in dairy products. Food Microbiology, 4(2), pp.173-185. has too few data points, skipping
⚠️ Serratia marcescens_15_Pasteurised Skim Milk_Phillips, J.D. and Griffiths, M.W., 1987. The relation between temperature and growth of bacteria in dairy products. Food Microbiology, 4(2), pp.173-185. has too few data points, skipping
⚠️ Serratia marcescens_15_UHT Skim Milk_Phillips, J.D. and Griffiths, M.W., 1987. The relation between temperature and growth of bacteria in dairy products. Food Microbiology, 4(2), pp.173-185. has too few data points, skipping
⚠️ Serratia marcescens_10_UHT Full-fat Milk_Phillips, J.D. and Griffiths

In [6]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
from matplotlib.backends.backend_pdf import PdfPages

# Define file paths
results_path = "../results"
pdf_filename = "random_combined_model_plot.pdf"
pdf_path = os.path.join(results_path, pdf_filename)  # Final PDF file path

# Read model fitting results
files = {
    "Quadratic": "nlls_quadratic_fits.csv",
    "Cubic": "nlls_cubic_fits.csv",
    "Logistic": "nlls_logistic_fits.csv",
    "Gompertz": "nlls_gompertz_fits.csv"
}

# Load all model fitting results
model_fits = {model: pd.read_csv(os.path.join(results_path, file)) for model, file in files.items()}

# Count the number of successful fits for each model
success_counts = {model: len(df.dropna()) for model, df in model_fits.items()}

# Identify subsets successfully fitted by all models
all_models_fitted = set(model_fits["Quadratic"]["ID"]).intersection(
    set(model_fits["Cubic"]["ID"]),
    set(model_fits["Logistic"]["ID"]),
    set(model_fits["Gompertz"]["ID"])
)

# Convert to list for random selection
all_models_fitted = list(all_models_fitted)

# Randomly select 5 subsets
np.random.seed(42)  # Fix random seed for reproducibility
selected_subsets = np.random.choice(all_models_fitted, 1, replace=False)

# Load raw data
data_path = "../data/LogisticGrowthData_Cleaned.csv"
raw_data = pd.read_csv(data_path)

# Create a PDF to save all plots
with PdfPages(pdf_path) as pdf:
    for subset_id in selected_subsets:
        plt.figure(figsize=(8, 6))
        
        # Retrieve original data
        subset_data = raw_data[raw_data["ID"] == subset_id]
        t = subset_data["Time"].values
        y_true = subset_data["PopBio"].values
        plt.scatter(t, y_true, label="Observed Data", color='black', alpha=0.6)
        
        # Plot fitted curves for four models
        for model_name, df in model_fits.items():
            params_row = df[df["ID"] == subset_id]
            if params_row.empty:
                continue  # Skip if fitting was unsuccessful
            
            # Generate prediction curve
            t_fine = np.linspace(min(t), max(t), 100)
            if model_name == "Quadratic":
                y_pred = params_row["a"].values[0] * t_fine**2 + params_row["b"].values[0] * t_fine + params_row["c"].values[0]
            elif model_name == "Cubic":
                y_pred = (params_row["a"].values[0] * t_fine**3 + params_row["b"].values[0] * t_fine**2 +
                          params_row["c"].values[0] * t_fine + params_row["d"].values[0])
            elif model_name == "Logistic":
                N_0, N_max, r = params_row["N_0"].values[0], params_row["N_max"].values[0], params_row["r"].values[0]
                y_pred = (N_0 * N_max * np.exp(r * t_fine)) / (N_max + N_0 * (np.exp(r * t_fine) - 1))
            elif model_name == "Gompertz":
                N_0, N_max, r_max, t_lag = params_row["N_0"].values[0], params_row["N_max"].values[0], params_row["r_max"].values[0], params_row["t_lag"].values[0]
                exp_term = np.exp(r_max * np.exp(1) * (t_lag - t_fine) / ((N_max - N_0) * np.log(10)) + 1)
                y_pred = N_0 + (N_max - N_0) * np.exp(-exp_term)
            else:
                continue

            plt.plot(t_fine, y_pred, label=f"{model_name} Fit")

        plt.xlabel("Time")
        plt.ylabel("PopBio")
        plt.title(f"Model Fit Comparison for {subset_id[:30]}...")
        plt.legend()
        
        # Save to PDF
        pdf.savefig()
        plt.close()

print(f"\nGenerated fitted plots and saved to PDF: {pdf_path}")



Generated fitted plots and saved to PDF: ../results/random_combined_model_plot.pdf
