[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/tunnel-ai/way/blob/main/notebooks/06_00_main_v3.ipynb)

# 06_00_main — CNNs on EuroSAT (TFDS)

**Module 6: Neural Networks (vision anchor)**

In this notebook we will:
- Load **EuroSAT RGB** from **TensorFlow Datasets (TFDS)**
- Compare a **dense network baseline** (flattened pixels) vs a **CNN**
- Add one “guardrail” (early stopping) to stabilize training
- Do lightweight **error analysis** (confusions + example mistakes)

> Teaching intent: the *same data* behaves very differently depending on whether the architecture respects spatial structure.


In [None]:
# Colab note: TensorFlow is preinstalled. Avoid upgrading it unless you have a reason.
# We do NOT use TFDS for EuroSAT here because the original TFDS download URL can return 403
# and checksum validation can fail if a mirror zip is used.
#
# If you see any import errors, restart runtime: Runtime → Restart runtime

import os
import numpy as np
import matplotlib.pyplot as plt

import tensorflow as tf

print("TensorFlow:", tf.__version__)

# Reproducibility (enough for interpretable curves)
tf.keras.utils.set_random_seed(1955)

# (Optional) confirm GPU
print("GPU available:", bool(tf.config.list_physical_devices("GPU")))


## 1) Get EuroSAT RGB (direct download) and build datasets

We download **EuroSAT RGB** from a stable archival host and load it using `image_dataset_from_directory`.
This avoids TFDS download-host issues (403) and checksum mismatches when mirrors are used.

We then create **train/val/test** splits and preserve the same notebook flow as before.


In [None]:
import pathlib, zipfile

# -----------------------------
# 1) Download + unzip EuroSAT RGB
# -----------------------------
# We use the Zenodo archival source for EuroSAT RGB (stable).
# This avoids TFDS download host issues and checksum mismatches.

DATA_ROOT = "/content/data"
ZIP_PATH = f"{DATA_ROOT}/EuroSAT_RGB.zip"
EXTRACT_DIR = f"{DATA_ROOT}/EuroSAT_RGB"

pathlib.Path(DATA_ROOT).mkdir(parents=True, exist_ok=True)

# Download (quiet). If the file already exists, wget overwrites; that's fine for reproducibility.
!wget -q -O "{ZIP_PATH}" "https://zenodo.org/records/7711810/files/EuroSAT_RGB.zip"

# Extract once
if not os.path.exists(EXTRACT_DIR):
    with zipfile.ZipFile(ZIP_PATH, "r") as z:
        z.extractall(EXTRACT_DIR)

print("Folder preview (to confirm class directory):")
!find "{EXTRACT_DIR}" -maxdepth 2 -type d | head -n 30

# -----------------------------
# Locate the class directory automatically
# -----------------------------
# The EuroSAT RGB zip may unpack into slightly different folder layouts depending on source/version.
# We detect the directory that contains the class subfolders (e.g., AnnualCrop, Forest, Highway, ...).

def find_class_dir(root):
    for current_root, dirs, files in os.walk(root):
        # EuroSAT has ~10 class folders; choose a directory with many subdirectories.
        if len(dirs) >= 8:
            return current_root
    raise RuntimeError(f"Could not locate class directory automatically under: {root}")

CLASS_DIR = find_class_dir(EXTRACT_DIR)
print("Detected CLASS_DIR:", CLASS_DIR)
print("Class folders:", sorted(os.listdir(CLASS_DIR)))


# -----------------------------
# 2) Build tf.data datasets from directory
# -----------------------------
BATCH_SIZE = 64
IMG_SIZE = (64, 64)   # EuroSAT patches are 64×64
SEED = 1955

# Load the full labeled dataset (shuffled)
full_ds = tf.keras.utils.image_dataset_from_directory(
    CLASS_DIR,
    labels="inferred",
    label_mode="int",
    image_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    shuffle=True,
    seed=SEED,
)

label_names = full_ds.class_names
num_classes = len(label_names)
print("Classes:", num_classes)
print(label_names)

# -----------------------------
# 3) Create train/val/test splits (by batches)
# -----------------------------
val_fraction = 0.10
test_fraction = 0.10

num_batches = tf.data.experimental.cardinality(full_ds).numpy()
num_test = int(num_batches * test_fraction)
num_val = int(num_batches * val_fraction)
num_train = num_batches - num_val - num_test

ds_train_raw = full_ds.take(num_train)
ds_val_raw   = full_ds.skip(num_train).take(num_val)
ds_test_raw  = full_ds.skip(num_train + num_val).take(num_test)

print(f"Batches — train: {num_train}, val: {num_val}, test: {num_test} (total: {num_batches})")

# Confirm shapes
for xb, yb in ds_train_raw.take(1):
    print("X batch:", xb.shape, xb.dtype)
    print("y batch:", yb.shape, yb.dtype)
    IMG_SHAPE = xb.shape[1:]


## 2) Quick visual sanity-check

We want to see what the labels *mean* at 64×64 resolution, and whether any classes look naturally confusable.


In [None]:
# Grab a small batch for inspection
sample_images = []
sample_labels = []
for img, lab in ds_train_raw.take(20):
    sample_images.append(img.numpy())
    sample_labels.append(int(lab.numpy()))

# Plot a grid
cols = 5
rows = int(np.ceil(len(sample_images) / cols))
plt.figure(figsize=(12, 8))
for i, (img, lab) in enumerate(zip(sample_images, sample_labels), start=1):
    plt.subplot(rows, cols, i)
    plt.imshow(img)
    plt.title(label_names[lab], fontsize=9)
    plt.axis("off")
plt.tight_layout()
plt.show()


## 3) Data pipeline (tf.data)

We’ll keep preprocessing minimal and explicit:
- Convert to float32
- Normalize to **[0, 1]**
- Batch + prefetch

No helper abstractions here—everything stays visible.


In [None]:
BATCH_SIZE = 64
AUTOTUNE = tf.data.AUTOTUNE

# Preprocess inline using Dataset.map with a lambda (keeps the logic visible)
ds_train = ds_train_raw.map(lambda x, y: (tf.image.convert_image_dtype(x, tf.float32), y), num_parallel_calls=AUTOTUNE)
ds_val   = ds_val_raw.map(lambda x, y: (tf.image.convert_image_dtype(x, tf.float32), y), num_parallel_calls=AUTOTUNE)
ds_test  = ds_test_raw.map(lambda x, y: (tf.image.convert_image_dtype(x, tf.float32), y), num_parallel_calls=AUTOTUNE)

ds_train = ds_train.shuffle(2048).batch(BATCH_SIZE).prefetch(AUTOTUNE)
ds_val   = ds_val.batch(BATCH_SIZE).prefetch(AUTOTUNE)
ds_test  = ds_test.batch(BATCH_SIZE).prefetch(AUTOTUNE)

# Confirm shapes
for xb, yb in ds_train.take(1):
    print("X batch:", xb.shape, xb.dtype)
    print("y batch:", yb.shape, yb.dtype)
    IMG_SHAPE = xb.shape[1:]


## 4) Baseline model (intentionally “wrong”): Dense NN on flattened pixels

Flattening treats the image as just a long vector and destroys locality.  
We use this baseline to create contrast: *what do we lose by ignoring spatial structure?*


In [None]:
dense_model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=IMG_SHAPE),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(256, activation="relu"),
    tf.keras.layers.Dense(128, activation="relu"),
    tf.keras.layers.Dense(num_classes, activation="softmax"),
], name="dense_baseline")

dense_model.summary()

dense_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)


In [None]:
EPOCHS_BASELINE = 8

history_dense = dense_model.fit(
    ds_train,
    validation_data=ds_val,
    epochs=EPOCHS_BASELINE,
    verbose=1
)


In [None]:
# Plot learning curves (baseline)
plt.figure(figsize=(10, 4))
plt.plot(history_dense.history["accuracy"], label="train acc")
plt.plot(history_dense.history["val_accuracy"], label="val acc")
plt.xlabel("epoch"); plt.ylabel("accuracy"); plt.title("Dense baseline accuracy")
plt.legend(); plt.show()

plt.figure(figsize=(10, 4))
plt.plot(history_dense.history["loss"], label="train loss")
plt.plot(history_dense.history["val_loss"], label="val loss")
plt.xlabel("epoch"); plt.ylabel("loss"); plt.title("Dense baseline loss")
plt.legend(); plt.show()


## 5) CNN model: same data, different inductive bias

A CNN assumes:
- nearby pixels matter together (locality)
- learned filters can be reused across the image (weight sharing)

This is the structural “match” for vision.


In [None]:
cnn_model = tf.keras.Sequential([
    tf.keras.layers.Input(shape=IMG_SHAPE),

    tf.keras.layers.Conv2D(32, 3, padding="same", activation="relu"),
    tf.keras.layers.MaxPooling2D(),

    tf.keras.layers.Conv2D(64, 3, padding="same", activation="relu"),
    tf.keras.layers.MaxPooling2D(),

    tf.keras.layers.Conv2D(128, 3, padding="same", activation="relu"),
    tf.keras.layers.GlobalAveragePooling2D(),

    tf.keras.layers.Dense(128, activation="relu"),
    tf.keras.layers.Dropout(0.25),
    tf.keras.layers.Dense(num_classes, activation="softmax"),
], name="cnn_small")

cnn_model.summary()

cnn_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)


### 5.1 One training guardrail: Early stopping

We’ll stop training when validation loss stops improving. This is a pragmatic default for live experimentation.


In [None]:
early_stop = tf.keras.callbacks.EarlyStopping(
    monitor="val_loss",
    patience=2,
    restore_best_weights=True
)

EPOCHS_CNN = 15

history_cnn = cnn_model.fit(
    ds_train,
    validation_data=ds_val,
    epochs=EPOCHS_CNN,
    callbacks=[early_stop],
    verbose=1
)


In [None]:
# Plot learning curves (CNN)
plt.figure(figsize=(10, 4))
plt.plot(history_cnn.history["accuracy"], label="train acc")
plt.plot(history_cnn.history["val_accuracy"], label="val acc")
plt.xlabel("epoch"); plt.ylabel("accuracy"); plt.title("CNN accuracy")
plt.legend(); plt.show()

plt.figure(figsize=(10, 4))
plt.plot(history_cnn.history["loss"], label="train loss")
plt.plot(history_cnn.history["val_loss"], label="val loss")
plt.xlabel("epoch"); plt.ylabel("loss"); plt.title("CNN loss")
plt.legend(); plt.show()


## 6) Test-set evaluation

Accuracy is not the only story, but it’s a good first check.


In [None]:
dense_test = dense_model.evaluate(ds_test, verbose=0)
cnn_test   = cnn_model.evaluate(ds_test, verbose=0)

print("Dense baseline — test loss, test acc:", dense_test)
print("CNN          — test loss, test acc:", cnn_test)


## 7) Confusion matrix + quick error analysis

We’ll inspect:
- which classes the model confuses
- a handful of high-confidence mistakes (useful for discussion)


In [None]:
# Collect predictions on the test set
y_true = []
y_pred = []
y_prob = []

for xb, yb in ds_test:
    probs = cnn_model.predict(xb, verbose=0)
    preds = np.argmax(probs, axis=1)

    y_true.extend(yb.numpy().tolist())
    y_pred.extend(preds.tolist())
    y_prob.extend(np.max(probs, axis=1).tolist())

y_true = np.array(y_true)
y_pred = np.array(y_pred)
y_prob = np.array(y_prob)

cm = tf.math.confusion_matrix(y_true, y_pred, num_classes=num_classes).numpy()

plt.figure(figsize=(10, 8))
plt.imshow(cm)
plt.title("CNN confusion matrix (test set)")
plt.xlabel("predicted")
plt.ylabel("true")
plt.colorbar()
plt.xticks(range(num_classes), label_names, rotation=90, fontsize=8)
plt.yticks(range(num_classes), label_names, fontsize=8)
plt.tight_layout()
plt.show()


In [None]:
# Show a few high-confidence mistakes
# We'll scan the test set again, keep images for mistakes.
mistake_imgs = []
mistake_true = []
mistake_pred = []
mistake_conf = []

for xb, yb in ds_test:
    probs = cnn_model.predict(xb, verbose=0)
    preds = np.argmax(probs, axis=1)
    confs = np.max(probs, axis=1)

    for i in range(xb.shape[0]):
        true_i = int(yb[i].numpy())
        pred_i = int(preds[i])
        conf_i = float(confs[i])
        if pred_i != true_i:
            mistake_imgs.append(xb[i].numpy())
            mistake_true.append(true_i)
            mistake_pred.append(pred_i)
            mistake_conf.append(conf_i)

# Sort mistakes by confidence (descending)
idx = np.argsort(-np.array(mistake_conf))

top_k = 12
idx = idx[:min(top_k, len(idx))]

cols = 4
rows = int(np.ceil(len(idx) / cols))
plt.figure(figsize=(12, 8))
for j, k in enumerate(idx, start=1):
    plt.subplot(rows, cols, j)
    plt.imshow(mistake_imgs[k])
    t = label_names[mistake_true[k]]
    p = label_names[mistake_pred[k]]
    c = mistake_conf[k]
    plt.title(f"true: {t}\npred: {p} ({c:.2f})", fontsize=8)
    plt.axis("off")
plt.tight_layout()
plt.show()

print(f"Total test mistakes found: {len(mistake_imgs)}")


## 8) (Optional) Transfer learning

If you have time in a live session, transfer learning is a useful “industrial default” demonstration:
- Resize inputs to match a pretrained backbone
- Freeze backbone, train a small head
- Compare stability and accuracy

**Toggle the flag below** if you want to run this section.


In [None]:
RUN_TRANSFER = False  # set True if you want to run this section


In [None]:
if RUN_TRANSFER:
    # We'll use MobileNetV2 as a lightweight pretrained backbone.
    # It expects larger images, so we'll resize on the fly in the pipeline.
    TARGET_SIZE = (160, 160)

    ds_train_tl = ds_train_raw.map(
        lambda x, y: (tf.image.resize(tf.image.convert_image_dtype(x, tf.float32), TARGET_SIZE), y),
        num_parallel_calls=AUTOTUNE
    ).shuffle(2048).batch(BATCH_SIZE).prefetch(AUTOTUNE)

    ds_val_tl = ds_val_raw.map(
        lambda x, y: (tf.image.resize(tf.image.convert_image_dtype(x, tf.float32), TARGET_SIZE), y),
        num_parallel_calls=AUTOTUNE
    ).batch(BATCH_SIZE).prefetch(AUTOTUNE)

    ds_test_tl = ds_test_raw.map(
        lambda x, y: (tf.image.resize(tf.image.convert_image_dtype(x, tf.float32), TARGET_SIZE), y),
        num_parallel_calls=AUTOTUNE
    ).batch(BATCH_SIZE).prefetch(AUTOTUNE)

    backbone = tf.keras.applications.MobileNetV2(
        input_shape=TARGET_SIZE + (3,),
        include_top=False,
        weights="imagenet"
    )
    backbone.trainable = False

    tl_model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=TARGET_SIZE + (3,)),
        backbone,
        tf.keras.layers.GlobalAveragePooling2D(),
        tf.keras.layers.Dropout(0.2),
        tf.keras.layers.Dense(num_classes, activation="softmax")
    ], name="mobilenetv2_transfer")

    tl_model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )

    history_tl = tl_model.fit(
        ds_train_tl,
        validation_data=ds_val_tl,
        epochs=10,
        callbacks=[early_stop],
        verbose=1
    )

    tl_test = tl_model.evaluate(ds_test_tl, verbose=0)
    print("Transfer learning — test loss, test acc:", tl_test)


## 9) Wrap-up (what to say out loud)

- The dense baseline *can* learn something, but it wastes capacity re-learning spatial structure.
- The CNN improves by building the right bias into the architecture.
- Training is not “set and forget”: early stopping (and other guardrails) prevent wasted compute and overfit.
- Confusions are not just mistakes—they are clues about what the data makes inherently hard.
