In [1]:
import os
import pandas
from openpyxl import Workbook
from datetime import datetime
from tensorboardX import SummaryWriter

import numpy as np
import sklearn as sklearn
from sklearn.model_selection import train_test_split
from sklearn.model_selection import cross_val_score
from sklearn import svm
from sklearn.naive_bayes import *
from sklearn.model_selection import ShuffleSplit
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_auc_score
from sklearn.preprocessing import StandardScaler

import torch
import torch.nn as nn
import torchvision
from torch.utils.data import DataLoader, Dataset
import torch.autograd as autograd
import torch.optim as optim
import torch.nn.functional as F
from torch.utils import data


# Class Defined by the Project
from config import Config

In [12]:
def read_data_file():
    data_filename = 'ES1.csv'
    if not os.path.exists(data_filename):
        print("Error: Missing file \'%s\'" % data_filename)
        return
    #pandas 读取csv文件：向量维度：1*1*8 ,列表用法
    test_data = pandas.read_csv(data_filename, names=["Flag", "L1", "L2", "L3", "L4", "R1", "R2", "R3", "R4"])
    return test_data


def data_prepare():
    value_data = read_data_file()
    value_data = value_data.values
    print(value_data.shape)

    label = value_data[:, 0] #取第一列元素
    datav = value_data[:, 1:] #取第一列剩余的元素
    scaler = StandardScaler()
    scaler.fit(datav)
    datas = scaler.transform(datav) #为什么要用transform

    x_train, x_test, y_train, y_test = train_test_split(datas, label, test_size=0.2, random_state=2021, stratify=label)
    x_verify = x_test.copy()
    y_verify = y_test.copy()

    print("Train dim =")
    print(x_train.shape)
    print("Train Label dim =")
    print(y_train.shape)
    print("Verify dim =")
    print(x_verify.shape)
    print("Verify Label dim =")
    print(y_verify.shape)
    print("Test dim =")
    print(x_test.shape)
    print("Test Label dim =")
    print(y_test.shape)

    return x_train, x_test, y_train, y_test, x_verify, y_verify


class OneDimData(Dataset):  # 继承Dataset
    def __init__(self, datav, label, transform=None):  # __init__是初始化该类的一些基础参数

        self.data = datav
        self.label = label
        self.transform = transform

    def __len__(self):  # 返回整个数据集的大小
        return len(self.data)

    def __getitem__(self, index):  # 根据索引index返回dataset[index]
        sample = self.data[index, :]  # 根据索引index获取
        sampleflag = self.label[index]
        if self.transform:
            sample = self.transform(sample)  # 对样本进行变换

        return sample.astype(np.float32), sampleflag.astype(np.int64)  # 返回该样本



In [13]:
#实现的是C*W*1的从池化到软阈值的实现过程，并不是整个模块。
#but the question is : I just have 1_D vector,if I need to rectify the net and do other things? 
#I think I should just try this method first,then consider other things.
#输入参数只是x,需要的gap_size=1 和channel=1 已经设定好
class Shrinkage(nn.Module):
    def __init__(self, gap_size=1,channel=1):
        super(Shrinkage, self).__init__()
        self.gap = nn.AdaptiveAvgPool2d(gap_size) #output size=gap_size*gap_size
        self.fc = nn.Sequential(
            nn.Linear(channel, channel),
            nn.BatchNorm1d(channel),
            nn.ReLU(inplace=True),
            nn.Linear(channel, 1),# 可能应该是nn.Linear(channel, channel)
            nn.Sigmoid(),
        )
    def forward(self, x):
        x_raw = x #x_raw=1*w*1
        x = torch.abs(x) #取绝对值
        x_abs = x #x_abs 副本=1*w*1
        x = self.gap(x) #池化 output_size=1*1*1
        x = torch.flatten(x, 1) # 
        average = x
        x = self.fc(x)
        x = torch.mul(average, x) #逐元素相乘
        x = x.unsqueeze(2).unsqueeze(2)
        # 软阈值化
        sub = x_abs - x #x就是keras代码中的阈值thres
        zeros = sub - sub
        n_sub = torch.max(sub, zeros)
        x = torch.mul(torch.sign(x_raw), n_sub) 
        return x


In [4]:
# net=DRSN()
# img = torch.randn(32,1, 1, 8)
# preds = net(img) 
# print(preds.shape)

In [14]:
#需要的参数 cin=1,cout=1，已经设定好，不再需要其他参数
class DRSNBlock(nn.Module):
    def __init__(self,cin=1,cout=1):
        super(DRSNBlock, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(cin, cout, 1), # in_channels, out_channels, kernel_size
            nn.BatchNorm2d(cout,eps=1e-5, momentum=0.01, affine=True),
            nn.ReLU(inplace=True))

        self.conv2 = nn.Sequential(
            nn.Conv2d(cin, cout, 1), # in_channels, out_channels, kernel_size
            nn.BatchNorm2d(cout,eps=1e-5, momentum=0.01, affine=True),
            nn.ReLU(inplace=True))
        
        self.shrink=Shrinkage()
        self.relu=nn.ReLU(inplace=True)
        
    def forward(self, x):

        identity = x
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.shrink(x)
        x = identity + x
        x = self.relu(x)
        return x
    


In [6]:
# net=DRSNBlock()
# img = torch.randn(32,1, 1, 8)
# preds = net(img) 
# print(preds.shape)

In [15]:
#需要的参数 cin=1,cout=1，已经设定好，不再需要其他参数
class DRSN(nn.Module):
    def __init__(self,cin=1,cout=1):
        super(DRSN, self).__init__()
        self.block=DRSNBlock()
        self.dropout=nn.Dropout(0.5)
        #数字由计算得出
        self.linear1=nn.Linear(8, 16)
        self.linear2=nn.Linear(16,2)
        
    def forward(self, x):
        if not torch.is_tensor(x):
            x=torch.from_numpy(x)
        batch = x.shape[0]
        x=torch.reshape(x,(batch,1,1,-1))        
        x=self.block(x)
        x=self.block(x)
        x=self.block(x)
        x=self.linear1(x)
        x=self.linear2(x)
        x=torch.reshape(x,(batch,-1))
        return x
    
    def initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                torch.nn.init.xavier_normal_(m.weight.data)
                if m.bias is not None:
                    m.bias.data.zero_()
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):
                torch.nn.init.normal_(m.weight.data, 0, 0.01)
                m.bias.data.zero_()
    

    


In [6]:
# net=DRSN()
# img = torch.randn(32,1, 1, 8)
# preds = net(img) 
# print(preds.shape)

In [16]:
def es_pred_disease():
    x_train, x_test, y_train, y_test, x_verify, y_verify = data_prepare()

    # Hyper Parameters
    max_epoch = 100  # 训练整批数据多少次
    batch_size = 32
    lr_init = 0.001  # 学习率

    xtrain = OneDimData(x_train, y_train)
    xverify = OneDimData(x_verify, y_verify)
    xtest = OneDimData(x_test, y_test)
    train_loader = torch.utils.data.DataLoader(dataset=xtrain, batch_size=batch_size, shuffle=True)
    test_loader = torch.utils.data.DataLoader(dataset=xtest, batch_size=batch_size, shuffle=True)
    cnn = DRSN()

    loss_function = nn.CrossEntropyLoss()  # 选择损失函数
    optimizer = optim.SGD(cnn.parameters(), lr=lr_init, momentum=0.9, dampening=0.1)  # 选择优化器
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1)  # 设置学习率下降策略

    now_time = datetime.now()
    time_str = datetime.strftime(now_time, '%m-%d_%H-%M-%S')
    writer = SummaryWriter(log_dir="./")

    max_acc=0
    
    for epoch in range(max_epoch):

        loss_sigma = 0.0  # 记录一个epoch的loss之和
        correct = 0.0
        total = 0.0
        
        #训练模式
        for step, (b_x, b_y) in enumerate(train_loader):  #for j ,(in,label)
            optimizer.zero_grad()
            outputs = cnn(b_x)
            loss = loss_function(outputs, b_y)
            loss.backward()
            optimizer.step()

            _, predicted = torch.max(outputs.data, 1)
            total += b_y.size(0)
            correct += (predicted == b_y).squeeze().sum().numpy()
            loss_sigma += loss.item()

            if step % 10 == 5:
                loss_avg = loss_sigma / 10
                loss_sigma = 0.0
                print("Training: Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.4%}".format(
                    epoch + 1, max_epoch, step + 1, len(train_loader), loss_avg, correct / total))

        scheduler.step()  # 更新学习率
        loss_sigma = 0.0
        cls_num = 2
        conf_mat = np.zeros([cls_num, cls_num])  # 混淆矩阵
        
        #评估模式
        cnn.eval()
        outputs = cnn(x_verify.astype(np.float32))
        outputs.detach()

        y2 = torch.from_numpy(y_verify.astype(np.int64))
        y2 = torch.reshape(y2,(-1,1))
        y2 = y2.squeeze_()
        loss = loss_function(outputs,y2)
        loss_sigma += loss.item()

        cnn.train()
        
        test_out = outputs
        predict_y = torch.argmax(test_out, 1).data.numpy()
        current_acc = sum(predict_y == y_test) * 100 / len(predict_y)
        if current_acc > max_acc:
            max_acc = current_acc
            max_test = predict_y
            net_save_path = os.path.join(r'E:\Data\code\Mr.ma\model_save', 'model_best.pt')
            torch.save(cnn, net_save_path)

        print('Epoch={} -  Test set Accuracy:{:.4%}'.format( epoch,  current_acc/100.0 ))

    print('finished training')
    print('ACC= %.4f%%' % max_acc)
    print(type(max_test))
#     print(max_test)
#     print(y_test)

    a=max_test-y_test
    TP=0
    TN=0
    FP=0
    FN=0
    for i in range (41):
        if a[i]==1:
            FN+=1
        if a[i]==-1:
            FP+=1
        TP=22-FN
        TN=19-FP
    print("TP= ",TP)
    print("FN= ",FN)
    print("FP= ",FP)
    print("TN= ",TN)
            
        
    

es_pred_disease()


(203, 9)
Train dim =
(162, 8)
Train Label dim =
(162,)
Verify dim =
(41, 8)
Verify Label dim =
(41,)
Test dim =
(41, 8)
Test Label dim =
(41,)
Training: Epoch[001/100] Iteration[006/006] Loss: 0.4308 Acc:48.7654%
Epoch=0 -  Test set Accuracy:46.3415%
Training: Epoch[002/100] Iteration[006/006] Loss: 0.4227 Acc:49.3827%
Epoch=1 -  Test set Accuracy:48.7805%
Training: Epoch[003/100] Iteration[006/006] Loss: 0.4142 Acc:52.4691%
Epoch=2 -  Test set Accuracy:53.6585%
Training: Epoch[004/100] Iteration[006/006] Loss: 0.4124 Acc:55.5556%
Epoch=3 -  Test set Accuracy:63.4146%
Training: Epoch[005/100] Iteration[006/006] Loss: 0.4018 Acc:53.0864%
Epoch=4 -  Test set Accuracy:65.8537%
Training: Epoch[006/100] Iteration[006/006] Loss: 0.4058 Acc:58.6420%
Epoch=5 -  Test set Accuracy:70.7317%
Training: Epoch[007/100] Iteration[006/006] Loss: 0.4031 Acc:58.6420%
Epoch=6 -  Test set Accuracy:56.0976%
Training: Epoch[008/100] Iteration[006/006] Loss: 0.4080 Acc:54.9383%
Epoch=7 -  Test set Accuracy:56

Training: Epoch[076/100] Iteration[006/006] Loss: 0.3335 Acc:75.3086%
Epoch=75 -  Test set Accuracy:82.9268%
Training: Epoch[077/100] Iteration[006/006] Loss: 0.3548 Acc:72.8395%
Epoch=76 -  Test set Accuracy:82.9268%
Training: Epoch[078/100] Iteration[006/006] Loss: 0.3674 Acc:72.2222%
Epoch=77 -  Test set Accuracy:80.4878%
Training: Epoch[079/100] Iteration[006/006] Loss: 0.3331 Acc:74.0741%
Epoch=78 -  Test set Accuracy:82.9268%
Training: Epoch[080/100] Iteration[006/006] Loss: 0.3552 Acc:73.4568%
Epoch=79 -  Test set Accuracy:82.9268%
Training: Epoch[081/100] Iteration[006/006] Loss: 0.3645 Acc:73.4568%
Epoch=80 -  Test set Accuracy:82.9268%
Training: Epoch[082/100] Iteration[006/006] Loss: 0.3545 Acc:73.4568%
Epoch=81 -  Test set Accuracy:75.6098%
Training: Epoch[083/100] Iteration[006/006] Loss: 0.3572 Acc:76.5432%
Epoch=82 -  Test set Accuracy:75.6098%
Training: Epoch[084/100] Iteration[006/006] Loss: 0.3444 Acc:71.6049%
Epoch=83 -  Test set Accuracy:75.6098%
Training: Epoch[085

In [8]:
print(type(max_test))

NameError: name 'max_test' is not defined

In [9]:
# help(nn.AdaptiveAvgPool2d)

In [10]:
    for epoch in range(max_epoch):

        loss_sigma = 0.0  # 记录一个epoch的loss之和
        correct = 0.0
        total = 0.0

        for step, (b_x, b_y) in enumerate(train_loader):
            optimizer.zero_grad()
            outputs = cnn(b_x)
            loss = loss_function(outputs, b_y)
            loss.backward()
            optimizer.step()

            _, predicted = torch.max(outputs.data, 1)
            total += b_y.size(0)
            correct += (predicted == b_y).squeeze().sum().numpy()
            loss_sigma += loss.item()

            if step % 10 == 5:
                loss_avg = loss_sigma / 10
                loss_sigma = 0.0
                print("Training: Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.4%}".format(
                    epoch + 1, max_epoch, step + 1, len(train_loader), loss_avg, correct / total))

        scheduler.step()  # 更新学习率
        loss_sigma = 0.0
        cls_num = 2
        conf_mat = np.zeros([cls_num, cls_num])  # 混淆矩阵
        
        cnn.eval()
        outputs = cnn(x_verify.astype(np.float32))
        outputs.detach()

        y2 = torch.from_numpy(y_verify.astype(np.int64))
        y2 = torch.reshape(y2,(-1,1))
        y2 = y2.squeeze_()
        loss = loss_function(outputs,y2)
        loss_sigma += loss.item()

        cnn.train()
        
        test_out = outputs
        predict_y = torch.argmax(test_out, 1).data.numpy()
        current_acc = sum(predict_y == y_test) * 100 / len(predict_y)
        if current_acc > max_acc:
            max_acc = current_acc
            max_test = predict_y

        print('Epoch={} -  Test set Accuracy:{:.4%}'.format( epoch,  current_acc/100.0 ))

    print('finished training')
    print('ACC= %.4f%%' % max_acc)
    print(max_test)
    print(y_test)
    print(max_test - y_test)



NameError: name 'max_epoch' is not defined