[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/marketing-and-customer-insight/AI_For_Marketing/blob/main/AI%20for%20Image%20Classification/Run_VisionTransformer.ipynb)

In Google Colab please make sure to select: Runtime -> Change Runtime -> Tesla T4 (GPU) 

# Vision Transformer Image Classification

This notebook demonstrates how to train and use a ConvNeXt vision transformer model for image classification using the Hugging Face transformers library. The workflow includes:
- Loading and preparing image datasets
- Hyperparameter tuning with K-fold cross-validation
- Training the model on your custom dataset
- Making predictions on new, unlabeled images

## 1. Import Required Libraries

The following cell imports all necessary libraries for:
- Data manipulation (pandas, numpy)
- Deep learning (torch, transformers)
- Image processing (PIL, datasets)
- Model evaluation and metrics
- Cross-validation and data splitting

In [None]:
!pip install evaluate --quiet

In [None]:
import os
import glob
import time
import random

import numpy as np
import pandas as pd
import torch
import transformers
import datasets

from PIL import Image
from datasets import Dataset, Features, Value, Image
from sklearn.model_selection import KFold
from transformers import (
    AutoFeatureExtractor,
    AutoImageProcessor,
    AutoModelForImageClassification,
    TrainingArguments,
    Trainer,
    EarlyStoppingCallback,
)
from evaluate import load
from tqdm import tqdm

Download the image datasets

In [None]:
!git clone --depth 1 --filter=blob:none --sparse \
https://github.com/marketing-and-customer-insight/AI_For_Marketing.git
%cd AI_For_Marketing
!git sparse-checkout set "AI for Image Classification/Datasets"
!mv "AI for Image Classification/Datasets" /content

## 2. Configuration Settings

Configure the core settings for training:
- **MODEL_HUGGINGFACE**: The pre-trained model to use (ConvNeXt from Hugging Face Model Hub)
- **DATASET_PATH**: Path to your dataset CSV file (must contain 'image_path' and 'label' columns)
- **PERFORMANCE_SUMMARY_PATH**: Where to save the training results summary

In [None]:
"""
Settings

Where should the trained model be saved, where is your dataset located?

"""

# Check if running in Google Colab to set appropriate paths
try:
    import google.colab
    IN_COLAB = True
    BASE_PATH = '/content'
except:
    IN_COLAB = False
    BASE_PATH = '.'

MODEL_HUGGINGFACE = 'facebook/convnext-base-384-22k-1k'
MODEL_NAME = 'ConvNeXt'

PERFORMANCE_SUMMARY_PATH = 'training_summary_vision_transformer.csv'
DATASET_PATH = f'{BASE_PATH}/Datasets/Brand_Selfies/dataset.csv'
MODEL_OUT_DIR = f'{BASE_PATH}/model_output'

print(f"Dataset path: {DATASET_PATH}")
print(f"Model output directory: {MODEL_OUT_DIR}")

## 3. Training Hyperparameters

Set the hyperparameters to test during model training. The training process will test all combinations of:
- **EPOCHS**: Number of training epochs (32 max)
- **BATCH_SIZES**: Batch sizes for training
- **LEARNING_RATES**: Learning rates for the optimizer

The best hyperparameters will be identified through 10-fold cross-validation.

In [None]:
"""
Settings

You can modify these settings for your training process

"""

EPOCHS = 32
BATCH_SIZES = [16]
LEARNING_RATES = [1e-5]

## 4. Helper Functions

This cell defines two key functions:

**`create_classification_dataset()`**:
- Converts your CSV dataset into a Hugging Face dataset format
- Maps class labels to numeric IDs
- Loads images and applies the image processor

**`train_hf_classification_model()`**:
- Configures and trains the model using Hugging Face Trainer
- Computes performance metrics (precision, recall, F1, accuracy)
- Implements early stopping to prevent overfitting
- Returns trained model and evaluation metrics

In [None]:
def create_classification_dataset(DF_DATASET, MODELNAME):
    df = DF_DATASET.copy()
    unique_labels = sorted(df["label"].astype(str).unique().tolist())
    label2id = {label: i for i, label in enumerate(unique_labels)}
    df["labels"] = df["label"].astype(str).map(label2id)
    img_path_list = df.image_path.to_list()
    label_id_list = df.labels.to_list()
    assert len(img_path_list) == len(label_id_list)

    features = datasets.Features({
        "image_path": datasets.Value("string"),
        "img": datasets.Image(),
        "labels": datasets.ClassLabel(names=unique_labels),
    })

    ds = datasets.Dataset.from_dict(
        {
            "img": img_path_list,
            "image_path": img_path_list,
            "labels": label_id_list,
        },
        features=features,
    )
    processor = AutoImageProcessor.from_pretrained(MODELNAME)

    def hf_transform(example_batch):
        inputs = processor(
            [x.convert("RGB") for x in example_batch["img"]],
            return_tensors="pt",
        )
        inputs["labels"] = example_batch["labels"]
        inputs["image_path"] = example_batch["image_path"]
        return inputs

    prepared_ds = ds.with_transform(hf_transform)
    return prepared_ds, label2id, processor

def train_hf_classification_model(outdir, epochs, batch_size, learning_rate, train_dataset, test_dataset, MODEL_NAME):
        def collate_fn(batch):
            return {
            'pixel_values': torch.stack([x['pixel_values'] for x in batch]),
            'labels': torch.tensor([x['labels'] for x in batch])
            }
        def custom_metrics(eval_pred):
            metric1 = load("precision")
            metric2 = load("recall")
            metric3 = load("f1")
            metric4 = load("accuracy")

            logits, labels = eval_pred
            predictions = np.argmax(logits, axis=-1)

            precision = metric1.compute(predictions=predictions, references=labels, average="weighted")["precision"]
            recall = metric2.compute(predictions=predictions, references=labels, average="weighted")["recall"]
            f1 = metric3.compute(predictions=predictions, references=labels, average="weighted")["f1"]
            accuracy = metric4.compute(predictions=predictions, references=labels)["accuracy"]

            return {"precision": precision, "recall": recall, "f1": f1, "accuracy": accuracy}

        labels = train_dataset.features['labels'].names
        processor = AutoImageProcessor.from_pretrained(MODEL_NAME)
        model = AutoModelForImageClassification.from_pretrained(MODEL_NAME, num_labels=len(labels), ignore_mismatched_sizes=True, id2label={str(i): c for i, c in enumerate(labels)}, label2id={c: str(i) for i, c in enumerate(labels)})

        device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        model.to(device)

        early_stopping_patience_epochs = 3
        early_stopping = EarlyStoppingCallback(early_stopping_patience=early_stopping_patience_epochs)

        training_args = TrainingArguments(
        output_dir = outdir,
        disable_tqdm=True,
        per_device_train_batch_size=batch_size,
        save_strategy="epoch",
        eval_strategy="epoch",
        num_train_epochs=epochs,
        learning_rate=learning_rate,
        weight_decay=0.01,
        save_total_limit=2,
        remove_unused_columns=False,
        push_to_hub=False,
        load_best_model_at_end=True,
        fp16=True
        )

        trainer = Trainer(
            model=model,
            args=training_args,
            data_collator=collate_fn,
            compute_metrics=custom_metrics,
            train_dataset=train_dataset,
            eval_dataset=test_dataset,
            callbacks=[early_stopping]
        )

        train_results = trainer.train()
        log_history = trainer.state.log_history
        best_epoch = None
        eval_entries = [e for e in log_history if "eval_loss" in e or "eval_accuracy" in e]
        if eval_entries:
            if any("eval_loss" in e for e in eval_entries):
                best_entry = min(
                    (e for e in eval_entries if "eval_loss" in e),
                    key=lambda x: x["eval_loss"],
                )
            else:
                best_entry = max(
                    eval_entries,
                    key=lambda x: x.get("eval_accuracy", float("-inf")),
                )
            best_epoch = best_entry.get("epoch")
        if best_epoch is None:
            best_epoch = trainer.state.epoch
        optimal_epochs_trained = int(round(best_epoch)) if best_epoch is not None else None
        metrics = trainer.evaluate(test_dataset)
        return trainer, model, metrics, optimal_epochs_trained

## 5. Load Your Dataset

Load the CSV file containing your image paths and labels. Your dataset CSV must contain:
- **image_path**: Path to each image file
- **label**: The class label for that image

The dataset is then displayed to verify it loaded correctly.

In [None]:
import os
os.chdir('/content')

In [None]:
"""

Loading your dataset

"""

df = pd.read_csv(DATASET_PATH)

required_cols = {'image_path', 'label'}
assert required_cols.issubset(df.columns), \
    'Please make sure that your dataset contains both columns: image_path and label.'
df.head()

## 6. Hyperparameter Tuning and Model Training

This is the main training cell that:

1. **Grid Search**: Tests all combinations of batch sizes and learning rates
2. **K-Fold Cross-Validation**: Trains 10 separate models with different train/validation splits to ensure robust results
3. **Tracks Performance**: Saves accuracy, precision, recall, and F1 scores for each fold
4. **Finds Best Hyperparameters**: Identifies the combination that yields the highest validation accuracy
5. **Final Training**: Trains a final model using the best hyperparameters on 90% of the data
6. **Saves Model**: Stores the trained model and processor for later inference

The results are saved to a CSV file for easy review.

In [None]:
"""

Fine-tune the model with your dataset

"""

df_train_summary = pd.DataFrame()
print('Training models to find optimal hyperparameters.')
for LEARNING_RATE in LEARNING_RATES:
    for BATCH_SIZE in BATCH_SIZES:
        dataset, label2id, processor = create_classification_dataset(df, MODELNAME=MODEL_HUGGINGFACE)
        ds = dataset.shuffle(seed=1)

        fold_counter = 0
        kf = KFold(n_splits=3, shuffle=True, random_state=1)
        for train_index, val_index in kf.split(ds):
            fold_counter += 1
            train_dataset = ds.select(train_index)
            val_dataset = ds.select(val_index)

            trainer, model, metrics, optimal_epochs_trained = train_hf_classification_model(outdir=MODEL_OUT_DIR, epochs=EPOCHS, batch_size=BATCH_SIZE, learning_rate=LEARNING_RATE, train_dataset=train_dataset, test_dataset=val_dataset, MODEL_NAME=MODEL_HUGGINGFACE)

            i = df_train_summary.shape[0]
            df_train_summary.at[i, 'Batch_Size'] = BATCH_SIZE
            df_train_summary.at[i, 'Learning_Rate'] = LEARNING_RATE
            df_train_summary.at[i, 'Fold'] = fold_counter
            df_train_summary.at[i, 'Epochs'] = optimal_epochs_trained
            df_train_summary.at[i, 'Accuracy'] = np.round(metrics['eval_accuracy'], 4)
            df_train_summary.at[i, 'Precision'] = np.round(metrics['eval_precision'], 4)
            df_train_summary.at[i, 'Recall'] = np.round(metrics['eval_recall'], 4)
            df_train_summary.at[i, 'F1'] = np.round(metrics['eval_f1'], 4)
            df_train_summary.to_csv(PERFORMANCE_SUMMARY_PATH, index=False)

print('Training the best model for inference.')
df_avg = (
    df_train_summary
    .groupby(["Batch_Size", "Learning_Rate"], as_index=False)
    .agg(
        AVG_ACCURACY=("Accuracy", "mean"),
        STD_ACCURACY=("Accuracy", "std"),
        N_FOLDS=("Accuracy", "count"),
        AVG_OPTIMAL_EPOCHS=("Epochs", "mean"),
    )
)
df_avg = df_avg.sort_values("AVG_ACCURACY", ascending=False)

best_row = df_avg.iloc[0]
best_batch_size = int(best_row["Batch_Size"])
best_learning_rate = float(best_row["Learning_Rate"])

split = dataset.train_test_split(test_size=0.1, seed=1)
train_dataset = split["train"]
val_dataset = split["test"]

trainer, model, metrics, optimal_epochs_trained = train_hf_classification_model(outdir=MODEL_OUT_DIR, epochs=EPOCHS, batch_size=best_batch_size, learning_rate=best_learning_rate, train_dataset=train_dataset, test_dataset=val_dataset, MODEL_NAME=MODEL_HUGGINGFACE)
model.config.label2id = label2id
model.config.id2label = {v: k for k, v in label2id.items()}

FINAL_MODEL_DIR = os.path.join(MODEL_OUT_DIR, "trained_tvm")
trainer.save_model(FINAL_MODEL_DIR)
processor.save_pretrained(FINAL_MODEL_DIR)


## 7. Inference with Trained Model

The following cells show how to load your trained model and make predictions on new, unlabeled images.

## 8. Load the Trained Model

Load the saved model and processor that were trained in the previous section. The model is moved to the available device (GPU if available, CPU otherwise) and set to evaluation mode.

In [None]:
"""

Use the trained model for prediction of unlabeled images

"""

import torch
from transformers import AutoModelForImageClassification, AutoImageProcessor

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
FINAL_MODEL_DIR = os.path.join(MODEL_OUT_DIR, "trained_tvm")

model = AutoModelForImageClassification.from_pretrained(FINAL_MODEL_DIR)
processor = AutoImageProcessor.from_pretrained(FINAL_MODEL_DIR)

model.to(device)
model.eval()

## 9. Prepare Images for Prediction

Load all images from the directory containing unseen/unlabeled images that you want to classify. A DataFrame is created to store the image paths and predictions.

In [None]:
import glob
from PIL import Image

# Use the BASE_PATH from the configuration settings
prediction_image_paths = glob.glob(f'{BASE_PATH}/Datasets/Brand_Selfies/Unseen_Samples/*')
df_predictions = pd.DataFrame(prediction_image_paths, columns=['image_path'])

print(f"Found {len(prediction_image_paths)} images for prediction")

## 10. Run Inference on Unlabeled Images

This cell processes each image through the trained model to generate predictions:
1. Opens each image and converts it to RGB format
2. Processes the image using the trained processor
3. Runs the model in inference mode (no gradient calculation)
4. Extracts the predicted class label
5. Saves all predictions to a CSV file for review

The output CSV contains the image paths and their corresponding predicted labels.

In [None]:
for i in tqdm(df_predictions.index):
    image = Image.open(df_predictions.at[i, 'image_path']).convert("RGB")

    inputs = processor(image, return_tensors="pt")
    inputs = {k: v.to(device) for k, v in inputs.items()}

    with torch.no_grad():
        logits = model(**inputs).logits

    pred_ids = logits.argmax(dim=-1).tolist()
    pred_labels = [model.config.id2label[i] for i in pred_ids]
    df_predictions.at[i, 'prediction'] = pred_labels[0]

output_path = f'{BASE_PATH}/prediction_unseen_data_vision_transformer.csv'
df_predictions.to_csv(output_path, index=False)
print(f"\n✓ Predictions saved to: {output_path}")