# Assignment 3

# Q1. Use your own Audio samples and get the aggregate WER using the Whisper  Model. Create at least 5 samples. How can you achieve a better error rate?

## Import the necessary libraries

In [None]:
!pip install opendatasets
!pip install datasets
!pip install torchmetrics

In [None]:
#Import the necessary libraries
import random
import opendatasets as od
import librosa
import numpy as np
import matplotlib.pyplot as plt
import librosa.display
from IPython.display import Audio
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from tqdm import tqdm
from IPython.display import Audio as aud

## Load and Prepare the Dataset

In [None]:
# Load datasets from Kaggle into Colab files
od.download(	"https://www.kaggle.com/datasets/mozillaorg/common-voice")

In [None]:
from datasets import load_dataset, DatasetDict

common_voice = DatasetDict()
data_dir = "/content/common-voice"

common_voice["train"] = load_dataset(
    "csv", data_files={
        "train": [
            f"{data_dir}/cv-valid-train.csv",
            f"{data_dir}/cv-other-train.csv"
        ]
    },
    split="train"
)

common_voice["validation"] = load_dataset(
    "csv", data_files={
        "validation": [
            f"{data_dir}/cv-valid-dev.csv",
            f"{data_dir}/cv-other-dev.csv"
        ]
    },
    split="validation"
)

common_voice["test"] = load_dataset(
    "csv", data_files={
        "test": [
            f"{data_dir}/cv-valid-test.csv",
            f"{data_dir}/cv-other-test.csv"
        ]
    },
    split="test"
)


print(common_voice)

## Correct the audio file path

In [None]:
# Function to prepend data_dir to the filename
def add_data_dir(example):
    files=example['filename'].split('/')
    example["filename"] = data_dir + '/'+files[0]+'/' + example["filename"]
    return example

# Apply the function to the train, validation, and test splits
common_voice["train"] = common_voice["train"].map(add_data_dir)
common_voice["validation"] = common_voice["validation"].map(add_data_dir)
common_voice["test"] = common_voice["test"].map(add_data_dir)

# Check the updated dataset
print(common_voice)

### Select only Audio and Text files

In [None]:
# Import the language codes
from transformers.models.whisper.tokenization_whisper import TO_LANGUAGE_CODE
from transformers import WhisperProcessor, WhisperForConditionalGeneration

## Load the whisper Preprocessor

In [None]:
processor = WhisperProcessor.from_pretrained("openai/whisper-small", language="english", task="transcribe")
Sampling_rate= processor.feature_extractor.sampling_rate

## Pre-Process the Data

In [None]:
# Function to preprocess audio data using WhisperProcessor
def preprcess( audio_sample ):

  audio_array, sampling_rate = librosa.load( audio_sample['filename'], sr= Sampling_rate )
  data = processor(
    audio_array, sampling_rate= sampling_rate, text= audio_sample['text'] ,  return_tensors="pt" )

  return data

Print out the input features and its labels after preparing the sample audio initialised above

In [None]:
test_file_path= [
    '/content/audio-1.m4a', '/content/audio-2.m4a', '/content/audio-3.m4a', '/content/audio-4.m4a', '/content/audio-5.m4a', '/content/audio-6.m4a'
]
random_index = random.randint(0, len(common_voice["train"]) - 1)
sample = [
    {  'filename': test_file_path[0], 'text': "Once upon a time there was a king."  },
    {  'filename': test_file_path[1], 'text': "In the light of the recent events, there is a huge debate on social media calling for a quick change in mentality in general public."  },
    {  'filename': test_file_path[2], 'text': "This is to inform you that the room number L201 has been assigned to the students for the student related activities."  },
    {  'filename': test_file_path[3], 'text': "The State Chattisgarh has opportunities, land and very talented people but they are not making use of it."  },
    {  'filename': test_file_path[4], 'text': "Sometimes there are many assignment and timetable and schedule becomes very hectic."  },
    {  'filename': test_file_path[5], 'text': "Many companies ask DSA questions to qualify the candidates in the open assessment rounds but they might not help in actual job which is based on development."  },

]

## Load the Whisper Model

In [None]:
# Load the pre-trained Whisper model
model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small")
device= torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
ran= random.randint(0,5)

# use the function defined above.
data = preprcess( sample[ran] )
predicted_text_tn = model.generate( data['input_features'] ,max_length=20, num_return_sequences= 1 )
predicted_text_tn

## Evaluate the model

In [None]:
translated = processor.batch_decode(predicted_text_tn)
print(translated, "\n", sample[ran]['text'])

from torchmetrics.text import WordErrorRate as WER
wer= WER()
wer(translated, sample[ran]['text'])

# Q2. For Vision Transformer, use ROBOFLOW to get a completely new dataset and run an Multi Label Image Classification Task. Try using a different model  (like DieT, etc)

In [None]:
!pip install -q git+https://github.com/huggingface/transformers

# Importing the dataset

In [None]:
!pip install transformers
!pip install roboflow

In [None]:
from roboflow import Roboflow

rf = Roboflow( api_key="UNKX12zHHhkSfRq8uoyu" )
project = rf.workspace("mem-g72lg").project( "labelled-classification" )
version = project.version( 1 )
dataset = version.download( "folder" )

In [None]:
import torchvision
from torchvision.transforms import ToTensor
from torch.utils.data import DataLoader
from transformers import DeiTModel
import torch.nn as nn
from transformers.modeling_outputs import SequenceClassifierOutput
from torch.nn import functional as func
from transformers import DeiTFeatureExtractor
import torch

train_ds = torchvision.datasets.ImageFolder('/content/labelled-classification-1/train', transform=ToTensor())
valid_ds = torchvision.datasets.ImageFolder('/content/labelled-classification-1/valid', transform=ToTensor())
test_ds = torchvision.datasets.ImageFolder('/content/labelled-classification-1/test', transform=ToTensor())

# Define DataLoaders
BATCH_SIZE = 16
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
valid_loader = DataLoader(valid_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

# Print dataset information
print(f"Number of train samples: {len(train_ds)}")
print(f"Number of validation samples: {len(valid_ds)}")
print(f"Number of test samples: {len(test_ds)}")
print(f"Detected Classes are: {test_ds.class_to_idx}")

In [None]:
class DeiTForImageClassification(nn.Module):

    def __init__(self, num_labels):

        model_name= "facebook/deit-base-distilled-patch16-224"
        # inheriting from the nn module
        super(DeiTForImageClassification, self).__init__()
        self.model = DeiTModel.from_pretrained(model_name)

        # defining the dropout layer and linear classifier layer
        self.dropout = nn.Dropout(0.5)
        self.classifier = nn.Linear(self.model.config.hidden_size, num_labels)

    def forward(self, px, labels=None):

        # taking out the feature vector from the last hidden layer
        outputs_px = self.model( px )
        hidden_states = outputs_px.last_hidden_state
        logits = self.classifier( self.dropout(hidden_states[:, 0]))  # Assuming you want the first token

        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.classifier.out_features), labels.view(-1))

        return logits, loss

# Making the models with the number of labels
num_labels = len( train_ds.classes )
model = DeiTForImageClassification(num_labels)

In [None]:
# running upto only 2 epochs due to long time taken in training
LEARNING_RATE = 2e-3
EPOCHS = 2
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Set up adam optimizer and cross entropy loss as the loss function
optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE)
loss_func = nn.CrossEntropyLoss()

feature_extractor = DeiTFeatureExtractor.from_pretrained("facebook/deit-base-distilled-patch16-224")

# Training
for epoch in range(EPOCHS):
    model.train()
    for step, (x, y) in enumerate(train_loader):

        # applying feature extrator to get changed features
        x = feature_extractor(images=x, return_tensors="pt")['pixel_values']
        x, y = x.to(device), y.to(device)

        # conduct Forward pass
        optimizer.zero_grad()
        logits, loss = model(x, y)

        # conducting backward pass and further optimization
        loss.backward()
        optimizer.step()

        if step % 50 == 0:
            print(f'Epoch [{epoch+1}/{EPOCHS}], Step [{step+1}/{len(train_loader)}], Loss: {loss.item():.4f}')
        else:
            print(f'Epoch [{epoch+1}/{EPOCHS}], Step [{step+1}/{len(train_loader)}]')

In [None]:
# Evaluating the model and checking its accuracy on the test set

model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for step, (x, y) in enumerate(test_loader):
        x = feature_extractor(images=x, return_tensors="pt")['pixel_values']
        x, y = x.to(device), y.to(device)

        # Forward pass
        logits, _ = model(x)
        predicted = torch.argmax(logits, dim=1)

        total += y.size(0)
        correct += (predicted == y).sum().item()

    print(f'Accuracy of the model on test images: {100 * correct / total:.2f}%')