In [1]:
import os, json, yaml, random, pickle, sys, math
from pathlib import Path
from datetime import datetime

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report, roc_auc_score, roc_curve, accuracy_score, precision_recall_fscore_support
import matplotlib.pyplot as plt

# ------------------ Config & Paths ------------------
BASE_DIR = Path(r"C:\Users\NXTWAVE\Downloads\Bone Cancer Detection")
DATA_DIRS = {
    "Normal": Path(r"C:\Users\NXTWAVE\Downloads\Bone Cancer Detection\archive\Research dataset\Normal"),
    "Cancer": Path(r"C:\Users\NXTWAVE\Downloads\Bone Cancer Detection\archive\Research dataset\Cancer"),
}
ART = BASE_DIR / "artifacts"
ART.mkdir(parents=True, exist_ok=True)

CFG = {
    "seed": 42,
    "image_size": 224,
    "batch_size": 16,
    "epochs": 20,
    "val_split": 0.1,      # from remaining after test split
    "test_split": 0.1,
    "augment": True,
    "optimizer": {"name": "adam", "lr": 1e-3},
    "loss": "binary_crossentropy",
    "metrics": ["accuracy"],
    "note": "Binary CNN classifier for Normal vs Cancer bone images.",
}

random.seed(CFG["seed"])
np.random.seed(CFG["seed"])
tf.random.set_seed(CFG["seed"])

# ------------------ Helpers ------------------
ALLOWED = {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff"}

def list_images(folder: Path):
    if not folder.exists(): return []
    return [p for p in folder.rglob("*") if p.suffix.lower() in ALLOWED]

def to_py(obj):
    """Make objects JSON serializable."""
    import numpy as _np
    try:
        import tensorflow as _tf
    except Exception:
        _tf = None
    if isinstance(obj, dict): return {k: to_py(v) for k, v in obj.items()}
    if isinstance(obj, (list, tuple)): return [to_py(v) for v in obj]
    if isinstance(obj, _np.generic): return obj.item()
    if isinstance(obj, _np.ndarray): return obj.tolist()
    if _tf is not None and isinstance(obj, _tf.Tensor): return obj.numpy().tolist()
    return obj

# ------------------ Scan dataset ------------------
records = []
for label, d in DATA_DIRS.items():
    for p in list_images(d):
        records.append({"path": str(p), "label": label})

df = pd.DataFrame(records)
assert len(df) > 0, "No images found. Check your dataset paths."
df = df.sample(frac=1.0, random_state=CFG["seed"]).reset_index(drop=True)
print(f"[INFO] Total images: {len(df)} | Normal={sum(df.label=='Normal')} | Cancer={sum(df.label=='Cancer')}")

# ------------------ Encode labels ------------------
le = LabelEncoder()
df["label_id"] = le.fit_transform(df["label"])  # Normal->0, Cancer->1 (order depends on alphabet)
with open(ART / "label_encoder.pkl", "wb") as f:
    pickle.dump(le, f)
print("[INFO] Classes:", list(le.classes_))

# ------------------ Split ------------------
df_trainval, df_test = train_test_split(
    df, test_size=CFG["test_split"], stratify=df["label_id"], random_state=CFG["seed"]
)
df_train, df_val = train_test_split(
    df_trainval, test_size=CFG["val_split"], stratify=df_trainval["label_id"], random_state=CFG["seed"]
)
print(f"[SPLIT] train={len(df_train)}  val={len(df_val)}  test={len(df_test)}")

# ------------------ tf.data pipeline ------------------
IMG_SIZE = CFG["image_size"]
AUTO = tf.data.AUTOTUNE

def _read_image(path):
    img = tf.io.read_file(path)
    img = tf.io.decode_image(img, channels=3, expand_animations=False)
    img = tf.image.convert_image_dtype(img, tf.float32)  # [0,1]
    img = tf.image.resize(img, (IMG_SIZE, IMG_SIZE))
    return img

def _augment(img):
    # light augmentations suited for X-rays
    img = tf.image.random_flip_left_right(img)
    img = tf.image.random_brightness(img, max_delta=0.05)
    img = tf.image.random_contrast(img, 0.95, 1.05)
    return img

def _parse(path, label, training=False):
    img = _read_image(path)
    if training and CFG["augment"]:
        img = _augment(img)
    return img, tf.cast(label, tf.float32)

def make_ds(df_split: pd.DataFrame, training=False, batch_size=8):
    ds = tf.data.Dataset.from_tensor_slices((df_split["path"].values, df_split["label_id"].values))
    ds = ds.map(lambda p, y: _parse(p, y, training=training), num_parallel_calls=AUTO)
    if training:
        ds = ds.shuffle(min(4096, len(df_split)), seed=CFG["seed"], reshuffle_each_iteration=True)
    ds = ds.batch(batch_size).prefetch(AUTO)
    return ds

train_ds = make_ds(df_train, training=True,  batch_size=CFG["batch_size"])
val_ds   = make_ds(df_val,   training=False, batch_size=CFG["batch_size"])
test_ds  = make_ds(df_test,  training=False, batch_size=CFG["batch_size"])

# ------------------ Model (small CNN; no external weights needed) ------------------
def build_model(input_shape=(224,224,3)):
    inputs = layers.Input(shape=input_shape)
    x = layers.Conv2D(32, 3, padding="same", activation="relu")(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPool2D()(x)

    x = layers.Conv2D(64, 3, padding="same", activation="relu")(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPool2D()(x)

    x = layers.Conv2D(128, 3, padding="same", activation="relu")(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPool2D()(x)

    x = layers.Conv2D(256, 3, padding="same", activation="relu")(x)
    x = layers.BatchNormalization()(x)
    x = layers.GlobalAveragePooling2D()(x)

    x = layers.Dropout(0.3)(x)
    x = layers.Dense(128, activation="relu")(x)
    x = layers.Dropout(0.2)(x)
    outputs = layers.Dense(1, activation="sigmoid")(x)  # binary
    return models.Model(inputs, outputs, name="BoneCancerCNN")

model = build_model((IMG_SIZE, IMG_SIZE, 3))
opt = tf.keras.optimizers.Adam(learning_rate=CFG["optimizer"]["lr"])
model.compile(optimizer=opt, loss=CFG["loss"], metrics=CFG["metrics"])
model.summary()

# ------------------ Callbacks & Train ------------------
ckpt_path = ART / "bone_cancer_cnn.h5"
cbs = [
    tf.keras.callbacks.EarlyStopping(monitor="val_accuracy", patience=5, mode="max", restore_best_weights=True),
    tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=2, min_lr=1e-5),
    tf.keras.callbacks.ModelCheckpoint(str(ckpt_path), monitor="val_accuracy", mode="max", save_best_only=True),
]

hist = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=CFG["epochs"],
    callbacks=cbs,
    verbose=1
)

# Save training history
with open(ART / "history.json", "w") as f:
    json.dump(to_py(hist.history), f, indent=2)
pd.DataFrame(hist.history).to_csv(ART / "history.csv", index=False)

# Ensure best model saved
model.save(ckpt_path)

# ------------------ Evaluation & Reports ------------------
# collect predictions on test set
y_true, y_prob, file_paths = [], [], []
for batch, (imgs, labels) in enumerate(test_ds):
    probs = model.predict(imgs, verbose=0).ravel()
    y_prob.extend(probs.tolist())
    y_true.extend(labels.numpy().astype(int).tolist())

# threshold 0.5
y_pred = (np.array(y_prob) >= 0.5).astype(int)

# Basic metrics
acc = accuracy_score(y_true, y_pred)
prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, average="binary", zero_division=0)
try:
    auc = roc_auc_score(y_true, y_prob)
except ValueError:
    auc = float("nan")

metrics = {
    "datetime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    "image_size": IMG_SIZE,
    "splits": {"train": int(len(df_train)), "val": int(len(df_val)), "test": int(len(df_test))},
    "test": {"accuracy": acc, "precision": prec, "recall": rec, "f1": f1, "auc": auc},
    "classes": list(le.classes_),
}
with open(ART / "metrics.json", "w") as f:
    json.dump(to_py(metrics), f, indent=2)

# Confusion Matrix & Classification Report
cm = confusion_matrix(y_true, y_pred, labels=[0,1])
report = classification_report(y_true, y_pred, target_names=list(le.classes_), digits=4)
with open(ART / "classification_report.txt", "w", encoding="utf-8") as f:
    f.write(report)

# Plot curves (matplotlib only)
def save_curves():
    with open(ART / "history.json", "r") as f:
        H = json.load(f)

    # Accuracy
    plt.figure(figsize=(7,4.5))
    if "accuracy" in H: plt.plot(H["accuracy"], marker="o", label="Train Acc")
    if "val_accuracy" in H: plt.plot(H["val_accuracy"], marker="s", label="Val Acc")
    plt.xlabel("Epoch"); plt.ylabel("Accuracy"); plt.title("Accuracy Curve"); plt.grid(True, alpha=0.3); plt.legend()
    plt.tight_layout(); plt.savefig(ART / "accuracy_curve.png", dpi=220); plt.close()

    # Loss
    plt.figure(figsize=(7,4.5))
    if "loss" in H: plt.plot(H["loss"], marker="o", label="Train Loss")
    if "val_loss" in H: plt.plot(H["val_loss"], marker="s", label="Val Loss")
    plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.title("Loss Curve"); plt.grid(True, alpha=0.3); plt.legend()
    plt.tight_layout(); plt.savefig(ART / "loss_curve.png", dpi=220); plt.close()

save_curves()

# Confusion matrix heatmap
def save_cm(cm, class_names):
    plt.figure(figsize=(5.6,4.5))
    im = plt.imshow(cm, interpolation="nearest")
    plt.title("Confusion Matrix")
    plt.colorbar(im, fraction=0.046, pad=0.04)
    tick_marks = np.arange(len(class_names))
    plt.xticks(tick_marks, class_names, rotation=45, ha="right")
    plt.yticks(tick_marks, class_names)
    thresh = cm.max() / 2.0 if cm.max() > 0 else 0.5
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, format(cm[i, j], "d"),
                     horizontalalignment="center",
                     verticalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")
    plt.ylabel("True label")
    plt.xlabel("Predicted label")
    plt.tight_layout()
    plt.savefig(ART / "confusion_matrix.png", dpi=220)
    plt.close()

save_cm(cm, list(le.classes_))

# ROC Curve (if both classes present)
def save_roc(y_true, y_prob):
    try:
        fpr, tpr, _ = roc_curve(y_true, y_prob)
        plt.figure(figsize=(5.6,4.5))
        plt.plot(fpr, tpr, label=f"AUC={auc:.4f}")
        plt.plot([0,1], [0,1], linestyle="--")
        plt.xlabel("False Positive Rate"); plt.ylabel("True Positive Rate")
        plt.title("ROC Curve")
        plt.grid(True, alpha=0.3)
        plt.legend()
        plt.tight_layout()
        plt.savefig(ART / "roc_curve.png", dpi=220)
        plt.close()
    except Exception:
        pass

save_roc(y_true, y_prob)

# Save per-image predictions CSV (path, true, pred, prob)
# (We reconstruct file order by iterating test df with the same batching again to capture file paths)
test_paths, test_labels = [], []
for p, y in zip(df_test["path"].values, df_test["label_id"].values):
    test_paths.append(p); test_labels.append(y)

pred_df = pd.DataFrame({
    "path": test_paths,
    "true_label": [le.classes_[i] for i in test_labels],
    "pred_label": [le.classes_[i] for i in y_pred],
    "pred_prob_cancer": y_prob,  # probability of class '1' (whichever class got encoded as 1)
})
pred_df.to_csv(ART / "test_predictions.csv", index=False, encoding="utf-8")

# ------------------ Save config ------------------
CFG_SAVE = {
    **CFG,
    "paths": {k: str(v) for k, v in DATA_DIRS.items()},
    "artifacts": {
        "model_h5": str(ckpt_path),
        "label_encoder_pkl": str(ART / "label_encoder.pkl"),
        "history_json": str(ART / "history.json"),
        "history_csv": str(ART / "history.csv"),
        "metrics_json": str(ART / "metrics.json"),
        "classification_report_txt": str(ART / "classification_report.txt"),
        "accuracy_curve_png": str(ART / "accuracy_curve.png"),
        "loss_curve_png": str(ART / "loss_curve.png"),
        "confusion_matrix_png": str(ART / "confusion_matrix.png"),
        "roc_curve_png": str(ART / "roc_curve.png"),
        "test_predictions_csv": str(ART / "test_predictions.csv"),
    },
    "classes": list(le.classes_),
}
with open(ART / "config.yaml", "w") as f:
    yaml.safe_dump(to_py(CFG_SAVE), f, sort_keys=False)

print("\n[DONE] Artifacts in:", ART)
for k, v in CFG_SAVE["artifacts"].items():
    print(" -", k, ":", v)



[INFO] Total images: 50 | Normal=25 | Cancer=25
[INFO] Classes: ['Cancer', 'Normal']
[SPLIT] train=40  val=5  test=5


Model: "BoneCancerCNN"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 224, 224, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 224, 224, 32)      896       
                                                                 
 batch_normalization (Batch  (None, 224, 224, 32)      128       
 Normalization)                                                  
                                                                 
 max_pooling2d (MaxPooling2  (None, 112, 112, 32)      0         
 D)                                                              
                                                                 
 conv2d_1 (Conv2D)           (None, 112, 112, 64)      18496     

  saving_api.save_model(


Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20

[DONE] Artifacts in: C:\Users\NXTWAVE\Downloads\Bone Cancer Detection\artifacts
 - model_h5 : C:\Users\NXTWAVE\Downloads\Bone Cancer Detection\artifacts\bone_cancer_cnn.h5
 - label_encoder_pkl : C:\Users\NXTWAVE\Downloads\Bone Cancer Detection\artifacts\label_encoder.pkl
 - history_json : C:\Users\NXTWAVE\Downloads\Bone Cancer Detection\artifacts\history.json
 - history_csv : C:\Users\NXTWAVE\Downloads\Bone Cancer Detection\artifacts\history.csv
 - metrics_json : C:\Users\NXTWAVE\Downloads\Bone Cancer Detection\artifacts\metrics.json
 - classification_report_txt : C:\Users\NXTWAVE\Downloads\Bone Cancer Detection\artifacts\classification_report.txt
 - accuracy_curve_png : C:\Users\NXTWAVE\Downloads\Bone Cancer Detection\artifacts\accuracy_curve.png
 - loss_curve_png : C:\Users\NXTWAVE\Downloads\Bone Cancer Detection\artifacts\loss_curve.png
 - confusion_matrix_png : C:\Users\NXTWAVE\Downloads\Bone Cancer Detection\artifacts\confusion_matrix.p