In [2]:
import copy
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import torch.nn.functional as F
from torchtext import data
from torchtext import datasets
from torchtext.vocab import GloVe

import time

TEXT = data.Field(lower=True, fix_length=60, batch_first=True)
LABEL = data.Field(sequential=False,)

#sst = datasets.SST.splits(TEXT, LABEL, fine_grained=True)
#train, dev, test = sst[0], sst[1], sst[2]
train, dev, test = data.TabularDataset.splits(
    path='SST-2', train='train.tsv', validation='dev.tsv',
    test='test.tsv', format='tsv', skip_header=True,
    fields=[('text', TEXT), ('label', LABEL)])
print("the size of train: {}, dev:{}, test:{}".format(len(train.examples), len(dev.examples), len(test.examples)))

TEXT.build_vocab(train, vectors=GloVe(name='6B', dim=100), max_size=25000)
LABEL.build_vocab(train,)

print("train.fields:", train.fields, TEXT.vocab.vectors.shape)

train_iter, dev_iter, test_iter = data.BucketIterator.splits(
        (train, dev, test), batch_sizes=(512, 64, 64), sort_key=lambda x: len(x.text), sort_within_batch=True, repeat=False
    )
train_iter.repeat = False
test_iter.repeat = False

the size of train: 65328, dev:872, test:2021
train.fields: {'text': <torchtext.data.field.Field object at 0x000001E605905250>, 'label': <torchtext.data.field.Field object at 0x000001E6054D2310>} torch.Size([14795, 100])


In [3]:
# Text-CNN Parameter
sequence_length = 60
#batch_size = 32
vocab_size = TEXT.vocab.vectors.shape[0]
embedding_size = TEXT.vocab.vectors.shape[1]
num_classes = 5  # 0 or 1
filter_sizes = [2, 3, 5, 2, 3, 5] # n-gram window
num_filters = 6
lr = 1e-2

In [4]:
class Square(nn.Module):
    def __init__(self):
        super(Square, self).__init__()

    def forward(self, x):
        x = torch.pow(x, 2)
        return x

class Sigmoid(nn.Module):
    def __init__(self):
        super(Sigmoid, self).__init__()
 
    def forward(self, x):
        x = 0.5 + 0.197*x - 0.004*torch.pow(x, 3)
        return x
    
class Softmax(nn.Module):
    def __init__(self):
        super(Softmax, self).__init__()
 
    def forward(self, x):
        x = 0.25 + 0.5*x - 0.125*torch.pow(x, 2)
        return x

class Swish(nn.Module):
    def __init__(self):
        super(Swish, self).__init__()
 
    def forward(self, x):
        x = 0.1198 + 0.5*x + 0.1473*torch.pow(x, 2) - -0.002012*torch.pow(x, 4)
        return x
    
class HETextCNN(nn.Module):
    def __init__(self):
        super(HETextCNN, self).__init__()

        self.num_filters_total = num_filters * len(filter_sizes)
        self.embedding = nn.Embedding(vocab_size, embedding_size)
        
        self.convs = nn.ModuleList([
            nn.Sequential(
                nn.Conv2d(1, num_filters, (kernel, embedding_size), bias=False),
                Swish(),
                nn.AvgPool2d((sequence_length - kernel + 1,1))
            ) for kernel in filter_sizes])
        
        self.fc = nn.Linear(self.num_filters_total,num_classes)
        self.sm = Softmax()
                           
    def forward(self, X):
        embedded_chars = self.embedding(X)# [batch_size, sequence_length, sequence_length]
        embedded_chars = embedded_chars.unsqueeze(1)

        out = [conv(embedded_chars) for conv in self.convs]
        out = torch.cat(out, dim=1)
        out = out.view(embedded_chars.size(0), -1)
        out = self.fc(out)
        logit = self.sm(out)
        return logit

In [5]:
def binary_acc(preds, y):
    correct = torch.eq(preds, y).float()
    acc = correct.sum() / len(correct)
    return acc

train_set_size = 128

def train(model, optimizer, criterion):
    avg_acc = []
    avg_loss = []
    model.train()

    for batch_idx , batch in enumerate(train_iter):
        if batch_idx >= train_set_size:
            continue
        text, labels = batch.text , batch.label - 1

        predicted = model(text)

        acc = binary_acc(torch.max(predicted, dim=1)[1], labels)
        avg_acc.append(acc)
        loss = criterion(predicted, labels)
        avg_loss.append(loss)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    return np.array(avg_acc).mean()

def evaluate(model, criterion):
    avg_acc = []
    model.eval()
    for batch_idx , batch in enumerate(dev_iter):
        text, labels = batch.text , batch.label - 1
        predicted = model(text)

        acc = binary_acc(torch.max(predicted, dim=1)[1], labels)
        avg_acc.append(acc)

    return np.array(avg_acc).mean()

In [6]:
model = HETextCNN()
print(model)

pretrained_embedding = TEXT.vocab.vectors
model.embedding.weight.data.copy_(pretrained_embedding)

optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()

train_accs, test_accs = [], []

for epoch in range(150):

    train_acc = train(model, optimizer, criterion)
    print('epoch={},训练准确率={}'.format(epoch, train_acc))
    test_acc = evaluate(model, criterion)
    print("epoch={},测试准确率={}".format(epoch, test_acc))
    train_accs.append(train_acc)
    test_accs.append(test_acc)

HETextCNN(
  (embedding): Embedding(14795, 100)
  (convs): ModuleList(
    (0): Sequential(
      (0): Conv2d(1, 6, kernel_size=(2, 100), stride=(1, 1), bias=False)
      (1): Swish()
      (2): AvgPool2d(kernel_size=(59, 1), stride=(59, 1), padding=0)
    )
    (1): Sequential(
      (0): Conv2d(1, 6, kernel_size=(3, 100), stride=(1, 1), bias=False)
      (1): Swish()
      (2): AvgPool2d(kernel_size=(58, 1), stride=(58, 1), padding=0)
    )
    (2): Sequential(
      (0): Conv2d(1, 6, kernel_size=(5, 100), stride=(1, 1), bias=False)
      (1): Swish()
      (2): AvgPool2d(kernel_size=(56, 1), stride=(56, 1), padding=0)
    )
    (3): Sequential(
      (0): Conv2d(1, 6, kernel_size=(2, 100), stride=(1, 1), bias=False)
      (1): Swish()
      (2): AvgPool2d(kernel_size=(59, 1), stride=(59, 1), padding=0)
    )
    (4): Sequential(
      (0): Conv2d(1, 6, kernel_size=(3, 100), stride=(1, 1), bias=False)
      (1): Swish()
      (2): AvgPool2d(kernel_size=(58, 1), stride=(58, 1), paddin

KeyboardInterrupt: 