# Combining Image Classification and NLP Models Together

## Import Libraries

In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import gc

from tqdm import tqdm
from dataset import MultiLabelDataset
from tools import get_data, load_data
from torch.utils.data import DataLoader
from torchvision import transforms, models
from torchvision.models import ResNet18_Weights, MobileNet_V3_Large_Weights
from sklearn.feature_extraction.text import CountVectorizer



## Remove unused memory in GPU

In [2]:
gc.collect()
torch.cuda.empty_cache()

## Load the Data

In [3]:
# import data
train_data = get_data("./dataset/train.csv")
test_data = get_data("./dataset/test.csv")

# perform text cleaning and get the pandas' dataframe
train_data = load_data(train_data)
test_data = load_data(test_data, has_label=False)

# join the data together
for_nlp_data = pd.concat((train_data['caption'], test_data['caption']), ignore_index=True)

In [4]:
print(f"Number of training instances: {train_data.shape[0]}")
print(f"Number of testing instances:  {test_data.shape[0]}")

Number of training instances: 30000
Number of testing instances:  10000


## Preprocessing for Images and Caption

In [5]:
# define the image transformation: currently following resnet18
transform = transforms.Compose([
    transforms.Resize((232, 232)),
    transforms.CenterCrop(224),
    transforms.ToTensor(), # converts images to [0, 1]
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.229, 0.224, 0.225],
    )
])

# define count vectorizer
vectorizer = CountVectorizer(lowercase=True)
vectorizer.fit(for_nlp_data)

## Create Dataset and DataLoader

In [6]:
# initialize the dataset
train_dataset = MultiLabelDataset(
    csv_file=train_data,
    root_dir='./dataset/data/',
    vectorizer=vectorizer,
    transform=transform,
)
test_dataset = MultiLabelDataset(
    csv_file=test_data,
    root_dir='./dataset/data/',
    vectorizer=vectorizer,
    transform=transform,
)

BATCH_SIZE=16

# load the dataset into batches 
train_dataloader = DataLoader(
    dataset=train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
)
test_dataloader = DataLoader(
    dataset=test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
)

## Create the Combined Model

### Create the LSTM model

In [7]:
class LSTM(nn.Module):
    
	def __init__(self, num_embeddings: int, embedding_dim: int, hidden_size: int, output_size: int) -> None:
		super(LSTM, self).__init__()

		# Add the word embedding layer
		self.embedding_layer = nn.Embedding(num_embeddings=num_embeddings+1, embedding_dim=embedding_dim)

		# Add the LSTM Layer
		self.lstm_layer = nn.LSTM(input_size=embedding_dim*num_embeddings, hidden_size=hidden_size)

		# Add the Output Layer
		self.fc_layer1 = nn.Linear(in_features=hidden_size, out_features=output_size)
		
	def forward(self, x):
		embeds = self.embedding_layer(x)
		lstm_out, _ = self.lstm_layer(embeds.view(len(x), -1))
		tag_space = self.fc_layer1(lstm_out)		
		tag_scores = F.tanh(tag_space) # tanh is currently the best
		return tag_scores

In [8]:
class CombinedModel(nn.Module):

    def __init__(self) -> None:
        super(CombinedModel, self).__init__()

        # self.cnn_model = models.resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)
        # n_features = self.cnn_model.fc.in_features
        # n_out = 350
        # self.cnn_model.fc = nn.Sequential(
        #     nn.Linear(in_features=n_features, out_features=n_out),
        # )
        self.cnn_model = models.mobilenet_v3_large(weights=MobileNet_V3_Large_Weights.IMAGENET1K_V2)
        n_out = 650
        self.cnn_model.classifier = nn.Sequential(
            nn.Linear(960, 1280),
            nn.Hardswish(inplace=True),
            nn.Dropout(p=0.2, inplace=True),
            nn.Linear(1280, n_out),
        )

        self.lstm_model = LSTM(
            num_embeddings=8075,
            embedding_dim=2,
            hidden_size=1000,
            output_size=n_out,
        )
        self.last_layer = nn.Linear(n_out, 19)

    def forward(self, x, y):
        x = self.cnn_model(x)
        y = self.lstm_model(y)
        output = x * y # element wise multiplication
        output = F.normalize(self.last_layer(output))
        return output

## Define the model

In [9]:
EPOCHS = 5
THRESHOLD = 0.5

model = CombinedModel()
loss_fn = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(
    params=model.parameters(),
    lr=0.01,
)

# utilise GPU
if torch.cuda.is_available():
    print('using GPU')
    model = model.to('cuda')

using GPU


## Train the model

In [10]:
train_losses = []
train_accs = []
for epoch in range(EPOCHS):

	n_total = 0
	n_correct = 0
	train_loss = 0.
	model.train()
	for _, images, captions, labels in tqdm(train_dataloader, desc=f"Epoch {epoch+1} Training: "):

		if torch.cuda.is_available():
			images = images.to('cuda')
			captions = captions.to('cuda')
			labels = labels.to('cuda')

		y_pred = model(images, captions)

		# backward
		loss = loss_fn(y_pred, labels)
		loss.backward()

		# update
		optimizer.step()

		# compare
		predicted = (y_pred > THRESHOLD).int()

		train_loss += loss.item()
		n_correct += torch.all(torch.eq(predicted, labels), dim=1).sum()
		n_total += labels.shape[0]

	train_losses.append(train_loss / len(train_dataloader))
	train_accs.append(n_correct / n_total)

	print("Epoch {:d}, Train Loss: {:.7f}, Train Accuracy: {:.3f}%".format(epoch+1, train_losses[-1], train_accs[-1]*100))

  labels = torch.Tensor(self.df.iloc[idx, 2:])
Epoch 1 Training: 100%|██████████| 1875/1875 [04:00<00:00,  7.79it/s]


Epoch 1, Train Loss: 0.5968456, Train Accuracy: 0.000%


Epoch 2 Training: 100%|██████████| 1875/1875 [03:54<00:00,  7.98it/s]


Epoch 2, Train Loss: 0.5967483, Train Accuracy: 0.000%


Epoch 3 Training:  24%|██▎       | 441/1875 [00:54<02:58,  8.02it/s]


KeyboardInterrupt: 

: 