download (8).svg

download (9).svg

download (11).svg

<style>
table{border-collapse:collapse;width:100%;font-family:Arial}
th,td{border:1px solid #333;padding:8px}
th{background:#0b3c8a;color:#fff}
.v1{background:#e3f2fd}
.v2{background:#e8f5e9}
.v3{background:#fff3e0}
.v4{background:#fce4ec}
</style>

<table>
<tr>
<th>Aspect</th><th>v1</th><th>v2</th><th>v3</th><th>v4</th>
</tr>
<tr>
<td>Latent Design</td>
<td class="v1">Single z</td>
<td class="v2">Œ≤-VAE</td>
<td class="v3">Contrastive</td>
<td class="v4">Hierarchical z‚ÇÅ+z‚ÇÇ</td>
</tr>
<tr>
<td>Temporal Awareness</td>
<td class="v1">No</td>
<td class="v2">No</td>
<td class="v3">Implicit</td>
<td class="v4">Explicit Regularization</td>
</tr>
<tr>
<td>Separability</td>
<td class="v1">Moderate</td>
<td class="v2">Improved</td>
<td class="v3">Strong</td>
<td class="v4">Very Strong</td>
</tr>
<tr>
<td>Edge Readiness</td>
<td class="v1">Prototype</td>
<td class="v2">Near-Edge</td>
<td class="v3">Edge</td>
<td class="v4">Edge-Optimized</td>
</tr>
</table>


In [None]:
# ============================================================
# üöó SIGNet-V3 + IDInferNet + Synthetic Classifier
# WITH EARLY STOPPING + FULL ABLATION LOGGING
# ============================================================

!pip -q install torch torchvision torchaudio seaborn scikit-learn thop openpyxl

import os, time, copy
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from sklearn.decomposition import PCA
from scipy.spatial.distance import euclidean
from thop import profile

torch.manual_seed(42)
np.random.seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# ============================================================
# 1. DATA & DIRECTORIES
# ============================================================
from google.colab import drive
drive.mount('/content/drive')

DATA_DIR = "/content/drive/MyDrive/DT_Driver_Wise_Data"
RES_DIR = f"{DATA_DIR}/SIGNetv3_Results"
FIG_DIR = f"{RES_DIR}/Figures"
EXCEL_DIR = f"{RES_DIR}/Excel"
MODEL_DIR = f"{RES_DIR}/Models"

for d in [RES_DIR, FIG_DIR, EXCEL_DIR, MODEL_DIR]:
    os.makedirs(d, exist_ok=True)

DRIVERS = ["B", "D", "F"]
FEATURES = [
    "Long_Term_Fuel_Trim_Bank1",
    "Engine_coolant_temperature.1",
    "Activation_of_Air_compressor",
    "Torque_of_friction",
    "Engine_soacking_time",
    "Intake_air_pressure",
]
TIME_COL = "Time(s)"

dfs = []
for d in DRIVERS:
    for s in ["Train", "Valid"]:
        p = f"{DATA_DIR}/Driver_{d}_{s}.csv"
        if os.path.exists(p):
            df = pd.read_csv(p)
            df["Driver"] = d
            df["Split"] = s
            dfs.append(df)

df = pd.concat(dfs, ignore_index=True).dropna(subset=FEATURES + [TIME_COL])

# ============================================================
# 2. PREPROCESSING
# ============================================================
sc_x, sc_t = StandardScaler(), StandardScaler()
enc = OneHotEncoder(sparse_output=False)

X = sc_x.fit_transform(df[FEATURES])
T = sc_t.fit_transform(df[[TIME_COL]])
D = enc.fit_transform(df[["Driver"]])
C = np.concatenate([T, D], axis=1)

X = torch.tensor(X, dtype=torch.float32).to(device)
C = torch.tensor(C, dtype=torch.float32).to(device)

mask_tr = df["Split"] == "Train"
mask_va = df["Split"] == "Valid"

X_tr, C_tr = X[mask_tr], C[mask_tr]
X_va, C_va = X[mask_va], C[mask_va]
drv_va = df.loc[mask_va, "Driver"].values

le = LabelEncoder()
driver_ids = torch.tensor(le.fit_transform(df["Driver"]), dtype=torch.long).to(device)

# ============================================================
# 3. SIGNet-V3 MODEL
# ============================================================
class SIGNet(nn.Module):
    def __init__(self, xdim, cdim, zdim=256, h=256):
        super().__init__()
        self.enc = nn.Sequential(
            nn.Linear(xdim + cdim, h), nn.ReLU(),
            nn.Linear(h, h), nn.ReLU()
        )
        self.mu = nn.Linear(h, zdim)
        self.logvar = nn.Linear(h, zdim)
        self.dec_fc = nn.Linear(zdim + cdim, h)
        self.dec = nn.Sequential(nn.ReLU(), nn.Linear(h, xdim))

    def encode(self, x, c):
        h = self.enc(torch.cat([x, c], 1))
        return self.mu(h), self.logvar(h)

    def reparam(self, mu, lv):
        return mu + torch.randn_like(mu) * torch.exp(0.5 * lv)

    def decode(self, z, c):
        return self.dec(self.dec_fc(torch.cat([z, c], 1)))

    def forward(self, x, c):
        mu, lv = self.encode(x, c)
        z = self.reparam(mu, lv)
        return self.decode(z, c), mu, lv

def vae_loss(r, x, mu, lv):
    rec = F.mse_loss(r, x)
    kl = -0.5 * torch.mean(1 + lv - mu**2 - lv.exp())
    return rec + kl, rec, kl

def contrastive(mu, labels, margin=1.0):
    d = torch.cdist(mu, mu)
    same = (labels[:, None] == labels[None, :]).float()
    diff = 1 - same
    return (same * d**2).mean() + (diff * F.relu(margin - d)**2).mean()

# ============================================================
# 4. TRAIN BASELINE & OPTIMIZED (WITH EARLY STOPPING)
# ============================================================
history = {}
early_meta = {}

for mode in ["baseline", "optimized"]:

    model = SIGNet(X_tr.shape[1], C_tr.shape[1]).to(device)
    opt = optim.Adam(model.parameters(), lr=1e-3 if mode == "baseline" else 1e-4)

    best_val = np.inf
    best_epoch = 0
    patience = 20
    delta = 1e-4
    no_improve = 0

    tr_hist, va_hist = [], []

    for ep in range(1, 401):
        model.train()
        opt.zero_grad()

        r, mu, lv = model(X_tr, C_tr)
        base, _, _ = vae_loss(r, X_tr, mu, lv)
        loss = base if mode == "baseline" else base + 0.1 * contrastive(mu, driver_ids[mask_tr])

        loss.backward()
        opt.step()

        model.eval()
        with torch.no_grad():
            rv, muv, lvv = model(X_va, C_va)
            vloss, _, _ = vae_loss(rv, X_va, muv, lvv)

        tr_hist.append(loss.item())
        va_hist.append(vloss.item())

        if vloss < best_val - delta:
            best_val = vloss
            best_epoch = ep
            best_wts = copy.deepcopy(model.state_dict())
            no_improve = 0
        else:
            no_improve += 1

        if ep % 25 == 0:
            print(f"{mode.upper()} | Ep {ep:03d} | Train={loss:.4f} | Val={vloss:.4f}")

        if no_improve >= patience:
            print(f"‚èπÔ∏è {mode.upper()} early stopped at epoch {ep}")
            break

    model.load_state_dict(best_wts)
    torch.save(model.state_dict(), f"{MODEL_DIR}/signet_v3_{mode}.pt")

    history[mode] = {"train": tr_hist, "val": va_hist}
    early_meta[mode] = {
        "Best_Val_Loss": best_val,
        "Best_Epoch": best_epoch,
        "Stopped_Epoch": ep
    }

# ============================================================
# 5. DRIVER SIGNATURES
# ============================================================
model.load_state_dict(torch.load(f"{MODEL_DIR}/signet_v3_optimized.pt"))
model.eval()

signatures = {}
with torch.no_grad():
    for d in DRIVERS:
        m = df["Driver"] == d
        mu, _ = model.encode(X[m], C[m])
        signatures[d] = mu.mean(0).cpu().numpy()

# ============================================================
# 6. IDINFERNET (LATENT CLASSIFIER + EARLY STOPPING)
# ============================================================
class IDInferNet(nn.Module):
    def __init__(self, z, n):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(z, 128), nn.ReLU(),
            nn.Linear(128, n)
        )
    def forward(self, z): return self.net(z)

with torch.no_grad():
    Z_va = model.encode(X_va, C_va)[0]

y_va = torch.tensor(le.transform(drv_va), dtype=torch.long).to(device)

idnet = IDInferNet(Z_va.shape[1], len(DRIVERS)).to(device)
opt_id = optim.Adam(idnet.parameters(), lr=1e-3)

best_acc = 0
patience = 10
no_improve = 0

for ep in range(1, 101):
    opt_id.zero_grad()
    logits = idnet(Z_va)
    loss = F.cross_entropy(logits, y_va)
    loss.backward()
    opt_id.step()

    acc = accuracy_score(y_va.cpu(), logits.argmax(1).cpu())
    if acc > best_acc:
        best_acc = acc
        best_state = copy.deepcopy(idnet.state_dict())
        no_improve = 0
    else:
        no_improve += 1

    if no_improve >= patience:
        print(f"‚èπÔ∏è IDInferNet early stopped at epoch {ep}")
        break

idnet.load_state_dict(best_state)

# ============================================================
# 7. SYNTHETIC DATA CLASSIFIER (EARLY STOPPING)
# ============================================================
Xs, ys = [], []
for d in DRIVERS:
    mu = torch.tensor(signatures[d], device=device)
    Cd = C[df["Driver"] == d]
    for _ in range(2000):
        z = mu + torch.randn_like(mu) * 0.8
        c = Cd[np.random.randint(0, len(Cd))]
        x = model.decode(z.unsqueeze(0), c.unsqueeze(0))
        Xs.append(x.cpu().numpy()[0])
        ys.append(le.transform([d])[0])

Xs, ys = np.array(Xs), np.array(ys)
Xs_tr, Xs_va, ys_tr, ys_va = train_test_split(Xs, ys, stratify=ys, test_size=0.3)

class DriverClassifier(nn.Module):
    def __init__(self, d, n):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(d, 256), nn.ReLU(),
            nn.Linear(256, n)
        )
    def forward(self, x): return self.net(x)

clf = DriverClassifier(Xs_tr.shape[1], len(DRIVERS)).to(device)
opt_c = optim.Adam(clf.parameters(), lr=1e-3)

Xt = torch.tensor(Xs_tr, dtype=torch.float32).to(device)
yt = torch.tensor(ys_tr, dtype=torch.long).to(device)
Xv = torch.tensor(Xs_va, dtype=torch.float32).to(device)
yv = torch.tensor(ys_va, dtype=torch.long).to(device)

best_val = np.inf
no_improve = 0
for ep in range(1, 201):
    opt_c.zero_grad()
    loss = F.cross_entropy(clf(Xt), yt)
    loss.backward()
    opt_c.step()

    with torch.no_grad():
        vloss = F.cross_entropy(clf(Xv), yv)

    if vloss < best_val - 1e-4:
        best_val = vloss
        best_state = copy.deepcopy(clf.state_dict())
        no_improve = 0
    else:
        no_improve += 1

    if no_improve >= 15:
        print(f"‚èπÔ∏è Synthetic classifier early stopped at epoch {ep}")
        break

clf.load_state_dict(best_state)

# ============================================================
# 8. SAVE ABLATION FIGURE
# ============================================================
plt.figure()
plt.plot(history["baseline"]["val"], label="Baseline")
plt.plot(history["optimized"]["val"], label="Optimized")
plt.legend()
plt.title("SIGNet-V3 Ablation (Validation Loss)")
plt.savefig(f"{FIG_DIR}/ablation_val_loss.png", dpi=300)
plt.close()

# ============================================================
# 9. SAVE EXCEL (FULL METADATA)
# ============================================================
excel_path = f"{EXCEL_DIR}/SIGNetV3_Full_Results.xlsx"
with pd.ExcelWriter(excel_path, engine="openpyxl") as w:
    pd.DataFrame(history["baseline"]).to_excel(w, "Baseline_Loss")
    pd.DataFrame(history["optimized"]).to_excel(w, "Optimized_Loss")
    pd.DataFrame(early_meta).T.to_excel(w, "Early_Stopping_Summary")
    pd.DataFrame(signatures).T.to_excel(w, "Driver_Signatures")
    pd.DataFrame(classification_report(
        y_va.cpu(), idnet(Z_va).argmax(1).cpu(), output_dict=True
    )).T.to_excel(w, "IDInferNet_Report")

print(f"\n‚úÖ ALL RESULTS SAVED TO:\n{excel_path}")


<style>
.comp-table {
  border-collapse: collapse;
  width: 100%;
  font-family: Arial, sans-serif;
  font-size: 14px;
}
.comp-table th {
  background-color: #0b3c8a;
  color: white;
  padding: 10px;
  border: 1px solid #333;
}
.comp-table td {
  padding: 10px;
  border: 1px solid #333;
}
.comp-table tr:nth-child(even) {
  background-color: #f2f4f8;
}
.v1 { background:#e3f2fd; }
.v2 { background:#e8f5e9; }
.v3 { background:#fff3e0; }
</style>

<table class="comp-table">
<tr>
<th>Aspect</th>
<th>SIGNet-v1</th>
<th>SIGNet-v2</th>
<th>SIGNet-v3 (Proposed)</th>
</tr>

<tr>
<td>Encoder Depth</td>
<td class="v1">Shallow</td>
<td class="v2">Deeper + BatchNorm</td>
<td class="v3">Deep + Contrastive-aware</td>
</tr>

<tr>
<td>Latent Dimension</td>
<td class="v1">Low (8)</td>
<td class="v2">Medium (16)</td>
<td class="v3">High (32)</td>
</tr>

<tr>
<td>Loss Function</td>
<td class="v1">VAE</td>
<td class="v2">Œ≤-VAE</td>
<td class="v3">Œ≤-VAE + Contrastive</td>
</tr>

<tr>
<td>Latent Separability</td>
<td class="v1">Moderate</td>
<td class="v2">Improved</td>
<td class="v3">Strong (cluster-aware)</td>
</tr>

<tr>
<td>Digital Twin Quality</td>
<td class="v1">Basic</td>
<td class="v2">Stable</td>
<td class="v3">Discriminative</td>
</tr>

<tr>
<td>DT-GDIN Robustness</td>
<td class="v1">Limited</td>
<td class="v2">Improved</td>
<td class="v3">High</td>
</tr>

<tr>
<td>Edge Readiness</td>
<td class="v1">Prototype</td>
<td class="v2">Near-edge</td>
<td class="v3">Edge-deployable</td>
</tr>

</table>


download (1).svg

download (5).svg

IDInferNet-v3 Architecture

(Discriminative Latent Classifier ‚Äì Real Data)

download (6).svg

DT-GDIN-v3 Architecture
(Digital-Twin-Generated Classifier ‚Äì Synthetic Behavior)
download (7).svg

In [None]:
# ============================================================
# CVAE_Digital_Twin-v3.ipynb
# SIGNet-v3 + IDInferNet-v3 + DT-GDIN-v3
# ============================================================

!pip -q install torch torchvision torchaudio thop seaborn scikit-learn openpyxl

# ============================================================
# 1. IMPORTS & CONFIG
# ============================================================

import os, time, copy
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    accuracy_score, classification_report,
    confusion_matrix, roc_curve, auc
)
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics import silhouette_score, davies_bouldin_score
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from scipy.spatial.distance import euclidean
from thop import profile

SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"‚öôÔ∏è Using device: {device}")

# ============================================================
# 2. DATA & DIRECTORIES
# ============================================================

from google.colab import drive
drive.mount("/content/drive")

DATA_ROOT = "/content/drive/MyDrive/DT_Driver_Wise_Data"
RES_ROOT  = f"{DATA_ROOT}/SIGNetV3_Results"

DIRS = {
    "fig": f"{RES_ROOT}/Figures",
    "mdl": f"{RES_ROOT}/Models",
    "xls": f"{RES_ROOT}/Excel"
}
for d in DIRS.values():
    os.makedirs(d, exist_ok=True)

DRIVERS = ["B", "D", "F"]
FEATURES = [
    "Long_Term_Fuel_Trim_Bank1",
    "Engine_coolant_temperature.1",
    "Activation_of_Air_compressor",
    "Torque_of_friction",
    "Engine_soacking_time",
    "Intake_air_pressure",
]
TIME_COL = "Time(s)"

# ============================================================
# 3. LOAD DATA
# ============================================================

dfs = []
for drv in DRIVERS:
    for sp in ["Train", "Valid"]:
        p = f"{DATA_ROOT}/Driver_{drv}_{sp}.csv"
        if os.path.exists(p):
            df = pd.read_csv(p)
            df["Driver"] = drv
            df["Split"] = sp
            dfs.append(df)

df_all = pd.concat(dfs, ignore_index=True)
df_all = df_all.dropna(subset=FEATURES + [TIME_COL])

# ============================================================
# 4. PREPROCESSING
# ============================================================

sc_x, sc_t = StandardScaler(), StandardScaler()
enc = OneHotEncoder(sparse_output=False)

X = sc_x.fit_transform(df_all[FEATURES])
T = sc_t.fit_transform(df_all[[TIME_COL]])
D = enc.fit_transform(df_all[["Driver"]])
C = np.concatenate([T, D], axis=1)

X = torch.tensor(X, dtype=torch.float32).to(device)
C = torch.tensor(C, dtype=torch.float32).to(device)

mask_tr = df_all["Split"] == "Train"
mask_va = df_all["Split"] == "Valid"

X_tr, C_tr = X[mask_tr], C[mask_tr]
X_va, C_va = X[mask_va], C[mask_va]
drv_va     = df_all.loc[mask_va, "Driver"].values

le_drv = LabelEncoder()
drv_id_all = torch.tensor(le_drv.fit_transform(df_all["Driver"]),
                          dtype=torch.long).to(device)

# ============================================================
# 5. SIGNET-v3 (CONTRASTIVE-CVAE)
# ============================================================

class SIGNetV3(nn.Module):
    def __init__(self, x_dim, c_dim, z_dim=32):
        super().__init__()
        self.enc = nn.Sequential(
            nn.Linear(x_dim + c_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU()
        )
        self.mu = nn.Linear(256, z_dim)
        self.lv = nn.Linear(256, z_dim)

        self.dec = nn.Sequential(
            nn.Linear(z_dim + c_dim, 256),
            nn.ReLU(),
            nn.Linear(256, x_dim)
        )

    def encode(self, x, c):
        h = self.enc(torch.cat([x, c], 1))
        return self.mu(h), self.lv(h)

    def reparam(self, mu, lv):
        return mu + torch.randn_like(mu) * torch.exp(0.5 * lv)

    def decode(self, z, c):
        return self.dec(torch.cat([z, c], 1))

    def forward(self, x, c):
        mu, lv = self.encode(x, c)
        z = self.reparam(mu, lv)
        return self.decode(z, c), mu, lv


def vae_loss(xr, x, mu, lv, beta=0.5):
    rec = F.mse_loss(xr, x)
    kl  = -0.5 * torch.mean(1 + lv - mu**2 - lv.exp())
    return rec + beta * kl, rec, kl


def contrastive_loss(mu, labels, margin=1.0):
    d = torch.cdist(mu, mu)
    same = (labels[:,None] == labels[None,:]).float()
    diff = 1 - same
    return (same * d**2).mean() + (diff * F.relu(margin - d)**2).mean()

# ============================================================
# 6. TRAIN SIGNET-v3 (BASELINE vs OPTIMIZED)
# ============================================================

def train_signet_v3(use_contrastive, batch_size=256):
    model = SIGNetV3(X.shape[1], C.shape[1]).to(device)
    opt = optim.Adam(model.parameters(), lr=1e-3)

    dataset = torch.utils.data.TensorDataset(
        X_tr, C_tr, drv_id_all[mask_tr]
    )
    loader = torch.utils.data.DataLoader(
        dataset, batch_size=batch_size, shuffle=True, drop_last=True
    )

    best, patience = np.inf, 0
    hist = {"train": [], "val": []}
    t0 = time.time()

    for ep in range(1, 601):
        model.train()
        ep_loss = 0.0

        for xb, cb, yb in loader:
            opt.zero_grad()

            xr, mu, lv = model(xb, cb)
            base_loss, _, _ = vae_loss(xr, xb, mu, lv)

            if use_contrastive:
                cl = contrastive_loss(mu, yb)
                loss = base_loss + 0.1 * cl
            else:
                loss = base_loss

            loss.backward()
            opt.step()
            ep_loss += loss.item()

        # Validation (unchanged)
        model.eval()
        with torch.no_grad():
            xv, mu_v, lv_v = model(X_va, C_va)
            vloss, _, _ = vae_loss(xv, X_va, mu_v, lv_v)

        hist["train"].append(ep_loss / len(loader))
        hist["val"].append(vloss.item())

        if vloss < best - 1e-4:
            best = vloss
            best_wts = copy.deepcopy(model.state_dict())
            patience = 0
        else:
            patience += 1

        if patience >= 40:
            break

    model.load_state_dict(best_wts)
    return model, hist, time.time() - t0



model_base, hist_base, time_base = train_signet_v3(False)
model_opt,  hist_opt,  time_opt  = train_signet_v3(True)

torch.save(model_opt.state_dict(), f"{DIRS['mdl']}/signet_v3_optimized.pt")

# ============================================================
# 7. LATENT EXTRACTION
# ============================================================

model_opt.eval()
with torch.no_grad():
    Z_va = model_opt.encode(X_va, C_va)[0].cpu().numpy()

# ============================================================
# 8. LATENT METRICS (INTRA + INTER)
# ============================================================

rows = []
for d in DRIVERS:
    Zd = Z_va[drv_va == d]
    rows.append({
        "Driver": d,
        "Cosine": cosine_similarity(Zd).mean(),
        "Euclidean": np.mean([euclidean(Zd[i], Zd[j])
                              for i in range(len(Zd))
                              for j in range(i+1, len(Zd))]),
        "VarTrace": np.trace(np.cov(Zd.T))
    })

latent_intra_df = pd.DataFrame(rows)

centroids = {d: Z_va[drv_va==d].mean(0) for d in DRIVERS}
latent_inter_df = pd.DataFrame([
    {
        "Pair": f"{d1}-{d2}",
        "Centroid_Euclid": euclidean(centroids[d1], centroids[d2])
    }
    for i,d1 in enumerate(DRIVERS) for d2 in DRIVERS[i+1:]
])

sil = silhouette_score(Z_va, le_drv.transform(drv_va))
db  = davies_bouldin_score(Z_va, le_drv.transform(drv_va))

# ============================================================
# 9. PCA + t-SNE
# ============================================================

Z_pca = PCA(2).fit_transform(Z_va)
Z_tsne = TSNE(2, perplexity=30, random_state=SEED).fit_transform(Z_va)

plt.figure(figsize=(6,5))
sns.scatterplot(x=Z_pca[:,0], y=Z_pca[:,1], hue=drv_va)
plt.title("SIGNet-v3 PCA")
plt.savefig(f"{DIRS['fig']}/latent_pca.png", dpi=300)
plt.close()

plt.figure(figsize=(6,5))
sns.scatterplot(x=Z_tsne[:,0], y=Z_tsne[:,1], hue=drv_va)
plt.title("SIGNet-v3 t-SNE")
plt.savefig(f"{DIRS['fig']}/latent_tsne.png", dpi=300)
plt.close()

# ============================================================
# 10. IDINFERNET-v3
# ============================================================

class IDInferNetV3(nn.Module):
    def __init__(self, z_dim, n):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(z_dim, 128),
            nn.ReLU(),
            nn.Linear(128, n)
        )
    def forward(self, z): return self.net(z)

y = le_drv.transform(drv_va)
Ztr, Zte, ytr, yte = train_test_split(
    Z_va, y, stratify=y, test_size=0.3, random_state=SEED
)

Ztr = torch.tensor(Ztr, dtype=torch.float32).to(device)
Zte = torch.tensor(Zte, dtype=torch.float32).to(device)
ytr = torch.tensor(ytr, dtype=torch.long).to(device)
yte = torch.tensor(yte, dtype=torch.long).to(device)

idnet = IDInferNetV3(Ztr.shape[1], len(DRIVERS)).to(device)
opt = optim.Adam(idnet.parameters(), lr=1e-3)

acc_hist = []
for _ in range(200):
    opt.zero_grad()
    loss = F.cross_entropy(idnet(Ztr), ytr)
    loss.backward()
    opt.step()
    acc_hist.append(
        accuracy_score(yte.cpu(), idnet(Zte).argmax(1).cpu())
    )

# ============================================================
# 11. DT-GDIN-v3
# ============================================================

signatures = {d: centroids[d] for d in DRIVERS}
Xs, ys = [], []

for d in DRIVERS:
    mu = torch.tensor(signatures[d]).to(device)
    Cd = C[df_all["Driver"] == d]
    for _ in range(3000):
        z = mu + 0.6 * torch.randn_like(mu)
        c = Cd[np.random.randint(len(Cd))]
        with torch.no_grad():
            xg = model_opt.decode(z.unsqueeze(0), c.unsqueeze(0))
        Xs.append(xg.cpu().numpy()[0])
        ys.append(d)

Xs = np.array(Xs)
ys = le_drv.transform(ys)

Xtr, Xva, ytr, yva = train_test_split(
    Xs, ys, stratify=ys, test_size=0.3, random_state=SEED
)

dtgdin = IDInferNetV3(Xtr.shape[1], len(DRIVERS)).to(device)
opt = optim.Adam(dtgdin.parameters(), lr=1e-3)

dt_acc = []
for _ in range(200):
    opt.zero_grad()
    loss = F.cross_entropy(dtgdin(
        torch.tensor(Xtr, dtype=torch.float32).to(device)),
        torch.tensor(ytr, dtype=torch.long).to(device))
    loss.backward()
    opt.step()
    dt_acc.append(
        accuracy_score(yva,
            dtgdin(torch.tensor(Xva, dtype=torch.float32).to(device))
            .argmax(1).cpu())
    )


# ============================================================
# 13. IDINFERNET-v3 FULL EVALUATION
# ============================================================

start = time.time()
with torch.no_grad():
    y_pred = idnet(Zte).argmax(1).cpu().numpy()
    y_prob = F.softmax(idnet(Zte), dim=1).cpu().numpy()
idinfer_infer_time = (time.time() - start) / len(Zte) * 1000

cm_id = confusion_matrix(yte.cpu(), y_pred)

plt.figure(figsize=(5,4))
sns.heatmap(cm_id, annot=True, fmt="d",
            xticklabels=le_drv.classes_,
            yticklabels=le_drv.classes_)
plt.title("IDInferNet-v3 Confusion Matrix")
plt.tight_layout()
plt.savefig(f"{DIRS['fig']}/idinfernet_v3_cm.png", dpi=300)
plt.close()

id_report_df = pd.DataFrame(
    classification_report(
        yte.cpu(), y_pred,
        target_names=le_drv.classes_,
        output_dict=True
    )
).T

from sklearn.preprocessing import label_binarize

y_bin = label_binarize(yte.cpu(), classes=range(len(DRIVERS)))
roc_rows = []

plt.figure(figsize=(6,5))
for i, drv in enumerate(le_drv.classes_):
    fpr, tpr, _ = roc_curve(y_bin[:, i], y_prob[:, i])
    auc_i = auc(fpr, tpr)
    roc_rows.append({"Driver": drv, "AUC": auc_i})
    plt.plot(fpr, tpr, label=f"{drv} (AUC={auc_i:.3f})")

plt.plot([0,1],[0,1],'k--')
plt.legend()
plt.title("IDInferNet-v3 ROC")
plt.tight_layout()
plt.savefig(f"{DIRS['fig']}/idinfernet_v3_roc.png", dpi=300)
plt.close()

id_roc_df = pd.DataFrame(roc_rows)
start = time.time()
with torch.no_grad():
    dt_pred = dtgdin(torch.tensor(Xva, dtype=torch.float32).to(device)) \
        .argmax(1).cpu().numpy()
dtgdin_infer_time = (time.time() - start) / len(Xva) * 1000

cm_dt = confusion_matrix(yva, dt_pred)

plt.figure(figsize=(5,4))
sns.heatmap(cm_dt, annot=True, fmt="d",
            xticklabels=le_drv.classes_,
            yticklabels=le_drv.classes_)
plt.title("DT-GDIN-v3 Confusion Matrix")
plt.tight_layout()
plt.savefig(f"{DIRS['fig']}/dtgdin_v3_cm.png", dpi=300)
plt.close()

dt_report_df = pd.DataFrame(
    classification_report(
        yva, dt_pred,
        target_names=le_drv.classes_,
        output_dict=True
    )
).T
# ============================================================
# 16. EDGE METRICS
# ============================================================

with torch.no_grad():
    flops, params = profile(
        model_opt,
        inputs=(X_va[:1], C_va[:1]),
        verbose=False
    )

edge_df = pd.DataFrame([{
    "SIGNet_FLOPs_M": flops / 1e6,
    "SIGNet_Params_M": params / 1e6,
    "IDInferNet_Infer_ms": idinfer_infer_time,
    "DTGDIN_Infer_ms": dtgdin_infer_time
}])
plt.figure()
plt.plot(acc_hist)
plt.title("IDInferNet-v3 Accuracy")
plt.xlabel("Epoch"); plt.ylabel("Accuracy")
plt.savefig(f"{DIRS['fig']}/idinfernet_v3_acc.png", dpi=300)
plt.close()

plt.figure()
plt.plot(dt_acc)
plt.title("DT-GDIN-v3 Accuracy")
plt.xlabel("Epoch"); plt.ylabel("Accuracy")
plt.savefig(f"{DIRS['fig']}/dtgdin_v3_acc.png", dpi=300)
plt.close()

# ============================================================
# 12. SAVE MASTER EXCEL
# ============================================================

excel_path = f"{DIRS['xls']}/SIGNetV3_MasterResults.xlsx"
with pd.ExcelWriter(excel_path, engine="openpyxl") as w:

    pd.DataFrame(hist_base).to_excel(w,"SIGNet_Base",index=False)
    pd.DataFrame(hist_opt).to_excel(w,"SIGNet_Optimized",index=False)

    latent_intra_df.to_excel(w,"Latent_Intra",index=False)
    latent_inter_df.to_excel(w,"Latent_Inter",index=False)

    pd.DataFrame({
        "Silhouette":[sil],
        "Davies_Bouldin":[db]
    }).to_excel(w,"Cluster_Quality",index=False)

    pd.DataFrame(acc_hist, columns=["Accuracy"]).to_excel(w,"IDInferNet_Ablation",index=False)
    id_report_df.to_excel(w,"IDInferNet_Report")
    pd.DataFrame(cm_id).to_excel(w,"IDInferNet_Confusion")
    id_roc_df.to_excel(w,"IDInferNet_ROC")

    pd.DataFrame(dt_acc, columns=["Accuracy"]).to_excel(w,"DT_GDIN_Ablation",index=False)
    dt_report_df.to_excel(w,"DT_GDIN_Report")
    pd.DataFrame(cm_dt).to_excel(w,"DT_GDIN_Confusion")

    edge_df.to_excel(w,"Edge_Metrics",index=False)


print("‚úÖ SIGNet-v3 COMPLETE")
print(f"üìÅ Saved to: {RES_ROOT}")


‚öôÔ∏è Using device: cuda
Mounted at /content/drive
‚úÖ SIGNet-v3 COMPLETE
üìÅ Saved to: /content/drive/MyDrive/DT_Driver_Wise_Data/SIGNetV3_Results


  pd.DataFrame(hist_base).to_excel(w,"SIGNet_Base",index=False)
  pd.DataFrame(hist_opt).to_excel(w,"SIGNet_Optimized",index=False)
  latent_intra_df.to_excel(w,"Latent_Intra",index=False)
  latent_inter_df.to_excel(w,"Latent_Inter",index=False)
  }).to_excel(w,"Cluster_Quality",index=False)
  pd.DataFrame(acc_hist, columns=["Accuracy"]).to_excel(w,"IDInferNet_Ablation",index=False)
  id_report_df.to_excel(w,"IDInferNet_Report")
  pd.DataFrame(cm_id).to_excel(w,"IDInferNet_Confusion")
  id_roc_df.to_excel(w,"IDInferNet_ROC")
  pd.DataFrame(dt_acc, columns=["Accuracy"]).to_excel(w,"DT_GDIN_Ablation",index=False)
  dt_report_df.to_excel(w,"DT_GDIN_Report")
  pd.DataFrame(cm_dt).to_excel(w,"DT_GDIN_Confusion")
  edge_df.to_excel(w,"Edge_Metrics",index=False)


In [1]:
# ============================================================
# CVAE_Digital_Twin-v3 | MASTER TRAIN + OPTIMIZATION
# SIGNet-v3 + IDInferNet-v3 + DT-GDIN-v3
# FP32 + Pruned + Quantized
# ============================================================

!pip -q install thop openpyxl

# ============================================================
# 0. IMPORTS & GLOBALS
# ============================================================

import os, time, copy
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.nn.utils.prune as prune

from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from thop import profile
from google.colab import drive

SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# ============================================================
# 1. PATHS
# ============================================================

drive.mount("/content/drive")

DATA_ROOT = "/content/drive/MyDrive/DT_Driver_Wise_Data"
RES_ROOT  = f"{DATA_ROOT}/SIGNetV3_Results"

DIRS = {
    "mdl": f"{RES_ROOT}/Models",
    "xls": f"{RES_ROOT}/Excel",
}
for d in DIRS.values():
    os.makedirs(d, exist_ok=True)

# ============================================================
# 2. DATA LOADING (REAL CSVs)
# ============================================================

DRIVERS = ["B", "D", "F"]
FEATURES = [
    "Long_Term_Fuel_Trim_Bank1",
    "Engine_coolant_temperature.1",
    "Activation_of_Air_compressor",
    "Torque_of_friction",
    "Engine_soacking_time",
    "Intake_air_pressure",
]
TIME_COL = "Time(s)"

dfs = []
for d in DRIVERS:
    for s in ["Train", "Valid"]:
        p = f"{DATA_ROOT}/Driver_{d}_{s}.csv"
        if not os.path.exists(p):
            raise FileNotFoundError(p)
        df = pd.read_csv(p)
        df["Driver"] = d
        df["Split"] = s
        dfs.append(df)

df_all = pd.concat(dfs, ignore_index=True)
df_all = df_all.dropna(subset=FEATURES + [TIME_COL])

# ============================================================
# 3. PREPROCESSING (UNCHANGED)
# ============================================================

sc_x, sc_t = StandardScaler(), StandardScaler()
enc = OneHotEncoder(sparse_output=False)
le  = LabelEncoder()

X = sc_x.fit_transform(df_all[FEATURES])
T = sc_t.fit_transform(df_all[[TIME_COL]])
D = enc.fit_transform(df_all[["Driver"]])
C = np.concatenate([T, D], axis=1)

X = torch.tensor(X, dtype=torch.float32)
C = torch.tensor(C, dtype=torch.float32)
y = torch.tensor(le.fit_transform(df_all["Driver"]), dtype=torch.long)

mask_tr = df_all["Split"] == "Train"
mask_va = df_all["Split"] == "Valid"

X_tr, C_tr, y_tr = X[mask_tr].to(device), C[mask_tr].to(device), y[mask_tr].to(device)
X_va, C_va, y_va = X[mask_va].to(device), C[mask_va].to(device), y[mask_va].to(device)

# ============================================================
# 4. ARCHITECTURES (EXACT v3)
# ============================================================

class SIGNetV3(nn.Module):
    def __init__(self, x_dim, c_dim, z_dim=32):
        super().__init__()
        self.enc = nn.Sequential(
            nn.Linear(x_dim + c_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU()
        )
        self.mu = nn.Linear(256, z_dim)
        self.lv = nn.Linear(256, z_dim)
        self.dec = nn.Sequential(
            nn.Linear(z_dim + c_dim, 256),
            nn.ReLU(),
            nn.Linear(256, x_dim)
        )

    def encode(self, x, c):
        h = self.enc(torch.cat([x, c], 1))
        return self.mu(h), self.lv(h)

    def reparam(self, mu, lv):
        return mu + torch.randn_like(mu) * torch.exp(0.5 * lv)

    def forward(self, x, c):
        mu, lv = self.encode(x, c)
        z = self.reparam(mu, lv)
        return self.dec(torch.cat([z, c], 1)), mu, lv


class IDInferNetV3(nn.Module):
    def __init__(self, z, n):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(z, 128),
            nn.ReLU(),
            nn.Linear(128, n)
        )
    def forward(self, z): return self.net(z)


class DTGDINV3(nn.Module):
    def __init__(self, d, n):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(d, 256),
            nn.ReLU(),
            nn.Linear(256, n)
        )
    def forward(self, x): return self.net(x)

# ============================================================
# 5. TRAIN (AUTO IF WEIGHTS MISSING)
# ============================================================

signet_p = f"{DIRS['mdl']}/signet_v3.pt"
idnet_p  = f"{DIRS['mdl']}/idinfernet_v3.pt"
dtgdin_p = f"{DIRS['mdl']}/dtgdin_v3.pt"

if not (os.path.exists(signet_p) and os.path.exists(idnet_p) and os.path.exists(dtgdin_p)):
    print("Training v3 models...")

    signet = SIGNetV3(X.shape[1], C.shape[1]).to(device)
    opt = torch.optim.Adam(signet.parameters(), 1e-3)

    for _ in range(250):
        opt.zero_grad()
        xr, mu, lv = signet(X_tr, C_tr)
        loss = F.mse_loss(xr, X_tr) - 0.5 * torch.mean(1 + lv - mu**2 - lv.exp())
        loss.backward()
        opt.step()

    torch.save(signet.state_dict(), signet_p)

    with torch.no_grad():
        Z_tr = signet.encode(X_tr, C_tr)[0]
        Z_va = signet.encode(X_va, C_va)[0]

    idnet = IDInferNetV3(Z_tr.shape[1], len(DRIVERS)).to(device)
    opt = torch.optim.Adam(idnet.parameters(), 1e-3)
    for _ in range(200):
        opt.zero_grad()
        F.cross_entropy(idnet(Z_tr), y_tr).backward()
        opt.step()
    torch.save(idnet.state_dict(), idnet_p)

    dtgdin = DTGDINV3(X_tr.shape[1], len(DRIVERS)).to(device)
    opt = torch.optim.Adam(dtgdin.parameters(), 1e-3)
    for _ in range(200):
        opt.zero_grad()
        F.cross_entropy(dtgdin(X_tr), y_tr).backward()
        opt.step()
    torch.save(dtgdin.state_dict(), dtgdin_p)

# ============================================================
# 6. LOAD MODELS
# ============================================================

signet = SIGNetV3(X.shape[1], C.shape[1]).to(device)
signet.load_state_dict(torch.load(signet_p))
signet.eval()

with torch.no_grad():
    Z_va = signet.encode(X_va, C_va)[0]

idnet = IDInferNetV3(Z_va.shape[1], len(DRIVERS)).to(device)
idnet.load_state_dict(torch.load(idnet_p))
idnet.eval()

dtgdin = DTGDINV3(X_va.shape[1], len(DRIVERS)).to(device)
dtgdin.load_state_dict(torch.load(dtgdin_p))
dtgdin.eval()

# ============================================================
# 7. OPTIMIZATION (FP32 / PRUNED / INT8)
# ============================================================

def edge_profile(model, inputs, dtype_bytes):
    try:
        dev = next(model.parameters()).device
    except StopIteration:
        dev = torch.device("cpu")

    inputs = tuple(i.to(dev) for i in inputs)
    with torch.no_grad():
        flops, params = profile(model, inputs=inputs, verbose=False)
        t0 = time.time()
        for _ in range(50):
            _ = model(*inputs)
        latency = (time.time() - t0)/50*1000

    return {
        "Params": params,
        "FLOPs": flops,
        "Latency_ms": latency,
        "Memory_KB": params * dtype_bytes / 1024
    }

def prune_model(m):
    m = copy.deepcopy(m)
    for l in m.modules():
        if isinstance(l, nn.Linear):
            prune.ln_structured(l, "weight", 0.3, n=1, dim=0)
            prune.remove(l, "weight")
    return m

def quant_model(m):
    return torch.quantization.quantize_dynamic(m.cpu(), {nn.Linear}, torch.qint8)

models = {
    "SIGNetV3_FP32": signet,
    "SIGNetV3_Pruned": prune_model(signet),
    "SIGNetV3_Quant": quant_model(signet),
}

rows = []
for n, m in models.items():
    rows.append({
        "Model": n,
        **edge_profile(m, (X_va[:1], C_va[:1]), 1 if "Quant" in n else 4)
    })
    torch.save(m.state_dict(), f"{DIRS['mdl']}/{n}.pt")

pd.DataFrame(rows).to_excel(f"{DIRS['xls']}/SIGNetV3_Edge.xlsx", index=False)

print("‚úÖ SIGNet-v3 TRAIN + OPTIMIZATION COMPLETE")


Device: cuda
Mounted at /content/drive
Training v3 models...


For migrations of users: 
1. Eager mode quantization (torch.ao.quantization.quantize, torch.ao.quantization.quantize_dynamic), please migrate to use torchao eager mode quantize_ API instead 
2. FX graph mode quantization (torch.ao.quantization.quantize_fx.prepare_fx,torch.ao.quantization.quantize_fx.convert_fx, please migrate to use torchao pt2e quantization API instead (prepare_pt2e, convert_pt2e) 
3. pt2e quantization has been migrated to torchao (https://github.com/pytorch/ao/tree/main/torchao/quantization/pt2e) 
see https://github.com/pytorch/ao/issues/2259 for more details
  return torch.quantization.quantize_dynamic(m.cpu(), {nn.Linear}, torch.qint8)


‚úÖ SIGNet-v3 TRAIN + OPTIMIZATION COMPLETE
