In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

# Optional: Optuna for hyperparameter tuning
try:
    import optuna
    OPTUNA_AVAILABLE = True
    print("Optuna imported successfully.")
except ImportError:
    OPTUNA_AVAILABLE = False
    print("Optuna not installed. Install with `pip install optuna` to use tuning.")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

# LSTM Regression for Y_rice

This notebook trains an LSTM model (PyTorch) on tabular data with target **Y_rice**, 
adds prediction vs. true plots, and (optionally) performs Optuna hyperparameter tuning.

> ðŸ”§ **Note:** Replace the data-loading part below with your actual dataset if needed, but keep the variables `X_train_scaled`, `X_val_scaled`, `X_test_scaled`, `y_train`, `y_val`, `y_test` in the same format.


In [None]:
# === CONFIG ===
RANDOM_STATE = 42
VAL_SIZE     = 0.15
TEST_SIZE    = 0.15
BATCH_SIZE   = 64

# === DATA LOADING (EXAMPLE) ===
# TODO: Replace this section with your own data-loading code if you already prepared
# X_train_scaled, X_val_scaled, X_test_scaled, y_train, y_val, y_test elsewhere.

# Example assumes a CSV file with 'Y_rice' as target and the rest as features.
# COMMENT OUT this block if you already have the scaled splits in memory.

csv_path = "your_data.csv"  # <- change this
try:
    df = pd.read_csv(csv_path)
    assert "Y_rice" in df.columns, "Target column 'Y_rice' not found. Adjust the name."
    X = df.drop(columns=["Y_rice"]).values.astype(np.float32)
    y = df["Y_rice"].values.astype(np.float32)

    # Train/val/test split
    X_temp, X_test, y_temp, y_test = train_test_split(
        X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE
    )
    val_ratio = VAL_SIZE / (1.0 - TEST_SIZE)
    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp, test_size=val_ratio, random_state=RANDOM_STATE
    )

    # Scale features (fit on train only)
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_val_scaled   = scaler.transform(X_val)
    X_test_scaled  = scaler.transform(X_test)

    print("Data loaded from:", csv_path)
    print("Shapes:")
    print("X_train_scaled:", X_train_scaled.shape)
    print("X_val_scaled:  ", X_val_scaled.shape)
    print("X_test_scaled: ", X_test_scaled.shape)
    print("y_train:", y_train.shape, "y_val:", y_val.shape, "y_test:", y_test.shape)
except FileNotFoundError:
    print("\n[!] CSV file not found at", csv_path)
    print("If you already have X_train_scaled, X_val_scaled, X_test_scaled, y_train, y_val, y_test")
    print("defined in your environment, you can ignore this block.")

In [None]:
# === SANITY CHECKS ===
def check_array(name, arr):
    arr_np = np.array(arr)
    print(f"{name}: NaN={np.isnan(arr_np).any()}, inf={np.isinf(arr_np).any()}, "
          f"shape={arr_np.shape}")
    if arr_np.size > 0 and np.isfinite(arr_np).any():
        print(f"    min={np.nanmin(arr_np):.4f}, max={np.nanmax(arr_np):.4f}")

check_array("X_train_scaled", X_train_scaled)
check_array("X_val_scaled",   X_val_scaled)
check_array("X_test_scaled",  X_test_scaled)
check_array("y_train",        y_train)
check_array("y_val",          y_val)
check_array("y_test",         y_test)

In [None]:
# === TORCH DATASETS & DATALOADERS ===
X_train_t = torch.tensor(X_train_scaled, dtype=torch.float32)
X_val_t   = torch.tensor(X_val_scaled,   dtype=torch.float32)
X_test_t  = torch.tensor(X_test_scaled,  dtype=torch.float32)

y_train_t = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
y_val_t   = torch.tensor(y_val,   dtype=torch.float32).view(-1, 1)
y_test_t  = torch.tensor(y_test,  dtype=torch.float32).view(-1, 1)

# For LSTM, we treat each sample as sequence length 1: (batch, seq_len=1, input_size)
def make_seq(x):
    return x.unsqueeze(1)  # (batch, 1, features)

train_ds = TensorDataset(make_seq(X_train_t), y_train_t)
val_ds   = TensorDataset(make_seq(X_val_t),   y_val_t)
test_ds  = TensorDataset(make_seq(X_test_t),  y_test_t)

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
val_loader   = DataLoader(val_ds,   batch_size=BATCH_SIZE, shuffle=False)
test_loader  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False)

input_size = X_train_t.shape[1]
print("Input feature size:", input_size)

In [None]:
# === MODEL DEFINITION ===
class LSTMRegressor(nn.Module):
    def __init__(self, input_size, hidden_size=64, num_layers=2, dropout=0.1):
        super().__init__()
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout if num_layers > 1 else 0.0,
        )
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        # x: (batch, seq_len=1, input_size)
        out, _ = self.lstm(x)
        # take last time step
        last = out[:, -1, :]  # (batch, hidden_size)
        out = self.fc(last)
        return out

def create_model(hidden_size=64, num_layers=2, dropout=0.1, lr=1e-3):
    model = LSTMRegressor(input_size, hidden_size, num_layers, dropout).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, betas=(0.9, 0.999))
    criterion = nn.MSELoss()
    return model, optimizer, criterion

In [None]:
# === TRAINING & EVALUATION UTILITIES ===
def rmse(y_true, y_pred):
    return np.sqrt(mean_squared_error(y_true, y_pred))

def evaluate_model(model, data_loader):
    model.eval()
    preds = []
    trues = []
    with torch.no_grad():
        for xb, yb in data_loader:
            xb = xb.to(device)
            yb = yb.to(device)
            out = model(xb)
            preds.append(out.cpu().numpy())
            trues.append(yb.cpu().numpy())
    preds = np.vstack(preds).ravel()
    trues = np.vstack(trues).ravel()
    return preds, trues

def train_model(
    model,
    optimizer,
    criterion,
    train_loader,
    val_loader,
    n_epochs=100,
    clip_norm=1.0,
):
    history = {
        "train_rmse": [],
        "val_rmse":   [],
        "train_r2":   [],
        "val_r2":     [],
    }

    for epoch in range(1, n_epochs + 1):
        model.train()
        train_losses = []
        for xb, yb in train_loader:
            xb = xb.to(device)
            yb = yb.to(device)

            optimizer.zero_grad()
            preds = model(xb)
            loss = criterion(preds, yb)
            if torch.isnan(loss):
                print(f"[Epoch {epoch}] NaN loss detected, breaking.")
                return history
            loss.backward()
            if clip_norm is not None:
                torch.nn.utils.clip_grad_norm_(model.parameters(), clip_norm)
            optimizer.step()
            train_losses.append(loss.item())

        # Evaluation
        train_pred, train_true = evaluate_model(model, train_loader)
        val_pred,   val_true   = evaluate_model(model, val_loader)

        train_rmse_val = rmse(train_true, train_pred)
        val_rmse_val   = rmse(val_true, val_pred)
        train_r2_val   = r2_score(train_true, train_pred)
        val_r2_val     = r2_score(val_true, val_pred)

        history["train_rmse"].append(train_rmse_val)
        history["val_rmse"].append(val_rmse_val)
        history["train_r2"].append(train_r2_val)
        history["val_r2"].append(val_r2_val)

        print(
            f"Epoch {epoch:3d}/{n_epochs} - "
            f"Train RMSE: {train_rmse_val:.4f}, Val RMSE: {val_rmse_val:.4f}, "
            f"Train R2: {train_r2_val:.4f}, Val R2: {val_r2_val:.4f}"
        )

    return history

In [None]:
# === BASELINE LSTM TRAINING ===
BASE_HIDDEN_SIZE = 64
BASE_NUM_LAYERS  = 2
BASE_DROPOUT     = 0.1
BASE_LR          = 1e-3
BASE_EPOCHS      = 100

baseline_model, baseline_opt, baseline_crit = create_model(
    hidden_size=BASE_HIDDEN_SIZE,
    num_layers=BASE_NUM_LAYERS,
    dropout=BASE_DROPOUT,
    lr=BASE_LR,
)

baseline_history = train_model(
    baseline_model,
    baseline_opt,
    baseline_crit,
    train_loader,
    val_loader,
    n_epochs=BASE_EPOCHS,
    clip_norm=1.0,
)

# Evaluate on val & test
val_pred,  val_true  = evaluate_model(baseline_model, val_loader)
test_pred, test_true = evaluate_model(baseline_model, test_loader)

baseline_val_rmse = rmse(val_true, val_pred)
baseline_val_r2   = r2_score(val_true, val_pred)
baseline_test_rmse = rmse(test_true, test_pred)
baseline_test_r2   = r2_score(test_true, test_pred)

print("\nBaseline LSTM performance:")
print(f"Val  RMSE: {baseline_val_rmse:.4f}, R2: {baseline_val_r2:.4f}")
print(f"Test RMSE: {baseline_test_rmse:.4f}, R2: {baseline_test_r2:.4f}")

In [None]:
# === PLOTS: LOSS CURVES (RMSE) ===
epochs = range(1, len(baseline_history["train_rmse"]) + 1)

plt.figure()
plt.plot(epochs, baseline_history["train_rmse"], label="Train RMSE")
plt.plot(epochs, baseline_history["val_rmse"],   label="Val RMSE")
plt.xlabel("Epoch")
plt.ylabel("RMSE")
plt.title("LSTM Training vs Validation RMSE")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

plt.figure()
plt.plot(epochs, baseline_history["train_r2"], label="Train R2")
plt.plot(epochs, baseline_history["val_r2"],   label="Val R2")
plt.xlabel("Epoch")
plt.ylabel("RÂ²")
plt.title("LSTM Training vs Validation RÂ²")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
# === PLOTS: TRUE vs PREDICTED ===
def plot_scatter(y_true, y_pred, title):
    y_true = np.array(y_true).ravel()
    y_pred = np.array(y_pred).ravel()
    plt.figure(figsize=(6, 6))
    plt.scatter(y_true, y_pred, alpha=0.5)
    min_val = min(y_true.min(), y_pred.min())
    max_val = max(y_true.max(), y_pred.max())
    plt.plot([min_val, max_val], [min_val, max_val], 'r--', label='y = x')
    plt.xlabel("True Y_rice")
    plt.ylabel("Predicted Y_rice")
    plt.title(title)
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

plot_scatter(val_true,  val_pred,  "Validation: True vs Predicted Y_rice (LSTM)")
plot_scatter(test_true, test_pred, "Test: True vs Predicted Y_rice (LSTM)")

In [None]:
# === OPTUNA HYPERPARAMETER TUNING (OPTIONAL) ===
if not OPTUNA_AVAILABLE:
    print("Optuna not available, skipping tuning. Install with `pip install optuna`.")
else:
    def objective(trial):
        hidden_size = trial.suggest_int("hidden_size", 32, 256, log=True)
        num_layers  = trial.suggest_int("num_layers", 1, 3)
        dropout     = trial.suggest_float("dropout", 0.0, 0.4)
        lr          = trial.suggest_float("lr", 1e-4, 5e-3, log=True)
        epochs      = trial.suggest_int("epochs", 40, 100)

        model, optimizer, criterion = create_model(
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout,
            lr=lr,
        )

        history = train_model(
            model,
            optimizer,
            criterion,
            train_loader,
            val_loader,
            n_epochs=epochs,
            clip_norm=1.0,
        )

        # use last val RMSE as objective
        val_rmse_hist = history["val_rmse"]
        if len(val_rmse_hist) == 0:
            return float("inf")
        return val_rmse_hist[-1]

    study = optuna.create_study(direction="minimize")
    study.optimize(objective, n_trials=20, show_progress_bar=True)

    print("\nBest trial:")
    print(study.best_trial)

    # Retrain best model
    best_params = study.best_trial.params
    best_model, best_opt, best_crit = create_model(
        hidden_size=best_params["hidden_size"],
        num_layers=best_params["num_layers"],
        dropout=best_params["dropout"],
        lr=best_params["lr"],
    )
    best_history = train_model(
        best_model,
        best_opt,
        best_crit,
        train_loader,
        val_loader,
        n_epochs=best_params["epochs"],
        clip_norm=1.0,
    )

    best_val_pred,  best_val_true  = evaluate_model(best_model, val_loader)
    best_test_pred, best_test_true = evaluate_model(best_model, test_loader)

    best_val_rmse  = rmse(best_val_true,  best_val_pred)
    best_val_r2    = r2_score(best_val_true,  best_val_pred)
    best_test_rmse = rmse(best_test_true, best_test_pred)
    best_test_r2   = r2_score(best_test_true, best_test_pred)

    print("\nBest LSTM (Optuna) performance:")
    print(f"Val  RMSE: {best_val_rmse:.4f}, R2: {best_val_r2:.4f}")
    print(f"Test RMSE: {best_test_rmse:.4f}, R2: {best_test_r2:.4f}")

In [None]:
# === PERFORMANCE COMPARISON & OPTUNA VISUALIZATION ===
results = []

results.append({
    "Model": "Baseline_LSTM",
    "Val_RMSE": baseline_val_rmse,
    "Val_R2":   baseline_val_r2,
    "Test_RMSE": baseline_test_rmse,
    "Test_R2":   baseline_test_r2,
})

# If you have metrics from your initial feed-forward NN, put them here:
# initial_nn_val_rmse = ...
# initial_nn_val_r2   = ...
# initial_nn_test_rmse = ...
# initial_nn_test_r2   = ...
# results.append({
#     "Model": "Initial_NN",
#     "Val_RMSE": initial_nn_val_rmse,
#     "Val_R2":   initial_nn_val_r2,
#     "Test_RMSE": initial_nn_test_rmse,
#     "Test_R2":   initial_nn_test_r2,
# })

if OPTUNA_AVAILABLE and 'best_val_rmse' in globals():
    results.append({
        "Model": "Best_LSTM_Optuna",
        "Val_RMSE": best_val_rmse,
        "Val_R2":   best_val_r2,
        "Test_RMSE": best_test_rmse,
        "Test_R2":   best_test_r2,
    })

results_df = pd.DataFrame(results)
print(results_df)

# Bar plots for RMSE and R2 comparison
plt.figure()
plt.bar(results_df["Model"], results_df["Val_RMSE"])
plt.ylabel("Val RMSE")
plt.title("Model Comparison - Validation RMSE")
plt.xticks(rotation=30)
plt.tight_layout()
plt.show()

plt.figure()
plt.bar(results_df["Model"], results_df["Val_R2"])
plt.ylabel("Val RÂ²")
plt.title("Model Comparison - Validation RÂ²")
plt.xticks(rotation=30)
plt.tight_layout()
plt.show()

# Optuna visualizations (if available)
if OPTUNA_AVAILABLE and 'study' in globals():
    try:
        from optuna.visualization import plot_optimization_history, plot_param_importances

        fig1 = plot_optimization_history(study)
        fig1.show()

        fig2 = plot_param_importances(study)
        fig2.show()
    except Exception as e:
        print("Could not generate Optuna visualizations:", e)