# 1准备数据

In [None]:
import numpy as np 
import pandas as pd 
from sklearn.model_selection import train_test_split
from pprint import pprint
from matplotlib import pyplot as plt
import torch
from torch import nn
import torch.nn.functional as F
from scipy.signal import savgol_filter #滤波
from sklearn.preprocessing import MinMaxScaler  
from torch.utils.data import Dataset,DataLoader
import torchkeras 

In [None]:
from argparse import Namespace
config = Namespace(
    batch_size =64,
    num_workers=0,
    lr=0.001
)

In [None]:

torch.manual_seed(17) #cpu
torch.cuda.manual_seed(17) #gpu
np.random.seed(17) #numpy

In [None]:
class MyDataset(Dataset):
    def __init__(self,filepath):
        self.alldata=pd.read_csv(filepath,header=None)
        self.len=self.alldata.shape[0]
        self.alldata=np.array(self.alldata,dtype='float32')
        self.xdata=torch.from_numpy(self.alldata[:,0:-2])
        self.ydata=torch.from_numpy(self.alldata[:,[-2]])##二分类
    def __getitem__(self,index):
        xx=self.xdata[index]
        lb=savgol_filter(xx, window_length=7, polyorder=2)#Savitzky-Golay 平滑滤波器
        scaler=MinMaxScaler()
        lb=lb.reshape(-1,1)
        lb=scaler.fit_transform(lb)#层归一化
        lb=lb.reshape(1,-1)
        return lb,self.ydata[index]
    def __len__(self):
        return self.len
# 、、、、、、、、、、、、、、、、、、、、
file_path = "../alldata.csv"
dfdata = MyDataset(file_path)

In [None]:

# dfdata.ydata=dfdata.ydata.to(dtype=torch.int64) #使用交叉熵做损失函数时
temp_b= torch.mean(dfdata.xdata, dim=0)
temp_c=torch.std(dfdata.xdata, dim=0)
dfdata.xdata= (dfdata.xdata- temp_b) / temp_c
dftmp, dftest_raw = train_test_split(dfdata, random_state=40, test_size=0.1)
dftrain_raw, dfval_raw = train_test_split(dftmp, random_state=40, test_size=0.2)



In [None]:
#dataloader
dl_train =DataLoader(dftrain_raw, batch_size=config.batch_size, shuffle=True, num_workers=config.num_workers)
dl_val =DataLoader(dfval_raw, batch_size=config.batch_size, shuffle=False, num_workers=config.num_workers)
dl_test =DataLoader(dftest_raw, batch_size=config.batch_size, shuffle=False, num_workers=config.num_workers)

In [None]:
for features,labels in dl_val:
    break
print(features.shape)
print(labels.shape)
print(dl_train.__len__())

In [None]:
# windows操作系统
plt.rcParams['font.sans-serif']=['SimHei']  # 用来正常显示中文标签 
plt.rcParams['axes.unicode_minus']=False  # 用来正常显示负号

# 创建模型

In [None]:
from collections import OrderedDict
# import warnings
# warnings.filterwarnings('ignore', category=UserWarning, module='torch.nn')

class Inception(torch.nn.Module):
    def __init__(self, input_size, filters):
        super(Inception, self).__init__()
#         瓶颈层用于减少维度或保持维度不变以便进行后续操作
# 当stride=1时，padding='SAME'意味着卷积后的输出与输入size保持一致
        self.bottleneck1 = torch.nn.Conv1d(
            in_channels=input_size,
            out_channels=filters,
            kernel_size=1,
            stride=1,
            padding='same',
            bias=False
        )
# 不同的卷积运算与池化操作可以获得输入图像的不同信息，并行处理这些运算并结合所有结果将获得更好的图像表征。        
        self.conv20 = torch.nn.Conv1d(
            in_channels=filters,
            out_channels=filters,
            kernel_size=20,
            stride=1,
            padding='same',
            dilation=4,
            bias=False
        )
        
        self.conv40 = torch.nn.Conv1d(
            in_channels=filters,
            out_channels=filters,
            kernel_size=40,
            stride=1,
            padding='same',
            dilation=4,
            bias=False
        )
        
        self.conv60 = torch.nn.Conv1d(
            in_channels=filters,
            out_channels=filters,
            kernel_size=60,
            stride=1,
            padding='same',
            dilation=4,
            bias=False
        )
        
        self.max_pool = torch.nn.MaxPool1d(
            kernel_size=3,
            stride=1,
            padding=1,
        )
        
        self.bottleneck2 = torch.nn.Conv1d(
            in_channels=input_size,
            out_channels=filters,
            kernel_size=1,
            stride=1,
            padding='same',
            bias=False
        )
#         当input的维度为（N, C）时，BN将对C维归一化；当input的维度为(N, C, L) 时，归一化的维度同样为C维。
        self.batch_norm = torch.nn.BatchNorm1d(
            num_features=4 * filters
        )
        
    def forward(self, x):
        x0 = self.bottleneck1(x)
        x1 = self.conv20(x0)
        x2 = self.conv40(x0)
        x3 = self.conv60(x0)
        x4 = self.bottleneck2(self.max_pool(x))
        y = torch.concat([x1, x2, x3, x4], dim=1)
        y = torch.nn.functional.relu(self.batch_norm(y))
        return y


class Residual(torch.nn.Module):
    def __init__(self, input_size, filters):
        super(Residual, self).__init__()
        
        self.bottleneck = torch.nn.Conv1d(
            in_channels=input_size,
            out_channels=4 * filters,
            kernel_size=1,
            stride=1,
            padding='same',
            bias=False
        )

        self.batch_norm = torch.nn.BatchNorm1d(
            num_features=4 * filters
        )
    
    def forward(self, x, y):
        y = y + self.batch_norm(self.bottleneck(x))
        y = torch.nn.functional.relu(y)
        return y


class Lambda(torch.nn.Module):
    
    def __init__(self, f):
        super(Lambda, self).__init__()
        self.f = f
    
    def forward(self, x):
        return self.f(x)


class InceptionModel(torch.nn.Module):
    def __init__(self, input_size, num_classes, filters, depth):
        super(InceptionModel, self).__init__()

        self.input_size = input_size
        self.num_classes = num_classes
        self.filters = filters
        self.depth = depth
#         self.drop=nn.Dropout(p=0.2)
        modules = OrderedDict()
        
        for d in range(depth):
            modules[f'inception_{d}'] = Inception(
                input_size=input_size if d == 0 else 4 * filters,
                filters=filters,
            )
            if d % 3 == 2:
                modules[f'residual_{d}'] = Residual(
                    input_size=input_size if d == 2 else 4 * filters,
                    filters=filters,
                )
        
        modules['avg_pool'] = Lambda(f=lambda x: torch.mean(x, dim=-1))
        # modules['linear1'] = torch.nn.Linear(in_features=4 * filters, out_features=num_classes)
        modules['linear1'] = torch.nn.Linear(in_features=4 * filters, out_features=filters)
        modules['linear2'] = torch.nn.Linear(in_features=filters, out_features=num_classes)
#         modules['linear3'] = torch.nn.Linear(in_features=filters, out_features=num_classes)
        self.model = torch.nn.Sequential(modules)

    def forward(self, x):
        for d in range(self.depth):
            y = self.model.get_submodule(f'inception_{d}')(x if d == 0 else y)
            if d % 3 == 2:
                y = self.model.get_submodule(f'residual_{d}')(x, y)
                x = y
        y = self.model.get_submodule('avg_pool')(y)
        y = self.model.get_submodule('linear1')(y)
        y = self.model.get_submodule('linear2')(y)
#         y = F.relu(self.drop(self.model.get_submodule('linear1')(y)))
        # y = F.relu(self.drop(self.model.get_submodule('linear2')(y)))
#         y = self.model.get_submodule('linear3')(y)
        return y

In [None]:
class AUC(nn.Module):
    'approximate AUC calculation for binary-classification task'
    def __init__(self):
        super().__init__()
        self.tp = nn.Parameter(torch.zeros(10001),requires_grad=False)
        self.fp = nn.Parameter(torch.zeros(10001),requires_grad=False)
        
    def eval_auc(self,tp,fp):
        tp_total = torch.sum(tp)
        fp_total = torch.sum(fp)
        length = len(tp)
        tp_reverse = tp[range(length-1,-1,-1)]
        tp_reverse_cum = torch.cumsum(tp_reverse,dim=0)-tp_reverse/2.0
        fp_reverse = fp[range(length-1,-1,-1)]
        
        auc = torch.sum(torch.true_divide(tp_reverse_cum,tp_total)
                        *torch.true_divide(fp_reverse,fp_total))
        return auc
        
    def forward(self, preds: torch.Tensor, targets: torch.Tensor):
        y_pred = (10000*torch.sigmoid(preds)).reshape(-1).type(torch.int)
        y_true = targets.reshape(-1)
        
        tpi = self.tp-self.tp
        fpi = self.fp-self.fp
        assert y_pred.shape == y_true.shape
        for i,label in enumerate(y_true):
            if label>=0.5:
                tpi[y_pred[i]]+=1.0
            else:
                fpi[y_pred[i]]+=1.0
        self.tp+=tpi
        self.fp+=fpi
        return self.eval_auc(tpi,fpi)
          
    def compute(self):
        return self.eval_auc(self.tp,self.fp)
    
    def reset(self):
        self.tp-=self.tp
        self.fp-=self.fp

In [None]:
from torchkeras import summary

summary(net,input_data=features);

# 训练集、测试集评估

# 验证集参数计算

In [None]:
#加载网络
net2 = InceptionModel(
                input_size=1,
                num_classes=1,
                filters=32,
                depth=6,
            )
net2.load_state_dict(torch.load('checkpoint_two_inception'))

support：当前行的类别在测试数据中的样本总量，如上表就是，在class 0 类别在测试集中总数量为1；<br>
precision：精度=正确预测的个数(TP)/被预测正确的个数(TP+FP)；人话也就是模型预测的结果中有多少是预测正确的<br>
ecall:召回率=正确预测的个数(TP)/预测个数(TP+FN)；人话也就是某个类别测试集中的总量，有多少样本预测正确了；<br>
f1-score:F1 = 2*精度*召回率/(精度+召回率)<br>
micro avg：计算所有数据下的指标值，假设全部数据 5 个样本中有 3 个预测正确，所以 micro avg 为 3/5=0.6<br>
macro avg：每个类别评估指标未加权的平均值，比如准确率的 macro avg，(0.50+0.00+1.00)/3=0.5<br>
weighted avg：加权平均，就是测试集中样本量大的，给他设置的权重大点；比如第一个值的计算方法，(0.50*1 + 0.0*1 + 1.0*3)/5 = 0.70。更好点

In [None]:
from torchkeras import KerasModel 
from torchkeras.metrics import Accuracy
from torchkeras.metrics import Precision
from torchkeras.metrics import Recall

# from torchkeras.kerascallbacks import WandbCallback

loss_fn = nn.BCEWithLogitsLoss()
optimizer= torch.optim.Adam(net2.parameters(),lr=config.lr)
metric_dict = {"acc":Accuracy(),"pre":Precision(),"recall":Recall(),"auc":AUC()}
model = KerasModel(net2,
                   loss_fn = loss_fn,
                   metrics_dict= metric_dict,
                   optimizer = optimizer
                  )   



In [None]:
model.evaluate(dl_test)

# 评估矩阵

In [None]:
from tqdm import tqdm 
from sklearn.metrics import classification_report
net2 = net2.cpu()
net2.eval()
preds = []
believer=[]
believer2=[]
ytrue=[]
with torch.no_grad():
    for batch in tqdm(dl_test):
#         preds.append(net(batch))
        inputs,labels=batch
        x=F.sigmoid(net2(inputs))
#         print(x)
        believer2.extend(x.tolist())
        x=torch.where(x > 0.5, torch.ones_like(x),torch.zeros_like(x))
        believer.extend(x.squeeze(1).tolist())
        preds.extend(x.squeeze(1).tolist())
        ytrue.extend(labels.squeeze(1).tolist())
    pprint(len(preds)) 
    pprint(len(ytrue))
#     pprint(preds) 
#     pprint(yhat)
print(classification_report(ytrue,preds))

In [None]:
import seaborn as sns
from sklearn.metrics import confusion_matrix

# 计算混淆矩阵
cm = confusion_matrix(ytrue,preds)
# 将混淆矩阵转换为DataFrame
df_cm = pd.DataFrame(cm, index=['True {}'.format(i) for i in range(cm.shape[0])],columns=['Predicted {}'.format(i) for i in range(cm.shape[1])])
pprint(df_cm)
pprint(cm)


In [None]:
#2# 使用matplotlib
plt.figure(figsize=(5, 5))
ax=plt.matshow(cm , cmap=plt.cm.Blues,fignum=1, aspect='auto') 
cbar=plt.colorbar(ax, fraction=0.05, pad=0.04)
cbar.ax.tick_params(labelsize=13)
for i in range(len(cm )): 
    for j in range(len(cm )):
        plt.annotate(cm [i,j], xy=(i, j), horizontalalignment='center', verticalalignment='center',fontsize=20)
plt.ylabel('真实值标签',fontsize=15)
plt.xlabel('预测值标签',fontsize=15) 
plt.show()

pr曲线

In [None]:
from sklearn.metrics import precision_recall_curve

precision, recall, threshold = precision_recall_curve(ytrue, b3)
fig = plt.figure(figsize=(8, 5))
plt.plot(precision, recall, label='label:1')

plt.xlabel('召回率', fontsize=17)
plt.ylabel('精确率', fontsize=17)
plt.tick_params(axis='both', labelsize=15)
plt.legend(fontsize=15)

In [None]:
AP = average_precision_score(ytrue, believer2, average='weighted')
AP

In [None]:
#PR
import math
result_matrix = [[math.exp(element) for element in row] for row in believer2] 


In [None]:
# 1二分类标注
# # df = pd.DataFrame()
tempp=np.array(ytrue)
y_test =(tempp==0)
# #2二分类预测置信度
tempr=np.array(believer2)
y_score = tempr[:,0]
print(y_score)
# #3计算ap
# from sklearn.metrics import precision_recall_curve
# from sklearn.metrics import average_precision_score
precision, recall, thresholds = precision_recall_curve(ytrue, y_score)
AP = average_precision_score(y_test, y_score, average='weighted')
AP

In [None]:
from matplotlib import colors as mcolors
import random
random.seed(124)
colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k', 'tab:blue', 'tab:orange', 'tab:green', 'tab:red', 'tab:purple', 'tab:brown', 'tab:pink', 'tab:gray', 'tab:olive', 'tab:cyan', 'black', 'indianred', 'brown', 'firebrick', 'maroon', 'darkred', 'red', 'sienna', 'chocolate', 'yellow', 'olivedrab', 'yellowgreen', 'darkolivegreen', 'forestgreen', 'limegreen', 'darkgreen', 'green', 'lime', 'seagreen', 'mediumseagreen', 'darkslategray', 'darkslategrey', 'teal', 'darkcyan', 'dodgerblue', 'navy', 'darkblue', 'mediumblue', 'blue', 'slateblue', 'darkslateblue', 'mediumslateblue', 'mediumpurple', 'rebeccapurple', 'blueviolet', 'indigo', 'darkorchid', 'darkviolet', 'mediumorchid', 'purple', 'darkmagenta', 'fuchsia', 'magenta', 'orchid', 'mediumvioletred', 'deeppink', 'hotpink']
markers = [".",",","o","v","^","<",">","1","2","3","4","8","s","p","P","*","h","H","+","x","X","D","d","|","_",0,1,2,3,4,5,6,7,8,9,10,11]
linestyle = ['--', '-.', '-']
def get_line_arg():
    '''
    随机产生一种绘图线型
    '''
    line_arg = {}
    line_arg['color'] = random.choice(colors)
    # line_arg['marker'] = random.choice(markers)
    line_arg['linestyle'] = random.choice(linestyle)
    line_arg['linewidth'] = random.randint(1, 4)
    # line_arg['markersize'] = random.randint(3, 5)
    return line_arg

In [None]:
get_line_arg()

In [None]:
classes=[0,1]


In [None]:
plt.figure(figsize=(8, 5))
plt.xlim([-0.01, 1.0])
plt.ylim([0.0, 1.01])
# plt.plot([0, 1], [0, 1],ls="--", c='.3', linewidth=3, label='随机模型')
# plt.xlabel('False Positive Rate (1 - Specificity)')
# plt.ylabel('True Positive Rate (Sensitivity)')
plt.xlabel('假正率',fontsize=17)
plt.ylabel('真正率',fontsize=17)
plt.tick_params(axis='both', labelsize=15)
plt.rcParams['font.size'] = 22
# plt.grid(True)

auc_list = []
# for each_class in classes:
#     y_test = list((tempp == each_class))
#     y_score = list(tempr[:,each_class])
#     fpr, tpr, threshold = roc_curve(y_test, y_score)
#     plt.plot(fpr, tpr, **get_line_arg(), label='class'+str(each_class))
#     plt.legend()
#     auc_list.append(auc(fpr, tpr))
fpr, tpr, threshold = roc_curve(ytrue,believer2)
plt.plot(fpr, tpr, label='label:1')
plt.legend()
auc_list.append(auc(fpr, tpr))
plt.legend(loc='best', fontsize=15)
# plt.savefig('各类别ROC曲线.pdf'.format(), dpi=120, bbox_inches='tight')
plt.show()

In [None]:
from sklearn.metrics import roc_curve, auc
auc(fpr, tpr)

In [None]:
#保存1
report=classification_report(ytrue,preds,target_names=classes, output_dict=True)
del report['accuracy']
df_report = pd.DataFrame(report).transpose()

df_report

In [None]:
auc_list

In [None]:
# 计算 AUC值 的 宏平均 和 加权平均
macro_avg_auc = np.mean(auc_list)
weighted_avg_auc = sum(auc_list * df_report.iloc[:-2]['support'] / len(ytrue))
auc_list.append(macro_avg_auc)
auc_list.append(weighted_avg_auc)
df_report['AUC'] = auc_list
df_report

In [None]:
df_report.to_csv('各类别准确率评估指标.csv', index_label='类别')
# df_report.to_csv('各类别准确率评估指标.csv', index=False)

# t-sne

In [None]:
from tqdm import tqdm 
for batch in tqdm(dl_test):
    inputs,labels=batch
    print(inputs.shape)

In [None]:
#加载网络

for name, module in net2.named_modules():
    print('modules:', name)

In [None]:

#抽取中间层
from torchvision.models.feature_extraction import create_feature_extractor
model_trunc = create_feature_extractor(net2, return_nodes={'model.linear1': 'semantic_feature'})

In [None]:
a=torch.randn([1,1,5000])
pred_logits = model_trunc(a) 
pred_logits['semantic_feature'].squeeze().detach().numpy().shape

In [None]:
from tqdm import tqdm 
# preds = []
# 抽取层的输出维度
preds =torch.empty((0, 32))
ll=torch.empty(0)
# print(preds.shape)
for batch in tqdm(dl_test):
    inputs,labels=batch
#     print(inputs.shape)
    for a in inputs:
#         print(a.shape)
        a=a.unsqueeze(1)
#         print(a.shape)
        pred_logits=model_trunc(a)
        tmp=pred_logits['semantic_feature'].detach()
        preds=torch.cat((preds, tmp), dim=0)
    ll=torch.cat((ll,labels), dim=0)
# preds=torch.tensor(preds)
print(preds.shape)
print(ll.shape)
tsne_in=np.array(preds)
ll_in=np.array(ll)

In [None]:
# 降维到二维和三维
from sklearn.manifold import TSNE
tsne = TSNE(n_components=2, init='pca', random_state=1)
X_tsne_2d = tsne.fit_transform(tsne_in)
print(X_tsne_2d.shape)

In [None]:
#二维平面可视化
import seaborn as sns
marker_list = ['.', ',', 'o', 'v', '^', '<', '>', '1', '2', '3', '4', '8', 's', 'p', 'P', '*', 'h', 'H', '+', 'x', 'X', 'D', 'd', '|', '_', 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
class_list = np.array([0,1] ,dtype='float32')
class_list

In [None]:
n_class = len(class_list) # 测试集标签类别数
palette = sns.hls_palette(n_class) # 配色方案
sns.palplot(palette)

In [None]:
plt.figure(figsize=(8, 5))
for idx, typee in enumerate(class_list): # 遍历每个类别
    # 获取颜色和点型
    color = palette[idx]
    marker = marker_list[idx%len(marker_list)]

    # 找到所有标注类别为当前类别的图像索引号
    indices = np.where(ll_in==typee)
    plt.scatter(X_tsne_2d[indices, 0], X_tsne_2d[indices, 1], color=color, marker=marker, label='label:'+str(typee), s=10)

plt.legend(fontsize=13, markerscale=2, bbox_to_anchor=(1, 1))#2倍原图例大小，右上角
plt.xticks([])
plt.yticks([])
plt.savefig('多维语义特征t-SNE二维降维可视化.pdf', dpi=300) # 保存图像
plt.show()

In [None]:
#二维交互可视化
import plotly.express as px
df_2d = pd.DataFrame()
df_2d['X'] = list(X_tsne_2d[:, 0].squeeze())
df_2d['Y'] = list(X_tsne_2d[:, 1].squeeze())
df_2d['标注类别名称'] = ll_in
df_2d.to_csv('t-SNE-2D.csv', index=False)

In [None]:
fig = px.scatter(df_2d, 
                 x='X', 
                 y='Y',
                 color='标注类别名称', 
                 labels='标注类别名称',
                 symbol='标注类别名称', 
                 opacity=0.8,
                 width=1000, 
                 height=600
                )
# 设置排版
fig.update_layout(margin=dict(l=0, r=0, b=0, t=0))
fig.show()
fig.write_html('语义特征t-SNE二维降维plotly可视化.html')

In [None]:
df_3d

In [None]:
show_feature = '标注类别名称'

In [None]:

import plotly.express as px
fig = px.scatter_3d(df_3d, 
                    x='X', 
                    y='Y', 
                    z='Z',
                    color='X', 
#                     labels=show_feature,
#                     symbol=show_feature, 
                    opacity=0.6,
                    width=800, 
                    height=580)

# 设置排版
fig.update_layout(margin=dict(l=0, r=0, b=0, t=0))
fig.show()
fig.write_html('语义特征t-SNE三维降维plotly可视化.html')