In [None]:
import torch
from torch import nn, optim, tensor
import pickle
import numpy as np
import random

In [None]:
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision import datasets
import torch.nn.functional as F

In [None]:
def pad_to_batch(batch,max_len=80):
    x,y = zip(*batch)
    x_p = []
    for i in range(len(batch)):
        if x[i].size(0) < max_len:
            x_p.append(
            torch.cat([x[i],Variable(torch.tensor([0]*(max_len - x[i].size(0))))]))
        else:
            x_p.append(x[i][:max_len])
    return torch.cat(x_p), y

In [None]:
def getBatch(batch_size, train_data):
    random.shuffle(train_data)
    sindex = 0
    eindex = batch_size
    while eindex < len(train_data):
        batch = train_data[sindex: eindex]
        temp = eindex
        eindex = eindex + batch_size
        sindex = temp
        yield batch
    
    if eindex >= len(train_data):
        batch = train_data[sindex:]
        yield batch

In [None]:
batch_size = 32
learning_rate = 1e-2
num_epoches = 50
EPOCH = 10

In [None]:
def load_data(path="./yahoo/yahoo.p"):
    with (open(path, "rb")) as openfile:
        while True:
            try:
                x = cPickle.load(openfile)
            except EOFError:
                break
        
        X, y, test_X, test_lab = x[0], x[3], x[2], x[5]
        wordtoix, ixtoword = x[6], x[7]
        class_name = ['Good','Bad']
        
    return X, y, test_X, test_lab, wordtoix, ixtoword, class_name

In [None]:
X, y, test_X, test_lab, wordtoix, ixtoword, class_name=load_data()

In [None]:
train_data=[(Variable(torch.tensor((X[i]))),(np.argmax(y[i]))) for i in range(len(y))]

In [None]:
class Classifier(nn.Module):
    
    def __init__(self, vocab_size, class_num, embedding_dim, hidden_dim=100, ngram=55, dropout=0.5):
        super(Classifier,self).__init__()
        
        self.class_num = class_num
        
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.embedding_class = nn.Embedding(class_num, embedding_dim)
        
        self.conv = torch.nn.Conv1d(class_num, class_num, 2*ngram+1,padding=55)
        
        self.layer = nn.Linear(embedding_dim, class_num)
        
    def forward(self,inputs):
        emb = self.embedding(inputs) # (B, L, e)
        #print("emb1:",emb)
        
        embn = torch.norm(emb, p=2, dim=2).detach()        
        emb_norm = emb.div(embn.unsqueeze(2))
        #print("emb2:",emb)
        #print("embsize:",emb.size()) 
        
        emb_c = self.embedding_class(torch.tensor([[i for i in range(self.class_num)] for j in range(inputs.size(0))]))
        #print("emb_csize:",emb_c.size()) # (B, C, e)
        #print(emb_c)
        emb_cn = torch.norm(emb_c, p=2, dim=2).detach()
        emb_c_norm = emb_c.div(emb_cn.unsqueeze(2))
        
        emb_norm_t = emb_norm.permute(0, 2, 1) # (B, e, L)
        #print("embtsize:",embt.size())
        
        g = torch.bmm(emb_c_norm,emb_norm_t) #(B, C, L)
        #print("gsize:",g.size())
        
        g = F.relu(self.conv(g))
        
        beta = torch.max(g,1)[0].unsqueeze(2) #(B, L)
        
        #print("betasize:",beta.size())
        beta = F.softmax(beta,1) #(B, L)
        
        z = torch.mul(beta,emb) #(B, L, e)
        #print("z1size:",z.size())
        
        z = z.sum(1) #(B, e)
        #print("z2size:",z.size())
        
        out = self.layer(z) #(B, C)
        #print("outsize:",out.size())
        
        logits = F.log_softmax(out,1) #(B, C)
        
        return logits

In [None]:
model = Classifier(vocab_size=len(wordtoix), class_num=5, embedding_dim=300, hidden_dim=100, ngram=55, dropout=0.5)

In [None]:
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())

In [None]:
for epoch in range(EPOCH):
    print("--- epoch:",epoch,"---")
    losses = []
    accuracy = []
    for i, data in enumerate(getBatch(batch_size, train_data), 1):

        inputs,targets = pad_to_batch(data)
        model.zero_grad()

        preds = model(inputs.view(batch_size,-1)) #(B, C)
        #targets: (B,)
        
        if len(targets)!= batch_size:
            break

        loss = loss_function(preds, torch.LongTensor(targets))

        losses.append(loss.data[0])
        
        max_index = preds.max(dim = 1)[1]
        correct = (max_index == torch.LongTensor(targets)).sum()
        acc = float(correct)/batch_size
        accuracy.append(acc)

        loss.backward()
        optimizer.step()
        
        if i % 100 == 0:
            print("[%d/%d] mean_loss : %0.2f" %(epoch, EPOCH, np.mean(losses)))
            losses = []
    
    loss_epoch = np.mean(losses)
    print("loss_epoch:",loss_epoch)
    acc_epoch = np.mean(accuracy)
    print("acc_epoch:",acc_epoch)
    
torch.save(model.state_dict(),"checkpoints/trained_model.pth")

In [None]:
test_data=[(Variable(torch.tensor((test_X[i]))),(np.argmax(test_lab[i]))) for i in range(len(test_lab))]

In [None]:
def test_accuracy(batch_size, test_data, model):
    acc = []
    
    for i, data in enumerate(getBatch(batch_size, test_data), 1):
        
        inputs,targets = pad_to_batch(data)
        model.zero_grad()
        
        #print("inputs:",inputs)
        preds = model(inputs.view(batch_size,-1))
        
        max_index = preds.max(dim = 1)[1]
        
        if len(targets)== batch_size:
            correct = (max_index == torch.LongTensor(targets)).sum()
            acc.append(float(correct)/batch_size)
        
    return np.mean(acc)

In [None]:
trained_model = Classifier(vocab_size=len(wordtoix), class_num=5, embedding_dim=300, hidden_dim=100, ngram=55, dropout=0.5)
trained_model.load_state_dict(torch.load("checkpoints/trained_model.pth"))

In [None]:
test_accuracy(batch_size, test_data, trained_model)