<a href="https://colab.research.google.com/github/karans17s/Practical_Implementation_Of_Deep_learning/blob/main/PHASE_6_IMAGE_CAPTIONING_USING_RESNET50.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Step_1 : Import Library



### 1. PIL ( Python Imaging Library ) handling and processing images files ( also provide funcationality to open , manipulate and converting image formats )

### 2. Resize -- resize imgs to a uniform size 224x224 so fed into neural network

### 3. ToTensor -- converts the img to num for model understanding

### 4. Normalize -- Scales these numbers to make the training stable

### 5. compose -- Combines multiple transformations into a single pipeline.

### 6. importing pre-trained ResNet Model ( resnet50 ) 50-layer deep residul network commonly used as an encoder for img captioning ( and extract high level features from imgs )

### 7. ResNet50_weights -- improves performance since the model has already learned general img features

### 8. io -- opens files for reading and writing ( loading text data caption / tokenization seq.)

### 9. Unicodedata -- it's very imp for simplify simpple accented characters

### 10. re -- ( regular expression module ) pattern matching and cleaning text

In [None]:
import torch
import torch.nn as nn
from torch import optim
from torch.utils.data import Dataset, DataLoader, random_split
from PIL import Image
from torchvision.transforms import Resize, ToTensor, Normalize, Compose
from matplotlib import pyplot as plt
from torchvision.models import resnet50,ResNet50_Weights
from io import open
import unicodedata # for text normalization
import re # for cleaning text data

# Step_2 : Most General Step Check Your usage Device

In [None]:
if torch.cuda.is_available():
    device=torch.device(type='cuda', index=0)
else:
    device=torch.device(type='cpu', index=0)

# Step_3 : Text Processing / Text Normalization




## 3-steps follow

## 1. Unicode Normalization

### Converts text into a consistent Unicode format and removes diacritical marks

## 2. Character Filtering

### Removes unwanted characters (e.g., numbers, symbols) while keeping only alphabets and selected punctuation marks.


## 3. Whitespace Normalization

### Replaces multiple spaces with a single space and trims leading/trailing spaces.

## NFD and NFC -- Normalization Form Decomposed , Normalization Form Composed

## Mn -- Mark Nonspacing ( identify and remove those combining marks like accents )

In [None]:
def normalizeString(s):
    sres=""
    for ch in unicodedata.normalize('NFD', s):
        if unicodedata.category(ch) != 'Mn':
            sres+=ch
    sres = re.sub(r"[^a-zA-Z!?,]+", r" ", sres)
    return sres.strip()

# Step_4 : Data Extraction.....

In [None]:
import zipfile
import os
zip_path = "/content/Images.zip"
extract_path = "/content/image/"
os.makedirs(extract_path, exist_ok=True)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)
print(f"Files extracted to: {extract_path}")

Files extracted to: /content/image/


In [None]:
import zipfile
import os
zip_path = "/content/captions.txt.zip"
extract_path = "/content/caption"
os.makedirs(extract_path, exist_ok=True)
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)
print(f"Files extracted to: {extract_path}")

Files extracted to: /content/caption


In [None]:
capt_file_path="/content/caption/captions.txt"
images_dir_path="/content/image/Images/"
data=open(capt_file_path).read().strip().split('\n')
data=data[1:]
img_filenames_list=[]
captions_list=[]
for s in data:
    templist=s.lower().split(",")
    img_path=templist[0]
    caption=",".join(s for s in templist[1:])
    caption=normalizeString(caption)
    img_filenames_list.append(img_path)
    captions_list.append(caption)

# Step_5 : Vocabulary Creation

## word2index -- convert words to numbers

## index2word -- convert number to words

## word2count -- count occurrences of words

In [None]:
max_cap_length=73

In [None]:
class Vocab:
    def __init__(self): # Corrected constructor name
        self.word2index={'SOS':0, 'EOS':1}
        self.index2word={0:'SOS', 1:'EOS'}
        self.word2count={} # count of words
        self.nwords=2 # count of total unique words
    def buildVocab(self,s):
        for word in s.split(" "):
            if word not in self.word2index:
                self.word2index[word]=self.nwords
                self.index2word[self.nwords]=word
                self.word2count[word]=1
                self.nwords+=1
            else:
                self.word2count[word]+=1

## Splits the caption into words.
## Adds each word to the vocabulary if it is not already present.
##Updates the frequency count of each word.

In [None]:
vocab=Vocab()
for caption in captions_list:
    vocab.buildVocab(caption)
print("Vocab Length:",vocab.nwords)

Vocab Length: 8446


# STEP : 6 = custom data


## why use this , handling non-standard data formats , automatic data extraction , text-to-tensor conversion , easy to access data ( _getitem_ )

In [None]:
class CustomDataset(Dataset):
    def __init__(self,images_dir_path, img_filenames_list, captions_list, vocab, max_cap_length): # Corrected constructor name to __init__
        super().__init__()
        self.images_dir_path=images_dir_path
        self.img_filenames_list=img_filenames_list
        self.captions_list=captions_list
        self.length=len(self.captions_list)
        self.transform=Compose([Resize((224,224), antialias=True), ToTensor(), Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])])
        self.vocab=vocab
        self.max_cap_length=max_cap_length

    def __len__(self): # Corrected method name to __len__
        return self.length

    def get_input_ids(self, sentence,vocab):
        input_ids=[0]*(self.max_cap_length+1)
        i=0
        for word in sentence.split(" "):
            input_ids[i]=vocab.word2index[word]
            i=i+1

        input_ids.insert(0,vocab.word2index['SOS'])
        i=i+1
        input_ids[i]=vocab.word2index['EOS']

        return torch.tensor(input_ids)

    def __getitem__(self,idx): # Corrected method name to __getitem__
        imgfname,caption=self.img_filenames_list[idx],self.captions_list[idx]

        imgfname=self.images_dir_path+imgfname
        img=Image.open(imgfname)
        img=self.transform(img)

        caption=self.get_input_ids(caption,self.vocab)

        return img,caption

In [None]:
#driver code part 3
dataset=CustomDataset(images_dir_path, img_filenames_list, captions_list, vocab, max_cap_length)
train_dataset,test_dataset=random_split(dataset,[0.999,0.001])

batch_size=64
train_dataloader=DataLoader(dataset=train_dataset,batch_size=batch_size, shuffle=True)
test_dataloader=DataLoader(dataset=test_dataset,batch_size=1, shuffle=False)

# STEP : 7 = Encoder

### The encoder transforms input images into feature vectors that can be used for further processing

### forward pass : Converts raw image data into meaningful representations that can be used by the decoder (text generator)

### ResNet-50 is a deep convolutional neural network that extracts useful image features.


In [None]:
class Encoder(nn.Module):
    def __init__(self, pretrained_feature_extractor): # Corrected constructor name to __init__
        super().__init__()
        self.pretrained_feature_extractor=pretrained_feature_extractor

    def forward(self,x):
        features=self.pretrained_feature_extractor(x)
        return features

### Replaces the fully connected (FC) layer of ResNet-50.

###Original ResNet-50 has fc = nn.Linear(2048, 1000), which maps features to 1000 ImageNet classes.

###We change it to nn.Linear(2048, 1024), reducing the feature vector size from 2048 to 1024.

### The new 1024-dimensional feature vector is better suited for the decoder.

In [None]:
#driver code part 4
pretrained_feature_extractor=resnet50(weights=ResNet50_Weights.DEFAULT)
pretrained_feature_extractor.fc=nn.Linear(2048,1024)

encoder=Encoder(pretrained_feature_extractor).to(device)

Downloading: "https://download.pytorch.org/models/resnet50-11ad3fa6.pth" to /root/.cache/torch/hub/checkpoints/resnet50-11ad3fa6.pth
100%|██████████| 97.8M/97.8M [00:04<00:00, 22.8MB/s]


# STEP : 8 = Decoder

### output_size: Number of words in the vocabulary.

### embed_size: Size of word embeddings (vector representation of words).

###hidden_size: Number of hidden units in the GRU (memory for the decoder).

### why??? Converts numerical input sequences (word indices) into meaningful text sentences.

### Defines a GRU (Gated Recurrent Unit) layer.

###Takes word embeddings (embed_size) as input.

###Outputs a hidden representation (hidden_size) that captures the context of the sentence.

### why?? GRU remembers past words while predicting the next word.

### Converts GRU output into probabilities for each word in the vocabulary.

### self.lsoftmax=nn.LogSoftmax(dim=-1) -- logsoftmax

In [None]:
class Decoder(nn.Module):
    def __init__(self,output_size,embed_size,hidden_size): # Corrected constructor name to __init__
        super().__init__()
        self.e=nn.Embedding(output_size,embed_size)
        self.relu=nn.ReLU()
        self.gru=nn.GRU(embed_size, hidden_size, batch_first=True)
        self.lin=nn.Linear(hidden_size,output_size)
        self.lsoftmax=nn.LogSoftmax(dim=-1)

    def forward(self,x,prev_hidden):
        x=self.e(x)
        x=self.relu(x)
        output,hidden=self.gru(x,prev_hidden)
        y=self.lin(output)
        y=self.lsoftmax(y)
        return y, hidden

In [None]:
#driver code part 5
embed_size=300
hidden_size=1024

decoder=Decoder(vocab.nwords,embed_size,hidden_size).to(device)

# STEP : 9 = training

In [None]:
def train_one_epoch():
    encoder.train()
    decoder.train()
    track_loss=0

    for i, (imgs,t_ids) in enumerate(train_dataloader):
        imgs=imgs.to(device)
        t_ids=t_ids.to(device)
        extracted_features=encoder(imgs)
        #extracted_features=extracted_features.detach()
        decoder_hidden=torch.reshape(extracted_features,(1,extracted_features.shape[0],-1))
        yhats, decoder_hidden = decoder(t_ids[:,0:-1],decoder_hidden)

        gt=t_ids[:,1:]

        yhats_reshaped=yhats.view(-1,yhats.shape[-1])

        gt=gt.reshape(-1)


        loss=loss_fn(yhats_reshaped,gt)
        track_loss+=loss.item()

        opte.zero_grad()
        optd.zero_grad()

        loss.backward()

        opte.step()
        optd.step()

        if i%50==0:
            print("Mini Batch=", i+1," Running Loss=",track_loss/(i+1), sep="")

    return track_loss/len(train_dataloader)

In [None]:
def ids2Sentence(ids,vocab):
    sentence=""
    for id in ids.squeeze():
        if id==0:
            continue
        word=vocab.index2word[id.item()]
        sentence+=word + " "
        if id==1:
            break
    return sentence

# STEP : 10 = Model Evaluation

In [None]:
#eval loop (written assuming batch_size=1)
def eval_one_epoch():
    encoder.eval()
    decoder.eval()
    track_loss=0

    with torch.no_grad():

        for i, (imgs,t_ids) in enumerate(test_dataloader):

            imgs=imgs.to(device)
            t_ids=t_ids.to(device)

            extracted_features=encoder(imgs)

            decoder_hidden=torch.reshape(extracted_features,(1,extracted_features.shape[0],-1)) #n_dim=3

            input_ids=t_ids[:,0]
            yhats=[]
            pred_sentence=""

            for j in range(1,max_cap_length+2): #j starts from 1
                probs, decoder_hidden = decoder(input_ids.unsqueeze(1),decoder_hidden)
                yhats.append(probs)
                _,input_ids=torch.topk(probs,1,dim=-1)
                input_ids=input_ids.squeeze(1,2) #still a tensor
                word=vocab.index2word[input_ids.item()] #batch_size=1
                pred_sentence+=word + " "
                if input_ids.item() == 1: #batch_size=1
                    break


            gt_sentence=ids2Sentence(t_ids,vocab)

            print("Input Image:")
            img=imgs[0]
            img[0]=(img[0]*0.229)+0.485
            img[1]=(img[1]*0.224)+0.456
            img[2]=(img[2]*0.225)+0.406
            plt.imshow(torch.permute(imgs[0],(1,2,0)).detach().cpu())
            plt.show()

            print("GT Sentence:",gt_sentence)

            print("Predicted Sentence:",pred_sentence)

            yhats_cat=torch.cat(yhats,dim=1)
            yhats_reshaped=yhats_cat.view(-1,yhats_cat.shape[-1])
            gt=t_ids[:,1:j+1]
            gt=gt.view(-1)


            loss=loss_fn(yhats_reshaped,gt)
            track_loss+=loss.item()


        print("-----------------------------------")
        return track_loss/len(test_dataloader)

In [None]:
#driver code part 5

loss_fn=nn.NLLLoss(ignore_index=0).to(device)
lr=0.001

optd=optim.Adam(params=decoder.parameters(), lr=lr)
opte=optim.Adam(params=encoder.parameters(), lr=lr)

n_epochs=5

for e in range(n_epochs):
    print("Epoch=",e+1, " Loss=", round(train_one_epoch(),4), sep="")

for e in range(1):
    print("Epoch=",e+1, " Loss=", round(eval_one_epoch(),4), sep="")

Mini Batch=1 Running Loss=9.064282417297363
Mini Batch=51 Running Loss=5.202331524269254
Mini Batch=101 Running Loss=4.7045413031436425
Mini Batch=151 Running Loss=4.39785737075553
Mini Batch=201 Running Loss=4.199654518668331
Mini Batch=251 Running Loss=4.051222480150808
Mini Batch=301 Running Loss=3.9371926396392114
Mini Batch=351 Running Loss=3.844473129663712
Mini Batch=401 Running Loss=3.7623244865874104
Mini Batch=451 Running Loss=3.6928043645659994
Mini Batch=501 Running Loss=3.6376399156338204
