# KAN + SupCon + CenterLoss + Differentiable KShape on ItalyPowerDemand Dataset

In [None]:

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score, silhouette_score
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt
from tqdm import tqdm
import seaborn as sns
from scipy.optimize import linear_sum_assignment


In [None]:

def load_trace_dataset(train_path='ItalyPowerDemand_TRAIN.tsv', test_path='ItalyPowerDemand_TEST.tsv', normalize=True):
    train_data = np.loadtxt(train_path)
    test_data = np.loadtxt(test_path)
    X = np.concatenate([train_data[:, 1:], test_data[:, 1:]], axis=0)
    y = np.concatenate([train_data[:, 0], test_data[:, 0]], axis=0).astype(int)

    unique_labels = np.unique(y)
    label_map = {old: new for new, old in enumerate(unique_labels)}
    y = np.array([label_map[label] for label in y])

    if normalize:
        X = StandardScaler().fit_transform(X)
    return X, y

X, y = load_trace_dataset()
num_classes = len(np.unique(y))


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import sys

sys.path.insert(0, './pykan_local/kan')  # 加入你本地pykan目录

from KANLayer_local import KANLayer # 来自你 pykan 文件夹

class KANEncoder(nn.Module):
    def __init__(self, input_dim=275, hidden_dim=128, output_dim=128):
        super().__init__()
        self.layer1 = KANLayer(in_dim=input_dim, out_dim=hidden_dim, num=21)
        self.layer2 = KANLayer(in_dim=hidden_dim, out_dim=hidden_dim, num=21)
        self.layer3 = KANLayer(in_dim=hidden_dim, out_dim=output_dim, num=21)

    def forward(self, x):
        x, _, _, _ = self.layer1(x)
        x, _, _, _ = self.layer2(x)
        x, _, _, _ = self.layer3(x)
        return F.normalize(x, dim=-1)

In [None]:

class CenterLoss(nn.Module):
    def __init__(self, num_classes, feat_dim):
        super().__init__()
        self.centers = nn.Parameter(torch.randn(num_classes, feat_dim))

    def forward(self, x, labels):
        return ((x - self.centers[labels])**2).sum() / 2.0

class SupConLoss(nn.Module):
    def __init__(self, temperature=0.07):
        super().__init__()
        self.temperature = temperature

    def forward(self, features, labels):
        device = features.device
        features = F.normalize(features, dim=1)
        sim_matrix = torch.matmul(features, features.T) / self.temperature
        mask = torch.eq(labels.unsqueeze(1), labels.unsqueeze(0)).float().to(device)
        logits_mask = torch.ones_like(mask) - torch.eye(features.shape[0], device=device)
        exp_sim = torch.exp(sim_matrix) * logits_mask
        log_prob = sim_matrix - torch.log(exp_sim.sum(dim=1, keepdim=True) + 1e-9)
        mean_log_prob_pos = (mask * log_prob).sum(1) / mask.sum(1).clamp(min=1)
        return -mean_log_prob_pos.mean()

def differentiable_kshape_loss(features, labels, num_classes):
    loss = 0
    for c in range(num_classes):
        mask = labels == c
        if mask.sum() < 2:
            continue
        cluster_feat = features[mask]
        prototype = cluster_feat.mean(dim=0)
        aligned = F.cosine_similarity(cluster_feat, prototype.unsqueeze(0))
        loss += (1 - aligned).mean()
    return loss


In [None]:
from sklearn.cluster import KMeans
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score, silhouette_score
import matplotlib.pyplot as plt
import torch
from torch.utils.data import TensorDataset, DataLoader
from tqdm import tqdm

X_tensor = torch.tensor(X, dtype=torch.float32)
y_tensor = torch.tensor(y, dtype=torch.long)
dataset = TensorDataset(X_tensor, y_tensor)
loader = DataLoader(dataset, batch_size=32, shuffle=True)

encoder = KANEncoder(input_dim=X.shape[1], output_dim=128)
center_loss_fn = CenterLoss(num_classes=num_classes, feat_dim=128)
supcon_loss_fn = SupConLoss()
optimizer = torch.optim.Adam(encoder.parameters(), lr=1e-3)


epochs = 50
loss_history = []
ari_history = []
nmi_history = []

for epoch in tqdm(range(epochs)):
    encoder.train()
    total_loss = 0

    for xb, yb in loader:
        optimizer.zero_grad()
        feats = encoder(xb)
        loss = (center_loss_fn(feats, yb) +
                supcon_loss_fn(feats, yb) +
                differentiable_kshape_loss(feats, yb, num_classes))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    loss_history.append(total_loss)

   # ===== ✅ 聚类评估指标 =====
    encoder.eval()
    with torch.no_grad():
        feats_all = encoder(X_tensor).cpu().numpy()
        true_labels = y_tensor.cpu().numpy()

    kmeans = KMeans(n_clusters=num_classes, n_init=20, random_state=42)
    pred_labels = kmeans.fit_predict(feats_all)

    ari = adjusted_rand_score(true_labels, pred_labels)
    nmi = normalized_mutual_info_score(true_labels, pred_labels)
    sil = silhouette_score(feats_all, pred_labels)

    ari_history.append(ari)
    nmi_history.append(nmi)

    print(f"Epoch {epoch} | Loss: {total_loss:.4f} | ARI: {ari:.4f} | NMI: {nmi:.4f} | Silhouette: {sil:.4f}")

In [None]:
# ===== 📈 可视化 Loss / ARI / NMI 曲线 =====
epochs_range = range(len(loss_history))

# 创建横向排列的三个子图
fig, axs = plt.subplots(1, 3, figsize=(18, 5), sharey=False)

axs[0].plot(epochs_range, loss_history, label='Loss', color='blue')
axs[0].set_title('Loss Curve')
axs[0].set_xlabel('Epoch')
axs[0].set_ylabel('Loss')
axs[0].grid(True)

axs[1].plot(epochs_range, ari_history, label='ARI', color='green')
axs[1].set_title('ARI Curve')
axs[1].set_xlabel('Epoch')
axs[1].set_ylabel('ARI')
axs[1].grid(True)

axs[2].plot(epochs_range, nmi_history, label='NMI', color='red')
axs[2].set_title('NMI Curve')
axs[2].set_xlabel('Epoch')
axs[2].set_ylabel('NMI')
axs[2].grid(True)
plt.tight_layout()

svg_path = "./metrics_over_epochs3.svg"
plt.savefig(svg_path, format='svg')
plt.show()

In [None]:

@torch.no_grad()
def evaluate_clustering(encoder, X_tensor, y_true, num_classes):
    encoder.eval()
    feats = encoder(X_tensor).cpu().numpy()
    from sklearn.cluster import KMeans
    km = KMeans(n_clusters=num_classes, random_state=42).fit(feats)
    pred = km.labels_

    # Hungarian matching
    from sklearn.metrics import confusion_matrix
    cm = confusion_matrix(y_true, pred)
    row_ind, col_ind = linear_sum_assignment(-cm)
    mapping = {col: row for row, col in zip(row_ind, col_ind)}
    pred_aligned = np.array([mapping[p] for p in pred])

    ari = adjusted_rand_score(y_true, pred_aligned)
    nmi = normalized_mutual_info_score(y_true, pred_aligned)
    sil = silhouette_score(feats, y_true)
    print(f"ARI: {ari:.4f}, NMI: {nmi:.4f}, Silhouette: {sil:.4f}")
    return feats, pred_aligned

feats, pred = evaluate_clustering(encoder, X_tensor, y, num_classes)


In [None]:

tsne = TSNE(n_components=2)
vis = tsne.fit_transform(feats)
plt.figure(figsize=(6,5))
sns.scatterplot(x=vis[:,0], y=vis[:,1], hue=pred, palette='tab10')
plt.title("Clustering Results on the ItalyPowerDemand Data")
svg_path = "./3.svg"
plt.savefig(svg_path, format='svg')
plt.show()


In [None]:
from scipy.optimize import linear_sum_assignment
import numpy as np

def clustering_accuracy(true_labels, pred_labels):
    """
    使用匈牙利算法计算聚类准确率 ACC。
    """
    true_labels = np.asarray(true_labels)
    pred_labels = np.asarray(pred_labels)
    assert true_labels.shape == pred_labels.shape

    D = max(pred_labels.max(), true_labels.max()) + 1
    cost_matrix = np.zeros((D, D), dtype=np.int64)

    for i in range(pred_labels.size):
        cost_matrix[pred_labels[i], true_labels[i]] += 1

    row_ind, col_ind = linear_sum_assignment(cost_matrix.max() - cost_matrix)
    acc = cost_matrix[row_ind, col_ind].sum() / pred_labels.size
    return acc

acc = clustering_accuracy(true_labels, pred_labels)
print(f"Clustering Accuracy (ACC): {acc:.4f}")