In [1]:
#读取数据
import pandas as pd
data_path = 'data.csv'
data= pd.read_csv(data_path)[1:]

In [2]:
data

Unnamed: 0,gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
1,female,group C,some college,standard,completed,69,90,88
2,female,group B,master's degree,standard,none,90,95,93
3,male,group A,associate's degree,free/reduced,none,47,57,44
4,male,group C,some college,standard,none,76,78,75
5,female,group B,associate's degree,standard,none,71,83,78
...,...,...,...,...,...,...,...,...
995,female,group E,master's degree,standard,completed,88,99,95
996,male,group C,high school,free/reduced,none,62,55,55
997,female,group C,high school,free/reduced,completed,59,71,65
998,female,group D,some college,standard,completed,68,78,77


In [3]:
from sklearn.preprocessing import LabelEncoder
labelencoder = LabelEncoder()
for col in data.columns:
    data[col] = labelencoder.fit_transform(data[col])
data

Unnamed: 0,gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
1,0,2,4,1,0,49,62,64
2,0,1,3,1,1,70,67,69
3,1,0,0,0,1,27,29,20
4,1,2,4,1,1,56,50,51
5,0,1,0,1,1,51,55,54
...,...,...,...,...,...,...,...,...
995,0,4,3,1,0,68,70,71
996,1,2,2,0,1,42,27,31
997,0,2,2,0,0,39,43,41
998,0,3,4,1,0,48,50,53


In [4]:
import torch
import torch.nn as nn
import torchvision
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import DataLoader,TensorDataset
from tqdm.autonotebook import tqdm
from torch.utils.tensorboard import SummaryWriter
import random
import pdb

In [5]:
class LinearLayer(nn.Module):
    def __init__(self, in_f, out_f, activate=2):
        super().__init__()
        a = [torch.nn.Sigmoid(),torch.nn.Tanh(),torch.nn.LeakyReLU()]
        net = [
            nn.Linear(in_f, out_f),
            a[activate],
            nn.BatchNorm1d(out_f)
        ]
        self.net = nn.Sequential(*net)
    def forward(self, x):
        x = self.net(x)
        return x

# def MLP_model(num_layers, hidden_dim,activate):
#     a = [torch.nn.Sigmoid(),torch.nn.Tanh(),torch.nn.LeakyReLU()]
#     layers = []
#     for i in range(num_layers):
#         layers.append(LinearLayer(hidden_dim[i], hidden_dim[i+1]))
#         layers.append(a[activate])
#         layers.append(nn.BatchNorm1d(hidden_dim[i+1]))
#         if i < num_layers - 1:
#             layers.append(nn.Dropout(0.2))
#     #layers.append(nn.Softmax(dim=1))
#     return nn.Sequential(*layers)

class MLP_model(nn.Module):
    def __init__(self, nomask_dims, mask_dims, share_dims, activate=2):
        super().__init__()
        a = [torch.nn.Sigmoid(),torch.nn.Tanh(),torch.nn.LeakyReLU()]
        
        nomask_layers = []
        for i in range(len(nomask_dims) - 1):
            nomask_layers.append(LinearLayer(nomask_dims[i], nomask_dims[i + 1], activate))
        self.nomask_layers = nn.Sequential(*nomask_layers)
        
        mask_layers = []
        for i in range(len(mask_dims) - 1):
            mask_layers.append(LinearLayer(mask_dims[i], mask_dims[i + 1], activate))
        self.mask_layers = nn.Sequential(*mask_layers)
        
        share_layers = []
        share_layers.append(LinearLayer(mask_dims[-1] + nomask_dims[-1], share_dims[0], activate))
        for i in range(len(share_dims) - 1):
            share_layers.append(LinearLayer(share_dims[i], share_dims[i + 1], activate))
        self.share_layers = nn.Sequential(*share_layers)
    
    def forward(self, nomask_input, mask_input):
        x = self.nomask_layers(nomask_input)
        mask = torch.ones_like(mask_input).to(mask_input.device)
        mask_type = [[0],[1],[2],[0,1],[0,2],[1,2],[0,1,2]]
        for i in range(mask.shape[0]):
            mask[i, mask_type[random.randint(0, 6)]] = 1
        mask = torch.randint(2, mask_input.shape).to(mask_input.device)
        y = self.mask_layers(mask_input * mask)
        mid = torch.cat((x, y), dim=-1)
        out = self.share_layers(mid)
        return out
        

def init_weights(m):
    if type(m) == nn.Linear:
        nn.init.normal_(m.weight, std=0.01)

In [6]:
def read_data():
    data=pd.read_csv(data_path)
    labelencoder = LabelEncoder()
    for col in data.columns:
        data[col] = labelencoder.fit_transform(data[col])
    data.head()
    np.save("data", data[1:])


def get_dataset(data):   
    train_dataset = []
    test_dataset = []
    for i in range(data.shape[0]):
        if i < data.shape[0] * 0.8:
            train_dataset.append(data[i])
        else:
            test_dataset.append(data[i])

    train_dataset = TensorDataset(torch.cat(train_dataset).reshape(-1, 8))
    test_dataset = TensorDataset(torch.cat(test_dataset).reshape(-1, 8))
    return train_dataset, test_dataset


def train():
    writer = SummaryWriter('log')
    model.train()
    optim = torch.optim.Adam(model.parameters(), lr=learning_rate)
    loss_fn = torch.nn.CrossEntropyLoss()    
    
    train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
    
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optim, mode='min', factor=0.5, patience=2000,
                                                               verbose=True, threshold=0.002, threshold_mode='rel',
                                                               cooldown=2000)
    
    step = 0
    pbar = tqdm(range(epoch), initial=0, dynamic_ncols=True, smoothing=0.01)
    for i in pbar:
        # break
        for j, batch in enumerate(train_loader):            
            optim.zero_grad()
            # input = batch[:, 2:15].float()
            # pdb.set_trace()
            nomask_input = batch[0][:, 0:5].float().to(device)
            mask_input = batch[0][:, 5:8].float().to(device)
            ground_truth = ((batch[0][:,5] + batch[0][:, 6] + batch[0][:, 7]) // 100).long().to(device)
            out_put = model(nomask_input, mask_input)
            loss = loss_fn(out_put, ground_truth)
            loss.backward()
            optim.step()
            scheduler.step(loss)
            
            writer.add_scalar('loss', loss, step)
            
            pbar.set_description(
                        (
                            f'iter: {step} loss: {loss:.4f}'
                        )
                    )
            step += 1
    torch.save(model, f"model/model.pth")

def test():
    model=torch.load("model/model.pth").to(torch.device('cuda:0'))
    model.eval()
    
    print("训练集输出结果:")
    a = np.zeros(shape=(3, 3))
    dataloader = DataLoader(train_dataset, batch_size=1, shuffle=True)
    for i, batch in enumerate(dataloader):
        nomask_input = batch[0][:, 0:5].float().to(device)
        mask_input = batch[0][:, 5:8].float().to(device)
        out_put = model(nomask_input, mask_input)
        a[int((batch[0][0][5] + batch[0][0][6] + batch[0][0][7]) // 100), torch.argmax(out_put, dim=1)] += 1
    print(a)
    print(f'准确率{(a[0][0] + a[1][1] + a[2][2]) / len(train_dataset)}')
    
    print("测试集输出结果:")
    a = np.zeros(shape=(3, 3))
    dataloader = DataLoader(test_dataset, batch_size=1, shuffle=True)
    for i, batch in enumerate(dataloader):
        nomask_input = batch[0][:, 0:5].float().to(device)
        mask_input = batch[0][:, 5:8].float().to(device)
        out_put = model(nomask_input, mask_input)
        a[int((batch[0][0][5] + batch[0][0][6] + batch[0][0][7]) // 100), torch.argmax(out_put, dim=1)] += 1
    print(a)
    print(f'准确率{(a[0][0] + a[1][1] + a[2][2]) / len(test_dataset)}')
    
if __name__ == "__main__":
    read_data()
    epoch = 200 # setting epoch of train
    nomask_dims = [5, 64, 128, 256]
    mask_dims = [3, 64, 128]
    share_dims = [512, 256, 128, 64, 3]
    activate = 2  # setting the activate function of net
    learning_rate = 0.001  # setting learning_rate
    
    data = torch.tensor(np.load('data.npy', allow_pickle=True).astype(np.float)).to(torch.device('cuda:0'))
    train_dataset, test_dataset = get_dataset(data) 
    
    device = torch.device('cuda:0')
    model = MLP_model(nomask_dims, mask_dims, share_dims, activate).to(device)
    model.apply(init_weights)
    # model=torch.load("model/model.pth").to(torch.device('cuda:0'))
    train()
    test()


Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  data = torch.tensor(np.load('data.npy', allow_pickle=True).astype(np.float)).to(torch.device('cuda:0'))


  0%|                                                   | 0/200 [00:00<?, ?it/s]

  ground_truth = ((batch[0][:,5] + batch[0][:, 6] + batch[0][:, 7]) // 100).long().to(device)


训练集输出结果:


  a[int((batch[0][0][5] + batch[0][0][6] + batch[0][0][7]) // 100), torch.argmax(out_put, dim=1)] += 1


[[168.  16.   0.]
 [ 12. 570.   0.]
 [  0.  12.  22.]]
准确率0.95
测试集输出结果:


  a[int((batch[0][0][5] + batch[0][0][6] + batch[0][0][7]) // 100), torch.argmax(out_put, dim=1)] += 1


[[ 36.   9.   0.]
 [  6. 134.   1.]
 [  0.   5.   8.]]
准确率0.8944723618090452
