In [1]:
import numpy as np
import pandas as pd 
import re
import math
from collections import Counter, defaultdict, OrderedDict
from pprint import pprint

from nltk.stem import PorterStemmer
import random
from sklearn.metrics import classification_report, confusion_matrix

from gensim.corpora.dictionary import Dictionary
from gensim.models import LdaModel
from scipy.sparse import dok_matrix

from operator import neg

import matplotlib.pyplot as plt

from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import SGDClassifier



import scipy

import string
import sklearn
from sklearn.ensemble import RandomForestClassifier

In [17]:
stopwords = "ttds_stopwords.txt"
sentiment_train_path = "train-sentiment.txt"
sentiment_test_path = "ttds_2023_cw2_test_final.txt"

with open(stopwords, 'r') as f:
    stopwords = set(map(str.strip, f.readlines()))
    
    
def join_to_string(data_list):
    # Flatten the list of lists into a single list
    flattened_list = ["\t".join(map(str, item)) for item in data_list]
    
    # Join all items in the list into a single string
    joined_string = '\n'.join(map(str, flattened_list))
    return joined_string

    
sentiment_train = pd.read_csv(sentiment_train_path, sep='\t').values.tolist()
sentiment_test = pd.read_csv(sentiment_test_path, sep='\t').values.tolist()
sentiment_test_final = pd.read_csv("ttds_2023_cw2_test_final.txt", sep='\t').values.tolist()

random.shuffle(sentiment_train)
random.shuffle(sentiment_test)

train_ratio = 0.8
test_ratio = 0.1
dev_ratio = 0.1

train_section_end =  int(len(sentiment_train) * train_ratio)
test_section_end = int(train_section_end + (len(sentiment_train) * test_ratio))
dev_section_start = test_section_end

train_set = sentiment_train[:train_section_end]
test_set = sentiment_train[train_section_end:test_section_end]
dev_set = sentiment_train[test_section_end:]


In [380]:
class SkipGram:
    def __init__(self, embedding_dim=20, window_size=2, learning_rate=0.01, epochs=10):
        self.embedding_dim = embedding_dim
        self.window_size = window_size
        self.learning_rate = learning_rate
        self.epochs = epochs
        self.word_to_index = {}
        self.index_to_word = {}
        self.W1 = None
        self.W2 = None
        self.oov_embedding = np.random.rand(1, embedding_dim)

    def _build_vocab(self, tokenized_tweets):
        word_counts = defaultdict(int)
        for tweet in tokenized_tweets:
            for word in tweet:
                word_counts[word] += 1

        self.word_to_index = {word: i for i, word in enumerate(word_counts)}
        self.index_to_word = {i: word for word, i in self.word_to_index.items()}

    def _generate_training_data(self, tokenized_tweets):
        X = []
        Y = []

        for tweet in tokenized_tweets:
            tweet_indices = [self.word_to_index[word] for word in tweet if word in self.word_to_index]
            for i, word_index in enumerate(tweet_indices):
                for j in range(max(i - self.window_size, 0), min(i + self.window_size + 1, len(tweet_indices))):
                    if i != j:
                        X.append(word_index)
                        Y.append(tweet_indices[j])
        return np.array(X), np.array(Y)

    def train(self, tokenized_tweets):
        #self._build_vocab(tokenized_tweets)
        #X, Y = self._generate_training_data(tokenized_tweets)

        #vocab_size = len(self.word_to_index)
        #self.W1 = np.random.rand(vocab_size, self.embedding_dim)
        #self.W2 = np.random.rand(self.embedding_dim, vocab_size)
        if not self.word_to_index:
            self._build_vocab(tokenized_tweets)
        X, Y = self._generate_training_data(tokenized_tweets)

        vocab_size = len(self.word_to_index)
        self.W1 = np.random.rand(vocab_size, self.embedding_dim)
        self.W2 = np.random.rand(self.embedding_dim, vocab_size)

        for epoch in range(self.epochs):
            for i in range(len(X)):
                x = np.zeros(vocab_size)
                x[X[i]] = 1
                h = np.dot(self.W1.T, x)
                u = np.dot(self.W2.T, h)
                y_pred = np.exp(u) / sum(np.exp(u))

                e = -np.zeros_like(y_pred)
                e[Y[i]] = 1
                e -= y_pred

                dW2 = np.outer(h, e)
                dW1 = np.outer(x, np.dot(self.W2, e))

                self.W1 += self.learning_rate * dW1
                self.W2 += self.learning_rate * dW2

            if epoch % 10 == 0:
                print(f"Epoch {epoch} completed")
        
    def get_word_embedding(self, word):
        if word in self.word_to_index:
            return self.W1[self.word_to_index[word]]
        else:
            return self.oov_embedding
        
    def get_tweet_embedding(self, tweet_tokens):
        embeddings = [self.get_word_embedding(word) for word in tweet_tokens]
        if embeddings:
            tweet_embedding = np.mean(np.array(embeddings), axis=0)
        else:
            # Return a zero vector if there are no words in the tweet
            tweet_embedding = self.oov_embedding
        return tweet_embedding

    #def get_tweet_embedding(self, tweet_tokens):
    #    embeddings = [self.get_word_embedding(word) for word in tweet_tokens]
    #    tweet_embedding = np.mean(embeddings, axis=0)
    #    #np.isnan(embedding).any() and not np.isinf(embedding).any()
    #    if np.isnan(tweet_embedding).any():
    #        return self.oov_embedding
    #    return tweet_embedding
        
        

In [None]:
skipgram_model = SkipGram()
tokenised_sample = [list(tokeniser.tokenise(tweet[2])) for tweet in sample]
# reemove stopwords
tokenised_sample = [list(filter(lambda x: x not in stopwords, tweet)) for tweet in tokenised_sample]
skipgram_model.train(tokenised_sample)
# Get embedding for a specific word


In [None]:
embedding = skipgram_model.get_word_embedding("and")
embedding

In [None]:
skipgram_model.get_word_embedding("buyers")

In [None]:
from sklearn.metrics import accuracy_score

class SimpleRNN:
    def __init__(self, input_size, hidden_size=64, output_size=3):
        self.Wxh = np.random.randn(hidden_size, input_size) * 0.01
        self.Whh = np.random.randn(hidden_size, hidden_size) * 0.01
        self.Why = np.random.randn(output_size, hidden_size) * 0.01
        self.bh = np.zeros((hidden_size, 1))
        self.by = np.zeros((output_size, 1))

    def forward(self, inputs):
        self.last_inputs = inputs
        self.last_hs = {0: np.zeros((self.Whh.shape[0], 1))}
        h = self.last_hs[0]

        for i, x in enumerate(inputs):
            x = x.reshape(-1, 1)
            h = np.tanh(np.dot(self.Wxh, x) + np.dot(self.Whh, h) + self.bh)
            self.last_hs[i + 1] = h

        y = np.dot(self.Why, h) + self.by
        return y.flatten()

    def backward(self, d_y, learn_rate=0.01):
        n = len(self.last_inputs)

        # Calculate dWhy and dby
        dWhy = np.dot(d_y, self.last_hs[n].T)
        dby = d_y

        # Initialize dWhh, dWxh, and dbh to zero
        dWhh = np.zeros(self.Whh.shape)
        dWxh = np.zeros(self.Wxh.shape)
        dbh = np.zeros(self.bh.shape)

        # Calculate d_h for the last h
        d_h = np.dot(self.Why.T, d_y)

        # Backpropagate through time
        for t in reversed(range(n)):
            temp = ((1 - self.last_hs[t + 1] ** 2) * d_h)
            dbh += temp
            dWhh += np.dot(temp, self.last_hs[t].T)
            # Correct the input reshape
            input_t = self.last_inputs[t].reshape(self.Wxh.shape[1], 1)
            dWxh += np.dot(temp, input_t.T)

            d_h = np.dot(self.Whh.T, temp)

        # Clip to prevent exploding gradients
        for d in [dWxh, dWhh, dbh, dWhy, dby]:
            np.clip(d, -1, 1, out=d)

        # Update weights and biases using gradient descent
        self.Wxh -= learn_rate * dWxh
        self.Whh -= learn_rate * dWhh
        self.bh -= learn_rate * dbh
        self.Why -= learn_rate * dWhy
        self.by -= learn_rate * dby


# Custom loss function: Categorical Cross-Entropy
def categorical_cross_entropy(predictions, targets):
    predictions = np.clip(predictions, 1e-12, 1. - 1e-12)
    ce = -np.sum(targets * np.log(predictions + 1e-9)) / targets.shape[0]
    return ce

# Convert categorical labels to one-hot
def to_one_hot(labels, dimension=3):
    results = np.zeros((len(labels), dimension))
    for i, label in enumerate(labels):
        results[i, label] = 1
    return results

# Function to convert words to vectors using Word2Vec
def word_to_vec(word, model):
    return model.wv[word] if word in model.wv else np.zeros(model.vector_size)

# Initialize RNN
rnn = SimpleRNN(input_size=dims, hidden_size=8, output_size=3)

# Training loop
for epoch in range(25):  # Number of epochs
    total_loss = 0
    for i, tweet in enumerate(tokenized_corpus_train):
        inputs = np.array([word_to_vec(word, model) for word in tweet])
        target = to_one_hot([ytrain[i]], dimension=3)

        # Forward pass
        out = rnn.forward(inputs)
        probs = np.exp(out) / np.sum(np.exp(out), axis=0, keepdims=True)

        # Calculate loss
        loss = categorical_cross_entropy(probs, target[0])
        total_loss += loss

        # Backward pass
        d_y = probs - target[0]
        rnn.backward(d_y.reshape(-1, 1))

    print(f"Epoch {epoch + 1}, Loss: {total_loss / len(tokenized_corpus_train)}")


# Preparing the data for logistic regression
def prepare_data_rnn(corpus, rnn_model):
    return [rnn_model.forward(np.array([word_to_vec(word, model) for word in tweet])) for tweet in corpus]

xtrain_rnn = prepare_data_rnn(tokenized_corpus_train, rnn)
xdev_rnn = prepare_data_rnn(tokenized_corpus_dev, rnn)
xtest_rnn = prepare_data_rnn(tokenized_corpus_test, rnn)

# Logistic Regression
logistic_model = LogisticRegression(solver='liblinear', class_weight='balanced', penalty='l1', max_iter=1000)
logistic_model.fit(xtrain_rnn, ytrain)

# Making predictions
ytrain_pred = logistic_model.predict(xtrain_rnn)
ydev_pred = logistic_model.predict(xdev_rnn)
ytest_pred = logistic_model.predict(xtest_rnn)

# Printing accuracies
print("Training accuracy:", accuracy_score(ytrain, ytrain_pred))
print("Development accuracy:", accuracy_score(ydev, ydev_pred))
print("Testing accuracy:", accuracy_score(ytest, ytest_pred))

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

# Assuming 'dims' is the size of your Word2Vec vectors
# max_len = ... # Define max_len based on your dataset

# CNN Model
class TextCNN(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(TextCNN, self).__init__()
        self.conv1 = nn.Conv1d(input_dim, 128, kernel_size=5, padding=1)
        self.conv2 = nn.Conv1d(128, 128, kernel_size=5, padding=1)
        self.dropout = nn.Dropout(0.5)
        self.fc1 = nn.Linear(128, 64)
        self.fc2 = nn.Linear(64, num_classes)
        self.relu = nn.ReLU()
        self.bn1 = nn.BatchNorm1d(128)
        #self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = x.permute(0, 2, 1)  # Change shape to [batch, channels, sequence_length]
        x = self.bn1(self.relu(self.conv1(x)))
        #x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = torch.max(x, 2)[0]  # Global Max Pooling
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x #self.softmax(x)

dims = 128
# Initialize the model, loss function, and optimizer
model = TextCNN(input_dim=dims, num_classes=3)  # Assuming 3 classes for your problem
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0005, weight_decay=1e-4)

# Training loop
for epoch in range(500):
    total_loss = 0
    for inputs, labels in train_loader:
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)

        # Compute loss
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    if epoch % 5 == 0:
        print(f'Epoch {epoch + 1}, Loss: {total_loss / len(train_loader)}')
        
        train_accuracy = evaluate_model(model, train_loader)
        print(f'Training Set Accuracy: {train_accuracy}')
        
        dev_accuracy = evaluate_model(model, dev_loader)
        print(f'Development Set Accuracy: {dev_accuracy}')
        
        test_accuracy = evaluate_model(model, test_loader)
        print(f'Test Set Accuracy: {test_accuracy}')
        

In [None]:
def evaluate_model(model, data_loader):
    model.eval()  # Set the model to evaluation mode
    true_labels = []
    predictions = []

    with torch.no_grad():  # Disable gradient computations
        for inputs, labels in data_loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)

            true_labels.extend(labels.numpy())
            predictions.extend(predicted.numpy())

    return accuracy_score(true_labels, predictions)

# Prepare your dev and test sets in a similar way as the training set
# Assuming xdev, ydev, xtest, ytest are already prepared

# Create TensorDatasets and DataLoaders for dev and test sets and train 
train_data = TensorDataset(xtrain, torch.tensor(ytrain))
train_loader = DataLoader(train_data, batch_size=128, shuffle=True)

dev_data = TensorDataset(xdev, torch.tensor(ydev))
dev_loader = DataLoader(dev_data, batch_size=128, shuffle=False)

test_data = TensorDataset(xtest, torch.tensor(ytest))
test_loader = DataLoader(test_data, batch_size=128, shuffle=False)

# Evaluate the model
dev_accuracy = evaluate_model(model, dev_loader)
test_accuracy = evaluate_model(model, test_loader)
train_accuracy = evaluate_model(model, train_loader)

print(f'Training Set Accuracy: {train_accuracy}')
print(f'Development Set Accuracy: {dev_accuracy}')
print(f'Test Set Accuracy: {test_accuracy}')

In [None]:
class Attention(nn.Module):
    def __init__(self, feature_dim, step_dim, bias=True, **kwargs):
        super(Attention, self).__init__(**kwargs)
        
        self.feature_dim = feature_dim
        self.step_dim = step_dim
        self.bias = bias

        self.attention = nn.Linear(feature_dim, 1, bias=bias)

    def forward(self, x):
        eij = self.attention(x)
        
        eij = torch.tanh(eij)
        eij = eij.squeeze(2)
        alpha = torch.softmax(eij, dim=1)
        
        # Weight input with attention weights
        weighted_input = x * alpha.unsqueeze(2)
        return torch.sum(weighted_input, 1), alpha

class TextCNNWithAttention(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(TextCNNWithAttention, self).__init__()
        self.conv1 = nn.Conv1d(input_dim, 128, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(128, 128, kernel_size=3, padding=1)
        self.dropout = nn.Dropout(0.5)
        self.fc1 = nn.Linear(128, 64)
        self.fc2 = nn.Linear(64, num_classes)
        self.relu = nn.ReLU()
        self.attention = Attention(128, 35)  # Assuming 35 is the max sequence length

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        
        # Apply attention
        attn_x, _ = self.attention(x.permute(0, 2, 1))
        
        x = self.fc1(attn_x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x


dims = 128
# Initialize the model, loss function, and optimizer
model = TextCNNWithAttention(input_dim=dims, num_classes=3)  # Assuming 3 classes for your problem
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0005, weight_decay=1e-4)

# Training loop
for epoch in range(200):
    total_loss = 0
    for inputs, labels in train_loader:
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)

        # Compute loss
        loss = criterion(outputs, labels)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    if epoch % 5 == 0:
        print(f'Epoch {epoch + 1}, Loss: {total_loss / len(train_loader)}')
        
        train_accuracy = evaluate_model(model, train_loader)
        print(f'Training Set Accuracy: {train_accuracy}')
        
        dev_accuracy = evaluate_model(model, dev_loader)
        print(f'Development Set Accuracy: {dev_accuracy}')
        
        test_accuracy = evaluate_model(model, test_loader)
        print(f'Test Set Accuracy: {test_accuracy}')
        print()