# 数据预处理

由于有些文本只有数字，我认为对不同的数字分类没有意义，所以我提前对数据进行了清洗，生成3个新的文件

In [1]:
# 输出标签1的概率，自定义loss计算损失

import re

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

torch.random.manual_seed(2333)

device =  'cuda' if torch.cuda.is_available() else 'cpu'

BATCH_SIZE = 32
EMBEDDING_DIM = 200
GRAD_CLIP = 2
EPOCHS = 2
LEARNING_RATE = 0.5
LOG_FILE = 'classification_13.log'
model_path = './best_13'

def normalize_string(s):
    s = s.lower().strip()
    s = re.sub(r'[^a-z]+',r' ',s)
    return s

def data_preprocess(dir_path, file_name):
    word2index = {}
    kind_of_words = 2
    max_sequence_len = 0

    with open(dir_path + '/' + file_name, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    for line in lines:
        text = line.split('\t')[0]
        normalized_text = normalize_string(text)
        sequence_len = len(normalized_text.split())
        if sequence_len > max_sequence_len:
            max_sequence_len =sequence_len
        for word in normalized_text.split():
            if word not in word2index:
                word2index[word] = kind_of_words
                kind_of_words += 1

    return word2index, max_sequence_len, kind_of_words

word2index, max_sequence_len, kind_of_words = data_preprocess('text_classification_data', 'senti.train.tsv')


class CustomDataset(Dataset):
    def __init__(self, dir_path, file_name, max_sequence_len=50):
        with open(dir_path + '/' + file_name, 'r', encoding='utf-8') as f:
            self.lines = f.readlines()

        self.length = len(self.lines)
        self.pad = 0
        self.unknown = 1
        self.max_sequence_len = max_sequence_len

    def __len__(self):
        length = 0
        for line in self.lines:
            text = line.strip().split('\t')[0]
            normalized_text = normalize_string(text)
            if len(normalized_text.split()) !=0:
                length +=1
        return length

    def __getitem__(self, item):
        line = self.lines[item]
        line = line.strip().split('\t')
        text, label = line[0], line[1]
        normalized_text = normalize_string(text)
        text_token = [word2index[word] if word in word2index else self.unknown for word in normalized_text.split()]
        if self.max_sequence_len > len(text_token):
            padded_text_token = text_token + [self.pad]*(self.max_sequence_len-len(text_token))
        else:
            padded_text_token = text_token[:self.max_sequence_len]
        return torch.tensor(padded_text_token), int(label)

train_dataset = CustomDataset('text_classification_data', 'senti.cleaned_train.tsv', max_sequence_len)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE)
valid_dataset = CustomDataset('text_classification_data', 'senti.cleaned_dev.tsv', max_sequence_len)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE)
test_dataset = CustomDataset('text_classification_data', 'senti.cleaned_test.tsv', max_sequence_len)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

# 创建模型

In [2]:
class CustomTextClassification(nn.Module):
    def __init__(self, embedding_size, embedding_dim):
        super().__init__()
        self.embed = nn.Embedding(embedding_size, embedding_dim, padding_idx=0)
        self.fc = nn.Linear(embedding_dim, 1)
        self.dropout = nn.Dropout(0.5)
        self.epsilon = torch.tensor(1e-8, device=device)
        self.init_weights()


    def init_weights(self):
        initrange = 0.1
        self.fc.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.zero_()

    def forward(self, input_tensor):
        sequence_len = (input_tensor != torch.tensor(0, device=device)).sum(-1,keepdim=True) # [batch_size]
        embed_out = self.dropout(self.embed(input_tensor)) # [batch_size, padded_sequence_len, embedding_dim]
        hidden_average = torch.div(embed_out.sum(1), (sequence_len.to(dtype=embed_out.dtype) + self.epsilon))# [batch_size, embedding_dim]
        decode = self.fc(hidden_average)
        output = torch.sigmoid(decode) # [batch_size, 1]


        return output.squeeze(1)


class CustomLoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.epsilon = torch.tensor(1e-8)

    def forward(self, output, labels):
        labels = labels.to(dtype=torch.float32)
        loss_1 = -labels*torch.log(output + self.epsilon)
        loss_0 = -(torch.tensor(1.) - labels)*torch.log(torch.tensor(1.) - output + self.epsilon)
        loss = torch.mean(loss_0 + loss_1)
        return loss

model = CustomTextClassification(embedding_size=kind_of_words, embedding_dim=EMBEDDING_DIM).to(device)
criterion = CustomLoss().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=LEARNING_RATE)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, 0.5)

# 训练和评估

In [3]:
def evaluate(model, data_loader):
    model.eval()
    with torch.no_grad():
        total_loss = 0.
        total_accurate = 0.
        total_count = 0.
        for text, label in data_loader:
            text, label = text.to(device), label.to(device)
            text_count = torch.numel(label)

            output = model(text)
            loss = criterion(output, label)
            out_label = (output > 0.5).to(dtype=label.dtype)
            accurate = (out_label == label).sum()

            total_accurate += accurate.item()
            total_loss += loss.item() * text_count
            total_count += text_count

        return total_loss / total_count, total_accurate / total_count

In [None]:
# 模型训练
best_valid_accuracy = 0.

for epoch in range(EPOCHS):
    total_train_loss = 0.
    total_train_count = 0.
    for iter, (text, label) in enumerate(train_loader):
        # 数据准备
        text, label = text.to(device), label.to(device)
        # 模型准备
        model.train() # 训练模式，对应与评估模式，影响模型的dropout,BatchNorm等
        model.zero_grad() # 梯度清零
        #前向计算
        output = model(text)
        loss = criterion(output, label)
        #反向传播
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), GRAD_CLIP)
        optimizer.step() # 参数更新
        #结果输出
        text_count = torch.numel(label)
        total_train_loss += loss.item() * text_count
        total_train_count += text_count
    # 每个epoch计算验证集准确率
    val_loss, val_accuracy = evaluate(model, valid_loader)
    print('epoch:{}'.format(epoch))
    print('\ttrain_batch_loss  :{:.10f}'.format(total_train_loss / total_train_count))
    print('\tval_loss          :{:.10f} | val_accuracy:{}'.format(val_loss, val_accuracy))
    with open(LOG_FILE, 'a') as f:
        f.write('epoch:{},iter:{},train_batch_loss:{},val_loss:{},val_accuracy:{}\n'.format(epoch, iter, loss.item(),
                                                                                            val_loss, val_accuracy))
        if val_accuracy > best_valid_accuracy:
            best_valid_accuracy = val_accuracy
            torch.save(model.state_dict(), model_path)

# 模型加载

In [4]:
model.load_state_dict(torch.load(model_path, map_location='cpu'))
valid_loss, valid_acc = evaluate(model, valid_loader)
test_loss, test_acc = evaluate(model, test_loader)
print('Valid_loss: {:.10f} | Valid_acc: {}'.format(valid_loss,valid_acc))
print('Test_loss:  {:.10f} | Test_acc:  {}'.format(test_loss,test_acc))

Valid_loss: 0.4123196883 | Valid_acc: 0.8302752293577982
Test_loss:  0.4097559166 | Test_acc:  0.8110928061504667


# 计算L2-Norm

In [5]:
index2word = {v:k for k,v in word2index.items()}

embedding_norm = model.embed.weight.data.norm(dim=-1)

_,sorted_index = embedding_norm.sort(descending=True)
max_norm = [index2word[index] for index in sorted_index[:15].tolist()]
min_norm = [index2word[index] for index in sorted_index[-16:-1].tolist()] 
print('norm最大的15个单词：\n',max_norm)
print('norm最小的15个单词：\n',min_norm)

norm最大的15个单词：
 ['worst', 'lacks', 'lacking', 'hilarious', 'waste', 'mess', 'poor', 'stupid', 'powerful', 'terrific', 'pointless', 'neither', 'lack', 'beautifully', 'nice']
norm最小的15个单词：
 ['film', 'an', 'as', 'that', 'its', 'it', 'in', 'movie', 'is', 'to', 's', 'of', 'a', 'and', 'the']
