In [1]:
import phate
import scprep
import seaborn as sns
import os
import json
from PIL import Image, ImageDraw
import re
import numpy as np
import cv2
import imageio
import matplotlib.pyplot as plt
import tifffile as tiff
from tqdm import tqdm
from skimage.exposure import equalize_adapthist
from scipy.stats import stats
import matplotlib.animation as animation
import pandas as pd
import csv
import shutil
from skimage.morphology import dilation, erosion
import numpy as np
import matplotlib.pyplot as plt
import imageio
from skimage import measure
from skimage.measure import regionprops, label
from scipy.spatial import distance
import time
import datetime
from mpl_toolkits.mplot3d import Axes3D  # 3D Plotting
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
import seaborn as sns
from sklearn.manifold import TSNE
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
from matplotlib.colors import Normalize
from matplotlib.cm import ScalarMappable
import statsmodels.api as sm
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from statsmodels.stats.outliers_influence import variance_inflation_factor
from statsmodels.stats.diagnostic import het_breuschpagan
from statsmodels.stats.stattools import durbin_watson
from sklearn.linear_model import Ridge, Lasso
from sklearn.preprocessing import StandardScaler, PowerTransformer
from sklearn.metrics import mean_squared_error, r2_score

from skimage.morphology import dilation, erosion
from skimage import measure
from scipy.ndimage import center_of_mass
from glob import glob
import random
from skimage.measure import regionprops, label
from scipy.spatial import distance

import imageio.v2 as imageio
from tifffile import imread

from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score


from scipy import stats
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import RidgeCV, LassoCV, ElasticNetCV, LinearRegression
from sklearn.ensemble import RandomForestRegressor
from xgboost import XGBRegressor
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.model_selection import train_test_split

# Final Regression model

In [None]:


# === CONFIGURATION ===
input_path = "/home/MinaHossain/EmbedTrack/Track_EMBD_Result_Shape/Median/Cells_Centroid_Velocity_TrueLabel_MA_Median_5.csv"
output_root = "/home/MinaHossain/EmbedTrack/Track_EMBD_Result_Shape/Median"
plot_dir = f"{output_root}/Ridge_Plots"
os.makedirs(plot_dir, exist_ok=True)

# === LOAD AND CLEAN DATA ===
df = pd.read_csv(input_path)
collinear_features = ['Area_MA', 'Perimeter_MA', 'Solidity_MA', 'Extent_MA',
                      'Circularity_MA', 'Convexity_MA', 'Elongation_MA', 'Compactness_MA']
spatial_features = ['Centroid_X_MA', 'Centroid_Y_MA', 'X_Centroid_Distance_MA', 'Y_Centroid_Distance_MA']
target_col = 'X_Centroid_Velocity_MA'

df_clean = df[collinear_features + spatial_features + [target_col]].dropna()
X_collinear = df_clean[collinear_features].values
X_spatial = df_clean[spatial_features].values
y = df_clean[target_col].values

# === SPLIT DATA ===
X_col_train, X_col_temp, X_spatial_train, X_spatial_temp, y_train, y_temp = train_test_split(
    X_collinear, X_spatial, y, test_size=0.4, random_state=42)
X_col_val, X_col_test, X_spatial_val, X_spatial_test, y_val, y_test = train_test_split(
    X_col_temp, X_spatial_temp, y_temp, test_size=0.5, random_state=42)

# === PCA ON TRAIN ===
scaler = StandardScaler()
X_col_train_scaled = scaler.fit_transform(X_col_train)
X_col_val_scaled = scaler.transform(X_col_val)
X_col_test_scaled = scaler.transform(X_col_test)

pca = PCA(n_components=3)
X_pca_train = pca.fit_transform(X_col_train_scaled)
X_pca_val = pca.transform(X_col_val_scaled)
X_pca_test = pca.transform(X_col_test_scaled)

X_train_final = np.hstack([X_pca_train, X_spatial_train])
X_val_final = np.hstack([X_pca_val, X_spatial_val])
X_test_final = np.hstack([X_pca_test, X_spatial_test])

# === DEFINE MODELS ===
models = {
    "LinearRegression": LinearRegression(),
    "Ridge": RidgeCV(alphas=np.logspace(-3, 3, 50), cv=5),
    "Lasso": LassoCV(cv=5, random_state=42, max_iter=10000),
    "ElasticNet": ElasticNetCV(cv=5, random_state=42, max_iter=10000),
    "RandomForest": RandomForestRegressor(n_estimators=100, max_depth=6, random_state=42, n_jobs=-1),
    "XGBoost": XGBRegressor(n_estimators=90, max_depth=4, learning_rate=0.01, random_state=42, n_jobs=-1, verbosity=0)
}

# === EVALUATE MODELS ON VALIDATION SET ===
results = {}
for name, model in models.items():
    try:
        model.fit(X_train_final, y_train)
        y_val_pred = model.predict(X_val_final)
        r2 = r2_score(y_val, y_val_pred)
        results[name] = r2
    except Exception as e:
        results[name] = f"Failed: {str(e)}"

best_model_name = max(results, key=lambda k: results[k] if isinstance(results[k], float) else -np.inf)
best_model = models[best_model_name]
best_model.fit(X_train_final, y_train)
y_test_pred = best_model.predict(X_test_final)

# === SAVE COEFFICIENTS AND PERFORMANCE ===
coeff_labels = [f'PC{i+1}' for i in range(3)] + spatial_features
coef_df = pd.DataFrame({'Feature': coeff_labels, 'Coefficient': best_model.coef_}) if hasattr(best_model, 'coef_') else pd.DataFrame()
metrics_df = pd.DataFrame({
    'Metric': ['R-squared', 'MSE'],
    'Value': [r2_score(y_test, y_test_pred), mean_squared_error(y_test, y_test_pred)]
})
coef_df.to_csv(f"{plot_dir}/best_model_coefficients.csv", index=False)
metrics_df.to_csv(f"{plot_dir}/best_model_metrics.csv", index=False)

# === PLOTTING ===
def plot_results(y_true, y_pred):
    residuals = y_true - y_pred
    absolute_errors = np.abs(residuals)

    def save_plot(fig, name):
        fig.tight_layout()
        fig.savefig(f"{plot_dir}/{name}.png")
        plt.close(fig)

    fig, ax = plt.subplots(figsize=(8, 5))
    sns.scatterplot(x=y_pred, y=residuals, ax=ax)
    ax.axhline(0, linestyle='--', color='red')
    ax.set_title("Residuals vs Predicted")
    ax.set_xlabel("Predicted")
    ax.set_ylabel("Residuals")
    save_plot(fig, "residuals_vs_predicted")

    fig, ax = plt.subplots(figsize=(8, 5))
    sns.scatterplot(x=y_true, y=y_pred, ax=ax)
    ax.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], linestyle='--', color='red')
    ax.set_title("Predicted vs Actual")
    ax.set_xlabel("Actual")
    ax.set_ylabel("Predicted")
    save_plot(fig, "predicted_vs_actual")

    fig, ax = plt.subplots(figsize=(8, 5))
    sns.histplot(residuals, bins=30, kde=True, ax=ax)
    ax.set_title("Distribution of Residuals")
    ax.set_xlabel("Residuals")
    save_plot(fig, "residuals_distribution")

    fig, ax = plt.subplots(figsize=(8, 5))
    sorted_errors = np.sort(absolute_errors)
    cum_dist = np.arange(1, len(sorted_errors) + 1) / len(sorted_errors)
    ax.plot(sorted_errors, cum_dist)
    ax.set_title("Cumulative Distribution of Absolute Errors")
    ax.set_xlabel("Absolute Error")
    ax.set_ylabel("Cumulative Proportion")
    save_plot(fig, "cumulative_absolute_errors")

    fig = plt.figure(figsize=(6, 6))
    stats.probplot(residuals, dist="norm", plot=plt)
    plt.title("Q-Q Plot of Residuals")
    save_plot(fig, "qq_plot_residuals")

    fig, ax = plt.subplots(figsize=(8, 5))
    sns.scatterplot(x=y_true, y=absolute_errors, ax=ax)
    ax.set_title("Absolute Error vs Actual Value")
    ax.set_xlabel("Actual Values")
    ax.set_ylabel("Absolute Error")
    save_plot(fig, "absolute_error_vs_actual")

plot_results(y_test, y_test_pred)

# === REPORT ON SPLITS ===
y_train_pred = best_model.predict(X_train_final)
y_val_pred = best_model.predict(X_val_final)
y_test_pred = best_model.predict(X_test_final)

performance = pd.DataFrame({
    "Split": ["Train", "Validation", "Test"],
    "R-squared": [
        r2_score(y_train, y_train_pred),
        r2_score(y_val, y_val_pred),
        r2_score(y_test, y_test_pred)
    ],
    "MSE": [
        mean_squared_error(y_train, y_train_pred),
        mean_squared_error(y_val, y_val_pred),
        mean_squared_error(y_test, y_test_pred)
    ]
})

performance.to_csv(f"{plot_dir}/model_performance_by_split.csv", index=False)
print(f"Best model: {best_model_name}")
print(performance)


# Generate the code block for PCA + Clustering using validation data and applying to test data


# === PCA + KMeans CLUSTERING (Fit on Validation, Apply to Test) ===
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score

output_directory = plot_dir
window_size = 5

# --- Fit PCA on Validation Set ---
X_scaled_val = X_col_val_scaled
pca_full = PCA()
pca_full.fit(X_scaled_val)
explained_variance = pca_full.explained_variance_ratio_
cumulative_variance = np.cumsum(explained_variance)

optimal_components = np.argmax(cumulative_variance >= 0.95) + 1

# --- Plot PCA Cumulative Variance (Elbow Plot) ---
plt.figure(figsize=(8, 6))
plt.plot(range(1, len(cumulative_variance) + 1), cumulative_variance, marker='s', linestyle='-', color='red', label="Cumulative Variance")
plt.axvline(x=optimal_components, color='green', linestyle='--', label=f"Optimal Components ({optimal_components})")
plt.scatter(optimal_components, cumulative_variance[optimal_components-1], color='black', s=50, label="Chosen Point")
plt.xlabel("Number of Principal Components")
plt.ylabel("Cumulative Explained Variance")
plt.title("PCA Elbow Plot with Cumulative Variance")
plt.legend()
plt.grid()
plt.tight_layout()
plt.savefig(os.path.join(output_directory, f"PCA_Elbow_Plot_{window_size}.png"))
plt.close()

# --- Fit 2D PCA on Validation ---
pca_2d = PCA(n_components=2)
pca_val_2d = pca_2d.fit_transform(X_scaled_val)

# --- KMeans on 2D PCA space (Validation) ---
silhouette_scores = []
k_range = range(2, 15)
for k in k_range:
    kmeans = KMeans(n_clusters=k, random_state=0, n_init='auto').fit(pca_val_2d)
    score = silhouette_score(pca_val_2d, kmeans.labels_)
    silhouette_scores.append(score)

optimal_k = k_range[np.argmax(silhouette_scores)]
optimal_score = max(silhouette_scores)

# --- Plot Silhouette Scores ---
plt.figure(figsize=(8, 6))
plt.plot(k_range, silhouette_scores, marker='o', linestyle='-', color='purple', label="Silhouette Score")
plt.axvline(x=optimal_k, color='green', linestyle='--', label=f"Optimal k = {optimal_k}")
plt.scatter(optimal_k, optimal_score, color='red', s=100, zorder=5)
plt.xlabel("Number of Clusters (k)")
plt.ylabel("Silhouette Score")
plt.title("Silhouette Score vs Number of Clusters (Validation)")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.savefig(os.path.join(output_directory, f"Silhouette_Score_vs_K_{window_size}.png"))
plt.close()

# --- Apply to Test Data ---
X_scaled_test = X_col_test_scaled
pca_test_2d = pca_2d.transform(X_scaled_test)
kmeans_final = KMeans(n_clusters=optimal_k, random_state=0, n_init='auto').fit(pca_val_2d)
test_clusters = kmeans_final.predict(pca_test_2d)

# --- Create Test DataFrame with PCA + Clusters ---
pca_test_df = pd.DataFrame(pca_test_2d, columns=["PCA1", "PCA2"])
pca_test_df["X_Centroid_Velocity_MA"] = X_col_test[:, 0]  # Area_MA is first collinear feature
pca_test_df["Cluster"] = test_clusters

# --- Clustered 2D PCA (Colored by Area) ---
vmin = np.percentile(pca_test_df["X_Centroid_Velocity_MA"], 5)
vmax = np.percentile(pca_test_df["X_Centroid_Velocity_MA"], 95)
plt.figure(figsize=(10, 7))
sc = plt.scatter(pca_test_df["PCA1"], pca_test_df["PCA2"], c=pca_test_df["X_Centroid_Velocity_MA"], cmap="plasma", alpha=0.8, vmin=vmin, vmax=vmax)
plt.colorbar(sc, label="X_Centroid Velocity (MA)")
plt.title("2D PCA Test Data Colored by X_Centroid Velocity(MA)")
plt.xlabel("PCA1")
plt.ylabel("PCA2")
plt.grid(True)
plt.tight_layout()
plt.savefig(os.path.join(output_directory, f"PCA_2D_X_Centroid_Velocity_MA_{window_size}.png"))
plt.close()

# --- Cluster Plot with Centers ---
plt.figure(figsize=(10, 7))
plt.scatter(pca_test_df["PCA1"], pca_test_df["PCA2"], c=pca_test_df["Cluster"], cmap="tab10", alpha=0.8)
centers = kmeans_final.cluster_centers_
plt.scatter(centers[:, 0], centers[:, 1], c='black', s=200, marker='o', label='Centers')
for idx, (x, y) in enumerate(centers):
    plt.text(x, y, str(idx), fontsize=12, fontweight='bold', ha='center', va='center', color='white',
             bbox=dict(facecolor='black', boxstyle='circle,pad=0.2'))
plt.title(f"Test Data PCA with KMeans Clusters (k={optimal_k})")
plt.xlabel("PCA1")
plt.ylabel("PCA2")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(os.path.join(output_directory, f"PCA_2D_Clusters_X_Centroid_Velocity_MA_{window_size}.png"))
plt.close()

# --- Boxplot: Area by Cluster (Test Data) ---
plt.figure(figsize=(10, 6))
sns.boxplot(data=pca_test_df, x="Cluster", y="X_Centroid_Velocity_MA", palette="Set2")
sns.stripplot(data=pca_test_df, x="Cluster", y="X_Centroid_Velocity_MA", color='gray', alpha=0.5, jitter=0.2, size=3)
plt.title("X_Centroid Velocity (MA) by Cluster (Test Data)", fontsize=14)
plt.xlabel("Cluster")
plt.ylabel("X_Centroid Velocity (MA)")
plt.tight_layout()
plt.savefig(os.path.join(output_directory, f"Boxplot_X_Centroid_Velocity_MA_By_Cluster_{window_size}.png"))
plt.close()

# --- Save clustered PCA test data and summary ---
pca_test_df.to_csv(os.path.join(output_directory, f"PCA_Cluster_Results_Test_{window_size}.csv"), index=False)

summary = pca_test_df.groupby("Cluster").agg({
    "PCA1": ["mean", "std"],
    "PCA2": ["mean", "std"],
    "X_Centroid_Velocity_MA": ["mean", "std"]
}).reset_index()
summary.columns = ["_".join(col).strip("_") for col in summary.columns.values]
summary.to_csv(os.path.join(output_directory, f"PCA_Cluster_Summary_Test_{window_size}.csv"), index=False)


Best model: XGBoost
        Split  R-squared        MSE
0       Train   0.695967  10.091581
1  Validation   0.810398   2.196952
2        Test   0.843223   1.817665



Passing `palette` without assigning `hue` is deprecated and will be removed in v0.14.0. Assign the `x` variable to `hue` and set `legend=False` for the same effect.

  sns.boxplot(data=pca_test_df, x="Cluster", y="X_Centroid_Velocity_MA", palette="Set2")


# Create the updated FNN pipeline script content with PCA + spatial features + grid search

In [4]:


import os
import argparse
import datetime
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import scipy.stats as stats
from sklearn.decomposition import PCA
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
from itertools import product

# --- CONFIG ---
output_root = "/home/MinaHossain/EmbedTrack/Track_EMBD_Result_Shape/Median/Reg_Analysis_FNN_PCA"
os.makedirs(f"{output_root}/models", exist_ok=True)
os.makedirs(f"{output_root}/results", exist_ok=True)
os.makedirs(f"{output_root}/plots", exist_ok=True)

# --- FNN Model ---
class FNN(nn.Module):
    def __init__(self, input_size, hidden_sizes):
        super(FNN, self).__init__()
        layers = [nn.Linear(input_size, hidden_sizes[0]), nn.ReLU()]
        for i in range(1, len(hidden_sizes)):
            layers.append(nn.Linear(hidden_sizes[i-1], hidden_sizes[i]))
            layers.append(nn.ReLU())
        layers.append(nn.Linear(hidden_sizes[-1], 1))
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

# --- Evaluation ---
def evaluate_model(model, X_test_tensor, y_test_tensor):
    model.eval()
    with torch.no_grad():
        y_pred_tensor = model(X_test_tensor)
    y_pred = y_pred_tensor.cpu().numpy().flatten()
    y_true = y_test_tensor.cpu().numpy().flatten()
    mse = mean_squared_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    print(f"✅ MSE: {mse:.4f} | R²: {r2:.4f}")
    return mse, r2, y_true, y_pred

# --- Plotting ---
def plot_results(y_true, y_pred, train_losses=None):
    residuals = y_true - y_pred
    if train_losses:
        plt.figure(); plt.plot(train_losses); plt.title("Training Loss"); plt.tight_layout()
        plt.savefig(f"{output_root}/plots/loss_curve.png"); plt.close()
    plt.figure(); sns.scatterplot(x=y_pred, y=residuals); plt.axhline(0, linestyle='--', color='red')
    plt.title("Residuals vs Predicted"); plt.tight_layout()
    plt.savefig(f"{output_root}/plots/residuals_vs_predicted.png"); plt.close()
    plt.figure(); sns.scatterplot(x=y_true, y=y_pred); plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--')
    plt.title("Predicted vs Actual"); plt.tight_layout()
    plt.savefig(f"{output_root}/plots/predicted_vs_actual.png"); plt.close()
    plt.figure(); sns.histplot(residuals, bins=30, kde=True)
    plt.title("Distribution of Residuals"); plt.tight_layout()
    plt.savefig(f"{output_root}/plots/residuals_distribution.png"); plt.close()
    plt.figure(); plt.plot(np.sort(np.abs(residuals)), np.linspace(0, 1, len(residuals)))
    plt.title("Cumulative Absolute Errors"); plt.tight_layout()
    plt.savefig(f"{output_root}/plots/cumulative_absolute_errors.png"); plt.close()
    plt.figure(); stats.probplot(residuals, dist="norm", plot=plt)
    plt.title("Q-Q Plot"); plt.tight_layout()
    plt.savefig(f"{output_root}/plots/qq_plot_residuals.png"); plt.close()
    plt.figure(); sns.scatterplot(x=y_true, y=np.abs(residuals))
    plt.title("Absolute Error vs Actual"); plt.tight_layout()
    plt.savefig(f"{output_root}/plots/absolute_error_vs_actual.png"); plt.close()

# --- Run FNN ---
def run_fnn(X_train, X_val, X_test, y_train, y_val, y_test, input_size, hidden_sizes, lr, batch_size, epochs):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = FNN(input_size=input_size, hidden_sizes=hidden_sizes).to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    X_train_tensor = torch.tensor(X_train).float().to(device)
    y_train_tensor = torch.tensor(y_train).float().to(device)
    X_val_tensor = torch.tensor(X_val).float().to(device)
    y_val_tensor = torch.tensor(y_val).float().to(device)
    X_test_tensor = torch.tensor(X_test).float().to(device)
    y_test_tensor = torch.tensor(y_test).float().to(device)

    train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=batch_size, shuffle=True)

    train_losses = []
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for xb, yb in train_loader:
            optimizer.zero_grad()
            pred = model(xb)
            loss = criterion(pred, yb)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        train_losses.append(total_loss / len(train_loader))
        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}: Loss={train_losses[-1]:.4f}")

    return model, train_losses, X_test_tensor, y_test_tensor

# --- Grid Search ---
def grid_search_fnn(csv_path, param_grid):
    df = pd.read_csv(csv_path)
    shape_features = ['Area_MA', 'Perimeter_MA', 'Extent_MA', 'Solidity_MA', 'Compactness_MA', 'Elongation_MA', 'Circularity_MA', 'Convexity_MA']
    spatial_features = ['Centroid_X_MA', 'Centroid_Y_MA', 'X_Centroid_Distance_MA', 'Y_Centroid_Distance_MA']
    target = 'X_Centroid_Velocity_MA'
    df = df.dropna(subset=shape_features + spatial_features + [target])

    X_shape = df[shape_features].values
    X_spatial = df[spatial_features].values
    y = df[target].values.reshape(-1, 1)

    X_shape_scaled = StandardScaler().fit_transform(X_shape)
    X_train_s, X_temp_s, X_train_spatial, X_temp_spatial, y_train, y_temp = train_test_split(
        X_shape_scaled, X_spatial, y, test_size=0.4, random_state=42)
    X_val_s, X_test_s, X_val_spatial, X_test_spatial, y_val, y_test = train_test_split(
        X_temp_s, X_temp_spatial, y_temp, test_size=0.5, random_state=42)

    # PCA
    pca = PCA(n_components=0.95)
    X_train_pca = pca.fit_transform(X_train_s)
    X_val_pca = pca.transform(X_val_s)
    X_test_pca = pca.transform(X_test_s)

    # Combine PCA + spatial
    X_train_final = np.hstack([X_train_pca, X_train_spatial])
    X_val_final = np.hstack([X_val_pca, X_val_spatial])
    X_test_final = np.hstack([X_test_pca, X_test_spatial])

    results = []
    best_r2 = -np.inf
    best_model = None
    best_config = None
    best_losses = None
    best_test_tensors = None

    for lr, batch_size, hidden_sizes, epochs in product(param_grid["lr"], param_grid["batch_size"],
                                                        param_grid["hidden_sizes"], param_grid["epochs"]):
        print(f"▶ Training: lr={lr}, batch={batch_size}, hidden={hidden_sizes}, epochs={epochs}")
        model, train_losses, X_test_tensor, y_test_tensor = run_fnn(
            X_train_final, X_val_final, X_test_final,
            y_train, y_val, y_test,
            input_size=X_train_final.shape[1],
            hidden_sizes=hidden_sizes,
            lr=lr,
            batch_size=batch_size,
            epochs=epochs
        )
        mse, r2, y_true, y_pred = evaluate_model(model, X_test_tensor, y_test_tensor)

        results.append({
            "lr": lr, "batch_size": batch_size, "hidden_sizes": str(hidden_sizes),
            "epochs": epochs, "mse": mse, "r2": r2
        })

        if r2 > best_r2:
            best_r2 = r2
            best_model = model
            best_config = {
                "lr": lr, "batch_size": batch_size,
                "hidden_sizes": hidden_sizes, "epochs": epochs
            }
            best_losses = train_losses
            best_test_tensors = (X_test_tensor, y_test_tensor, y_true, y_pred)

    # Save best model
    model_path = f"{output_root}/models/fnn_best_model.pt"
    torch.save(best_model.state_dict(), model_path)

    pd.DataFrame(results).to_csv(f"{output_root}/results/grid_search_results.csv", index=False)
    print(f"🏆 Best config: {best_config}, R²={best_r2:.4f}")
    return best_model, best_config, best_losses, best_test_tensors

# --- Entry Point ---
if __name__ == "__main__":
    param_grid = {
        "lr": [0.001, 0.0005,0.00001],
        "batch_size": [16, 32],
        "hidden_sizes": [(128, 64, 32), (64, 32, 16),(64,32,16,8),(8,16,32,64,32,16,8)],
        "epochs": [100, 200,300]
    }
    csv_path = "/home/MinaHossain/EmbedTrack/Track_New_Result_Shape/Median/Cells_Centroid_Velocity_TrueLabel_MA_Median_5.csv"
    best_model, best_config, best_losses, (X_test_tensor, y_test_tensor, y_true, y_pred) = grid_search_fnn(csv_path, param_grid)
    plot_results(y_true, y_pred, train_losses=best_losses)



▶ Training: lr=0.001, batch=16, hidden=(128, 64, 32), epochs=100
Epoch 10: Loss=36.3944
Epoch 20: Loss=31.2367
Epoch 30: Loss=11.3344
Epoch 40: Loss=9.9017
Epoch 50: Loss=9.4405
Epoch 60: Loss=4.4220
Epoch 70: Loss=5.4241
Epoch 80: Loss=3.9423
Epoch 90: Loss=5.2508
Epoch 100: Loss=5.2816
✅ MSE: 13.0347 | R²: 0.6386
▶ Training: lr=0.001, batch=16, hidden=(128, 64, 32), epochs=200
Epoch 10: Loss=37.7422
Epoch 20: Loss=29.1949
Epoch 30: Loss=15.7549
Epoch 40: Loss=6.3029
Epoch 50: Loss=4.5938
Epoch 60: Loss=6.4549
Epoch 70: Loss=5.0735
Epoch 80: Loss=6.2158
Epoch 90: Loss=5.0749
Epoch 100: Loss=6.1349
Epoch 110: Loss=4.2883
Epoch 120: Loss=4.1000
Epoch 130: Loss=9.2812
Epoch 140: Loss=3.1869
Epoch 150: Loss=2.7398
Epoch 160: Loss=2.6845
Epoch 170: Loss=15.1445
Epoch 180: Loss=1.8563
Epoch 190: Loss=19.8593
Epoch 200: Loss=2.0967
✅ MSE: 17.2162 | R²: 0.5227
▶ Training: lr=0.001, batch=16, hidden=(128, 64, 32), epochs=300
Epoch 10: Loss=41.2870
Epoch 20: Loss=35.8674
Epoch 30: Loss=30.3807


# Update the previous script by removing spatial features from the model input pipeline
# Keep PCA on shape features, 60/20/20 split, grid search, plots

In [2]:


import os
import argparse
import datetime
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import scipy.stats as stats
from sklearn.decomposition import PCA
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
from itertools import product

# --- CONFIG ---
output_root = "/home/MinaHossain/EmbedTrack/Track_New_Result_Shape/Median/Reg_Analysis_FNN_PCA"
os.makedirs(f"{output_root}/models", exist_ok=True)
os.makedirs(f"{output_root}/results", exist_ok=True)
os.makedirs(f"{output_root}/plots", exist_ok=True)

# --- FNN Model ---
class FNN(nn.Module):
    def __init__(self, input_size, hidden_sizes):
        super(FNN, self).__init__()
        layers = [nn.Linear(input_size, hidden_sizes[0]), nn.ReLU()]
        for i in range(1, len(hidden_sizes)):
            layers.append(nn.Linear(hidden_sizes[i-1], hidden_sizes[i]))
            layers.append(nn.ReLU())
        layers.append(nn.Linear(hidden_sizes[-1], 1))
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        return self.model(x)

# --- Evaluation ---
def evaluate_model(model, X_test_tensor, y_test_tensor):
    model.eval()
    with torch.no_grad():
        y_pred_tensor = model(X_test_tensor)
    y_pred = y_pred_tensor.cpu().numpy().flatten()
    y_true = y_test_tensor.cpu().numpy().flatten()
    mse = mean_squared_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    print(f"✅ MSE: {mse:.4f} | R²: {r2:.4f}")
    return mse, r2, y_true, y_pred

# --- Plotting ---
def plot_results(y_true, y_pred, train_losses=None):
    residuals = y_true - y_pred
    if train_losses:
        plt.figure(); plt.plot(train_losses); plt.title("Training Loss"); plt.tight_layout()
        plt.savefig(f"{output_root}/plots/loss_curve.png"); plt.close()
    plt.figure(); sns.scatterplot(x=y_pred, y=residuals); plt.axhline(0, linestyle='--', color='red')
    plt.title("Residuals vs Predicted"); plt.tight_layout()
    plt.savefig(f"{output_root}/plots/residuals_vs_predicted.png"); plt.close()
    plt.figure(); sns.scatterplot(x=y_true, y=y_pred); plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--')
    plt.title("Predicted vs Actual"); plt.tight_layout()
    plt.savefig(f"{output_root}/plots/predicted_vs_actual.png"); plt.close()
    plt.figure(); sns.histplot(residuals, bins=30, kde=True)
    plt.title("Distribution of Residuals"); plt.tight_layout()
    plt.savefig(f"{output_root}/plots/residuals_distribution.png"); plt.close()
    plt.figure(); plt.plot(np.sort(np.abs(residuals)), np.linspace(0, 1, len(residuals)))
    plt.title("Cumulative Absolute Errors"); plt.tight_layout()
    plt.savefig(f"{output_root}/plots/cumulative_absolute_errors.png"); plt.close()
    plt.figure(); stats.probplot(residuals, dist="norm", plot=plt)
    plt.title("Q-Q Plot"); plt.tight_layout()
    plt.savefig(f"{output_root}/plots/qq_plot_residuals.png"); plt.close()
    plt.figure(); sns.scatterplot(x=y_true, y=np.abs(residuals))
    plt.title("Absolute Error vs Actual"); plt.tight_layout()
    plt.savefig(f"{output_root}/plots/absolute_error_vs_actual.png"); plt.close()

# --- Run FNN ---
def run_fnn(X_train, X_val, X_test, y_train, y_val, y_test, input_size, hidden_sizes, lr, batch_size, epochs):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = FNN(input_size=input_size, hidden_sizes=hidden_sizes).to(device)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    X_train_tensor = torch.tensor(X_train).float().to(device)
    y_train_tensor = torch.tensor(y_train).float().to(device)
    X_val_tensor = torch.tensor(X_val).float().to(device)
    y_val_tensor = torch.tensor(y_val).float().to(device)
    X_test_tensor = torch.tensor(X_test).float().to(device)
    y_test_tensor = torch.tensor(y_test).float().to(device)

    train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=batch_size, shuffle=True)

    train_losses = []
    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for xb, yb in train_loader:
            optimizer.zero_grad()
            pred = model(xb)
            loss = criterion(pred, yb)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        train_losses.append(total_loss / len(train_loader))
        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}: Loss={train_losses[-1]:.4f}")

    return model, train_losses, X_test_tensor, y_test_tensor

# --- Grid Search ---
def grid_search_fnn(csv_path, param_grid):
    df = pd.read_csv(csv_path)
    shape_features = ['Area_MA', 'Perimeter_MA', 'Extent_MA', 'Solidity_MA', 'Compactness_MA', 'Elongation_MA', 'Circularity_MA', 'Convexity_MA']
    target = 'X_Centroid_Velocity_MA'
    df = df.dropna(subset=shape_features + [target])

    X_shape = df[shape_features].values
    y = df[target].values.reshape(-1, 1)

    X_shape_scaled = StandardScaler().fit_transform(X_shape)
    X_train_s, X_temp_s, y_train, y_temp = train_test_split(X_shape_scaled, y, test_size=0.4, random_state=42)
    X_val_s, X_test_s, y_val, y_test = train_test_split(X_temp_s, y_temp, test_size=0.5, random_state=42)

    # PCA
    pca = PCA(n_components=0.95)
    X_train_pca = pca.fit_transform(X_train_s)
    X_val_pca = pca.transform(X_val_s)
    X_test_pca = pca.transform(X_test_s)

    results = []
    best_r2 = -np.inf
    best_model = None
    best_config = None
    best_losses = None
    best_test_tensors = None

    for lr, batch_size, hidden_sizes, epochs in product(param_grid["lr"], param_grid["batch_size"],
                                                        param_grid["hidden_sizes"], param_grid["epochs"]):
        print(f"▶ Training: lr={lr}, batch={batch_size}, hidden={hidden_sizes}, epochs={epochs}")
        model, train_losses, X_test_tensor, y_test_tensor = run_fnn(
            X_train_pca, X_val_pca, X_test_pca,
            y_train, y_val, y_test,
            input_size=X_train_pca.shape[1],
            hidden_sizes=hidden_sizes,
            lr=lr,
            batch_size=batch_size,
            epochs=epochs
        )
        mse, r2, y_true, y_pred = evaluate_model(model, X_test_tensor, y_test_tensor)

        results.append({
            "lr": lr, "batch_size": batch_size, "hidden_sizes": str(hidden_sizes),
            "epochs": epochs, "mse": mse, "r2": r2
        })

        if r2 > best_r2:
            best_r2 = r2
            best_model = model
            best_config = {
                "lr": lr, "batch_size": batch_size,
                "hidden_sizes": hidden_sizes, "epochs": epochs
            }
            best_losses = train_losses
            best_test_tensors = (X_test_tensor, y_test_tensor, y_true, y_pred)

    # Save best model
    model_path = f"{output_root}/models/fnn_best_model.pt"
    torch.save(best_model.state_dict(), model_path)

    pd.DataFrame(results).to_csv(f"{output_root}/results/grid_search_results.csv", index=False)
    print(f"🏆 Best config: {best_config}, R²={best_r2:.4f}")
    return best_model, best_config, best_losses, best_test_tensors

# # --- Entry Point ---
# if __name__ == "__main__":
#     param_grid = {
#         "lr": [0.001, 0.0005],
#         "batch_size": [16, 32],
#         "hidden_sizes": [(128, 64, 32), (64, 32, 16)],
#         "epochs": [100, 200]
#     }
#     csv_path = "/home/MinaHossain/EmbedTrack/Track_New_Result_Shape/Mean/Cells_Centroid_Velocity_TrueLabel_MA_Mean_5.csv"
#     best_model, best_config, best_losses, (X_test_tensor, y_test_tensor, y_true, y_pred) = grid_search_fnn(csv_path, param_grid)
#     plot_results(y_true, y_pred, train_losses=best_losses)



# --- Entry Point ---
if __name__ == "__main__":
    param_grid = {
        "lr": [0.001, 0.0005,0.00001],
        "batch_size": [16, 32],
        "hidden_sizes": [(128, 64, 32), (64, 32, 16),(64,32,16,8),(8,16,32,64,32,16,8)],
        "epochs": [100, 200,300]
    }
    csv_path = "/home/MinaHossain/EmbedTrack/Track_New_Result_Shape/Median/Cells_Centroid_Velocity_TrueLabel_MA_Median_5.csv"
    best_model, best_config, best_losses, (X_test_tensor, y_test_tensor, y_true, y_pred) = grid_search_fnn(csv_path, param_grid)
    plot_results(y_true, y_pred, train_losses=best_losses)



▶ Training: lr=0.001, batch=16, hidden=(128, 64, 32), epochs=100
Epoch 10: Loss=42.2834
Epoch 20: Loss=36.8070
Epoch 30: Loss=36.0448
Epoch 40: Loss=35.2224
Epoch 50: Loss=34.9857
Epoch 60: Loss=34.2992
Epoch 70: Loss=32.9667
Epoch 80: Loss=33.4002
Epoch 90: Loss=31.5127
Epoch 100: Loss=30.9567
✅ MSE: 39.9161 | R²: -0.1067
▶ Training: lr=0.001, batch=16, hidden=(128, 64, 32), epochs=200
Epoch 10: Loss=37.2072
Epoch 20: Loss=36.7678
Epoch 30: Loss=35.8292
Epoch 40: Loss=34.7127
Epoch 50: Loss=34.3937
Epoch 60: Loss=32.9545
Epoch 70: Loss=32.0619
Epoch 80: Loss=31.3390
Epoch 90: Loss=36.3797
Epoch 100: Loss=30.2648
Epoch 110: Loss=31.0115
Epoch 120: Loss=39.5490
Epoch 130: Loss=30.2589
Epoch 140: Loss=28.7733
Epoch 150: Loss=28.9666
Epoch 160: Loss=28.1586
Epoch 170: Loss=26.1867
Epoch 180: Loss=27.0547
Epoch 190: Loss=25.1450
Epoch 200: Loss=25.1422
✅ MSE: 46.4937 | R²: -0.2891
▶ Training: lr=0.001, batch=16, hidden=(128, 64, 32), epochs=300
Epoch 10: Loss=37.6454
Epoch 20: Loss=36.4032