In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# ---------------------------------------------
# 1. Install the required libraries (uncomment in Colab if needed)
# ---------------------------------------------
!pip install transformers pillow torch torchvision pyngrok tensorboardX
!pip install flask-ngrok
!pip install flask-cors
!pip install sentencepiece
!pip install accelerate
!pip install bitsandbytes



In [None]:
import os
import json
import torch
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from transformers import AutoModel, AutoTokenizer
from PIL import Image
import torch.nn as nn
from torchvision import transforms

In [None]:
# TensorBoard
from tensorboardX import SummaryWriter

In [None]:
# ---------------------------------------------
# 2. Define the Dataset
# ---------------------------------------------
class BookCoverDataset(Dataset):
    def __init__(self, data, tokenizer, max_length=128):
        """
        data: list of dicts: {'image_path', 'title', 'author', 'publisher'}
        tokenizer: a HuggingFace tokenizer
        max_length: max sequence length for text fields
        """
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.image_transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor()  # shape: [3, 224, 224]
        ])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        # Tokenize each field
        title_tokens = self.tokenizer(
            item['title'],
            padding="max_length",
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt"
        )['input_ids'].squeeze(0)

        author_tokens = self.tokenizer(
            item['author'],
            padding="max_length",
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt"
        )['input_ids'].squeeze(0)

        publisher_tokens = self.tokenizer(
            item['publisher'],
            padding="max_length",
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt"
        )['input_ids'].squeeze(0)

        # Load and transform the image
        image = Image.open(item['image_path']).convert('RGB')
        image_tensor = self.image_transform(image)  # [3, 224, 224]

        return {
            'image': image_tensor,
            'title_tokens': title_tokens,
            'author_tokens': author_tokens,
            'publisher_tokens': publisher_tokens
        }

In [None]:
# ---------------------------------------------
# 3. Load JSON Data
# ---------------------------------------------
with open('drive/MyDrive/json_book/book_data.json', 'r') as file:
    custom_data = json.load(file)

In [None]:
# ---------------------------------------------
# 4. Set Device
# ---------------------------------------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


In [None]:
# ---------------------------------------------
# 5. Load Base Model and Tokenizer
# ---------------------------------------------
base_model_name = 'openbmb/MiniCPM-Llama3-V-2_5-int4'
base_model = AutoModel.from_pretrained(base_model_name, trust_remote_code=True)
base_model.to(device)

tokenizer = AutoTokenizer.from_pretrained(base_model_name, trust_remote_code=True)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
Unused kwargs: ['_load_in_4bit', '_load_in_8bit', 'quant_method']. These kwargs are not used in <class 'transformers.utils.quantization_config.BitsAndBytesConfig'>.
`low_cpu_mem_usage` was None, now default to True since model is quantized.


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [None]:
# ---------------------------------------------
# 6. Create Dataset and DataLoader
# ---------------------------------------------
from sklearn.model_selection import train_test_split

train_data, temp_data = train_test_split(custom_data, test_size=0.2, random_state=42)  # 80% train
val_data, test_data = train_test_split(temp_data, test_size=0.5, random_state=42)      # 10% val, 10% test

print("length train", len(train_data))
print("length val:", len(val_data))
print("length test:", len(test_data))

train_dataset = BookCoverDataset(train_data, tokenizer)
val_dataset = BookCoverDataset(val_data, tokenizer)
test_dataset = BookCoverDataset(test_data, tokenizer)

length train 72
length val: 9
length test: 9


In [None]:
train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=2, shuffle=False)

In [None]:
# ---------------------------------------------
# 7. Define Custom Model
# ---------------------------------------------
class CustomModel(nn.Module):
    def __init__(self, base_model):
        super().__init__()
        self.base_model = base_model
        self.title_head = nn.Linear(self.base_model.config.hidden_size, tokenizer.vocab_size)
        self.author_head = nn.Linear(self.base_model.config.hidden_size, tokenizer.vocab_size)
        self.publisher_head = nn.Linear(self.base_model.config.hidden_size, tokenizer.vocab_size)

        # We'll map each pixel from 3 -> 128 dimension
        self.image_transform = nn.Linear(3, 128)

    def forward(self, image, title_ids, author_ids, publisher_ids):
        """
        image: [batch_size, 3, 224, 224]
        title_ids, author_ids, publisher_ids: [batch_size, seq_len]
        """
        bsz, channels, height, width = image.shape  # e.g. [2, 3, 224, 224]

        # Flatten to [bsz, height*width, 3]
        image = image.permute(0, 2, 3, 1).contiguous()  # [2, 224, 224, 3]
        image = image.view(bsz, -1, channels)           # [2, 50176, 3]

        # Transform => [bsz, 50176, 128]
        image_emb = self.image_transform(image)

        # -------------------------------------------------------------
        # IMPORTANT: We must pass pixel_values as a "list of lists"
        # each sub-list = images for a single sample
        # each item in sub-list = shape [seq_len, 128]
        # So for bsz=2, we do:
        # pixel_values = [
        #   [ image_emb[0] ],  # shape: [50176, 128]
        #   [ image_emb[1] ]
        # ]
        # Then the model code won't break on permute
        # -------------------------------------------------------------
        pixel_values_list = []
        for i in range(bsz):
            # image_emb[i] is shape [50176, 128]
            # We wrap it in a list, so we get a sub-list of length 1
            pixel_values_list.append([image_emb[i]])

        # We'll use a typical "data" dict
        tgt_sizes = torch.tensor([128], device=image.device)  # dummy

        # Title
        title_data = {
            "input_ids": title_ids,          # [bsz, seq_len]
            "pixel_values": pixel_values_list,  # list of lists
            "tgt_sizes": tgt_sizes
        }
        title_outs = self.base_model(data=title_data)
        # shape: title_outs.last_hidden_state => [bsz, seq_len, hidden_size]
        title_logits = self.title_head(title_outs.last_hidden_state[:, 0, :])  # [bsz, vocab_size]

        # Author
        author_data = {
            "input_ids": author_ids,
            "pixel_values": pixel_values_list,
            "tgt_sizes": tgt_sizes
        }
        author_outs = self.base_model(data=author_data)
        author_logits = self.author_head(author_outs.last_hidden_state[:, 0, :])  # [bsz, vocab_size]

        # Publisher
        publisher_data = {
            "input_ids": publisher_ids,
            "pixel_values": pixel_values_list,
            "tgt_sizes": tgt_sizes
        }
        publisher_outs = self.base_model(data=publisher_data)
        publisher_logits = self.publisher_head(publisher_outs.last_hidden_state[:, 0, :])  # [bsz, vocab_size]

        return title_logits, author_logits, publisher_logits

model = CustomModel(base_model).to(device)

In [None]:
# ---------------------------------------------
# 8. Define Loss & Optimizer
# ---------------------------------------------
loss_fn = nn.CrossEntropyLoss()

def compute_loss(title_out, author_out, publisher_out,
                 title_labels, author_labels, publisher_labels):
    """
    Each out is [batch_size, vocab_size].
    Each label is [batch_size].
    If your labels are [batch_size, seq_len], you'll need a different approach.
    """
    title_loss = loss_fn(title_out.view(-1, tokenizer.vocab_size), title_labels.view(-1))
    author_loss = loss_fn(author_out.view(-1, tokenizer.vocab_size), author_labels.view(-1))
    publisher_loss = loss_fn(publisher_out.view(-1, tokenizer.vocab_size), publisher_labels.view(-1))
    return title_loss + author_loss + publisher_loss

optimizer = AdamW(model.parameters(), lr=1e-4)

In [None]:
# ---------------------------------------------
# 9. Quick Shape Check
# ---------------------------------------------
sample = train_dataset[0]
print("[Sample 0] Title tokens:", sample['title_tokens'].shape)
print("[Sample 0] Image shape:", sample['image'].shape)

[Sample 0] Title tokens: torch.Size([128])
[Sample 0] Image shape: torch.Size([3, 224, 224])


In [None]:
# ---------------------------------------------
# 10. Training Loop
# ---------------------------------------------
writer = SummaryWriter(log_dir="./runs")
num_epochs = 2
best_val_loss = float("inf")  # best val loss

for epoch in range(num_epochs):
    model.train()
    running_train_loss = 0.0

    # ---------- TRAINING LOOP ----------
    for step, batch in enumerate(train_loader):
        images = batch['image'].to(device)
        title_ids = batch['title_tokens'].to(device)
        author_ids = batch['author_tokens'].to(device)
        publisher_ids = batch['publisher_tokens'].to(device)

        optimizer.zero_grad()

        # Forward pass
        title_out, author_out, publisher_out = model(
            image=images,
            title_ids=title_ids,
            author_ids=author_ids,
            publisher_ids=publisher_ids
        )

        title_labels = title_ids[:, 0]
        author_labels = author_ids[:, 0]
        publisher_labels = publisher_ids[:, 0]

        loss = compute_loss(
            title_out, author_out, publisher_out,
            title_labels, author_labels, publisher_labels
        )
        loss.backward()
        optimizer.step()

        running_train_loss += loss.item()
        global_step = epoch * len(train_loader) + step
        writer.add_scalar("Loss/train", loss.item(), global_step)

        if (step+1) % 10 == 0:
            print(f"[Train] Epoch [{epoch+1}/{num_epochs}], step {step+1}/{len(train_loader)}, loss = {loss.item():.4f}")

    avg_train_loss = running_train_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{num_epochs}] TRAIN loss = {avg_train_loss:.4f}")

    # ---------- VALIDATION LOOP ----------
    model.eval()
    running_val_loss = 0.0

    with torch.no_grad():
        for batch in val_loader:
            images = batch['image'].to(device)
            title_ids = batch['title_tokens'].to(device)
            author_ids = batch['author_tokens'].to(device)
            publisher_ids = batch['publisher_tokens'].to(device)

            title_out, author_out, publisher_out = model(
                image=images,
                title_ids=title_ids,
                author_ids=author_ids,
                publisher_ids=publisher_ids
            )

            title_labels = title_ids[:, 0]
            author_labels = author_ids[:, 0]
            publisher_labels = publisher_ids[:, 0]

            val_loss = compute_loss(
                title_out, author_out, publisher_out,
                title_labels, author_labels, publisher_labels
            )
            running_val_loss += val_loss.item()

    avg_val_loss = running_val_loss / len(val_loader)
    writer.add_scalar("Loss/val", avg_val_loss, epoch)
    print(f"Epoch [{epoch+1}/{num_epochs}] VAL loss = {avg_val_loss:.4f}")

    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(model.state_dict(), "best_model.pt")
        print(">>> Best model saved (val_loss improved).")

print("Training done!")