In [1]:
from collections import defaultdict
import time
import random
import torch

In [2]:
# Functions to read in the corpus
w2i = defaultdict(lambda: len(w2i))
t2i = defaultdict(lambda: len(t2i))
UNK = w2i["<unk>"]


def read_dataset(filename):
    with open(filename, "r") as f:
        for line in f:
            tag, words = line.lower().strip().split(" ||| ")
            yield ([w2i[x] for x in words.split(" ")], t2i[tag])


# Read in the data
train = list(read_dataset("../data/classes/train.txt"))
w2i = defaultdict(lambda: UNK, w2i)
dev = list(read_dataset("../data/classes/test.txt"))
nwords = len(w2i)
ntags = len(t2i)

### [Dilated convolution](https://towardsdatascience.com/review-dilated-convolution-semantic-segmentation-9d5a5bd768f5)

### [`torch.nn.Conv1d`](https://pytorch.org/docs/stable/generated/torch.nn.Conv1d.html)

### [Stanford CS224N: NLP with Deep Learning | Winter 2019 | Lecture 11 – Convolutional Networks for NLP](https://www.youtube.com/watch?v=EAJoRA0KX7I)

### [Padding by deep.ai](https://www.youtube.com/watch?v=smHa2442Ah4)

In [13]:
class CNNClass(torch.nn.Module):
    def __init__(self, nwords, emb_size, num_filters, window_size, ntags):
        super(CNNClass, self).__init__()
        
        self.embedding = torch.nn.Embedding(nwords, emb_size)
        
        torch.nn.init.uniform_(self.embedding.weight, -0.25, 0.25)
        
        # Conv 1d
        self.conv_1d = torch.nn.Conv1d(
            in_channels=emb_size,
            out_channels=num_filters,
            kernel_size=window_size,
            stride=1,
            padding=0,
            dilation=1,
            groups=1,
            bias=True)
        
        self.relu = torch.nn.ReLU()  # Activation function
        
        # Project num_filters onto ntags
        self.projection_layer = torch.nn.Linear(
            in_features=num_filters,
            out_features=ntags,
            bias=True)
        
        # Initializing the projection layer
        torch.nn.init.xavier_uniform_(self.projection_layer.weight)
        
    def forward(self, words):
        emb = self.embedding(words)  # nwords x emb_size 
        emb = emb.unsqueeze(0).permute(0, 2, 1)  # 1 x emb_size x n_words 
        h = self.conv_1d(emb) # 1 x num_filters x n_words
        
        # Do max pooling
        h = h.max(dim=2)[0]  # 1 x num_filters
        h = self.relu(h)     
        out = self.projection_layer(h) # 1 x ntags
        return out

In [14]:
# Define the model
EMB_SIZE = 64
WIN_SIZE = 3
FILTER_SIZE = 64

# initialize the model
model = CNNClass(nwords, EMB_SIZE, FILTER_SIZE, WIN_SIZE, ntags)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

data_type = torch.LongTensor
use_cuda = torch.cuda.is_available()

if use_cuda:
    data_type = torch.cuda.LongTensor
    model.cuda()

In [15]:
for ITER in range(1):
    # Perform training
    random.shuffle(train)
    train_loss, train_correct = 0.0, 0
    start = time.time()
    
    model.train()
    
    for words, tag in train[:5000]:
        if len(words) < WIN_SIZE:
            # Add padding
            words += [0] * (WIN_SIZE - len(words))
            
        words_tensor = torch.tensor(words).type(data_type)
        tag_tensor = torch.tensor([tag]).type(data_type)
        scores = model(words_tensor)
        predict = scores[0].argmax().item()
        
        if predict == tag:
            train_correct += 1
        
        my_loss = criterion(scores, tag_tensor)
        train_loss += my_loss.item()
        
        # Update weights
        optimizer.zero_grad()
        my_loss.backward()
        optimizer.step()
        
    print("iter {}: train loss/sent={}, acc={}, time={}".format(
        ITER,
        train_loss/len(train),
        train_correct/len(train),
        time.time()-start))
    
    test_correct = 0
    model.eval()
    
    for words, tag in dev[:5000]:
        # Padding 
        if len(words) < WIN_SIZE:
            words += [0] * (WIN_SIZE - len(words))
        words_tensor = torch.tensor(words).type(data_type)
        scores = model(words_tensor)[0]
        predict = scores.argmax().item()
        
        if predict == tag:
            test_correct += 1
    
    print("iter {}: test acc={}".format(
        ITER,
        test_correct/len(dev)))

iter 0: train loss/sent=0.8887445103056199, acc=0.18422284644194756, time=13.640254259109497
iter 0: test acc=0.34479638009049773
