**Flower Recognition with Transfer Learning**

# Summary
In this notebook we develop an AI-based system to automatically classify two flower species — **daisy** and **dandelion** — using transfer learning on ResNet-50 via the **timm** library.  
We optimize for **macro F1-score** on a held-out test set, reaching ≈0.89 F1 and 0.90 accuracy.

The code below take inspiration from a project of mine for DL with PyTorch ([here](https://colab.research.google.com/drive/1tp_s8WTgODxTt7DAPiNTyVMlEYY6GgsA) the link)

# Introduction & Methodology
**Objective:**  
Automatically recognize **daisy** and **dandelion** from RGB images to support GreenTech Solutions in crop health monitoring.

**Approach:**  
- **Transfer learning** with ImageNet-pretrained ResNet-50 via `timm.create_model`.
- **Data augmentations** (RandAugment + Random Erasing) from `timm.data.create_transform` to improve robustness.
- **Two-phase fine-tuning**:  
  1. Freeze backbone, train only classifier head;
  2. Unfreeze `layer4`, fine-tune last block at lower LR.
- **Evaluation metric:** macro F1-score + accuracy on validation and test.

**Key design decisions:**  
- ResNet-50: trade-off between accuracy and compute cost.  
- `rand-m9-mstd0.5`: strong augmentations for color/shape variation.  
- `ReduceLROnPlateau` scheduler to adapt LR on validation loss.  
- `is_valid_file` filter to skip corrupted or macOS “._” files.

# Requirement

In [None]:
import os
import urllib.request
import tarfile

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models #we could also try transforms
from torch.utils.data import DataLoader, Subset, random_split

import timm
from timm.data import create_transform

import copy
from sklearn.metrics import f1_score, classification_report, confusion_matrix
from tqdm import tqdm

# Download & Extract dataset

In [None]:
def download_and_extract_dataset(url: str, dest_folder: str):

    """
    Download and extract a tar.gz archive from a URL.

    Args:
        url (str): URL pointing to the .tar.gz dataset.
        dest_folder (str): Local folder where files will be extracted.
    """

    os.makedirs(dest_folder, exist_ok=True)

    # Skip download if already extracted
    if all(os.path.isdir(os.path.join(dest_folder, split)) for split in ['test','train','valid']):
        print("Dataset already exists, skip download.")
        return

    archive_path = os.path.join(dest_folder, 'flowers_dataset.tar.gz')
    print(f"Downloading dataset from {url}...")
    urllib.request.urlretrieve(url, archive_path)
    print("Download complete.")

    print("Extracting files...")
    with tarfile.open(archive_path, 'r:gz') as tar:
        tar.extractall(path=dest_folder)
    print("Extraction complete.")

    os.remove(archive_path)
    print("Cleaned up archive.")

In [None]:
# Base URL
DATASET_URL = "https://proai-datasets.s3.eu-west-3.amazonaws.com/progetto-finale-flowes.tar.gz"

DATASET_FOLDER = "/content/data/flowers"

# Download and extract (skip if already done)
download_and_extract_dataset(DATASET_URL, DATASET_FOLDER)

Downloading dataset from https://proai-datasets.s3.eu-west-3.amazonaws.com/progetto-finale-flowes.tar.gz...
Download complete.
Extracting files...
Extraction complete.
Cleaned up archive.


# Transform, Dataset & Dataloader

In [None]:
# 1) Define timm-based transforms

def get_timm_transforms(input_size: int = 224):

    """
    Create train/valid/test transforms using timm's create_transform.
    """

    # ImageNet normalization
    normalize = (0.485, 0.456, 0.406), (0.229, 0.224, 0.225)

    # Training transform: random crop, RandAugment, Random Erasing, normalization
    train_tf = create_transform(
        input_size=input_size,
        is_training=True,
        # RandAugment policy for strong color and geometric variations
        auto_augment='rand-m9-mstd0.5-inc1',
        # Random Erasing to simulate occlusions and improve generalization
        re_prob=0.25, # random erase probability
        re_mode='pixel',
        re_count=1,
        mean=normalize[0],
        std=normalize[1]
    )

    # Validation/test transform: center crop + normalization
    val_tf = create_transform(
        input_size=input_size,
        is_training=False,
        mean=normalize[0],
        std=normalize[1]
    )

    return {'train': train_tf, 'valid': val_tf, 'test': val_tf}

In [None]:
# 2) Define valid-file filter to skip non-image or macOS “._” files

IMG_EXTENSIONS = ('.jpg', '.jpeg', '.png', '.bmp', '.gif')

def is_valid_file(path: str) -> bool:

    """
    Return True only if:
      - the file has a valid image extension
      - the filename does NOT start with '._'
    """

    fname = os.path.basename(path)
    return fname.lower().endswith(IMG_EXTENSIONS) and not fname.startswith('._')

In [None]:
# 3) Prepare transforms
INPUT_SIZE = 224
transforms_dict = get_timm_transforms(input_size=INPUT_SIZE)

# 4) Create ImageFolder datasets with is_valid_file filter
DATASET_FOLDER = "/content/data/flowers/progetto-finale-flowes"
datasets_dict = {}
for split in ['train', 'valid', 'test']:
    split_dir = os.path.join(DATASET_FOLDER, split)
    datasets_dict[split] = datasets.ImageFolder(
        root=split_dir,
        transform=transforms_dict[split],
        is_valid_file=is_valid_file
    )

# 5) Wrap datasets in DataLoaders
BATCH_SIZE = 32
NUM_WORKERS = 2
dataloaders = {}
for split, ds in datasets_dict.items():
    dataloaders[split] = DataLoader(
        ds,
        batch_size=BATCH_SIZE,
        shuffle=(split == 'train'),
        num_workers=NUM_WORKERS,
        pin_memory=True
    )

# 6) Inspect classes
print("Classes:", datasets_dict['train'].classes)

Classes: ['daisy', 'dandelion']


# Model

## Initialize

**Model & Hyperparameters**

- **Architecture:** ResNet-50 from `timm`  
- **Batch size:** 32  
- **Learning rates:** 1e-4 (phase1), 1e-5 (phase2)  
- **Weight decay:** 1e-5  
- **Epochs:** 5 (head only) + 5 (fine-tuning)  
- **Scheduler:** `ReduceLROnPlateau` on validation loss (factor=0.1, patience=3)  
- **Metrics:** macro F1-score primary, also track accuracy

In [None]:
def initialize_timm_model(model_name: str,
                          num_classes: int,
                          feature_extract: bool = True,
                          pretrained: bool = True):

    """
    Create a timm model, replace its classifier head, and optionally freeze backbone.
    """

    model = timm.create_model(model_name, pretrained=pretrained, num_classes=num_classes)
    if feature_extract:
        # 1) Freeze all
        for param in model.parameters():
            param.requires_grad = False
        # 2) Unfreeze only the classifier head
        if hasattr(model, 'fc'):           # Typical ResNet
            for p in model.fc.parameters():
                p.requires_grad = True
        elif hasattr(model, 'classifier'):
            for p in model.classifier.parameters():
                p.requires_grad = True
        elif hasattr(model, 'head'):
            for p in model.head.parameters():
                p.requires_grad = True
        else:
            raise ValueError("Cannot find fc/classifier/head in model to unfreeze.")
    return model

## Train one epoch

In [None]:
def train_one_epoch(model, loader, criterion, optimizer, device):

    """
    Train the model for one epoch; return:
      - epoch_loss: average loss over all samples
      - epoch_f1:   macro F1-score over the epoch
      - epoch_acc: accuracy over the epoch
    """

    model.train() # Switch to training mode (enables dropout, batchnorm updates)
    running_loss = 0.0
    all_preds, all_labels = [], []
    correct = 0
    total = 0

    # Iterate over the DataLoader batches
    for inputs, labels in tqdm(loader, desc="train"):
        # Move inputs and labels to the computation device (CPU or GPU)
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad() # Reset gradients from previous step
        outputs = model(inputs) # Forward pass: compute logits
        loss = criterion(outputs, labels) # Compute classification loss
        loss.backward() # Backward pass: compute gradients
        optimizer.step() # Update model parameters

        # Accumulate batch loss (loss * number of samples)
        running_loss += loss.item() * inputs.size(0)
        # Compute predictions: index of max logit per sample
        preds = outputs.argmax(dim=1)
        all_preds.extend(preds.cpu().tolist())
        all_labels.extend(labels.cpu().tolist())

        # Update correct prediction count for accuracy
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    # Average loss over entire epoch
    epoch_loss = running_loss / len(loader.dataset)
    # Compute macro F1-score across all classes
    epoch_f1   = f1_score(all_labels, all_preds, average='macro')
    # Compute accuracy as total correct / total samples
    epoch_acc  = correct / total
    return epoch_loss, epoch_f1, epoch_acc

## Validate one epoch

In [None]:
@torch.no_grad() # Disable gradient computation for efficiency
def validate_one_epoch(model, loader, criterion, device):
    """
    Validate the model for one epoch; return:
      - epoch_loss: average loss over all samples
      - epoch_f1:   macro F1-score over the epoch
      - epoch_acc:  accuracy over the epoch
      - all_labels: list of true labels (for detailed reports)
      - all_preds:  list of predicted labels
    """
    model.eval() # Switch to evaluation mode (disable dropout, use running stats for batchnorm)
    running_loss = 0.0
    all_preds, all_labels = [], []
    correct = 0
    total = 0

    # Iterate over the validation DataLoader batches
    for inputs, labels in tqdm(loader, desc="valid"):
        # Move inputs and labels to the computation device
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        # Compute classification loss
        loss = criterion(outputs, labels)

        # Accumulate batch loss (loss * number of samples)
        running_loss += loss.item() * inputs.size(0)
        # Compute predictions: index of max logit per sample
        preds = outputs.argmax(dim=1)
        all_preds.extend(preds.cpu().tolist())
        all_labels.extend(labels.cpu().tolist())
        # Update correct prediction count for accuracy
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    epoch_loss = running_loss / len(loader.dataset)
    epoch_f1   = f1_score(all_labels, all_preds, average='macro')
    epoch_acc  = correct / total
    return epoch_loss, epoch_f1, epoch_acc, all_labels, all_preds

## Fit & Validate

We perform **two-phase training**:
1. **Phase 1**: only train the new head (fast convergence).  
2. **Phase 2**: unfreeze last block (`layer4`) and fine-tune at lower LR.

At each epoch we record **F1-macro** and **accuracy** on both train and validation sets.


In [None]:
def fit(model, dataloaders, device, criterion,
        init_lr=1e-4, ft_lr=1e-5, weight_decay=1e-5,
        init_epochs=5, ft_epochs=5, unfreeze_layer='layer4'):

    """
    Two-phase training:
      Phase 1: train head only.
      Phase 2: unfreeze last block and fine-tune.
    """

    # save initial weights to restore best model later
    best_model_wts = copy.deepcopy(model.state_dict())
    best_f1 = 0.0 # track best validation F1-score

    # Phase 1: head only
    optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()),
                           lr=init_lr, weight_decay=weight_decay)
    # scheduler reduces LR when validation loss plateaus
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer,
                                                     mode='min',
                                                     factor=0.1,
                                                     patience=3)
    for epoch in range(1, init_epochs+1):
        print(f"\n[Phase1] Epoch {epoch}/{init_epochs}")
        # train and validate one epoch
        tr_loss, tr_f1, tr_acc = train_one_epoch(model, dataloaders['train'], criterion, optimizer, device)
        val_loss, val_f1, val_acc, _, _ = validate_one_epoch(model, dataloaders['valid'], criterion, device)
        # update scheduler based on validation loss
        scheduler.step(val_loss)
        print(
            f" train F1: {tr_f1:.4f}, acc: {tr_acc:.4f} | "
            f"valid F1: {val_f1:.4f}, acc: {val_acc:.4f} | "
            f"lr={scheduler.get_last_lr()[0]:.1e}"
        )
        # if validation F1 improves, save model weights
        if val_f1 > best_f1:
            best_f1 = val_f1
            best_model_wts = copy.deepcopy(model.state_dict())

    # Phase 2: unfreeze last block
    for name, param in model.named_parameters():
        if name.startswith(unfreeze_layer):
            param.requires_grad = True
    print(f"\nUnfroze {unfreeze_layer}, fine-tuning last block...")
    # restore weights from best validation epoch before fine-tuning
    model.load_state_dict(best_model_wts)

    # reconfigure optimizer & scheduler to include the newly unfrozen parameters
    optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()),
                           lr=ft_lr, weight_decay=weight_decay)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer,
                                                     mode='min',
                                                     factor=0.1,
                                                     patience=3)
    for epoch in range(1, ft_epochs+1):
        print(f"\n[Phase2] Epoch {epoch}/{ft_epochs}")
        tr_loss, tr_f1, tr_acc = train_one_epoch(model, dataloaders['train'], criterion, optimizer, device)
        val_loss, val_f1, val_acc, _, _ = validate_one_epoch(model, dataloaders['valid'], criterion, device)
        scheduler.step(val_loss)
        print(
            f" train F1: {tr_f1:.4f}, acc: {tr_acc:.4f} | "
            f"valid F1: {val_f1:.4f}, acc: {val_acc:.4f} | "
            f"lr={scheduler.get_last_lr()[0]:.1e}"
        )
        if val_f1 > best_f1:
            best_f1 = val_f1
            best_model_wts = copy.deepcopy(model.state_dict())

    # load best overall weights before returning the model
    model.load_state_dict(best_model_wts)
    return model

In [None]:
# Select device: prefer GPU if available, otherwise CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Number of output classes inferred from the training dataset
num_classes = len(datasets_dict['train'].classes)

# Initialize the ResNet-50 model with timm:
# - feature_extract=True freezes the backbone initially
# - pretrained=True loads ImageNet weights
model = initialize_timm_model('resnet50', num_classes,
                              feature_extract=True, pretrained=True).to(device) # move model to the chosen device

# Define the classification loss function (cross-entropy)
criterion = nn.CrossEntropyLoss()

# Run the two-phase training routine:
# Phase 1: train only the new classifier head (fast initial convergence)
# Phase 2: unfreeze 'layer4' and fine-tune at a lower learning rate
model = fit(
    model,
    dataloaders,
    device,
    criterion,
    init_lr=1e-4,       # learning rate for head training
    ft_lr=1e-5,         # learning rate for fine-tuning last block
    weight_decay=1e-5,  # L2 regularization strength
    init_epochs=5,      # number of epochs for phase 1
    ft_epochs=5,        # number of epochs for phase 2
    unfreeze_layer='layer4'  # which block to unfreeze in phase 2
)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


model.safetensors:   0%|          | 0.00/102M [00:00<?, ?B/s]


[Phase1] Epoch 1/5


train: 100%|██████████| 40/40 [00:07<00:00,  5.21it/s]
valid: 100%|██████████| 12/12 [00:01<00:00,  6.07it/s]


 train F1: 0.5927, acc: 0.6298 | valid F1: 0.5157, acc: 0.6264 | lr=1.0e-04

[Phase1] Epoch 2/5


train: 100%|██████████| 40/40 [00:07<00:00,  5.42it/s]
valid: 100%|██████████| 12/12 [00:01<00:00,  6.11it/s]


 train F1: 0.5038, acc: 0.6400 | valid F1: 0.6630, acc: 0.7143 | lr=1.0e-04

[Phase1] Epoch 3/5


train: 100%|██████████| 40/40 [00:07<00:00,  5.51it/s]
valid: 100%|██████████| 12/12 [00:01<00:00,  6.46it/s]


 train F1: 0.6664, acc: 0.7263 | valid F1: 0.8572, acc: 0.8654 | lr=1.0e-04

[Phase1] Epoch 4/5


train: 100%|██████████| 40/40 [00:06<00:00,  6.27it/s]
valid: 100%|██████████| 12/12 [00:02<00:00,  4.34it/s]


 train F1: 0.7429, acc: 0.7796 | valid F1: 0.8938, acc: 0.8984 | lr=1.0e-04

[Phase1] Epoch 5/5


train: 100%|██████████| 40/40 [00:06<00:00,  6.02it/s]
valid: 100%|██████████| 12/12 [00:01<00:00,  6.36it/s]


 train F1: 0.7552, acc: 0.7867 | valid F1: 0.9089, acc: 0.9121 | lr=1.0e-04

Unfroze layer4, fine-tuning last block...

[Phase2] Epoch 1/5


train: 100%|██████████| 40/40 [00:08<00:00,  4.80it/s]
valid: 100%|██████████| 12/12 [00:01<00:00,  6.29it/s]


 train F1: 0.7882, acc: 0.8102 | valid F1: 0.9235, acc: 0.9258 | lr=1.0e-05

[Phase2] Epoch 2/5


train: 100%|██████████| 40/40 [00:07<00:00,  5.08it/s]
valid: 100%|██████████| 12/12 [00:01<00:00,  6.09it/s]


 train F1: 0.8032, acc: 0.8220 | valid F1: 0.9267, acc: 0.9286 | lr=1.0e-05

[Phase2] Epoch 3/5


train: 100%|██████████| 40/40 [00:06<00:00,  5.84it/s]
valid: 100%|██████████| 12/12 [00:02<00:00,  4.53it/s]


 train F1: 0.8130, acc: 0.8306 | valid F1: 0.9295, acc: 0.9313 | lr=1.0e-05

[Phase2] Epoch 4/5


train: 100%|██████████| 40/40 [00:07<00:00,  5.70it/s]
valid: 100%|██████████| 12/12 [00:01<00:00,  6.20it/s]


 train F1: 0.8226, acc: 0.8384 | valid F1: 0.9323, acc: 0.9341 | lr=1.0e-05

[Phase2] Epoch 5/5


train: 100%|██████████| 40/40 [00:07<00:00,  5.05it/s]
valid: 100%|██████████| 12/12 [00:01<00:00,  6.14it/s]

 train F1: 0.8478, acc: 0.8604 | valid F1: 0.9354, acc: 0.9368 | lr=1.0e-05





**Summary:**  
- The two‐phase schedule allows the classifier head to learn robustly first, then refines deeper representations in `layer4`;
- Validation F1/macro and accuracy consistently improve in both phases, peaking in Phase 2;
- No signs of overfitting: training and validation curves move in tandem, and the LR scheduler effectively adapts to plateauing;
- Final validation performance (~0.94 F1/acc) confirms the effectiveness of strong augmentations and two-phase fine-tuning.

## Final evaluation on test set

In [None]:
test_loss, test_f1, test_acc, y_true, y_pred = validate_one_epoch(
    model, dataloaders['test'], criterion, device)
print(f"\nTest Loss: {test_loss:.4f}, Test F1-macro: {test_f1:.4f}, Test acc: {test_acc:.4f}")
print("Classification Report:\n", classification_report(y_true, y_pred,
                                                        target_names=datasets_dict['train'].classes,
                                                        digits=4, zero_division=0))
print("Confusion Matrix:\n", confusion_matrix(y_true, y_pred))

valid: 100%|██████████| 6/6 [00:02<00:00,  2.63it/s]


Test Loss: 0.4357, Test F1-macro: 0.8943, Test acc: 0.9011
Classification Report:
               precision    recall  f1-score   support

       daisy     1.0000    0.7662    0.8676        77
   dandelion     0.8537    1.0000    0.9211       105

    accuracy                         0.9011       182
   macro avg     0.9268    0.8831    0.8943       182
weighted avg     0.9156    0.9011    0.8985       182

Confusion Matrix:
 [[ 59  18]
 [  0 105]]





## Test Set Performance & Future Improvements

**Test Set Results:**  
- **Loss:** 0.4357  
- **F1-macro:** 0.8943  
- **Accuracy:** 0.9011  

**Classification Report:**  
- **Daisy:** Precision 1.00, Recall 0.77 → The model makes no false positives on daisies (very precise), but misses about 23% of actual daisies (lower recall).  
- **Dandelion:** Precision 0.85, Recall 1.00 → All true dandelions are correctly identified (perfect recall), but about 15% of its positive predictions are actually daisies (some false positives).

**Confusion Matrix:**
- 59 true daisies, 18 daisies misclassified as dandelions  
- 105 true dandelions, 0 misclassified  

By addressing class imbalance and exploring stronger augmentation and ensemble strategies, we can push both recall and precision higher, aiming for even better F1 and accuracy on daisies and dandelions alike.