#Text Mining Final Assignment - Sentiment Analysis

##SemEval 2017 Task 4: Sentiment Analysis in Twitter (subtask A)

Subtask A: sentiment classification on a 3-point scale

#Pre-trained Sentiment Analysis Model based on Huggingface Tutorial

Importing needed libraries

In [2]:
!pip install -q transformers

In [3]:
from transformers import pipeline

Example of Sentiment Analysis

In [4]:
sentiment_pipeline = pipeline("sentiment-analysis")
data = ["I love you", "I hate you"]
sentiment_pipeline(data)

No model was supplied, defaulted to distilbert-base-uncased-finetuned-sst-2-english and revision af0f99b (https://huggingface.co/distilbert-base-uncased-finetuned-sst-2-english).
Using a pipeline without specifying a model name and revision in production is not recommended.


config.json:   0%|          | 0.00/629 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

[{'label': 'POSITIVE', 'score': 0.9998656511306763},
 {'label': 'NEGATIVE', 'score': 0.9991129040718079}]

Sentiment analysis Bertweet based model for tweets

In [5]:
specific_model = pipeline(model="finiteautomata/bertweet-base-sentiment-analysis")
specific_model(data)

config.json:   0%|          | 0.00/949 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/540M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/338 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/843k [00:00<?, ?B/s]

bpe.codes:   0%|          | 0.00/1.08M [00:00<?, ?B/s]

added_tokens.json:   0%|          | 0.00/22.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/167 [00:00<?, ?B/s]

emoji is not installed, thus not converting emoticons or emojis into text. Install emoji: pip3 install emoji==0.6.0


[{'label': 'POS', 'score': 0.9916695356369019},
 {'label': 'NEG', 'score': 0.9806600213050842}]

Sentiment analysis Roberta model fine-tuned for tweets, that assigns labels in this way:


*   LABEL_0: Negative
*   LABEL_1: Neutral
*   LABEL_2: Positive





In [7]:
specific_model = pipeline(model="cardiffnlp/twitter-roberta-base-sentiment")
specific_model(data)

config.json:   0%|          | 0.00/747 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

[{'label': 'LABEL_2', 'score': 0.955704927444458},
 {'label': 'LABEL_0', 'score': 0.965427041053772}]

Sentiment analysis on our test dataset using this model

In [7]:
from transformers import AutoModelForSequenceClassification
from transformers import TFAutoModelForSequenceClassification
from transformers import AutoTokenizer
import numpy as np
from scipy.special import softmax
import csv
import urllib.request

In [8]:
MODEL = f"cardiffnlp/twitter-roberta-base-sentiment"
tokenizer = AutoTokenizer.from_pretrained(MODEL)
model = AutoModelForSequenceClassification.from_pretrained(MODEL)

config.json:   0%|          | 0.00/747 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

In [9]:
# Preprocess text (username and link placeholders)
def preprocess(text):
    new_text = []


    for t in text.split(" "):
        t = '@user' if t.startswith('@') and len(t) > 1 else t
        t = 'http' if t.startswith('http') else t
        new_text.append(t)
    return " ".join(new_text)

In [10]:
# Function to classify text
def classify_text(text):
    text = preprocess(text)
    encoded_input = tokenizer(text, return_tensors='pt')
    output = model(**encoded_input)
    scores = output.logits[0].detach().numpy()
    scores = softmax(scores)
    return scores

In [11]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

# Path to your input file containing test data
input_file_path = "/content/drive/MyDrive/Sentiment_analysis/twitter-2016test-A.txt"

# Initialize variables for evaluation
total_samples = 0
correct_predictions = 0
true_labels = []
predicted_labels = []

# Mapping of label strings to numerical values
label_map = {'negative': 0, 'positive': 1, 'neutral': 2}

# Read input data from the file and classify each text
with open(input_file_path, 'r', encoding='utf-8') as file:
    lines = file.readlines()
    for line in lines:
        # Split each line into three fields: identifier, label, text
        fields = line.strip().split('\t')
        if len(fields) == 3:
            identifier, label, text = fields
            label = label.lower()  # Convert label to lowercase for consistency
            if label in label_map:
                # Perform classification for the text snippet (assuming you have a function/classify_text)
                scores = classify_text(text)
                predicted_label = np.argmax(scores)  # Predicted label index with highest score

                # Update evaluation metrics
                true_labels.append(label_map[label])
                predicted_labels.append(predicted_label)
                total_samples += 1
                if label_map[label] == predicted_label:
                    correct_predictions += 1

# Calculate accuracy
accuracy = accuracy_score(true_labels, predicted_labels)
print(f"Accuracy: {accuracy:.4f}")

# Calculate precision, recall, F1-score for each class
precision, recall, f1, _ = precision_recall_fscore_support(true_labels, predicted_labels, average=None)

# Calculate average recall across classes
average_recall = np.mean(recall)

# Calculate macro-averaged F1 over positive and negative labels
positive_negative_f1 = np.mean([f1[label_map['positive']], f1[label_map['negative']]])

# Calculate average F1 between F1 on positive and F1 on negative labels
average_positive_negative_f1 = (f1[label_map['positive']] + f1[label_map['negative']]) / 2

# Print the computed metrics
print(f"Average Recall: {average_recall:.4f}")
print(f"Macro-averaged F1 (Pos/Neg): {positive_negative_f1:.4f}")
print(f"Average F1 (Pos/Neg): {average_positive_negative_f1:.4f}")


Accuracy: 0.2409
Average Recall: 0.3814
Macro-averaged F1 (Pos/Neg): 0.4771
Average F1 (Pos/Neg): 0.4771


Fine-tuning this model using the training and the validation datasets

In [15]:
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import torch
from torch.utils.data import Dataset, DataLoader

In [16]:
# Define a custom dataset class
class CustomDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length):
        self.encodings = tokenizer(texts, truncation=True, padding='max_length', max_length=max_length)
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

In [3]:
# Load the pre-trained model and tokenizer
MODEL = "cardiffnlp/twitter-roberta-base-sentiment"
tokenizer = AutoTokenizer.from_pretrained(MODEL)
model = AutoModelForSequenceClassification.from_pretrained(MODEL)

config.json:   0%|          | 0.00/747 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/899k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/499M [00:00<?, ?B/s]

In [18]:
# Define paths to your training and validation files
train_file_path = "/content/drive/MyDrive/Sentiment_analysis/twitter-2016train-A.txt"
validation_file_path = "/content/drive/MyDrive/Sentiment_analysis/twitter-2016dev-A.txt"

# Read training data from the file
train_texts = []
train_labels = []

# Read training data from the file and split it into texts and labels
with open(train_file_path, 'r', encoding='utf-8') as train_file:
    train_lines = train_file.readlines()
    for line in train_lines:
        fields = line.strip().split('\t')
        if len(fields) == 3:
            train_texts.append(fields[2])  # Text is in the third column
            train_labels.append(fields[1].lower())  # Label is in the second column

# Map string labels to numerical values
label_map = {'negative': 0, 'positive': 1, 'neutral': 2}
train_labels = [label_map[label] for label in train_labels]

# Read validation data from the file
val_texts = []
val_labels = []

# Read validation data from the file and split it into texts and labels
with open(validation_file_path, 'r', encoding='utf-8') as val_file:
    val_lines = val_file.readlines()
    for line in val_lines:
        fields = line.strip().split('\t')
        if len(fields) == 3:
            val_texts.append(fields[2])  # Text is in the third column
            val_labels.append(fields[1].lower())  # Label is in the second column

# Map string labels to numerical values for validation data
val_labels = [label_map[label] for label in val_labels]

# Define dataset and data loaders for training and validation
train_dataset = CustomDataset(train_texts, train_labels, tokenizer, max_length=128)
val_dataset = CustomDataset(val_texts, val_labels, tokenizer, max_length=128)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16)

In [19]:
# Define optimizer and loss function
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-5)
loss_fn = torch.nn.CrossEntropyLoss()

# Fine-tuning loop
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

num_epochs = 3
for epoch in range(num_epochs):
    model.train()
    for batch in train_loader:
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()

    model.eval()
    val_accuracy = 0
    total_val_samples = 0
    with torch.no_grad():
        for batch in val_loader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)
            outputs = model(input_ids, attention_mask=attention_mask)
            logits = outputs.logits
            predictions = torch.argmax(logits, dim=1)
            val_accuracy += torch.sum(predictions == labels).item()
            total_val_samples += len(labels)

    val_accuracy /= total_val_samples
    print(f"Epoch {epoch + 1}/{num_epochs} - Validation Accuracy: {val_accuracy:.4f}")

# Save the fine-tuned model
model.save_pretrained("/content/drive/MyDrive/Sentiment_analysis")

Epoch 1/3 - Validation Accuracy: 0.7539
Epoch 2/3 - Validation Accuracy: 0.7459
Epoch 3/3 - Validation Accuracy: 0.7479


Testing the fine-tuned model on the test data

In [4]:
from transformers import AutoModelForSequenceClassification, AutoTokenizer
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import recall_score, f1_score, accuracy_score

# Define a custom dataset class
class CustomDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length):
        self.encodings = tokenizer(texts, truncation=True, padding='max_length', max_length=max_length)
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

# Load the saved fine-tuned model and tokenizer
MODEL = "/content/drive/MyDrive/Sentiment_analysis"
#tokenizer = AutoTokenizer.from_pretrained(MODEL)
model = AutoModelForSequenceClassification.from_pretrained(MODEL)

# Define path to your test file
test_file_path = "/content/drive/MyDrive/Sentiment_analysis/twitter-2016test-A.txt"

# Read test data from the file
test_texts = []
test_labels = []

# Read test data from the file and split it into texts and labels
with open(test_file_path, 'r', encoding='utf-8') as test_file:
    test_lines = test_file.readlines()
    for line in test_lines:
        fields = line.strip().split('\t')
        if len(fields) == 3:
            test_texts.append(fields[2])  # Text is in the third column
            test_labels.append(fields[1].lower())  # Label is in the second column

# Map string labels to numerical values
label_map = {'negative': 0, 'positive': 1, 'neutral': 2}
test_labels = [label_map[label] for label in test_labels]


# Define dataset and data loader for the test set
test_dataset = CustomDataset(test_texts, test_labels, tokenizer, max_length=128)
test_loader = DataLoader(test_dataset, batch_size=16)

# Evaluate the model on the test set
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
model.eval()

test_accuracy = 0
total_test_samples = 0

# Calculate evaluation metrics using scikit-learn's classification_report
true_labels = []
predicted_labels = []

with torch.no_grad():
    for batch in test_loader:
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        predictions = torch.argmax(logits, dim=1)

        true_labels.extend(labels.cpu().numpy())
        predicted_labels.extend(predictions.cpu().numpy())

# Calculate accuracy, precision, recall, and F1-score
accuracy = accuracy_score(true_labels, predicted_labels)

# Compute average recall for each class
recall_negative = recall_score(true_labels, predicted_labels, labels=[0], average='weighted')
recall_positive = recall_score(true_labels, predicted_labels, labels=[1], average='weighted')
recall_neutral = recall_score(true_labels, predicted_labels, labels=[2], average='weighted')
avg_recall = (recall_negative + recall_positive + recall_neutral) / 3

# Compute F1 score for positive and negative labels
f1_positive = f1_score(true_labels, predicted_labels, labels=[1], average='weighted')
f1_negative = f1_score(true_labels, predicted_labels, labels=[0], average='weighted')
avg_f1_pos_neg = (f1_positive + f1_negative) / 2

# Print the evaluation metrics
print(f"Accuracy: {accuracy:.4f}")
print(f"Average Recall: {avg_recall:.4f}")
print(f"Average F1 (positive and negative): {avg_f1_pos_neg:.4f}")


Accuracy: 0.7386
Average Recall: 0.7624
Average F1 (positive and negative): 0.7702
