In [1]:
import torch 
import torch.nn as nn
import torch.nn.functional as F

In [13]:
class GATE(nn.Module):
    def __init__(self,hidden_dims,A,X,S,R,lambda_=0.05):
        super().__init__()
        self.lambda_ = lambda_
        self.n_layers = len(hidden_dims)-1
        self.W,self.v = self.define_weights(hidden_dims)
        self.C={}
        self.threshold = 0.01
        self.A = A
        self.X = X
        self.S = S
        self.R = R
        self.apply(self._init_weights)
    #前向传播
    def forward(self,A,X,R,S):

        #编码
        H = X
        for layer in range(self.n_layers):
            H = self.encoder(A, H, layer)
        #最终节点表征
        self.H = H

        #解码
        for layer in range(self.n_layers-1,-1,-1):
            H = self.decoder(H,layer)
        X_ = H

        #节点得重构损失
        features_loss = torch.sqrt(torch.sum((X-X_)**2))

        #图结构得重构损失
        S_emb = self.H[S]
        R_emb = self.H[R]
        structure_loss = -torch.log(torch.sigmoid(torch.sum(S_emb*R_emb,dim=-1))).sum()

        #总损失
        self.loss = features_loss + self.lambda_*structure_loss

        return self.loss,self.H,self.C
    def encoder(self,A,H,layer):
        H = nn.LeakyReLU(0.2)(H)
        H = torch.matmul(H,self.W[layer].weight.t())
        self.C[layer]= self.graph_attention_layer(A,H,self.v[layer],layer,self.threshold)
        return torch.matmul(self.C[layer],H)
    #解码器
    def decoder(self,H,layer):
        H = torch.matmul(H,self.W[layer].weight)
        H = nn.BatchNorm1d(H.size(1))(H)   #添加 BatchNorm
        H = nn.ReLU()(H)
        return torch.matmul(self.C[layer],H)
    
    def define_weights(self,hidden_dims):
        W = nn.ModuleList(nn.Linear(hidden_dims[i],hidden_dims[i+1])for i in range(self.n_layers))
        Ws_att = nn.ModuleList([nn.ModuleList([nn.Linear(hidden_dims[i+1],1,bias=False),nn.Linear(hidden_dims[i+1],1,bias=False)])for i in range(self.n_layers)])
        return W,Ws_att
    def graph_attention_layer(self,A,M,v,layer,threshold):
        '''
        全连接图注意力层（带阈值剪枝）
        参数：
        M:节点特征矩阵N,d
        v:注意力参数列表，包含两个可学习向量v1,v2每个形状d,1
        layer:层编号
        threshold:注意力权重剪枝阈值(默认0.1)
        '''
        #启用异常检测，监控反向传播
        with torch.autograd.set_detect_anomaly(True):
            #计算全连接注意力分数
            f1 = torch.matmul(M,v[0].weight.t())
            f2 = torch.matmul(M,v[1].weight.t())

            #生成全连接分数矩阵N,N
            logits = f1 + f2.T
            logits = logits / torch.max(torch.abs(logits))   #归一化
            #激活函数
            unnormalized_attentions = torch.exp(logits)
            
            #行方向归一化
            attentions = F.softmax(unnormalized_attentions,dim=1)
         
            #阈值剪枝
            attentions = F.dropout(attentions,p=0.2,training=self.training)
            return attentions
    
    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            nn.init.xavier_normal_(m.weight)
            if m.bias is not None:
                nn.init.zeros_(m.bias)
        elif isinstance(m, nn.ModuleList):   #针对注意力参数的特殊初始化
            for sub_module in m:
                if isinstance(sub_module, nn.Linear):
                    nn.init.normal_(sub_module.weight, mean=0, std=1)

In [5]:
import pandas as pd
import numpy as np
#数据归一化
df = pd.read_excel('standardized_file.xlsx')


In [6]:
for column in df.columns:
    df[column] = (df[column]-df[column].mean())/df[column].std()
df = df.drop(0,axis=1)
data_3d = df.values.reshape(296,22,5)

In [7]:
num_cities = 292
num_nodes = 22
num_features = 5
hidden_dims = [5,16,32,16,5]
lambda_ = 0.3
A = torch.ones(num_nodes,num_nodes,device='cpu')
S,R = np.where(A==1)
embeddings = []


In [None]:
for i in range(num_cities):
    x = torch.from_numpy(data_3d[i]).float()
    model = GATE(hidden_dims,A,x,S,R)
    optimizer = torch.optim.Adam(model.parameters(),lr=0.0002)
    for epoch in range(100):
        
        optimizer.zero_grad()
        loss ,embedding,attention = model(A,x,R,S)
        loss.backward()
        # 在训练循环中添加梯度检查
        # for name, param in model.named_parameters():
        #     if param.grad is None:
        #         print(f"参数 {name} 无梯度")
        #     else:
        #         print(f"参数 {param.shape} 梯度范数: {param.grad.norm().item():.4f},是否可训练{param.requires_grad}")

        optimizer.step()

        if epoch % 20 == 0:
            print(f"Epoch{epoch}:Loss={loss.item():.4f}")
    embeddings.append(embedding)

In [None]:
data = torch.stack(embeddings)
print(data.shape)
numpy_data = data.detach().numpy()
numpy_data = numpy_data.reshape(292,110)
print(numpy_data.shape)
df = pd.DataFrame(numpy_data)
df.to_excel('embedding.xlsx')

In [67]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.manifold import TSNE
from sklearn.datasets import make_blobs
from sklearn.preprocessing import StandardScaler

In [None]:
embedding_s = torch.tensor(numpy_data,dtype=torch.float32)
model = GATEWithClustering(embedding_s)
cluster_labels = model.cluster_cities()
print(cluster_labels)

In [None]:
class GATEWithClustering():
    def __init__(self,embeddings):
        super().__init__()
        self.cluster_labels = None
        self.cluster_centers = None
        self.lambda_ = 0.01
        self.embeddings = embeddings
    def cluster_cities(self,n_clusters = 3,visualize = True):
        '''
        城市韧性等级聚类(5类)
        参数：
          n_cluters:聚类数量（韧性等级数）
          visualize:是否进行降维可视化
        '''
        #获取节点嵌入
        X_emb =self.embeddings
        
        #使用KMeans聚类
        kmeans = KMeans(n_clusters=n_clusters,random_state = 42,n_init=10)
        self.cluster_labels = kmeans.fit_predict(X_emb)
        self.cluster_centers = kmeans.cluster_centers_
    
        #聚类结果分析
        self._analyze_clusters()

        #可视化嵌入空间
        if visualize and X_emb.shape[1]>2:
            self._visualize_embeddings(X_emb)

        return self.cluster_labels
    
    def _analyze_clusters(self):
        """分析聚类结果"""
        unique,counts = np.unique(self.cluster_labels,return_counts=True)
        print("\n城市韧性等级分布")
        for cls,cnt in zip(unique,counts):
            print(f"等级{cls+1}:{cnt}个城市(占比{cnt/len(self.cluster_labels):.1%})")
        #计算轮廓系数
        from sklearn.metrics import silhouette_score
        sil_score = silhouette_score(self.embeddings.detach().cpu().numpy(),self.cluster_labels)
        print(f"\n轮廓系数:{sil_score:.3f}(越接近1表示聚类效果越好)")

    def _visualize_embeddings(self,X_emb,perplexity=2):
        """使用t-SNE降维可视化"""
        tsne = TSNE(n_components=2,perplexity=perplexity,random_state=42)
        X_tsne = tsne.fit_transform(X_emb)

        plt.figure(figsize=(10,8))
        scatter = plt.scatter(X_tsne[:,0],X_tsne[:,1],
                              c = self.cluster_labels,
                              cmap='viridis',
                              alpha=0.7,
                              edgecolors='w')
        #添加图例和标签
        plt.colorbar(scatter,ticks=range(5),label='韧性等级')
        plt.title("城市韧性嵌入空间可视化(t-SNE)")
        plt.xlabel("t-SNE 1")
        plt.ylabel("t-SNE 2")

        #标记聚类中心
        centers_tsne = tsne.fit_transform(self.cluster_centers)
        plt.scatter(centers_tsne[:,0],centers_tsne[:,1],
                    c='red',marker='X',s=200,
                    edgecolors='k',linewidths=1.5,
                    label='等级中心')
        plt.legend()
        plt.show()