In [1]:
# !pip install textblob
# !pip install gensim
# nltk.download('stopwords')
# python -m spacy download en_core_web_sm

In [2]:
import re
import numpy as np
import pandas as pd
from textblob import Word
from nltk.corpus import stopwords

In [3]:
# Importing the training data

data_folder_path = "C:\\Users\\swapn\\OneDrive\\Desktop\\NLP\\Assignment2\\NLP_AUTUMN_ASSIGNMENT_DATA\\"

train_file_path = data_folder_path + "NLP_ass_train.tsv"
val_file_path = data_folder_path + "NLP_ass_valid.tsv"
test_file_path = data_folder_path + "NLP_ass_test.tsv"

train_df = pd.read_csv(train_file_path, delimiter='\t')
val_df = pd.read_csv(val_file_path, delimiter='\t')
test_df = pd.read_csv(test_file_path, delimiter='\t')


# Renaming the columns
df1 = pd.DataFrame({train_df.columns[0]: [train_df.columns[0]], train_df.columns[1]: [train_df.columns[1]]})
train_df = pd.concat([df1, train_df])
train_df = train_df.reset_index(drop=True)
train_df.columns = ["Text", "Label"]

df2 = pd.DataFrame({val_df.columns[0]: [val_df.columns[0]], val_df.columns[1]: [val_df.columns[1]]})
val_df = pd.concat([df2, val_df])
val_df = val_df.reset_index(drop=True)
val_df.columns = ["Text", "Label"]

df3 = pd.DataFrame({test_df.columns[0]: [test_df.columns[0]], test_df.columns[1]: [test_df.columns[1]]})
test_df = pd.concat([df3, test_df])
test_df = test_df.reset_index(drop=True)
test_df.columns = ["Text", "Label"]

In [4]:
def preprocess_text(text):
    # Convert text to lowercase
    text = text.lower()
    # Remove special characters and symbols
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    
    # Handling Negations
    # Detect and handle negations by appending "not_" to the words that follow negation terms.
    words = text.split()
    for i in range(len(words)):
        if words[i] == 'not' and i < len(words) - 1:
            words[i + 1] = 'not_' + words[i + 1]
    
    text = ' '.join(words)
    
    # Tokenize the text (split it into words)
    words = text.split()
    
    # Remove stopwords
    stop_words = set(stopwords.words('english'))
    words = [word for word in words if word not in stop_words]
    text = ' '.join(words)
    
    # Token Normalization
    # Normalize words by reducing them to their base forms.
    words = text.split()
    normalized_words = [Word(word).lemmatize() for word in words]
    
    # Join the words back into a cleaned text
    cleaned_text = ' '.join(normalized_words)
    return cleaned_text


In [5]:
# Preprocessing the data
train_df['Text'] = train_df['Text'].apply(preprocess_text)
val_df['Text'] = val_df['Text'].apply(preprocess_text)
test_df['Text'] = test_df['Text'].apply(preprocess_text)

In [6]:
from sklearn.preprocessing import LabelEncoder

X_train = train_df[train_df.columns[0]].tolist()
X_val = val_df[val_df.columns[0]].tolist()
X_test = test_df[test_df.columns[0]].tolist()
y_train = train_df[train_df.columns[1]].tolist()
y_val = val_df[val_df.columns[1]].tolist()
y_test = test_df[test_df.columns[1]].tolist()

from sklearn.preprocessing import LabelEncoder

# Suppose 'train_labels' is a list of string labels
label_encoder = LabelEncoder()
y_train = label_encoder.fit_transform(y_train)
y_val = label_encoder.fit_transform(y_val)
y_test = label_encoder.fit_transform(y_test)


# Encode the labels using LabelEncoder
# label_encoder = LabelEncoder()
# y_train = label_encoder.fit_transform(y_train)
# y_val = label_encoder.fit_transform(y_val)
# y_test = label_encoder.fit_transform(y_test)

In [7]:
import torch
from transformers import BertTokenizer, BertForSequenceClassification
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
model = BertForSequenceClassification.from_pretrained("bert-base-uncased")

  from .autonotebook import tqdm as notebook_tqdm
Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [8]:
# Tokenize the sentences and find their lengths
train_tokenized_sentences = [tokenizer.tokenize(sentence) for sentence in X_train]
lengths = [len(tokens) for tokens in train_tokenized_sentences]

# Find the maximum length
max_length = max(lengths)

print("Maximum length:", max_length)

Maximum length: 105


In [9]:
val_tokenized_sentences = [tokenizer.tokenize(sentence) for sentence in X_val]
lengths = [len(tokens) for tokens in val_tokenized_sentences]

# Find the maximum length
val_max_length = max(lengths)

print("Maximum length:", val_max_length)

Maximum length: 81


In [10]:
test_tokenized_sentences = [tokenizer.tokenize(sentence) for sentence in X_test]
lengths = [len(tokens) for tokens in test_tokenized_sentences]

# Find the maximum length
test_max_length = max(lengths)

print("Maximum length:", test_max_length)

Maximum length: 69


In [11]:
from sklearn.preprocessing import LabelEncoder

# Suppose 'train_labels' is a list of string labels
label_encoder = LabelEncoder()
y_train = label_encoder.fit_transform(y_train)
y_val = label_encoder.fit_transform(y_val)
y_test = label_encoder.fit_transform(y_test)

In [12]:
print("Unique labels in training set:", set(y_train))
print("Unique labels in validation set:", set(y_val))
print("Unique labels in test set:", set(y_test))


Unique labels in training set: {0, 1, 2}
Unique labels in validation set: {0, 1, 2}
Unique labels in test set: {0, 1, 2}


In [13]:
from torch.utils.data import DataLoader, Dataset

class CustomDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length):
        self.encodings = tokenizer(texts, truncation=True, padding=True, max_length=max_length)
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

batch_size = 32

train_dataset = CustomDataset(X_train, y_train, tokenizer, max_length)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

val_dataset = CustomDataset(X_val, y_val, tokenizer, val_max_length)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=True)

test_dataset = CustomDataset(X_test, y_test, tokenizer, test_max_length)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

In [14]:
import torch
from sklearn.metrics import accuracy_score

def evaluate(model, dataloader):
    model.eval()
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            logits = model(input_ids, attention_mask)
            predictions = torch.argmax(logits, dim=1)
            
            all_predictions.extend(predictions.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_predictions)
    print(f"Test Accuracy: {accuracy * 100:.2f}%")
    return accuracy


In [15]:
import re
import numpy as np
import pandas as pd
from textblob import Word
from nltk.corpus import stopwords
import torch
from sklearn.preprocessing import LabelEncoder
from transformers import BertTokenizer, BertForSequenceClassification
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.optim as optim
from transformers import get_linear_schedule_with_warmup
from sklearn.metrics import accuracy_score
from tqdm import tqdm

In [16]:
num_classes = 3

# Model configuration and training loop
class CustomBERTClassifier(nn.Module):
    def __init__(self, num_labels, dropout_prob=0.1):
        super(CustomBERTClassifier, self).__init__()
        self.bert = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=num_labels)
    
    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        logits = outputs.logits
        return logits

In [17]:
# Initialize the model, optimizer, scheduler, and criterion
model = CustomBERTClassifier(num_labels=num_classes)
optimizer = optim.AdamW(model.parameters(), lr=0.001)
num_epochs = 10
num_train_steps = len(train_loader) * num_epochs
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=1500, num_training_steps=num_train_steps)
criterion = nn.CrossEntropyLoss()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


CustomBERTClassifier(
  (bert): BertForSequenceClassification(
    (bert): BertModel(
      (embeddings): BertEmbeddings(
        (word_embeddings): Embedding(30522, 768, padding_idx=0)
        (position_embeddings): Embedding(512, 768)
        (token_type_embeddings): Embedding(2, 768)
        (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (encoder): BertEncoder(
        (layer): ModuleList(
          (0-11): 12 x BertLayer(
            (attention): BertAttention(
              (self): BertSelfAttention(
                (query): Linear(in_features=768, out_features=768, bias=True)
                (key): Linear(in_features=768, out_features=768, bias=True)
                (value): Linear(in_features=768, out_features=768, bias=True)
                (dropout): Dropout(p=0.1, inplace=False)
              )
              (output): BertSelfOutput(
                (dense): Linear(in_features=768, out_features

In [18]:
best_accuracy = 0.0
n_no_improve = 0
patience = 3

for epoch in range(num_epochs):
    model.train()
    for batch in tqdm(train_loader):
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        optimizer.zero_grad()
        logits = model(input_ids, attention_mask)
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()

    # Evaluate on the validation set
    model.eval()
    with torch.no_grad():
        validation_accuracy = evaluate(model, val_loader)  # Implement your validation evaluation function

    print(f"Epoch {epoch+1}/{num_epochs}, Validation Accuracy: {validation_accuracy}")

    if validation_accuracy > best_accuracy:
        best_accuracy = validation_accuracy
        n_no_improve = 0
        torch.save(model.state_dict(), "BERT_Model.pth")
    else:
        n_no_improve += 1

    if n_no_improve >= patience:
        break

# Load the best model weights
model.load_state_dict(torch.load("BERT_Model.pth"))

  4%|▎         | 18/481 [18:02<7:36:35, 59.17s/it] 

In [None]:
test_accuracy = evaluate(model, test_loader)  # Implement your test evaluation function
print(f"Test Accuracy: {test_accuracy}")

from sklearn.metrics import f1_score
# Calculate the macro F1 score
macro_f1 = f1_score(true_labels, predicted_labels, average='macro')

print("Macro F1 Score:", macro_f1)

In [None]:
def calculate_intersection(set1, set2):
    return len(set(set1) & set(set2))

# Example train, validation, and test sets (replace with your actual data)
train_set = set(train_df["Text"])
validation_set = set(val_df["Text"])
test_set = set(test_df["Text"])

# Calculate and print the intersections
intersection_train_validation = calculate_intersection(train_set, validation_set)
intersection_train_test = calculate_intersection(train_set, test_set)
intersection_validation_test = calculate_intersection(validation_set, test_set)

print("Intersection between train and validation sets:", intersection_train_validation)
print("Intersection between train and test sets:", intersection_train_test)
print("Intersection between validation and test sets:", intersection_validation_test)