# Sample Code: CNNs for sentiment analysis

Example of sequence classification using *Convolutional Neural Net* with 1D convolutions.

Written by Shengpu Tang (tangsp@umich.edu). The code for dataset and model is "completely" original; please feel free to plagiarize. 

In [None]:
#@title Run this cell to download labeled sentence dataset. { display-mode: "form" }
!pip install -U wget
!rm -rf sentiment
!mkdir -p sentiment

import wget
wget.download('https://github.com/shengpu1126/BDSI2019-ML/raw/master/sentiment.zip', 'sentiment.zip')

import zipfile
with zipfile.ZipFile("sentiment.zip","r") as zip_ref:
    zip_ref.extractall(".")

In [None]:
%config InlineBackend.figure_format = 'svg'

In [None]:
# GPU support
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('using device:', device)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, ConcatDataset

import numpy as np
from tqdm import tqdm
import yaml
import random

import matplotlib.pyplot as plt

## Sentence dataset

In [None]:
class SentenceDataset(Dataset):
    def __init__(self, path='sentiment/train.txt', labeled=True):
        self.labels, self.sentences = [], []
        if labeled:
            with open(path) as f:
                for line in f.readlines():
                    self.labels.append(int(line[0]))
                    self.sentences.append(line[1:].split())
        else:
            with open(path) as f:
                for line in f.readlines():
                    self.labels.append(-1)
                    self.sentences.append(line.split())
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return self.sentences[idx], self.labels[idx]
    
    def __add__(self, other):
        self.labels += other.labels
        self.sentences += other.sentences
        return self

In [None]:
dataset_tr = SentenceDataset('sentiment/train.txt') + SentenceDataset('sentiment/dev.txt')
dataset_te = SentenceDataset('sentiment/test.txt')

In [None]:
print(dataset_tr[0])
print(dataset_tr[7])
print(dataset_tr[-6])

In [None]:
# word_to_ix maps each word in the vocab to a unique integer, which will be its
# index into the Bag of words vector
UNKNOWN = '!!UNKNOWN!!'
words = set([UNKNOWN])
for sent in dataset_tr.sentences:
    words.update(sent)

word_to_ix = {w:i for i, w in enumerate(sorted(words))}
NUM_LABELS = 2
VOCAB_SIZE = len(word_to_ix)
print(VOCAB_SIZE)

In [None]:
def make_idx_vector(sentence, word_to_ix):
    vec = []
    for word in sentence:
        if word in word_to_ix:
            vec.append(word_to_ix[word])
        else:
            vec.append(len(word_to_ix) - 1)
    return torch.LongTensor(vec)

class EmbeddingDataset(Dataset):
    def __init__(self, dataset, word_to_idx):
        self.dataset = dataset
        self.word_to_idx = word_to_idx
        
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        return (
            make_idx_vector(self.dataset.sentences[idx], self.word_to_idx), 
            torch.LongTensor([self.dataset.labels[idx]]).float(),
        )

def collate_fn(data):
    """Creates mini-batch tensors from the list of tuples (sentence, label).
    
    We should build custom collate_fn rather than using default collate_fn, 
    because merging caption (including padding) is not supported in default.
    Args:
        data: list of tuples (sentence, label). 
            - sentence: torch tensor of shape (?); variable length.
            - label: torch tensor of shape ().
    Returns:
        inputs: torch tensor of shape (batch_size, padded_length).
        lengths: list; valid length for each padded caption.
        labels: torch tensor of shape (batch_size).
    """
    # Sort a data list by caption length (descending order).
    data.sort(key=lambda x: len(x[0]), reverse=True)
    sentences, labels = zip(*data)

    # Merge captions (from tuple of 1D tensor to 2D tensor).
    lengths = [len(sent) for sent in sentences]
    inputs = torch.zeros(len(sentences), max(lengths)).long()
    for i, sent in enumerate(sentences):
        end = lengths[i]
        inputs[i, :end] = sent[:end]        
    return inputs, lengths, torch.stack(labels, 0)

## Word embedding + CNN

Embedding layer → 1D-Convolutional Layer → Global Max/Average Pooling → ReLU → Linear → Sigmoid → Binary Cross Entropy Loss

In [None]:
class EmbeddingCNNClassifier(nn.Module):
    def __init__(self, vocab_size, emb_dim, kernel_size=5, hidden_dim=8, pool='average'):
        super().__init__()
        self.emb_dim = emb_dim
        self.hidden_dim = hidden_dim
        self.embedding = nn.Embedding(vocab_size, emb_dim)
        self.conv = nn.Conv1d(emb_dim, hidden_dim, kernel_size, padding=(kernel_size-1)//2)
        self.fc = nn.Linear(hidden_dim, 1)
        self.pool = pool
    
    def forward(self, x, lens):
        embed = self.embedding(x).transpose(1,2)
        h0 = self.conv(embed)
        out = torch.zeros(len(x), self.hidden_dim)
        for i, length in enumerate(lens):
            if self.pool == 'average':
                out[i] = torch.mean(h0[i, :, :lens[i]], dim=1)
            elif self.pool == 'max':
                out[i], _ = torch.max(h0[i, :, :lens[i]], dim=1)
            else:
                assert False
        h1 = torch.relu(out)
        return self.fc(h1)

## Train

In [None]:
torch.manual_seed(0)
np.random.seed(0)
random.seed(0)

# model = EmbeddingCNNClassifier(VOCAB_SIZE, 50)
model = EmbeddingCNNClassifier(VOCAB_SIZE, 50, kernel_size=7, pool='max').to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-2)

n_epochs = 3
tr_losses = []
va_losses = []
va_scores = []
train_counter = []
test_counter = [i*len(dataset_tr) for i in range(n_epochs + 1)]

tr_loader = DataLoader(EmbeddingDataset(dataset_tr, word_to_ix), batch_size=64, shuffle=True, collate_fn=collate_fn)
va_loader = DataLoader(EmbeddingDataset(dataset_te, word_to_ix), batch_size=64, shuffle=False, collate_fn=collate_fn)


# Evaluate epoch (before training)
y_pred = []
val_loss_ = []
with torch.no_grad():
    for X, lens, y in va_loader:
        out = model(X, lens)
        loss = criterion(out, y)
        val_loss_.append(loss.detach().cpu().item())
        y_pred += (out.numpy() > 0).astype(int).tolist()

y_pred = np.array(y_pred).ravel()
val_score = (y_pred == np.array(dataset_te.labels)).mean()
va_scores.append(val_score)
va_losses.append(np.mean(val_loss_))

for epoch in range(n_epochs):
    # Train epoch
    for batch_idx, (X, lens, y) in enumerate(tqdm(tr_loader)):
        X, y = X.to(device), y.to(device)
        optimizer.zero_grad()
        out = model(X, lens)
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()
        tr_losses.append(loss.detach().cpu().item())
        train_counter.append((batch_idx*64) + ((epoch)*len(dataset_tr)))
    
    # Evaluate epoch
    y_pred = []
    val_loss_ = []
    with torch.no_grad():
        for X, lens, y in va_loader:
            X, y = X.to(device), y.to(device)
            out = model(X, lens)
            loss = criterion(out, y)
            val_loss_.append(loss.detach().cpu().item())
            y_pred += (out.numpy() > 0).astype(int).tolist()
    
    y_pred = np.array(y_pred).ravel()
    val_score = (y_pred == np.array(dataset_te.labels)).mean()
    va_scores.append(val_score)
    va_losses.append(np.mean(val_loss_))

In [None]:
fig = plt.figure(figsize=(6,6))
plt.plot(train_counter, tr_losses, ':', color='blue', lw=0.8)
plt.scatter(test_counter, va_losses, color='red')
plt.legend(['Train Loss', 'Validation Loss'], loc='upper right')
plt.xlabel('number of training examples seen')
plt.ylabel('binary cross entropy loss')
plt.show()

In [None]:
fig = plt.figure(figsize=(6,6))
plt.plot(test_counter, va_scores, color='red')
plt.legend(['Validation Accuracy'], loc='upper right')
plt.xlabel('number of training examples seen')
plt.ylabel('Accuracy')
plt.ylim(0.5,1)
plt.grid()
plt.show()