Mô hình: https://www.kaggle.com/code/vangphamhuu/classifier-model?scriptVersionId=226940874

# Cài đặt và import thư viện

In [None]:
!pip install -q underthesea

In [None]:
import pandas as pd 
import numpy as np
import matplotlib.pyplot as plt
import os
import json
import random
from sklearn.model_selection import train_test_split
from PIL import Image
import underthesea
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import models, transforms
import pickle
from tqdm import tqdm
from collections import Counter

# Đọc dữ liệu

In [None]:
def load_image_caption(caption_path):
    with open(caption_path, 'r', encoding="utf-8") as f:
        data_captions = json.load(f)

    image_dict = {}
    for item in data_captions:
        image_path = item["path_file"]
        image_path = image_path.split("/")
        image_path = image_path[0] + "/gen_caption(Nhap)" + "/" + image_path[1] + "/" + image_path[2]
        captions = item["captions"] 
        
        if image_path not in image_dict:
            image_dict[image_path] = []
        
        image_dict[image_path].extend(captions)

    return image_dict

    

In [None]:
def split_data(image_dict, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15):
    image_paths = list(image_dict.keys())
    random.shuffle(image_paths)
    
    train_images, temp_images = train_test_split(image_paths, test_size=(1 - train_ratio), random_state=42)
    val_images, test_images = train_test_split(temp_images, test_size=(test_ratio / (val_ratio + test_ratio)), random_state=42)

    train_data = [(img, cap) for img in train_images for cap in image_dict[img]]
    val_data = [(img, cap) for img in val_images for cap in image_dict[img]]
    test_data = [(img, cap) for img in test_images for cap in image_dict[img]]

    return train_data, val_data, test_data

In [None]:
image_dict = load_image_caption("/kaggle/input/imagecaptioning/output.json")
train_data, val_data, test_data = split_data(image_dict)

# Tiền xử lý ảnh + văn bản

In [None]:

class ImageCaptionDataset(Dataset):
    def __init__(self, image_root, captions_file, transform=None, max_length=50):
        self.transform = transform
        self.max_length = max_length

        with open(captions_file, 'r', encoding="utf-8") as f:
            data_captions = json.load(f)
        
        # Tạo danh sách ảnh & captions
        self.annotations = {}
        self.class_labels = {}
        
        for item in data_captions:
            image_path = item["path_file"]
            image_path = image_path.split("/")
            full_img_path = image_root + "/"+ image_path[0] + "/gen_caption(Nhap)" + "/" + image_path[1] + "/" + image_path[2]
            self.annotations[full_img_path] = item["captions"]
            
            # Lấy nhãn phân loại từ thư mục cha của ảnh
            class_label = image_path[1]  # Giả sử thư mục thứ 2 là nhãn
            self.class_labels[full_img_path] = class_label
        
        self.img_ids = list(self.annotations.keys())

        # Xây dựng danh sách nhãn phân loại
        self.unique_classes = list(set(self.class_labels.values()))
        self.class_to_idx = {cls: idx for idx, cls in enumerate(self.unique_classes)}

        # Xây dựng từ điển từ captions
        self.vocab = self.build_vocabulary()

    def build_vocabulary(self, threshold=4):
        """
        Xây dựng từ điển từ các caption
        """
        counter = Counter()
        
        for captions in self.annotations.values():
            for caption in captions:
                tokens = underthesea.word_tokenize(caption.lower())  # Dùng Underthesea để tách từ
                counter.update(tokens)
        
        words = [word for word, count in counter.items() if count >= threshold]
        
        vocab = {'<pad>': 0, '<start>': 1, '<end>': 2, '<unk>': 3}
        vocab.update({word: idx + 4 for idx, word in enumerate(words)})
        
        return vocab

    def caption_to_indices(self, caption):
        """
        Chuyển đổi caption thành chuỗi indices
        """
        tokens = underthesea.word_tokenize(caption.lower())  # Dùng Underthesea để tách từ
        caption_indices = [self.vocab['<start>']] + \
                          [self.vocab.get(token, self.vocab['<unk>']) for token in tokens] + \
                          [self.vocab['<end>']]
        
        if len(caption_indices) > self.max_length:
            caption_indices = caption_indices[:self.max_length]
        else:
            caption_indices += [self.vocab['<pad>']] * (self.max_length - len(caption_indices))
        
        return torch.tensor(caption_indices, dtype=torch.long)

    def __len__(self):
        return len(self.img_ids)

    def __getitem__(self, idx):
        img_path = self.img_ids[idx]

        # Mở ảnh
        img = Image.open(img_path)
        
        image = img.convert('RGB')
        if self.transform is not None:
            image = self.transform(image)

        # Chọn caption ngẫu nhiên
        captions = self.annotations[img_path]
        caption = np.random.choice(captions)

        # Chuyển caption thành tensor
        caption_tensor = self.caption_to_indices(caption)

        # Lấy nhãn phân loại của ảnh
        class_label = self.class_labels[img_path]
        class_idx = self.class_to_idx[class_label]

        # One-hot encoding cho nhãn phân loại
        class_tensor = torch.zeros(len(self.unique_classes), dtype=torch.float)
        class_tensor[class_idx] = 1.0

        return image, caption_tensor, class_tensor, caption

        return image, caption_tensor, class_tensor, caption


# Model

In [None]:
class EncoderCNN(nn.Module):
    """
    Mô hình CNN để trích xuất đặc trưng từ ảnh
    """
    def __init__(self, embed_size):
        super(EncoderCNN, self).__init__()
        # Sử dụng mô hình ResNet50 pretrained
        resnet = models.resnet50(pretrained=True)
        
        # Loại bỏ lớp cuối cùng (fc)
        modules = list(resnet.children())[:-1]
        self.resnet = nn.Sequential(*modules)
        
        # Lớp projection để giảm chiều đặc trưng
        self.embed = nn.Linear(resnet.fc.in_features, embed_size)
        self.bn = nn.BatchNorm1d(embed_size)
        self.dropout = nn.Dropout(0.5)
        
        # Freezing các lớp của ResNet
        for param in self.resnet.parameters():
            param.requires_grad = False
            
    def forward(self, images):
        # Trích xuất đặc trưng với ResNet
        with torch.no_grad():
            features = self.resnet(images)
        
        # Reshape đặc trưng
        features = features.view(features.size(0), -1)
        
        # Chiếu đặc trưng về không gian embed_size
        features = self.embed(features)
        features = self.bn(features)
        features = self.dropout(features)
        
        return features

In [None]:
class DecoderRNN(nn.Module):
    """
    Mô hình LSTM để sinh caption từ đặc trưng ảnh
    """
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(DecoderRNN, self).__init__()
        
        # Lớp embedding cho các từ
        self.embed = nn.Embedding(vocab_size, embed_size)
        
        # LSTM layer
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        
        # Lớp fully connected để dự đoán từ tiếp theo
        self.linear = nn.Linear(hidden_size, vocab_size)
        
        # Dropout
        self.dropout = nn.Dropout(0.5)
        
    def forward(self, features, captions):
        # Embedding captions
        embeddings = self.embed(captions)
        
        # Ghép đặc trưng ảnh với embedding của caption
        # features: (batch_size, embed_size)
        # embeddings: (batch_size, caption_length, embed_size)
        features = features.unsqueeze(1)  # (batch_size, 1, embed_size)
        
        # Ghép đặc trưng ảnh với tất cả các từ trừ từ cuối cùng
        embeddings = torch.cat((features, embeddings[:, :-1, :]), dim=1)
        
        # Đưa qua LSTM
        hiddens, _ = self.lstm(embeddings)
        
        # Đưa qua lớp fully connected
        outputs = self.linear(hiddens)
        
        return outputs

In [None]:
class ClassificationHead(nn.Module):
    """
    Mô hình phân loại ảnh từ đặc trưng ảnh
    """
    def __init__(self, embed_size, num_classes):
        super(ClassificationHead, self).__init__()
        
        # Các lớp fully connected
        self.fc1 = nn.Linear(embed_size, 512)
        self.fc2 = nn.Linear(512, num_classes)
        
        # Các lớp khác
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)
        
    def forward(self, features):
        # Đưa đặc trưng qua các lớp fully connected
        x = self.fc1(features)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        
        return x

In [None]:
class MultitaskModel(nn.Module):
    """
    Mô hình multitask học đồng thời phân loại và sinh caption
    """
    def __init__(self, embed_size, hidden_size, vocab_size, num_classes):
        super(MultitaskModel, self).__init__()
        
        # CNN encoder
        self.encoder = EncoderCNN(embed_size)
        
        # LSTM decoder cho sinh caption
        self.decoder = DecoderRNN(embed_size, hidden_size, vocab_size)
        
        # Classification head cho phân loại
        self.classifier = ClassificationHead(embed_size, num_classes)
        
    def forward(self, images, captions):
        # Trích xuất đặc trưng từ ảnh
        features = self.encoder(images)
        
        # Task 1: Phân loại
        classification_outputs = self.classifier(features)
        
        # Task 2: Sinh caption
        caption_outputs = self.decoder(features, captions)
        
        return classification_outputs, caption_outputs

In [None]:
def train_model(model, train_loader, criterion_cls, criterion_cap, optimizer, num_epochs=10, device='cuda'):
    """
    Huấn luyện mô hình multitask
    """
    model.train()
    
    for epoch in range(num_epochs):
        running_loss = 0.0
        running_cls_acc = 0.0
        running_cap_acc = 0.0
        
        for i, (images, captions, class_labels, _) in enumerate(tqdm(train_loader)):
            # Chuyển dữ liệu sang device
            images = images.to(device)
            captions = captions.to(device)
            class_labels = class_labels.to(device)
            
            # Xóa gradient
            optimizer.zero_grad()
            
            # Forward pass
            cls_outputs, cap_outputs = model(images, captions)
            
            # Tính loss cho task phân loại
            cls_loss = criterion_cls(cls_outputs, class_labels)
            
            # Tính loss cho task sinh caption
            # Reshape cap_outputs và captions cho CrossEntropyLoss
            cap_outputs = cap_outputs.view(-1, cap_outputs.size(2))
            captions = captions.view(-1)
            cap_loss = criterion_cap(cap_outputs, captions)
            
            # Tổng hợp loss
            loss = 0.4 * cls_loss + 0.6 * cap_loss
            
            # Backward pass
            loss.backward()
            
            # Cập nhật trọng số
            optimizer.step()
            
            # Thống kê
            running_loss += loss.item()
            
            # Tính accuracy cho phân loại
            _, predicted_cls = torch.max(cls_outputs.data, 1)
            _, true_cls = torch.max(class_labels.data, 1)
            cls_acc = (predicted_cls == true_cls).float().mean()
            running_cls_acc += cls_acc.item()
            
            # Tính accuracy cho sinh caption
            _, predicted_cap = torch.max(cap_outputs.data, 1)
            cap_acc = (predicted_cap == captions).float().mean()
            running_cap_acc += cap_acc.item()
            
            # In thông tin sau mỗi 100 batch
            if (i+1) % 100 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], '
                      f'Loss: {loss.item():.4f}, Cls Acc: {cls_acc.item():.4f}, Cap Acc: {cap_acc.item():.4f}')
        
        # In thông tin sau mỗi epoch
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}, '
              f'Cls Acc: {running_cls_acc/len(train_loader):.4f}, Cap Acc: {running_cap_acc/len(train_loader):.4f}')
    
    return model

In [None]:
def predict_caption(model, image_path, vocab, transform, max_length=50, device='cuda'):
    """
    Dự đoán caption cho một ảnh mới
    """
    # Tiền xử lý ảnh
    image = Image.open(image_path).convert('RGB')
    image = transform(image).unsqueeze(0).to(device)
    
    # Tắt chế độ training
    model.eval()
    
    # Tạo từ điển ngược để chuyển từ index sang từ
    idx_to_word = {idx: word for word, idx in vocab.items()}
    
    # Bắt đầu với token <start>
    start_token = torch.tensor([vocab['<start>']]).unsqueeze(0).to(device)
    
    # Trích xuất đặc trưng từ ảnh
    with torch.no_grad():
        features = model.encoder(image)
        
        # Dự đoán phân loại
        cls_outputs = model.classifier(features)
        _, predicted_cls = torch.max(cls_outputs.data, 1)
        
        # Dự đoán caption
        captions = []
        input_word = start_token
        
        # Chuẩn bị hidden state cho LSTM
        h = torch.zeros(1, 1, model.decoder.lstm.hidden_size).to(device)
        c = torch.zeros(1, 1, model.decoder.lstm.hidden_size).to(device)
        
        # Sinh từng từ một cho đến khi gặp token <end> hoặc đạt max_length
        for i in range(max_length):
            embeddings = model.decoder.embed(input_word).squeeze(0)
            
            # Ghép với đặc trưng ảnh cho từ đầu tiên
            if i == 0:
                embeddings = features
            
            # Reshape cho LSTM
            embeddings = embeddings.unsqueeze(0).unsqueeze(0)
            
            # Dự đoán từ tiếp theo
            lstm_out, (h, c) = model.decoder.lstm(embeddings, (h, c))
            outputs = model.decoder.linear(lstm_out.squeeze(0))
            _, predicted = torch.max(outputs.data, 1)
            
            # Thêm từ vào caption
            word_idx = predicted.item()
            captions.append(word_idx)
            
            # Nếu gặp token <end>, dừng lại
            if word_idx == vocab['<end>']:
                break
            
            # Cập nhật input_word cho bước tiếp theo
            input_word = predicted.unsqueeze(0)
    
    # Chuyển từ indices sang từ
    caption = ' '.join([idx_to_word[idx] for idx in captions if idx not in [vocab['<start>'], vocab['<end>'], vocab['<pad>']]])
    
    return predicted_cls.item(), caption

# Main

In [None]:
def main():
    # Thiết lập các tham số
    embed_size = 256
    hidden_size = 512
    batch_size = 32
    num_epochs = 100
    learning_rate = 0.001
    
    # Thiết lập device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    # Tiền xử lý ảnh
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    # Tạo dataset
    dataset = ImageCaptionDataset(image_root="/kaggle/input/imagecaptioning", captions_file='/kaggle/input/imagecaptioning/output.json', transform=transform)
    
    # Chia dataset
    train_size = int(0.8 * len(dataset))
    val_size = len(dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])
    
    # Tạo dataloader
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=4)
    
    # Tạo mô hình
    vocab_size = len(dataset.vocab)
    num_classes = len(dataset.unique_classes)
    model = MultitaskModel(embed_size, hidden_size, vocab_size, num_classes).to(device)
    
    # Định nghĩa loss function và optimizer
    criterion_cls = nn.BCEWithLogitsLoss()  # Binary Cross Entropy cho phân loại
    criterion_cap = nn.CrossEntropyLoss(ignore_index=dataset.vocab['<pad>'])  # Cross Entropy cho sinh caption
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    # Huấn luyện mô hình
    model = train_model(model, train_loader, criterion_cls, criterion_cap, optimizer, num_epochs, device)
    
    # Lưu mô hình
    torch.save(model.state_dict(), 'multitask_model.pth')
    
    # Lưu vocabulary
    with open('vocab.pkl', 'wb') as f:
        pickle.dump(dataset.vocab, f)

In [None]:
main()

In [None]:
  # Dự đoán caption cho một ảnh mới
    test_image_path = 'path/to/test/image.jpg'
    predicted_class, caption = predict_caption(model, test_image_path, dataset.vocab, transform, device=device)
    print(f'Predicted class: {dataset.unique_classes[predicted_class]}')
    print(f'Generated caption: {caption}')