Cell 1 — Imports, CONFIG, Seed, Mixed Precision

In [None]:
import os
os.environ["KERAS_BACKEND"] = "tensorflow"   # เผื่อเคยสลับ backend อื่น
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"     # ลด verbosity ของ TF

# ตั้ง image data format ให้ถูกต้อง *ก่อน* import applications/model
from tensorflow.keras import backend as K
K.set_image_data_format("channels_last")

# ---- General imports
import math, json, random
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple
import matplotlib.pyplot as plt

# ---- TensorFlow / Keras core
import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint

# (ยังไม่ import EfficientNetB0 ตรงนี้ เพื่อเลี่ยง state เก่า; จะ import ใน Cell โมเดล)
# from tensorflow.keras.applications import EfficientNetB0  # <-- อย่า import ตรงนี้

# ---- Sklearn utilities
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, label_binarize
from sklearn.metrics import (classification_report, confusion_matrix, roc_auc_score,
                             roc_curve, auc)

In [None]:

CONFIG = {
    "IMG_DIRS": [
         r"C:\Users\User\Desktop\ds533\ham\HAM10000_images_part_1",
        r"C:\Users\User\Desktop\ds533\ham\HAM10000_images_part_2",
    ],
    "METADATA_CSV": r"C:\Users\User\Desktop\ds533\ham\HAM10000_metadata.csv",  # columns include: image_id, dx, age, sex, localization
    "IMG_SIZE": 224, 
    "BATCH": 32, 
    "EPOCHS": 50, #50(B0)
    "BACKBONE": "EfficientNetB0", # Keep B0 for VRAM 16GB
    "MIXED_PRECISION": True,     # Set False if you hit numeric issues
    "OUTPUT_DIR": r"C:\Users\User\Desktop\ds533\outputs_ham10000 case4",
    "SEED": 42,
    "UNFREEZE_TOP_N": 50          # fine-tune top N layers (like Tajerian)
}


os.makedirs(CONFIG["OUTPUT_DIR"], exist_ok=True)
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
np.random.seed(CONFIG["SEED"]); random.seed(CONFIG["SEED"]); tf.random.set_seed(CONFIG["SEED"])

# Mixed precision (if available)
if CONFIG["MIXED_PRECISION"]:
    try:
        from tensorflow.keras import mixed_precision
        mixed_precision.set_global_policy("mixed_float16")
        print("[INFO] Mixed precision enabled.")
    except Exception as e:
        print("[WARN] Mixed precision not available:", e)

print("TF:", tf.__version__)
print("Output dir:", CONFIG["OUTPUT_DIR"])

Cell 3 — Load images & metadata, basic cleaning, path mapping

In [None]:
# ===== Cell 3: load metadata & image paths (NO dx_type) =====
CLASSES = ["akiec", "bcc", "bkl", "df", "mel", "nv", "vasc"]
CLASS2IDX = {c: i for i, c in enumerate(CLASSES)}

def collect_image_paths(img_dirs):
    paths = []
    for d in img_dirs:
        if os.path.isdir(d):
            for fn in os.listdir(d):
                if fn.lower().endswith((".jpg", ".jpeg", ".png")):
                    paths.append(os.path.join(d, fn))
    return paths

all_paths = collect_image_paths(CONFIG["IMG_DIRS"])
print(f"[INFO] Found {len(all_paths)} image files.")

# Load metadata (expects: image_id, dx, age, sex, localization)
md = pd.read_csv(CONFIG["METADATA_CSV"])
print("[HEAD] metadata:\n", md.head(3))

# Keep 7 classes
md = md[md["dx"].isin(CLASSES)].copy()
md["label_idx"] = md["dx"].map(CLASS2IDX).astype(int)

# Clean fields
md["age"] = md["age"].fillna(md["age"].median() if not md["age"].isna().all() else 45)
md["sex"] = md["sex"].fillna("unknown").str.lower()
md["localization"] = md["localization"].fillna("unknown").str.lower()

# (NO dx_type) — removed to avoid any leakage-like columns

# Map name -> path
name2path: Dict[str, str] = {}
for p in all_paths:
    stem = os.path.splitext(os.path.basename(p))[0]
    name2path[stem] = p

md["image_path"] = md["image_id"].map(name2path)
before = len(md)
md = md[~md["image_path"].isna()].copy()
print(f"[INFO] Rows with missing files dropped: {before - len(md)}")
print("[INFO] Total usable images:", len(md))
print("[CLASS COUNTS]\n", md["dx"].value_counts().reindex(CLASSES, fill_value=0))


Cell 4 — Stratified split 80/10/10 + ตรวจ split distribution

In [None]:
# ===== Cell 4: stratified split 80/10/10 =====
train_df, temp_df = train_test_split(
    md, test_size=0.2, stratify=md["label_idx"], random_state=CONFIG["SEED"]
)
val_df, test_df = train_test_split(
    temp_df, test_size=0.5, stratify=temp_df["label_idx"], random_state=CONFIG["SEED"]
)
print(f"[SPLIT] train={len(train_df)}  val={len(val_df)}  test={len(test_df)}")

def show_split_counts(df, name):
    vc = df["dx"].value_counts().reindex(CLASSES, fill_value=0)
    print(f"[{name}] class counts:\n{vc}\n")

show_split_counts(train_df, "TRAIN")
show_split_counts(val_df, "VAL")
show_split_counts(test_df, "TEST")


Cell 5 — Build metadata encoders (sex, localization, dx_type) + scaler + ตัวอย่าง meta vector

In [None]:
# ===== Cell 5: metadata encoders & scaler (no dx_type) =====
sex_values = sorted(md["sex"].unique().tolist())
loc_values = sorted(md["localization"].unique().tolist())

SEX2IDX = {s: i for i, s in enumerate(sex_values)}
LOC2IDX = {l: i for i, l in enumerate(loc_values)}

age_scaler = StandardScaler()
age_scaler.fit(md[["age"]].values.astype(np.float32))

# age + sex + loc
meta_dim = 1 + len(SEX2IDX) + len(LOC2IDX)
print(f"[META] meta_dim = {meta_dim} (age + {len(SEX2IDX)} sex + {len(LOC2IDX)} loc)")

def row_to_meta(row) -> np.ndarray:
    age_scaled = age_scaler.transform([[float(row["age"])]])[0, 0]

    sex_oh = np.zeros(len(SEX2IDX), dtype=np.float32)
    sex_oh[SEX2IDX.get(row["sex"], 0)] = 1.0

    loc_oh = np.zeros(len(LOC2IDX), dtype=np.float32)
    loc_oh[LOC2IDX.get(row["localization"], 0)] = 1.0

    return np.concatenate([[age_scaled], sex_oh, loc_oh]).astype(np.float32)

# Preview
sample_vec = row_to_meta(train_df.iloc[0])
print("[SAMPLE META] shape:", sample_vec.shape)
print("[SAMPLE META] first 10 values:", np.round(sample_vec[:10], 3))


Cell 6 — Build NumPy arrays (paths, metas, labels) + ตรวจ shapes

In [None]:
# ===== Cell 6: build numpy arrays =====
def build_np_arrays(df):
    paths = df["image_path"].tolist()
    labels = df["label_idx"].astype(np.int32).values
    metas = np.vstack([row_to_meta(r) for _, r in df.iterrows()])
    return np.array(paths), metas, labels

train_paths, train_meta, train_y = build_np_arrays(train_df)
val_paths,   val_meta,   val_y   = build_np_arrays(val_df)
test_paths,  test_meta,  test_y  = build_np_arrays(test_df)

print("[SHAPES]")
print(" train:", train_paths.shape, train_meta.shape, train_y.shape)
print(" val  :", val_paths.shape,   val_meta.shape,   val_y.shape)
print(" test :", test_paths.shape,  test_meta.shape,  test_y.shape)

# Save label mapping
with open(os.path.join(CONFIG["OUTPUT_DIR"], "class_indices.json"), "w", encoding="utf-8") as f:
    json.dump({i: c for c, i in enumerate(CLASSES)}, f, ensure_ascii=False, indent=2)
print("[INFO] Saved class_indices.json")


Cell 7 — tf.data pipelines + Augmentation (Tajerian) + ตรวจหนึ่ง batch

In [None]:
# ===== Cell 6: tf.data + augmentation (robust types) =====
IMG_SIZE = CONFIG["IMG_SIZE"]

@tf.function
def preprocess_image(path):
    img_bytes = tf.io.read_file(path)
    img = tf.image.decode_jpeg(img_bytes, channels=3)
    img = tf.image.resize(img, (IMG_SIZE, IMG_SIZE), method=tf.image.ResizeMethod.BILINEAR)
    img = tf.cast(img, tf.float32)
    return img

augment_layers = [
    layers.RandomFlip("horizontal_and_vertical"),
    layers.RandomTranslation(0.2, 0.2, fill_mode="nearest"),
]
try:
    augment_layers.append(layers.RandomShear(0.2, fill_mode="nearest"))
except Exception as e:
    print("[WARN] RandomShear not available. Skipping shear.", e)

augment = tf.keras.Sequential(augment_layers, name="augment")

from tensorflow.keras.applications.efficientnet import preprocess_input as effnet_preprocess

def make_dataset(paths, metas, labels, training: bool, batch_size: int):
    ds = tf.data.Dataset.from_tensor_slices((paths, metas, labels))

    def _map(path, meta, y):
        img = preprocess_image(path)
        if training:
            img = augment(img, training=True)
        img = effnet_preprocess(img)               # expects float32
        meta = tf.cast(meta, tf.float32)           # force float32
        y = tf.cast(y, tf.int32)                   # force int32 for sparse CE
        return {"image": img, "meta": meta}, y

    if training:
        ds = ds.shuffle(4096, seed=CONFIG["SEED"], reshuffle_each_iteration=True)
    ds = ds.map(_map, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch_size, drop_remainder=False).prefetch(tf.data.AUTOTUNE)
    return ds

def make_per_class_datasets(paths, metas, labels, batch):
    paths = np.array(paths); metas = np.array(metas); labels = np.array(labels)
    per_class = []
    for c in range(len(CLASSES)):
        idx = np.where(labels == c)[0]
        ds_c = tf.data.Dataset.from_tensor_slices((paths[idx], metas[idx], labels[idx]))
        ds_c = ds_c.shuffle(max(1, len(idx)), seed=CONFIG["SEED"], reshuffle_each_iteration=True)
        ds_c = ds_c.repeat()
        per_class.append(ds_c)
    weights = [1.0/len(CLASSES)] * len(CLASSES)

    def _map(path, meta, y):
        img = preprocess_image(path)
        img = augment(img, training=True)
        img = effnet_preprocess(img)
        meta = tf.cast(meta, tf.float32)
        y = tf.cast(y, tf.int32)
        return {"image": img, "meta": meta}, y

    ds = tf.data.Dataset.sample_from_datasets(per_class, weights=weights, seed=CONFIG["SEED"])
    ds = ds.map(_map, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch, drop_remainder=False).prefetch(tf.data.AUTOTUNE)
    return ds

train_ds_balanced = make_per_class_datasets(train_paths, train_meta, train_y, CONFIG["BATCH"])
val_ds  = make_dataset(val_paths, val_meta, val_y, training=False, batch_size=CONFIG["BATCH"])
test_ds = make_dataset(test_paths, test_meta, test_y, training=False, batch_size=CONFIG["BATCH"])

# Sanity peek
bx = next(iter(train_ds_balanced))
x_b, y_b = bx
assert set(x_b.keys()) == {"image","meta"}, f"Input keys mismatch: {x_b.keys()}"
print("[BATCH] image:", x_b["image"].shape, x_b["image"].dtype,
      " meta:", x_b["meta"].shape, x_b["meta"].dtype,
      " labels:", y_b.shape, y_b.dtype)
print("[BATCH] label sample (first 8):", y_b[:8].numpy())


Cell 8 — Model (EfficientNetB0 + ImageNet) + metadata fusion + summary

In [None]:
# ===== Cell 8: model (EfficientNetB0 + ImageNet) — input name fixed to 'image' =====
import tensorflow as tf
from tensorflow.keras import backend as K
from tensorflow.keras import layers, Model
from tensorflow.keras.applications import EfficientNetB0

# เคลียร์กราฟ + ยืนยัน data format
tf.keras.backend.clear_session()
assert K.image_data_format() == "channels_last", f"image_data_format is {K.image_data_format()} (must be channels_last)"

IMG_SIZE = CONFIG["IMG_SIZE"]  # 224

# 1) เตรียม input ที่ "ตั้งชื่อชัดเจน" ว่า image เพื่อให้ตรงกับ dict จาก tf.data
img_in = layers.Input(shape=(IMG_SIZE, IMG_SIZE, 3), name="image")
meta_in = layers.Input(shape=(train_meta.shape[1],), name="meta")

# (ตัวตรวจเล็ก ๆ ว่าชั้นแรกเห็นช่อง = 3)
_probe = layers.Conv2D(32, 3, padding="same")(img_in)
print("[PROBE] conv kernel shape should be (?, ?, ?, 32) — in-channels locked to 3.")

# 2) สร้าง EfficientNetB0 โดยอ้างอิง input_tensor=img_in (จะล็อก 3-channel + คงชื่อ 'image')
try:
    base = EfficientNetB0(
        include_top=False,
        weights="imagenet",
        input_tensor=img_in,   # <<< สำคัญ: ใช้ input ที่เราตั้งชื่อเอง
        pooling="avg",
    )
except ValueError as e:
    print("[WARN] ImageNet mismatch. Falling back to weights=None :", e)
    base = EfficientNetB0(
        include_top=False,
        weights=None,
        input_tensor=img_in,
        pooling="avg",
    )

# 3) สาขาภาพ + สาขา metadata + fuse
img_feat = layers.Dropout(0.2, name="img_dropout")(base.output)

m = layers.Dense(64, activation="relu")(meta_in)
m = layers.Dropout(0.2)(m)
m = layers.Dense(32, activation="relu")(m)

h = layers.Concatenate(name="fuse")([img_feat, m])
h = layers.Dense(128, activation="relu")(h)
h = layers.Dropout(0.3)(h)

out = layers.Dense(len(CLASSES), activation="softmax", dtype="float32", name="pred")(h)

model = Model(inputs=[img_in, meta_in], outputs=out)

# 4) Freeze base สำหรับ Stage 1
for layer in base.layers:
    layer.trainable = False

model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-3),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

print("[INPUT NAMES]", [t.name.split(":")[0] for t in model.inputs])  # ควรได้ ['image', 'meta']
model.summary(line_length=120, expand_nested=False)
print("[INFO] Base trainable layers (stage 1):", sum(int(l.trainable) for l in base.layers))


Cell 9 — Training Stage 1 (Feature Extraction)

In [None]:
# ===== Cell 9: training stage 1 (robust) =====
# steps_per_epoch: ใช้จำนวนตัวอย่างใน train_df / batch (เพราะ train_ds_balanced เป็น repeat/สุ่มถาวร)
steps_per_epoch = max(1, math.ceil(len(train_df) / CONFIG["BATCH"]))
print(f"[TRAIN] steps_per_epoch={steps_per_epoch}, batches_per_val={math.ceil(len(val_df)/CONFIG['BATCH'])}")

ckpt_path1 = os.path.join(CONFIG["OUTPUT_DIR"], "best_efficientnetb0_meta_stage1.keras")

callbacks_stage1 = [
    EarlyStopping(patience=5, restore_best_weights=True, monitor="val_accuracy", verbose=1),
    ReduceLROnPlateau(patience=3, factor=0.1, monitor="val_loss", min_lr=1e-5, verbose=1),
    ModelCheckpoint(ckpt_path1, monitor="val_accuracy", save_best_only=True, verbose=1)
]

# extra: print layer dtypes to catch mixed precision issues
for l in model.layers[-5:]:
    print(f"[LAYER] {l.name:20s} dtype={getattr(l, 'dtype', None)}")

history1 = model.fit(
    train_ds_balanced,
    validation_data=val_ds,
    epochs=15,
    steps_per_epoch=steps_per_epoch,
    callbacks=callbacks_stage1,
    verbose=1
)

hist1 = {k: [float(x) for x in v] for k, v in history1.history.items()}
print("[HIST1] keys:", list(hist1.keys()))
print("[HIST1] last epoch:", {k: v[-1] for k, v in hist1.items()})


Cell 10 — Training Stage 2 (Fine-tuning top-N) + Learning Curves

In [None]:
# ===== Cell 10: fine-tune top-N & learning curves =====
# 1) ลด LR สำหรับเฟส fine-tune
tf.keras.backend.set_value(model.optimizer.learning_rate, 1e-4)

# 2) ปลดล็อกเลเยอร์ท้าย ๆ ของฐาน (กันพลาดถ้า N เกิน)
N = int(CONFIG["UNFREEZE_TOP_N"])
N = max(1, min(N, len(base.layers)))
for layer in base.layers[-N:]:
    layer.trainable = True

num_trainable = sum(int(l.trainable) for l in base.layers)
print(f"[INFO] Base trainable layers (stage 2): {num_trainable}/{len(base.layers)}  (unfrozen last {N})")

# 3) ***สำคัญ***: recompile หลังเปลี่ยน trainable flags
model.compile(
    optimizer=tf.keras.optimizers.Adam(tf.keras.backend.get_value(model.optimizer.learning_rate)),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

# 4) callbacks
ckpt_path2 = os.path.join(CONFIG["OUTPUT_DIR"], "best_efficientnetb0_meta_stage2.keras")
callbacks_stage2 = [
    EarlyStopping(patience=8, restore_best_weights=True, monitor="val_accuracy", verbose=1),
    ReduceLROnPlateau(patience=4, factor=0.2, monitor="val_loss", min_lr=1e-6, verbose=1),
    ModelCheckpoint(ckpt_path2, monitor="val_accuracy", save_best_only=True, verbose=1),
]

# 5) ใช้ steps_per_epoch เดิม (เพราะ train_ds_balanced เป็น repeat)
history2 = model.fit(
    train_ds_balanced,
    validation_data=val_ds,
    epochs=CONFIG["EPOCHS"],
    steps_per_epoch=steps_per_epoch,
    callbacks=callbacks_stage2,
    verbose=1
)

# 6) รวม history และบันทึก
hist2 = {k: [float(x) for x in v] for k, v in history2.history.items()}
with open(os.path.join(CONFIG["OUTPUT_DIR"], "train_history_efficientnetb0_meta.json"), "w") as f:
    json.dump({"stage1": hist1, "stage2": hist2}, f, indent=2)

# 7) วาด learning curves
def plot_learning_curves(h1, h2, out_dir):
    acc = (h1.get("accuracy", []) + h2.get("accuracy", []))
    val_acc = (h1.get("val_accuracy", []) + h2.get("val_accuracy", []))
    loss = (h1.get("loss", []) + h2.get("loss", []))
    val_loss = (h1.get("val_loss", []) + h2.get("val_loss", []))

    plt.figure(figsize=(8,6))
    plt.plot(acc, label="train_acc"); plt.plot(val_acc, label="val_acc")
    plt.title("Learning Curve (Accuracy)"); plt.xlabel("Epoch"); plt.ylabel("Acc"); plt.legend(); plt.tight_layout()
    p1 = os.path.join(out_dir, "learning_curve_acc.png"); plt.savefig(p1, dpi=150); plt.close()

    plt.figure(figsize=(8,6))
    plt.plot(loss, label="train_loss"); plt.plot(val_loss, label="val_loss")
    plt.title("Learning Curve (Loss)"); plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.legend(); plt.tight_layout()
    p2 = os.path.join(out_dir, "learning_curve_loss.png"); plt.savefig(p2, dpi=150); plt.close()
    print("[SAVED] ", p1, " & ", p2)

plot_learning_curves(hist1, hist2, CONFIG["OUTPUT_DIR"])


Cell 11 — Evaluation (Accuracy/Report/CM/AUC/ROC) + Save Model

In [None]:
# ===== Cell 11: evaluation & artifacts =====
print("[EVAL] on test set ...")
test_results = model.evaluate(test_ds, verbose=1)
print(dict(zip(model.metrics_names, test_results)))

# Probabilities / predictions
y_prob = model.predict(test_ds, verbose=1)
y_pred = np.argmax(y_prob, axis=1)
y_true = test_y

# Classification report
report = classification_report(y_true, y_pred, target_names=CLASSES, digits=4)
print(report)
with open(os.path.join(CONFIG["OUTPUT_DIR"], "classification_report.txt"), "w", encoding="utf-8") as f:
    f.write(report)

# Confusion matrix
cm = confusion_matrix(y_true, y_pred, labels=list(range(len(CLASSES))))
np.savetxt(os.path.join(CONFIG["OUTPUT_DIR"], "confusion_matrix.csv"), cm, fmt="%d", delimiter=",")
print("[CM]\n", cm)

# AUC macro/micro (OvR)
y_true_oh = label_binarize(y_true, classes=list(range(len(CLASSES))))
auc_macro = roc_auc_score(y_true_oh, y_prob, average="macro", multi_class="ovr")
auc_micro = roc_auc_score(y_true_oh, y_prob, average="micro", multi_class="ovr")
print(f"[AUC] macro={auc_macro:.4f} | micro={auc_micro:.4f}")

# ROC curves per-class
def plot_roc_curves(y_true_oh, y_prob, class_names: List[str], out_png: str):
    plt.figure(figsize=(8,6))
    for i, cls in enumerate(class_names):
        fpr, tpr, _ = roc_curve(y_true_oh[:, i], y_prob[:, i])
        roc_auc = auc(fpr, tpr)
        plt.plot(fpr, tpr, label=f"{cls} (AUC={roc_auc:.3f})")
    plt.plot([0,1], [0,1], linestyle="--")
    plt.title("ROC Curves (OvR)")
    plt.xlabel("False Positive Rate"); plt.ylabel("True Positive Rate")
    plt.legend(fontsize=8); plt.tight_layout()
    plt.savefig(out_png, dpi=150); plt.close()
    print("[SAVED]", out_png)

plot_roc_curves(y_true_oh, y_prob, CLASSES, os.path.join(CONFIG["OUTPUT_DIR"], "roc_ovr.png"))

# Save model
final_path = os.path.join(CONFIG["OUTPUT_DIR"], "final_efficientnetb0_case3.keras")
model.save(final_path)
print("[SAVED MODEL]", final_path)

# Quick sample predictions (first 5 from test set order)
print("[SAMPLE PRED] first 5:")
for i in range(5):
    print(f"  gt={CLASSES[y_true[i]]:>5s}  pred={CLASSES[y_pred[i]]:>5s}  p={y_prob[i, y_pred[i]]:.3f}")
