In [None]:
import csv
import os, random, sys, copy
import torch, torch.nn as nn, numpy as np
from tqdm.notebook import tqdm
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from nltk.tokenize import word_tokenize
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score, accuracy_score

In [None]:
#ADOPTED FROM MOVIE DATALOADER FROM HW3
class sarcasmDataset(torch.utils.data.Dataset):

    def __init__(self, input_file="..\\train.En.csv", word2id=None, finalized_data = None, data_limit=4000, max_length=280):
        """
        :param directory: The location of aclImdb
        :param split: Train or test
        :param word2id: The generated glove word2id dictionary
        :param finalized_data: We'll use this to initialize a validation set without reloading the data.
        :param data_limit: Limiter on the number of examples we load
        :param max_length: Maximum length of the sequence
        """

        if finalized_data:
            self.data = finalized_data
        else:
            #dataset parameters
            self.data_limit = data_limit
            self.max_length = max_length
            self.word2id = word2id

            #read the input file and get examples: example = (tweet_text, sarcasm_label)
            examples = self.read_file(input_file)

            #get tokenized examples: example_tokenized = (tweet_text_embeddings, sarcasm_label)
            examples_tokenized = self.tokenize(examples) 

            #set dataset data and shuffle it
            self.data = examples_tokenized
        random.shuffle(self.data)

    def read_file(self, input_file):
        examples = []
        with open(input_file, 'r', errors='ignore') as csvfile: #read the csv
            reader = csv.reader(csvfile)
            next(reader) #skip the header
            for line in reader: #go through every tweet example
                examples.append([line[1],int(line[2])]) #get the tweet and the true sarcasm label
        return examples

    def tokenize(self, examples):

        example_ids = []
        misses = 0              # Count the number of tokens in our dataset which are not covered by glove -- i.e. percentage of unk tokens
        total = 0
        for example in examples: #for every example
            tokens = word_tokenize(example[0]) #tokenize the tweet_text
            ids = []
            for tok in tokens: #go through every word in tokenized tweet and get embedding from glove
                if tok in word2id: 
                    ids.append(word2id[tok])
                else:
                    misses += 1
                    ids.append(word2id['unk'])
                total += 1

            if len(ids) >= self.max_length:
                ids = ids[:self.max_length]
                length = self.max_length
            else:
                length = len(ids)
                ids = ids + [word2id['<pad>']]*(self.max_length - len(ids))
            if length > 0:
                example_ids.append(((torch.tensor(ids),length),example[1]))
        print('Missed {} out of {} words -- {:.2f}%'.format(misses, total, misses/total))
        return (example_ids)

    def generate_validation_split(self, ratio=0.8):

        split_idx = int(ratio * len(self.data))

        # Take a chunk of the processed data, and return it in order to initialize a validation dataset
        validation_split = self.data[split_idx:]

        #We'll remove this data from the training data to prevent leakage
        self.data = self.data[:split_idx]

        return validation_split

    def __getitem__(self, item):
        return self.data[item]

    def __len__(self):
        return len(self.data)


glove_file = '../glove.6B.50d.txt'

embeddings_dict = {}

with open(glove_file, 'r', encoding='utf8') as f:
    for i, line in enumerate(f):
        line = line.strip().split(' ')
        word = line[0]
        embed = np.asarray(line[1:], "float")

        embeddings_dict[word] = embed

print('Loaded {} words from glove'.format(len(embeddings_dict)))

embedding_matrix = np.zeros((len(embeddings_dict)+1, 50)) #add 1 for padding

word2id = {}
for i, word in enumerate(embeddings_dict.keys()):

    word2id[word] = i                                #Map each word to an index
    embedding_matrix[i] = embeddings_dict[word]      #That index holds the Glove embedding in the embedding matrix
word2id['<pad>'] = 0
train_dataset = sarcasmDataset(word2id=word2id)
valid_data = train_dataset.generate_validation_split()
valid_dataset = sarcasmDataset(finalized_data=valid_data, word2id=word2id)
print(train_dataset.__len__(), train_dataset[1])
print(valid_dataset.__len__(), valid_dataset[1])

In [None]:
def getMetrics(model, valid_dataloader, mode='accuracy'):

    sigmoid = nn.Sigmoid() 
    
    y_true = []
    y_pred = []
    for (x, x_lengths), y in valid_dataloader:
       # print(x, x_lengths)
        output = sigmoid(model(x, x_lengths))
        y_true = y_true + y.tolist()
        y_pred = y_pred + torch.squeeze(output).tolist()
        
    y_pred = list(map(lambda x: 0 if x < 0.5 else 1, y_pred))
    if mode == 'accuracy': #accuracy for model training
        accuracy = accuracy_score(y_true, y_pred)
        print('accuracy: {}'.format(accuracy))
        return accuracy
    elif mode == 'f1':
        f1 = f1_score(y_true, y_pred, average="binary", pos_label=1)
        print('f1: {}'.format(f1))
        return f1
    

In [None]:
# Adapted From HW3
def train_sarcasm_classification(model, train_dataset, valid_dataset, epochs=10, batch_size=32, learning_rate=.001, print_frequency=25):

    criteria = nn.BCEWithLogitsLoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)


    epochs = epochs
    batch_size = batch_size
    print_frequency = print_frequency

    #We'll create an instance of a torch dataloader to collate our data. This class handles batching and shuffling (should be done each epoch)
    train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
    valid_dataloader = torch.utils.data.DataLoader(valid_dataset, batch_size=128, shuffle=False)

    print('Total train batches: {}'.format(train_dataset.__len__() / batch_size))

    best_model_sd = None
    best_accuracy = 0.0

    for i in range(epochs):
        print('### Epoch: ' + str(i+1) + ' ###')
    
        model.train()

        avg_loss = 0
        for step, data in enumerate(train_dataloader):

            (x, x_lengths), y = data	# Our dataset is returning the input example x and also the lengths of the examples, so we'll unpack that here
            optimizer.zero_grad()

            model_output = model(x, x_lengths)

            loss = criteria(model_output.squeeze(1), y.float())

            loss.backward()
            optimizer.step()

            avg_loss += loss.item()

            if step % print_frequency == 0:
                print('epoch: {} batch: {} loss: {}'.format(
                    i,
                    step,
                    avg_loss / print_frequency
                ))
                avg_loss = 0

        print('Evaluating...')
        model.eval()
        with torch.no_grad():
            acc = getMetrics(model, valid_dataloader, 'accuracy')
            if acc > best_accuracy:
                best_model_sd = copy.deepcopy(model.state_dict())
                best_accuracy = acc

    return model.state_dict(), best_model_sd

In [None]:
# Adapted From HW3
class sarcasmModel(nn.Module):

    def __init__(self, embedding_matrix, lstm_hidden_size=50, num_lstm_layers=1, bidirectional=True):

        super().__init__()
        self.embedding = nn.Embedding.from_pretrained(torch.FloatTensor(embedding_matrix))
        self.lstm = nn.LSTM(input_size = embedding_matrix.shape[1], 
                            hidden_size = lstm_hidden_size,
                            num_layers = num_lstm_layers,
                            bidirectional = bidirectional,
                            batch_first = True) #define lstm
        
        self.hidden_1 = nn.Linear(lstm_hidden_size * 2, lstm_hidden_size) #define layers
        self.hidden_2 = nn.Linear(lstm_hidden_size, 1)
        self.num_directions = 2 if bidirectional else 1
        self.relu = nn.ReLU()
    
    def forward(self, input_batch, input_lengths):
        
        embedded_input = self.embedding(input_batch) #take input
        
        packed_input = pack_padded_sequence(embedded_input, input_lengths, batch_first=True, enforce_sorted=False) 
         
        packed_output, (hn, cn) = self.lstm(packed_input)                                                                       # See docs linked below for description of hn.shape
        
        hn_view = hn.view(self.lstm.num_layers, self.num_directions, input_batch.shape[0], self.lstm.hidden_size)               # Reshape hn for clarity -- first dimension now represents each layer (total set by num_lstm_layers)
        
        hn_view_last_layer = hn_view[-1]                                                                                        # Taking the last layer for our final LSTM output
        
        hn_cat = torch.cat([hn_view_last_layer[-2, :, :], hn_view_last_layer[-1, :, :]], dim=1)                                 # Each layer has two directions. We want to use both of these vectors, so concatenate them
        
        hid = self.relu(self.hidden_1(hn_cat))
        
        output = self.hidden_2(hid)
        
        return output

In [None]:
#Initialize model and train
model = sarcasmModel(embedding_matrix, lstm_hidden_size=50, num_lstm_layers=5, bidirectional=True) #initialize sarcasm model
model_weights, best_model_weights = train_sarcasm_classification(model, train_dataset, valid_dataset, batch_size=64, epochs=1); #train it
torch.save(model_weights, 'sarcasmModel.pt') #save outputs
torch.save(best_model_weights, 'sarcasmModelBest.pt')

In [None]:
#Loading Model
model_loaded = sarcasmModel(embedding_matrix, lstm_hidden_size=50, num_lstm_layers=5, bidirectional=True) #instantiate model
model_loaded.load_state_dict(torch.load('sarcasmV3.pt')) #load saved weights
model_loaded.eval() #set to eval

In [None]:
#Getting F1 score

def tokenize(text): #tokenize function for feeding single examples to the model
    tokens = word_tokenize(text) #tokenize the tweet_text
    ids = []
    for tok in tokens: #go through every word in tokenized tweet and get embedding from glove
        if tok in word2id: 
            ids.append(word2id[tok])
        else:
            ids.append(word2id['unk'])

    if len(ids) >= 280:
        ids = ids[:280]
        length = 280
    else:
        length = len(ids)
        ids = ids + [word2id['<pad>']]*(280 - len(ids))
    if length > 0: #return tweet tensor and length tensor
        return(torch.tensor(ids),torch.tensor(length))
    else:
        return -1, -1

def getEval(model, test_file): #get output file and f1 score
    output_file = open('output_file.csv', 'w') #open output file
    output_file.write('text,sarcasm\n') #write header
    csvfile = open(test_file, 'r', errors='ignore') #open test csv
    reader = csv.reader(csvfile) #make reader
    next(reader) #skip the header
    
    y_true = [] #true labels 
    y_pred = [] #predicted labels
    sigmoid = nn.Sigmoid() #sigmoid
    
    for line in reader: #go through every tweet example
        actual = int(line[1]) #true label
        embedded, length = tokenize(line[0]) #get tokenized example and length
        output = sigmoid(model(embedded.unsqueeze(0), length.unsqueeze(0))) #model prediction
        pred = 0 if output < 0.5 else 1 #set prediction to label value
        y_true.append(actual) #append real label 
        y_pred.append(pred) #append predicted label

        output_file.write('%s,%s\n'%(line[0],pred)) #write line to output file
        
    y_pred = list(map(lambda x: 0 if x < 0.5 else 1, y_pred)) #map all prediction to label value
    f1 = f1_score(y_true, y_pred, average="binary", pos_label=1) #calc f1
    print('f1: {}'.format(f1))
    output_file.close() #close output file
    return f1

In [None]:
with torch.no_grad(): #actually running getEval
    getEval(model_loaded, 'test.csv')