## Training Script

In [2]:
import torch
import pandas as pd
import numpy as np
from transformers import BertTokenizer, BertForSequenceClassification, TrainingArguments, Trainer
from sklearn.preprocessing import LabelEncoder
from datasets import Dataset
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
import torch.nn as nn

# ========== 1️⃣ Load Train, Validation, and Test Data ==========
train_df = pd.read_csv("train_set_1.csv")
val_df = pd.read_csv("val_set_1.csv")
test_df = pd.read_csv("test_set_1.csv")

import numpy as np
from sklearn.utils.class_weight import compute_class_weight
from collections import Counter

# Get the class distribution

# Convert labels to numerical format
label_encoder = LabelEncoder()
train_df["label"] = label_encoder.fit_transform(train_df["label"])
val_df["label"] = label_encoder.transform(val_df["label"])
test_df["label"] = label_encoder.transform(test_df["label"])

class_counts = Counter(train_df['label'])
classes = np.unique(train_df['label'])

# Compute class weights
class_weights = compute_class_weight('balanced', classes=classes, y=train_df['label'])
class_weight_dict = dict(zip(classes, class_weights))

# ========== 2️⃣ Tokenization ==========
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class_weights_tensor = torch.tensor(list(class_weight_dict.values()), dtype=torch.float, device=device)

def encode_texts(examples):
    return tokenizer(examples["text"], padding="max_length", truncation=True, max_length=512)

# Convert Pandas DataFrame to Hugging Face Dataset
train_dataset = Dataset.from_pandas(train_df)
val_dataset = Dataset.from_pandas(val_df)
test_dataset = Dataset.from_pandas(test_df)

# Apply Tokenization
train_dataset = train_dataset.map(encode_texts, batched=True)
val_dataset = val_dataset.map(encode_texts, batched=True)
test_dataset = test_dataset.map(encode_texts, batched=True)

# Set the dataset format for PyTorch
train_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
val_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
test_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])

# ========== 3️⃣ Load Pre-trained BERT Model ==========
model = BertForSequenceClassification.from_pretrained("bert-base-uncased", num_labels=len(label_encoder.classes_))

# ========== 4️⃣ Define Training Arguments ==========
training_args = TrainingArguments(
    output_dir="./bert_classification",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=100,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy"
)

class CustomTrainer(Trainer):
    def __init__(self, *args, class_weights=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.class_weights = class_weights  # Save class weights
        
    def compute_loss(self, model, inputs, return_outputs=False):
        # Forward pass
        labels = inputs.get("labels")
        outputs = model(**inputs)
        logits = outputs.get("logits")
        logits = logits.to(device)
        labels = labels.to(device)
        
        # Use CrossEntropyLoss with class weights if labels are provided
        if self.class_weights is not None and labels is not None:
            loss_fct = nn.CrossEntropyLoss(weight=self.class_weights)
            loss = loss_fct(logits.view(-1, self.model.config.num_labels), labels.view(-1))
        else:
            loss = outputs.loss

        return (loss, outputs) if return_outputs else loss

# ========== 5️⃣ Define Metrics Function ==========
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    acc = accuracy_score(labels, predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average="weighted")
    return {"accuracy": acc, "precision": precision, "recall": recall, "f1": f1}

# ========== 6️⃣ Train Model using Trainer ==========
trainer = CustomTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    class_weights=class_weights_tensor
)

trainer.train()

# ========== 7️⃣ Save the Trained Model ==========
trainer.save_model("./bert_text_classifier")
print("Model saved successfully!")

# ========== 8️⃣ Evaluate on Test Set ==========
test_results = trainer.evaluate(test_dataset)
print("Test Accuracy:", test_results["eval_accuracy"])


Map:   0%|          | 0/21861 [00:00<?, ? examples/s]

Map:   0%|          | 0/2733 [00:00<?, ? examples/s]

Map:   0%|          | 0/2733 [00:00<?, ? examples/s]

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
dataloader_config = DataLoaderConfiguration(dispatch_batches=None, split_batches=False, even_batches=True, use_seedable_sampler=True)


Epoch,Training Loss,Validation Loss,Accuracy,Precision,Recall,F1
1,0.2182,0.18757,0.962312,0.962613,0.962312,0.962383
2,0.1525,0.153029,0.964874,0.965489,0.964874,0.964961
3,0.0812,0.142295,0.972192,0.972307,0.972192,0.972212


Model saved successfully!


Test Accuracy: 0.9612147822905233


In [None]:
# !python -m spacy download en_core_web_sm
import re
import nltk
import spacy

# Load spaCy's English model

nlp = spacy.load("en_core_web_sm")

# Download NLTK stopwords
nltk.download('stopwords')
from nltk.corpus import stopwords

def remove_stopwords(text):
    before = text
    stop_words = set(stopwords.words('english'))
    words = text.split()
    filtered_text = [word for word in words if word.lower() not in stop_words]
    text = " ".join(filtered_text)
    # print_change(before, text, "Remove Stopwords")
    return text

def lemmatize_text(text):
    before = text
    doc = nlp(text)
    lemmatized_text = " ".join([token.lemma_ for token in doc])
    # print_change(before, lemmatized_text, "Lemmatization")
    return lemmatized_text

def lowercase_text(text):
    before = text
    text = text.lower()
    # print_change(before, text, "Lowercasing")
    return text


def remove_whitespaces(text):
    before = text
    text = re.sub(r'\s+', ' ', text)
    text = text.strip()  # Remove leading/trailing spaces
    text = text + '.'  # Append full stop
    # print_change(before, text, "Remove Whitespaces and Add Full Stop")
    return text

# 7. Removing URLs
def remove_urls(text):
    before = text
    text = re.sub(r'http\S+|www\S+|https\S+', '', text)
    # print_change(before, text, "Remove URLs")
    return text

# 8. Replace Ampersand (&) with 'and' and Similar Substitutions
def replace_ampersand(text):
    before = text
    substitutions = {
        "&": "and",
        "%": "percent",
        "$": "dollar",
        "₹": 'rs.',
        "@": "",
        "*": "x",
        "#":'',
        '"': ' ',       
        "'s": ' ',      
        "'": '',       
        "_": ' ',       
        "=": ' ',       
        "|": ' ',
    }
    
    for old, new in substitutions.items():
        text = text.replace(old, new)
    text = re.sub(r'[©®™~^<>\\/`\[\]\(\)\{\}]', ' ', text)
    # print_change(before, text, "Replace Ampersand (&) and Similar Substitutions")
    return text

# 9. Replace Model Numbers or Part Numbers
def replace_model_numbers(text):
    before = text
    # Regex to match common model/part number patterns
    # Match sequences like 'ABC123', '123-XYZ', 'ABC-1234', etc.
    model_number_pattern = r'(?<!\s)([A-Za-z0-9]+(?:[-_\][A-Za-z0-9]+)+)(?!\s)'
    
    # Only replace model numbers with <MODEL>
    text = re.sub(model_number_pattern, lambda match: '<MODEL>' if any(c.isdigit() for c in match.group(0)) else match.group(0), text)
    
    # print_change(before, text, "Model")
    return text

def remove_repeated_phrases(text):
    # Split text into words
    words = text.split()

    # Keep track of seen phrases
    seen_phrases = set()

    # List to store words that are not repeated
    result = []

    # Iterate through words and construct phrases
    for i, word in enumerate(words):
        # Construct potential phrase by joining words
        phrase = ' '.join(words[i:i+1])  # Adjust the range for longer phrases if needed

        # Check if phrase is seen
        if phrase not in seen_phrases:
            result.append(word)
            seen_phrases.add(phrase)

    # Join the result list into a string
    return ' '.join(result)
# Combining All Preprocessing Steps
def preprocess_text(text):
    # print(f"Original Text: {text[:100]}...")  # Show original text (first 100 characters)
    
    # Call each function and apply transformations
    text = remove_stopwords(text)
    text = lemmatize_text(text)
    text = lowercase_text(text)
    text = replace_ampersand(text)
    text = remove_whitespaces(text)
    text = remove_urls(text)
    text = replace_model_numbers(text)
    text = remove_repeated_phrases(text)
    
    # print(f"Processed Text: {text[:100]}...")  # Show processed text (first 100 characters)
    return text


In [None]:
# from api.model.utils.clean_text import preprocess_text
import torch.nn.functional as F
def predict_class(text):
    # Tokenize input text
    text = preprocess_text(text)
    
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
    
    # Move tensors to the same device as model (GPU/CPU)
    inputs = {key: value.to(model.device) for key, value in inputs.items()}
    
    # Perform inference (no need to compute gradients)
    with torch.no_grad():
        outputs = model(**inputs)
    
    # Get the predicted class (index of the highest logit)
    logits = outputs.logits
    predicted_class = torch.argmax(logits, dim=1).item()
    class_scores = logits.squeeze().cpu().numpy()
    probabilities = F.softmax(torch.tensor(class_scores), dim=0).numpy()

    return predicted_class, probabilities

# Example usage
input_text = "I love computers with mac insalled"

predicted_class, probabilities = predict_class(input_text)

# Mapping predicted class to your labels (assuming you have 4 classes)
labels = ["Household", "Books", "Electronics", "Clothing & Accessories"]

predicted_label = labels[predicted_class]

probabilities_with_labels = list(zip(labels, probabilities))

# Sorting probabilities with labels in descending order to see the top classes
probabilities_with_labels = sorted(probabilities_with_labels, key=lambda x: x[1], reverse=True)

print(f"Predicted class: {predicted_label}")
print(f"Class probabilities with labels: {probabilities_with_labels}")