In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
zip_file_path = '/content/drive/My Drive/hm_fashion_dataset/images.zip'
# Unzip images to the local disk of the Colab virtual machine
unzip_destination_path = '/content/'

# Unzip
print("Starting to unzip images.zip...")
!unzip -q -n "{zip_file_path}" -d "{unzip_destination_path}"
print("Unzipping completed!")

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

articles_df = pd.read_csv('/content/drive/My Drive/hm_fashion_dataset/articles.csv')

# View basic information
print(articles_df.info())
print(articles_df.head())

In [None]:
category_counts = articles_df['product_group_name'].value_counts()
print(category_counts)

# Visualize category distribution
plt.figure(figsize=(12, 6))
category_counts.plot(kind='bar')
plt.title('Distribution of Product Categories')
plt.xlabel('Category')
plt.ylabel('Count')
plt.show()

In [None]:
# Keep only categories with more than 1000 samples
keep_categories = category_counts[category_counts > 1000].index.tolist()

articles_df_filtered = articles_df[articles_df['product_group_name'].isin(keep_categories)].copy()

print(f"\nOriginal data size: {len(articles_df)}")
print(f"Data size after filtering: {len(articles_df_filtered)}")

In [None]:
# Undersampling
sample_limit = 8000

def limit_samples_per_group(df, group_col, limit):
    return df.groupby(group_col).apply(
        lambda x: x.sample(n=min(len(x), limit), random_state=42)
    ).reset_index(drop=True)

articles_df_sampled = limit_samples_per_group(articles_df_filtered, 'product_group_name', sample_limit)

# View the new category distribution
print("Category distribution after undersampling:")
print(articles_df_sampled['product_group_name'].value_counts())

In [None]:
data = articles_df_sampled[['article_id', 'product_group_name']].copy()
data['label'] = data['product_group_name'].astype('category').cat.codes

# Construct image paths
GDRIVE_ROOT = '/content/drive/My Drive/'
DATASET_FOLDER = 'hm_fashion_dataset'
IMAGE_FOLDER_PATH = '/content'
def get_image_path(article_id):
    str_id = str(article_id).zfill(10)
    folder = str_id[:3]
    return f'{IMAGE_FOLDER_PATH}/{folder}/{str_id}.jpg'

data['image_path'] = data['article_id'].apply(get_image_path)
print(data.head())

In [None]:
# Split into 80% training set and 20% validation set
from sklearn.model_selection import train_test_split

train_df, val_df = train_test_split(
    data,
    test_size=0.2,
    random_state=42,
    stratify=data['label']
)

print(f"Training set size: {len(train_df)}")
print(f"Validation set size: {len(val_df)}")

In [None]:
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import torch

# Image preprocessing
val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

class FashionDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.df = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image_path = row['image_path']
        label = row['label']

        try:
            image = Image.open(image_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
            return image, torch.tensor(label, dtype=torch.long)
        except FileNotFoundError:
            print(f"Warning: Image not found at {image_path}, skipping.")
            return None, None

# Filter out samples that failed to load (returned None)
def collate_fn(batch):
    batch = [b for b in batch if b[0] is not None]
    if not batch: return None, None
    return torch.utils.data.dataloader.default_collate(batch)

# Create Dataset instances
train_dataset = FashionDataset(train_df, transform=val_transform) # Temporarily use the same transform as validation
val_dataset = FashionDataset(val_df, transform=val_transform)

# Create DataLoader instances
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

In [None]:
# CNN
import torch.nn as nn
import torch.nn.functional as F

class SimpleCNN(nn.Module):
    def __init__(self, num_classes):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(32 * 56 * 56, 128)
        self.fc2 = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 32 * 56 * 56)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
def train_model(model, model_name, train_loader, val_loader, criterion, optimizer, num_epochs=5):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.to(device)

    for epoch in range(num_epochs):
        # Training mode
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            if inputs is None: continue # Skip empty batches
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)

        epoch_loss = running_loss / len(train_loader.dataset)

        # Evaluation mode
        model.eval()
        corrects = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                if inputs is None: continue
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                corrects += torch.sum(preds == labels.data)

        epoch_acc = torch.tensor(corrects).double() / len(val_loader.dataset)
        print(f'Epoch {epoch}/{num_epochs-1} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
        save_path = f'./{model_name}_epoch_{epoch}.pth'
        torch.save(model.state_dict(), save_path)
        print(f"Model saved to {save_path}")

In [None]:
import torch.optim as optim

num_classes = len(label_map)
baseline_model = SimpleCNN(num_classes)

# Calculate class weights based on the training set
class_counts = train_df['product_group_name'].value_counts()
class_weights = 1. / torch.tensor(class_counts.sort_index().values, dtype=torch.float)
# Send weights to the GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class_weights = class_weights.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(baseline_model.parameters(), lr=0.001)

# Start training
train_model(baseline_model, "baseline_cnn", train_loader, val_loader, criterion, optimizer, num_epochs=5)

In [None]:
# Data Augmentation
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

train_dataset_aug = FashionDataset(train_df, transform=train_transform)
train_loader_aug = DataLoader(train_dataset_aug, batch_size=32, shuffle=True, collate_fn=collate_fn)

In [None]:
# Transfer Learning
from torchvision import models

resnet_model = models.resnet50(pretrained=True)

# Freeze all pre-trained layers
for param in resnet_model.parameters():
    param.requires_grad = False

# Replace the last layer
num_ftrs = resnet_model.fc.in_features
resnet_model.fc = nn.Linear(num_ftrs, num_classes) # num_classes is your number of categories

# Define a new optimizer that only optimizes the parameters of the new layer
optimizer_resnet = optim.Adam(resnet_model.fc.parameters(), lr=0.001)

# Start training with the new DataLoader and new model
print("\nTraining ResNet50 with Transfer Learning...")
train_model(resnet_model, "resnet50", train_loader_aug, val_loader, criterion, optimizer_resnet, num_epochs=5)

In [None]:
# Confusion Matrix
from sklearn.metrics import confusion_matrix
import seaborn as sns
import numpy as np

def get_all_preds(model, loader):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.eval()
    all_preds = torch.tensor([]).to(device)
    all_labels = torch.tensor([]).to(device)
    with torch.no_grad():
        for inputs, labels in loader:
            if inputs is None: continue
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds = torch.cat((all_preds, preds), dim=0)
            all_labels = torch.cat((all_labels, labels), dim=0)
    return all_preds.cpu().numpy(), all_labels.cpu().numpy()

y_pred, y_true = get_all_preds(resnet_model, val_loader)
cm = confusion_matrix(y_true, y_pred)

# Visualize
plt.figure(figsize=(15, 12))
sns.heatmap(cm, annot=True, fmt='d', xticklabels=label_map.values(), yticklabels=label_map.values())
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

In [None]:
# Preparation before visualization
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Will run model on this device: {device}")

try:
    print(f"Checking 'label_map': Success! Contains {len(label_map)} categories.")
    print(f"Checking 'val_loader': Success! Contains {len(val_loader)} batches.")
except NameError as e:
    print(f"Error: Required variable not defined - {e}")

from torchvision import models
import torch.nn as nn
import torch

# Create the "empty shell" for the ResNet50 model
num_classes = len(label_map)
best_model = models.resnet50()
num_ftrs = best_model.fc.in_features
best_model.fc = nn.Linear(num_ftrs, num_classes)

# Define the path to the best model
best_model_path = '/content/drive/My Drive/hm_fashion_dataset/saved_models/resnet50_epoch_4.pth'

# Load weights
try:
    print(f"Loading model from {best_model_path}...")
    best_model.to(device) # First, move the model structure to the device
    best_model.load_state_dict(torch.load(best_model_path))
except FileNotFoundError:
    print(f"Error: Model file not found at path {best_model_path}!")

In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import random

# Helper function: to convert a PyTorch Tensor image back to a displayable format
def imshow(inp, title=None):
    inp = inp.numpy().transpose((1, 2, 0))
    # The image was normalized, we need to un-normalize it
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title, fontsize=10)
    plt.axis('off')

def visualize_model_predictions(model, dataloader, label_map, num_images=10):
    """Visualize model's prediction results"""
    model.eval()
    device = torch.device("cuda:0" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
    model.to(device)

    # Get a batch of data from the dataloader
    images, labels = next(iter(dataloader))
    images, labels = images.to(device), labels.to(device)

    # Get model's prediction results
    with torch.no_grad():
        outputs = model(images)
        _, preds = torch.max(outputs, 1)

    # Move data back to CPU for visualization
    images = images.cpu()
    labels = labels.cpu()
    preds = preds.cpu()

    # Set canvas size
    plt.figure(figsize=(15, 12))

    for i in range(num_images):
        ax = plt.subplot(2, 5, i + 1)

        # Get the text names for true and predicted labels
        true_label = label_map[labels[i].item()]
        pred_label = label_map[preds[i].item()]

        # Set the title, green if prediction is correct, red if incorrect
        title_text = f"True: {true_label}\nPred: {pred_label}"
        color = "green" if true_label == pred_label else "red"

        # Display image and title
        imshow(images[i], title=title_text)
        plt.gca().title.set_color(color)

    plt.tight_layout()
    plt.show()

In [None]:
from torch.utils.data import DataLoader

vis_loader = DataLoader(val_dataset, batch_size=10, shuffle=True, collate_fn=collate_fn)

try:
    visualize_model_predictions(best_model, vis_loader, label_map, num_images=10)
except StopIteration:
    print("An empty batch was generated, possibly because all images in this batch could not be found. Please re-run this cell to try a new random batch.")