# AneRBC-I Binary Classification (Anemic vs Healthy) with VGG16

This notebook implements a reproducible TensorFlow/Keras binary classification pipeline using **AneRBC-I `Original_images` only**.

## Cases Compared
- **Case 1**: VGG16 from scratch (`weights=None`, trainable base)
- **Case 2**: Transfer learning (`weights='imagenet'`, frozen base only)

## Fixed Split (seed=42)
- Train: 700 images (350 Healthy / 350 Anemic)
- Validation: 100 images (50 Healthy / 50 Anemic)
- Test: 200 images (100 Healthy / 100 Anemic)


## Environment (Python 3.11)

Use the project virtualenv before running this notebook:

```bash
cd <project-root>          # e.g. cd ~/Downloads/fyp
source venv/bin/activate
pip install -r requirements.txt
```

> The notebook auto-detects the project root at runtime via `find_project_root()` — no hardcoded paths needed.


In [2]:
import os
import random
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns

from sklearn.metrics import (
    accuracy_score,
    classification_report,
    confusion_matrix,
    f1_score,
    precision_score,
    recall_score,
)
from sklearn.model_selection import train_test_split

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.applications import VGG16
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.preprocessing.image import ImageDataGenerator

SEED = 42
IMAGE_SIZE = (224, 224)
BATCH_SIZE = 16
MAX_EPOCHS = 30
LABEL_TO_ID = {"Healthy": 0, "Anemic": 1}

random.seed(SEED)
np.random.seed(SEED)
tf.keras.utils.set_random_seed(SEED)
try:
    tf.config.experimental.enable_op_determinism()
except Exception:
    pass


def find_project_root(start: Path) -> Path:
    start = start.resolve()
    for candidate in [start] + list(start.parents):
        if (candidate / "AneRBC_dataset" / "AneRBC-I").exists():
            return candidate
    raise FileNotFoundError("Could not locate project root containing AneRBC_dataset/AneRBC-I")


PROJECT_ROOT = find_project_root(Path.cwd())
DATASET_ROOT = PROJECT_ROOT / "AneRBC_dataset" / "AneRBC-I"
ANEMIC_DIR = DATASET_ROOT / "Anemic_individuals" / "Original_images"
HEALTHY_DIR = DATASET_ROOT / "Healthy_individuals" / "Original_images"

ARTIFACTS_DIR = PROJECT_ROOT / "Code" / "ImageClassification" / "artifacts"
SPLITS_DIR = ARTIFACTS_DIR / "splits"
MODELS_DIR = ARTIFACTS_DIR / "models"
METRICS_DIR = ARTIFACTS_DIR / "metrics"
PLOTS_DIR = ARTIFACTS_DIR / "plots"

for directory in [ARTIFACTS_DIR, SPLITS_DIR, MODELS_DIR, METRICS_DIR, PLOTS_DIR]:
    directory.mkdir(parents=True, exist_ok=True)

TRAIN_CSV = SPLITS_DIR / "train_split.csv"
VAL_CSV = SPLITS_DIR / "val_split.csv"
TEST_CSV = SPLITS_DIR / "test_split.csv"

SCRATCH_CKPT = MODELS_DIR / "vgg16_scratch_best.keras"
TRANSFER_CKPT = MODELS_DIR / "vgg16_transfer_frozen_best.keras"

COMPARISON_CSV = METRICS_DIR / "comparison_metrics.csv"

TRAINING_CURVES_PNG = PLOTS_DIR / "training_curves.png"
CONFUSION_MATRICES_PNG = PLOTS_DIR / "confusion_matrices.png"
ACCURACY_F1_PNG = PLOTS_DIR / "accuracy_f1_comparison.png"

print("TensorFlow version:", tf.__version__)
print("Project root:", PROJECT_ROOT)
print("Artifacts dir:", ARTIFACTS_DIR)


TensorFlow version: 2.18.1
Project root: /Users/kiran/Downloads/fyp
Artifacts dir: /Users/kiran/Downloads/fyp/Code/ImageClassification/artifacts


In [3]:
VALID_EXTENSIONS = {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}


def _collect_image_paths(folder: Path) -> list[Path]:
    return sorted(
        [p for p in folder.iterdir() if p.is_file() and p.suffix.lower() in VALID_EXTENSIONS],
        key=lambda p: p.name,
    )


def build_dataframe(anemic_dir: str, healthy_dir: str) -> pd.DataFrame:
    anemic_paths = _collect_image_paths(Path(anemic_dir))
    healthy_paths = _collect_image_paths(Path(healthy_dir))

    rows = []
    for path in healthy_paths:
        rows.append({"filepath": str(path.resolve().relative_to(PROJECT_ROOT)), "label": "Healthy"})
    for path in anemic_paths:
        rows.append({"filepath": str(path.resolve().relative_to(PROJECT_ROOT)), "label": "Anemic"})

    df = pd.DataFrame(rows).sort_values("filepath").reset_index(drop=True)
    return df


def resolve_paths(df: pd.DataFrame) -> pd.DataFrame:
    """Return a copy of df with filepaths resolved to absolute paths from PROJECT_ROOT."""
    df = df.copy()
    df["filepath"] = df["filepath"].apply(lambda p: str(PROJECT_ROOT / p))
    return df


def make_splits(df: pd.DataFrame, seed: int = 42) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    stratify_labels = df["label"].map(LABEL_TO_ID)

    train_df, temp_df = train_test_split(
        df,
        test_size=0.30,
        random_state=seed,
        shuffle=True,
        stratify=stratify_labels,
    )

    temp_stratify_labels = temp_df["label"].map(LABEL_TO_ID)
    val_df, test_df = train_test_split(
        temp_df,
        test_size=2 / 3,
        random_state=seed,
        shuffle=True,
        stratify=temp_stratify_labels,
    )

    return (
        train_df.reset_index(drop=True),
        val_df.reset_index(drop=True),
        test_df.reset_index(drop=True),
    )


In [4]:
def assert_split_integrity(train_df: pd.DataFrame, val_df: pd.DataFrame, test_df: pd.DataFrame) -> None:
    # Exact sample counts
    assert len(train_df) == 700, f"Expected 700 train samples, found {len(train_df)}"
    assert len(val_df) == 100, f"Expected 100 val samples, found {len(val_df)}"
    assert len(test_df) == 200, f"Expected 200 test samples, found {len(test_df)}"

    # Exact per-class counts
    expected_class_counts = {
        "train": {"Healthy": 350, "Anemic": 350},
        "val": {"Healthy": 50, "Anemic": 50},
        "test": {"Healthy": 100, "Anemic": 100},
    }

    split_map = {"train": train_df, "val": val_df, "test": test_df}
    for split_name, split_df in split_map.items():
        counts = split_df["label"].value_counts().to_dict()
        assert counts == expected_class_counts[split_name], (
            f"{split_name} class counts mismatch. Expected {expected_class_counts[split_name]}, got {counts}"
        )

    # Zero overlap among filepaths (compare as relative paths)
    train_paths = set(train_df["filepath"])
    val_paths = set(val_df["filepath"])
    test_paths = set(test_df["filepath"])

    assert train_paths.isdisjoint(val_paths), "Leakage detected between train and val"
    assert train_paths.isdisjoint(test_paths), "Leakage detected between train and test"
    assert val_paths.isdisjoint(test_paths), "Leakage detected between val and test"

    # All files exist and are readable (resolve relative paths against PROJECT_ROOT)
    all_paths = train_paths | val_paths | test_paths
    bad_paths = [
        path for path in all_paths
        if not (PROJECT_ROOT / path).is_file() or not os.access(str(PROJECT_ROOT / path), os.R_OK)
    ]
    assert not bad_paths, f"Found missing/unreadable files: {len(bad_paths)}"


full_df = build_dataframe(str(ANEMIC_DIR), str(HEALTHY_DIR))
assert full_df.shape[0] == 1000, f"Expected 1000 rows, found {full_df.shape[0]}"

train_df, val_df, test_df = make_splits(full_df, seed=SEED)
assert_split_integrity(train_df, val_df, test_df)

# Reproducibility check: rerunning with the same seed yields identical split memberships
train_df_2, val_df_2, test_df_2 = make_splits(full_df, seed=SEED)
assert set(train_df["filepath"]) == set(train_df_2["filepath"])
assert set(val_df["filepath"]) == set(val_df_2["filepath"])
assert set(test_df["filepath"]) == set(test_df_2["filepath"])

# Stable output ordering
train_df = train_df.sort_values("filepath").reset_index(drop=True)
val_df = val_df.sort_values("filepath").reset_index(drop=True)
test_df = test_df.sort_values("filepath").reset_index(drop=True)

train_df.to_csv(TRAIN_CSV, index=False)
val_df.to_csv(VAL_CSV, index=False)
test_df.to_csv(TEST_CSV, index=False)

print("Saved split files:")
print(" -", TRAIN_CSV)
print(" -", VAL_CSV)
print(" -", TEST_CSV)

print("\nSplit summary:")
summary_df = pd.DataFrame({
    "split": ["train", "val", "test"],
    "count": [len(train_df), len(val_df), len(test_df)],
})
print(summary_df)

print("\nPer-class counts:")
for split_name, split_df in {"train": train_df, "val": val_df, "test": test_df}.items():
    print(split_name)
    print(split_df["label"].value_counts().sort_index())
    print()


Saved split files:
 - /Users/kiran/Downloads/fyp/Code/ImageClassification/artifacts/splits/train_split.csv
 - /Users/kiran/Downloads/fyp/Code/ImageClassification/artifacts/splits/val_split.csv
 - /Users/kiran/Downloads/fyp/Code/ImageClassification/artifacts/splits/test_split.csv

Split summary:
   split  count
0  train    700
1    val    100
2   test    200

Per-class counts:
train
label
Anemic     350
Healthy    350
Name: count, dtype: int64

val
label
Anemic     50
Healthy    50
Name: count, dtype: int64

test
label
Anemic     100
Healthy    100
Name: count, dtype: int64



In [5]:
def make_generators(train_df, val_df, test_df, image_size=(224, 224), batch_size=16):
    classes = ["Healthy", "Anemic"]

    # Resolve relative paths to absolute before passing to Keras generators
    train_df_abs = resolve_paths(train_df)
    val_df_abs = resolve_paths(val_df)
    test_df_abs = resolve_paths(test_df)

    train_datagen = ImageDataGenerator(rescale=1.0 / 255.0)
    eval_datagen = ImageDataGenerator(rescale=1.0 / 255.0)

    common_kwargs = {
        "x_col": "filepath",
        "y_col": "label",
        "target_size": image_size,
        "color_mode": "rgb",
        "class_mode": "binary",
        "classes": classes,
        "batch_size": batch_size,
        "validate_filenames": True,
    }

    train_gen = train_datagen.flow_from_dataframe(
        dataframe=train_df_abs,
        shuffle=True,
        seed=SEED,
        **common_kwargs,
    )

    val_gen = eval_datagen.flow_from_dataframe(
        dataframe=val_df_abs,
        shuffle=False,
        **common_kwargs,
    )

    test_gen = eval_datagen.flow_from_dataframe(
        dataframe=test_df_abs,
        shuffle=False,
        **common_kwargs,
    )

    return train_gen, val_gen, test_gen


train_gen, val_gen, test_gen = make_generators(
    train_df,
    val_df,
    test_df,
    image_size=IMAGE_SIZE,
    batch_size=BATCH_SIZE,
)

# Label encoding test
expected_class_indices = {"Healthy": 0, "Anemic": 1}
assert train_gen.class_indices == expected_class_indices, (
    f"Class indices mismatch. Expected {expected_class_indices}, got {train_gen.class_indices}"
)

# Input shape/range test
x_batch, y_batch = next(train_gen)
assert x_batch.shape[1:] == (224, 224, 3), f"Unexpected input shape: {x_batch.shape}"
assert np.min(x_batch) >= 0.0 and np.max(x_batch) <= 1.0, "Input pixel range should be [0, 1]"
assert y_batch.ndim == 1, f"Expected binary labels as rank-1 vector, got shape {y_batch.shape}"
train_gen.reset()

print("Generator checks passed.")
print("Class indices:", train_gen.class_indices)
print("Sample batch shape:", x_batch.shape, "Label shape:", y_batch.shape)
print("Pixel range:", float(np.min(x_batch)), "to", float(np.max(x_batch)))


Found 700 validated image filenames belonging to 2 classes.
Found 100 validated image filenames belonging to 2 classes.
Found 200 validated image filenames belonging to 2 classes.
Generator checks passed.
Class indices: {'Healthy': 0, 'Anemic': 1}
Sample batch shape: (16, 224, 224, 3) Label shape: (16,)
Pixel range: 0.04313725605607033 to 0.9568628072738647


In [6]:
def build_vgg16_binary(case: str) -> tf.keras.Model:
    if case not in {"scratch", "transfer_frozen"}:
        raise ValueError("case must be one of {'scratch', 'transfer_frozen'}")

    weights = None if case == "scratch" else "imagenet"
    base = VGG16(
        weights=weights,
        include_top=False,
        input_shape=(224, 224, 3),
    )
    base.trainable = case == "scratch"

    inputs = tf.keras.Input(shape=(224, 224, 3))
    x = base(inputs, training=base.trainable)
    x = layers.GlobalAveragePooling2D()(x)
    x = layers.Dense(256, activation="relu")(x)
    x = layers.Dropout(0.5)(x)
    outputs = layers.Dense(1, activation="sigmoid")(x)

    model = models.Model(inputs=inputs, outputs=outputs, name=f"vgg16_{case}_binary")
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
        loss="binary_crossentropy",
        metrics=["accuracy"],
    )
    return model


# Model configuration tests
scratch_model = build_vgg16_binary("scratch")
transfer_model = build_vgg16_binary("transfer_frozen")

assert scratch_model.get_layer("vgg16").trainable is True, "Case 1 base should be trainable"
assert transfer_model.get_layer("vgg16").trainable is False, "Case 2 base should be frozen"

print("Model configuration checks passed.")
print("Case 1 base trainable:", scratch_model.get_layer("vgg16").trainable)
print("Case 2 base trainable:", transfer_model.get_layer("vgg16").trainable)

del scratch_model, transfer_model
tf.keras.backend.clear_session()


Model configuration checks passed.
Case 1 base trainable: True
Case 2 base trainable: False


In [None]:
def train_case(case: str, train_gen, val_gen, epochs: int = 30):
    tf.keras.backend.clear_session()
    model = build_vgg16_binary(case)

    checkpoint_path = SCRATCH_CKPT if case == "scratch" else TRANSFER_CKPT
    callbacks = [
        EarlyStopping(
            monitor="val_loss",
            patience=5,
            restore_best_weights=True,
            verbose=1,
        ),
        ModelCheckpoint(
            filepath=str(checkpoint_path),
            monitor="val_accuracy",
            mode="max",
            save_best_only=True,
            verbose=1,
        ),
    ]

    history = model.fit(
        train_gen,
        validation_data=val_gen,
        epochs=epochs,
        callbacks=callbacks,
        verbose=1,
    )

    return model, history, checkpoint_path


cases = [
    ("scratch", "Case 1: VGG16 without transfer learning"),
    ("transfer_frozen", "Case 2: VGG16 transfer learning (frozen base)"),
]

trained_models = {}
histories = {}

for case, description in cases:
    print("\n" + "=" * 80)
    print(description)
    model, history, checkpoint_path = train_case(case, train_gen, val_gen, epochs=MAX_EPOCHS)
    trained_models[case] = {"model": model, "checkpoint": checkpoint_path}
    histories[case] = history.history
    print(f"Saved best checkpoint to: {checkpoint_path}")



Case 1: VGG16 without transfer learning
Epoch 1/30
[1m15/44[0m [32m━━━━━━[0m[37m━━━━━━━━━━━━━━[0m [1m1:47[0m 4s/step - accuracy: 0.4870 - loss: 0.6937

In [None]:
def _predict_labels(model: tf.keras.Model, test_gen):
    test_gen.reset()
    y_true = test_gen.classes.astype(int)
    y_prob = model.predict(test_gen, verbose=0).ravel()
    y_pred = (y_prob >= 0.5).astype(int)
    return y_true, y_pred, y_prob


def evaluate_case(model, test_gen) -> dict[str, float]:
    y_true, y_pred, _ = _predict_labels(model, test_gen)
    return {
        "accuracy": float(accuracy_score(y_true, y_pred)),
        "f1": float(f1_score(y_true, y_pred, pos_label=1)),
        "precision": float(precision_score(y_true, y_pred, pos_label=1, zero_division=0)),
        "recall": float(recall_score(y_true, y_pred, pos_label=1, zero_division=0)),
    }


In [None]:
results = {}

for case, description in cases:
    checkpoint_path = SCRATCH_CKPT if case == "scratch" else TRANSFER_CKPT
    if not checkpoint_path.exists():
        raise FileNotFoundError(f"Checkpoint not found: {checkpoint_path}")

    best_model = tf.keras.models.load_model(checkpoint_path)
    metrics = evaluate_case(best_model, test_gen)
    y_true, y_pred, _ = _predict_labels(best_model, test_gen)

    cm = confusion_matrix(y_true, y_pred, labels=[0, 1])
    report = classification_report(
        y_true,
        y_pred,
        labels=[0, 1],
        target_names=["Healthy", "Anemic"],
        digits=4,
        zero_division=0,
    )

    # Evaluation tests
    assert cm.shape == (2, 2), f"Confusion matrix for {case} must be 2x2, got {cm.shape}"
    assert np.all(np.isfinite(list(metrics.values()))), f"Non-finite metric values detected for {case}"

    results[case] = {
        **metrics,
        "confusion_matrix": cm,
        "classification_report": report,
    }

    print("\n" + "=" * 80)
    print(description)
    print("Test metrics:")
    print({k: round(v, 4) for k, v in metrics.items()})
    print("\nConfusion matrix:")
    print(cm)
    print("\nClassification report:")
    print(report)


In [None]:
case_name_map = {
    "scratch": "Case 1",
    "transfer_frozen": "Case 2",
}

comparison_rows = []
for case, _ in cases:
    comparison_rows.append(
        {
            "case": case_name_map[case],
            "config": case,
            "accuracy": results[case]["accuracy"],
            "f1": results[case]["f1"],
            "precision": results[case]["precision"],
            "recall": results[case]["recall"],
        }
    )

comparison_df = pd.DataFrame(comparison_rows)
comparison_df = comparison_df.sort_values(by=["accuracy", "f1"], ascending=[False, False]).reset_index(drop=True)
comparison_df.insert(0, "rank", np.arange(1, len(comparison_df) + 1))

comparison_df.to_csv(COMPARISON_CSV, index=False)
comparison_df


In [None]:
# Training curves per case
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

for row_idx, (case, description) in enumerate(cases):
    history = histories[case]

    axes[row_idx, 0].plot(history.get("loss", []), label="train_loss")
    axes[row_idx, 0].plot(history.get("val_loss", []), label="val_loss")
    axes[row_idx, 0].set_title(f"{description} - Loss")
    axes[row_idx, 0].set_xlabel("Epoch")
    axes[row_idx, 0].set_ylabel("Loss")
    axes[row_idx, 0].legend()

    axes[row_idx, 1].plot(history.get("accuracy", []), label="train_accuracy")
    axes[row_idx, 1].plot(history.get("val_accuracy", []), label="val_accuracy")
    axes[row_idx, 1].set_title(f"{description} - Accuracy")
    axes[row_idx, 1].set_xlabel("Epoch")
    axes[row_idx, 1].set_ylabel("Accuracy")
    axes[row_idx, 1].legend()

plt.tight_layout()
plt.savefig(TRAINING_CURVES_PNG, dpi=200, bbox_inches="tight")
plt.show()
print("Saved:", TRAINING_CURVES_PNG)


In [None]:
# Confusion matrices per case
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

for idx, (case, description) in enumerate(cases):
    cm = results[case]["confusion_matrix"]
    sns.heatmap(
        cm,
        annot=True,
        fmt="d",
        cmap="Blues",
        cbar=False,
        xticklabels=["Healthy (0)", "Anemic (1)"],
        yticklabels=["Healthy (0)", "Anemic (1)"],
        ax=axes[idx],
    )
    axes[idx].set_title(description)
    axes[idx].set_xlabel("Predicted")
    axes[idx].set_ylabel("True")

plt.tight_layout()
plt.savefig(CONFUSION_MATRICES_PNG, dpi=200, bbox_inches="tight")
plt.show()
print("Saved:", CONFUSION_MATRICES_PNG)


In [None]:
# Accuracy/F1 comparison bar chart
plot_df = comparison_df[["case", "accuracy", "f1"]].copy()
plot_df = plot_df.melt(id_vars="case", var_name="metric", value_name="value")

plt.figure(figsize=(8, 5))
sns.barplot(data=plot_df, x="case", y="value", hue="metric")
plt.ylim(0, 1)
plt.title("Case Comparison: Accuracy vs F1")
plt.ylabel("Score")
plt.xlabel("Case")
plt.legend(title="Metric")
plt.tight_layout()
plt.savefig(ACCURACY_F1_PNG, dpi=200, bbox_inches="tight")
plt.show()
print("Saved:", ACCURACY_F1_PNG)


In [None]:
artifacts = [
    TRAIN_CSV,
    VAL_CSV,
    TEST_CSV,
    SCRATCH_CKPT,
    TRANSFER_CKPT,
    COMPARISON_CSV,
    TRAINING_CURVES_PNG,
    CONFUSION_MATRICES_PNG,
    ACCURACY_F1_PNG,
]

print("Artifact check:")
for path in artifacts:
    exists = path.exists()
    size = path.stat().st_size if exists else 0
    print(f" - {path}: {'OK' if exists else 'MISSING'} (bytes={size})")
