# NLP Assignment 1 (40% of grade): Sentiment Analysis from Tweets

This coursework will involve you implementing functions for a text classifier, which you will train to identify the **sentiment expressed in a text** in a dataset of approx. 27,000 entries, which will be split into a 80%/20% training/test split.

In this template you are given the basis for that implementation, though some of the functions are missing, which you have to fill in.

Follow the instructions file **NLP_Assignment_1_Instructions.pdf** for details of each question - the outline of what needs to be achieved for each question is as below.

You must submit all **ipython notebooks and extra resources you need to run the code if you've added them** in the code submission, and a **2 page report (pdf)** in the report submission on QMPlus where you report your methods and findings according to the instructions file for each question.

In [None]:
import csv                               # csv reader
from sklearn.svm import LinearSVC
from nltk.classify import SklearnClassifier
from sklearn.pipeline import Pipeline
from sklearn.metrics import precision_recall_fscore_support # to report on precision and recall
import numpy as np

In [None]:
def load_data(path):
    """Load data from a tab-separated file and append it to raw_data."""
    with open(path) as f:
        reader = csv.reader(f, delimiter='\t')
        for line in reader:
            if line[0] == "Id":  # skip header
                continue
            (label, text) = parse_data_line(line)
            raw_data.append((text, label))

def split_and_preprocess_data(percentage):
    """Split the data between train_data and test_data according to the percentage
    and performs the preprocessing."""
    num_samples = len(raw_data)
    num_training_samples = int((percentage * num_samples))
    for (text, label) in raw_data[:num_training_samples]:
        train_data.append((to_feature_vector(pre_process(text)),label))
    for (text, label) in raw_data[num_training_samples:]:
        test_data.append((to_feature_vector(pre_process(text)),label))

# Question 1: Input and Basic preprocessing (10 marks)

In [None]:
def parse_data_line(data_line):
    # Should return a tuple of the label as just positive or negative and the statement
    # e.g. (label, statement)
    label = data_line[1]  # Assuming the label is in the second column
    statement = data_line[2]
    return (label, statement)

# Simple preprocess

In [None]:
import re
def pre_process(text):
    # Should return a list of tokens
    # DESCRIBE YOUR METHOD IN WORDS
    print("original:", text)
    # sentence segmentation - assume already done


    # word tokenisation
    text = re.sub(r"(\w)([.,;:!?'\"”\)])", r"\1 \2", text) # separates punctuation at ends of strings
    text = re.sub(r"([.,;:!?'\"“\(\)])(\w)", r"\1 \2", text) # separates punctuation at beginning of strings

    # Convert multiple spaces to a single space
    text = re.sub(r"\s+", " ", text).strip()


    print("tokenising:", text)
    tokens = re.split(r"\s+",text)
    # normalisation - only by lower casing for now
    tokens = [t.lower() for t in tokens]
    return tokens

# Simple feature extraction

In [None]:
global_feature_dict = {}  # A global dictionary of features

def to_feature_vector(tokens, binary=True):
    
    feature_vector = {}

    for token in tokens:
        # Update the feature vector
        if binary:
            feature_vector[token] = 1  # Binary representation (1 if present)
        else:
            # Bag-of-words count representation
            feature_vector[token] = feature_vector.get(token, 0) + 1

        # Update global feature dictionary
        if token not in global_feature_dict:
            global_feature_dict[token] = len(global_feature_dict)  # unique index for each token

    return feature_vector


In [None]:
# TRAINING AND VALIDATING OUR CLASSIFIER

def train_classifier(data):
    print("Training Classifier...")
    pipeline =  Pipeline([('svc', LinearSVC())])

    sklearn_clf = SklearnClassifier(pipeline)  # Initialize SklearnClassifier
    sklearn_clf.train(data)  # Train in place
    if isinstance(sklearn_clf, SklearnClassifier):
        print("Classifier successfully trained.")

    return sklearn_clf

# Question 3: Cross-validation (20 marks)

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, classification_report
import numpy as np

def cross_validate(dataset, folds):
    results = []
    fold_size = int(len(dataset) / folds) + 1

    for i in range(0, len(dataset), int(fold_size)):
        # Define start and end for the test fold
        start = i
        end = min(i + fold_size, len(dataset))  # Ensure end does not exceed dataset length

        # Split the data into training and test sets
        test_data = dataset[start:end]
        train_data = dataset[:start] + dataset[end:]

        # Train classifier on training data
        classifier = train_classifier(train_data)

        # Get true labels and predictions for test data
        true_labels = [label for _, label in test_data]
        predictions = predict_labels([text for text, _ in test_data], classifier)

        # Calculate metrics for this fold
        precision = precision_score(true_labels, predictions, pos_label='positive')
        recall = recall_score(true_labels, predictions, pos_label='positive')
        f1 = f1_score(true_labels, predictions, pos_label='positive')
        accuracy = accuracy_score(true_labels, predictions)
        precision = precision_score(true_labels, predictions, pos_label='positive')
        recall = recall_score(true_labels, predictions, pos_label='positive')
        f1 = f1_score(true_labels, predictions, pos_label='positive')
        accuracy = accuracy_score(true_labels, predictions)



        # Append results for this fold
        fold_result = {
            'Fold': i // fold_size + 1,
            'precision': precision,
            'recall': recall,
            'f1_score': f1,
            'accuracy': accuracy
        }
        results.append(fold_result)

        print("Fold %d - Precision: %.4f, Recall: %.4f, F1 Score: %.4f, Accuracy: %.4f" %
              (i // fold_size + 1, precision, recall, f1, accuracy))

        print(f"Fold {i // fold_size + 1} Classification Report:")
        print(classification_report(true_labels, predictions, target_names=['negative', 'positive']))

    # Calculate average results over all folds
    avg_results = {metric: np.mean([fold[metric] for fold in results]) for metric in results[0]}
    avg_results['Fold'] = 'Average'
    results.append(avg_results)  # Append the average row to the results

    print("\nAverage Cross-Validation Results")
    print("Precision: %.4f, Recall: %.4f, F1 Score: %.4f, Accuracy: %.4f" %
          (avg_results['precision'], avg_results['recall'], avg_results['f1_score'], avg_results['accuracy']))
    cv_results=results
    return cv_results


In [None]:
# PREDICTING LABELS GIVEN A CLASSIFIER

def predict_labels(samples, classifier):
    """Assuming preprocessed samples, return their predicted labels from the classifier model."""
    return classifier.classify_many(samples)

def predict_label_from_raw(sample, classifier):
    """Assuming raw text, return its predicted label from the classifier model."""
    return classifier.classify(to_feature_vector(preProcess(reviewSample)))

In [None]:
# MAIN

# loading reviews
# initialize global lists that will be appended to by the methods below
raw_data = []          # the filtered data from the dataset file
train_data = []        # the pre-processed training data as a percentage of the total dataset
test_data = []         # the pre-processed test data as a percentage of the total dataset


# references to the data files
data_file_path = 'sentiment-dataset.tsv'

# Do the actual stuff (i.e. call the functions we've made)
# We parse the dataset and put it in a raw data list
print("Now %d rawData, %d trainData, %d testData" % (len(raw_data), len(train_data), len(test_data)),
      "Preparing the dataset...",sep='\n')

load_data(data_file_path)

# We split the raw dataset into a set of training data and a set of test data (80/20)
# You do the cross validation on the 80% (training data)
# We print the number of training samples and the number of features before the split
print("Now %d rawData, %d trainData, %d testData" % (len(raw_data), len(train_data), len(test_data)),
      "Preparing training and test data...",sep='\n')


split_and_preprocess_data(0.8)

# We print the number of training samples and the number of features after the split
print("After split, %d rawData, %d trainData, %d testData" % (len(raw_data), len(train_data), len(test_data)),
      "Training Samples: ", len(train_data), "Features: ", len(global_feature_dict), sep='\n')


In [None]:
results = cross_validate(train_data, 10)  # will work and output overall performance of p, r, f-score when cv implemented

In [None]:
results_df = pd.DataFrame(results)
print(results_df)


# Question 4: Error Analysis (20 marks)

In [None]:
from sklearn import metrics
import matplotlib.pyplot as plt
# a function to make the confusion matrix readable and pretty
def confusion_matrix_heatmap(y_test, preds, labels):
    """Function to plot a confusion matrix"""
    # pass labels to the confusion matrix function to ensure right order
    # cm = metrics.confusion_matrix(y_test, preds, labels)
    cm = metrics.confusion_matrix(y_test, preds, labels=labels)
    fig = plt.figure(figsize=(10,10))
    ax = fig.add_subplot(111)
    cax = ax.matshow(cm)
    plt.title('Confusion matrix of the classifier')
    fig.colorbar(cax)
    ax.set_xticks(np.arange(len(labels)))
    ax.set_yticks(np.arange(len(labels)))
    ax.set_xticklabels( labels, rotation=45)
    ax.set_yticklabels( labels)

    for i in range(len(cm)):
        for j in range(len(cm)):
            text = ax.text(j, i, cm[i, j],
                           ha="center", va="center", color="w")

    plt.xlabel('Predicted')
    plt.ylabel('True')

    # fix for mpl bug that cuts off top/bottom of seaborn viz:
    b, t = plt.ylim() # discover the values for bottom and top
    b += 0.5 # Add 0.5 to the bottom
    t -= 0.5 # Subtract 0.5 from the top
    plt.ylim(b, t) # update the ylim(bottom, top) values
    plt.show() # ta-da!
    plt.show()

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, classification_report
import numpy as np

def error_output (dataset, folds):
    results = []
    fold_size = int(len(dataset) / folds) + 1
    false_predictions = []
    misclassified_samples = []


    for i in range(0, len(dataset), int(fold_size)):
        start = i
        end = min(i + fold_size, len(dataset))  # Ensure end does not exceed dataset length

        # Split the data into training and test sets
        test_data = dataset[start:end]
        train_data = dataset[:start] + dataset[end:]

        # Train classifier on training data
        classifier = train_classifier(train_data)

        # Get true labels and predictions for test data
        true_labels = [label for _, label in test_data]
        texts = [text for text, _ in test_data]
        predictions = predict_labels(texts, classifier)

        # Collect false predictions for the first fold
        if i == 0:  # First fold
            false_predictions = [
                {'Text': text, 'True Label': true_label, 'Predicted Label': pred}
                for text, true_label, pred in zip(texts, true_labels, predictions)
                if true_label != pred
            ]
            for text, true_label, pred_label in zip(texts, true_labels, predictions):
              if true_label != pred_label:
                misclassified_samples.append({
                    'Fold': i // fold_size + 1,
                    'Text': text,
                    'True Label': true_label,
                    'Predicted Label': pred_label
                })

        # Write false predictions from the first fold to a file
    with open("false_predictions_fold_1.txt", "w") as f:
        for fp in false_predictions:
            f.write(f"Text: {fp['Text']}\nTrue Label: {fp['True Label']}\nPredicted Label: {fp['Predicted Label']}\n\n")
    confusion_matrix_heatmap(true_labels, predictions, labels=['positive', 'negative'])
    return results


In [None]:
error_output (train_data, 10)

In [None]:
false_positives = []
false_negatives = []

# Loop to collect false positives and false negatives for the positive label
for text, true_label, pred_label in zip(test_texts, test_labels, predicted_labels):
    if true_label == 'negative' and pred_label == 'positive':  # False Positive
        false_positives.append((text, true_label, pred_label))
    elif true_label == 'positive' and pred_label == 'negative':  # False Negative
        false_negatives.append((text, true_label, pred_label))

# Print or save to file
print("False Positives:")
for fp in false_positives:
    print(f"Text: {fp[0]} | True Label: {fp[1]} | Predicted Label: {fp[2]}")

print("\nFalse Negatives:")
for fn in false_negatives:
    print(f"Text: {fn[0]} | True Label: {fn[1]} | Predicted Label: {fn[2]}")

# Save to file for further analysis
with open("error_analysis.txt", "w") as f:
    for fp in false_positives:
        f.write(f"Text: {fp[0]} | True Label: {fp[1]} | Predicted Label: {fp[2]}\n")
    for fn in false_negatives:
        f.write(f"Text: {fn[0]} | True Label: {fn[1]} | Predicted Label: {fn[2]}\n")

# Questions 5: Optimising pre-processing and feature extraction (30 marks)

**Note:** it is advisable to implement question 5 in a separate notebook where you further develop the pre-processing and feature extraction functions you implemented above.

In [None]:
# Finally, check the accuracy of your classifier by training on all the traning data
# and testing on the test set
# Will only work once all functions are complete
functions_complete = False  # set to True once you're happy with your methods for cross val
if functions_complete:
    print(test_data[0])   # have a look at the first test data instance
    classifier = train_classifier(train_data)  # train the classifier
    test_true = [t[1] for t in test_data]   # get the ground-truth labels from the data
    test_pred = predict_labels([x[0] for x in test_data], classifier)  # classify the test data to get predicted labels
    final_scores = precision_recall_fscore_support(test_true, test_pred, average='weighted') # evaluate
    print("Done training!")
    print("Precision: %f\nRecall: %f\nF Score:%f" % final_scores[:3])