In [None]:
import os
import json
import time
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.metrics import (
    confusion_matrix, classification_report,
    accuracy_score, precision_score, recall_score,
    f1_score, roc_auc_score
)
from sklearn.preprocessing import label_binarize

In [None]:
# -----------------------
# Konfigurasi 
# -----------------------
IMG_SIZE = (224, 224)       
BATCH_SIZE = 32     
EPOCHS = 10
ANNOTATION_JSON = "augmented_output/annotations.json"
USE_COMBINED = True
BASE_DIR = os.getcwd()
DATASET_DIR = os.path.join(BASE_DIR, "dataset_classification")

In [None]:
# -----------------------
# Baca class names
# -----------------------
train_folder = os.path.join(DATASET_DIR, "train")
class_names = sorted([d for d in os.listdir(train_folder) if os.path.isdir(os.path.join(train_folder, d))])
num_classes = len(class_names)
print("Detected classes:", class_names)

class_to_idx = {c: i for i, c in enumerate(class_names)}

In [None]:
# -----------------------
# Load annotations.json
# -----------------------
image_paths = []
labels_idx = []
if os.path.exists(ANNOTATION_JSON):
    with open(ANNOTATION_JSON, "r") as f:
        ann = json.load(f)
    for item in ann:
        cls = item.get("class")
        if cls not in class_to_idx:
            continue
        idx = class_to_idx[cls]
        orig = item.get("original")
        aug = item.get("augmented")
        if orig and os.path.exists(orig):
            image_paths.append(orig); labels_idx.append(idx)
        if aug and os.path.exists(aug):
            image_paths.append(aug); labels_idx.append(idx)
else:
    print(f"Warning: {ANNOTATION_JSON} tidak ditemukan.")
print(f"Total images from annotations: {len(image_paths)}")

In [None]:
# -----------------------
# Preprocess function (bisa dipanggil langsung oleh .map)
# -----------------------
def preprocess_path_label(path, label):
    image = tf.io.read_file(path)
    image = tf.image.decode_image(image, channels=3, expand_animations=False)
    image = tf.image.convert_image_dtype(image, tf.float32)  # 0-1
    image = tf.image.resize(image, IMG_SIZE)
    return image, tf.one_hot(label, num_classes)

In [None]:
# -----------------------
# Buat aug_ds (tanpa tf.py_function)
# -----------------------
if len(image_paths) > 0:
    paths_tensor = tf.constant(image_paths)
    labels_tensor = tf.constant(labels_idx, dtype=tf.int32)
    aug_ds = tf.data.Dataset.from_tensor_slices((paths_tensor, labels_tensor))
    # map langsung ke preprocess (tidak perlu py_function)
    aug_ds = aug_ds.shuffle(len(image_paths), reshuffle_each_iteration=True)
    aug_ds = aug_ds.map(lambda p, l: preprocess_path_label(p, l), num_parallel_calls=tf.data.AUTOTUNE)
    aug_ds = aug_ds.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
else:
    aug_ds = tf.data.Dataset.from_tensor_slices(([],[]))  # kosong, kalau file tidak ditemukan

In [None]:
# -----------------------
# Gabungkan dengan original
# -----------------------
if USE_COMBINED:
    ds_original = tf.keras.utils.image_dataset_from_directory(
        train_folder,
        image_size=IMG_SIZE,
        batch_size=BATCH_SIZE,
        label_mode="categorical",
        shuffle=True
    )
    ds_original = ds_original.map(lambda x,y: (tf.cast(x, tf.float32)/255.0, y),
                                  num_parallel_calls=tf.data.AUTOTUNE)
    train_ds = ds_original.concatenate(aug_ds).shuffle(1000).prefetch(tf.data.AUTOTUNE)
else:
    train_ds = aug_ds

In [None]:
# -----------------------
# Validasi & Test
# -----------------------
val_ds = tf.keras.utils.image_dataset_from_directory(
    os.path.join(DATASET_DIR, "valid"),
    image_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    label_mode="categorical",
    shuffle=False
)
test_ds = tf.keras.utils.image_dataset_from_directory(
    os.path.join(DATASET_DIR, "test"),
    image_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    label_mode="categorical",
    shuffle=False
)
val_ds = val_ds.map(lambda x,y: (tf.cast(x, tf.float32)/255.0, y), num_parallel_calls=tf.data.AUTOTUNE)
test_ds = test_ds.map(lambda x,y: (tf.cast(x, tf.float32)/255.0, y), num_parallel_calls=tf.data.AUTOTUNE)

In [None]:
# -----------------------
# Build model (pastikan input pakai IMG_SIZE)
# -----------------------
def build_model(num_classes):
    inputs = layers.Input(shape=(IMG_SIZE[0], IMG_SIZE[1], 3))
    x = layers.Conv2D(32, (3,3), activation='relu')(inputs)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(64, (3,3), activation='relu')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Conv2D(128, (3,3), activation='relu')(x)
    x = layers.MaxPooling2D((2,2))(x)
    x = layers.Flatten()(x)
    x = layers.Dense(64, activation='relu')(x)
    outputs = layers.Dense(num_classes, activation='softmax')(x)
    model = models.Model(inputs, outputs)
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

model = build_model(num_classes)
model.summary()

In [None]:
# -----------------------
# 7) Training
# -----------------------
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS
)

model.save("model_with_augmentations.h5")

In [None]:
# -----------------------
# 8) Plot: training curves
# -----------------------
plt.figure(figsize=(12,5))
plt.subplot(1,2,1)
plt.plot(history.history['accuracy'], label='Train Acc')
plt.plot(history.history['val_accuracy'], label='Val Acc')
plt.title("Training vs Validation Accuracy")
plt.xlabel("Epoch"); plt.ylabel("Accuracy"); plt.legend()

plt.subplot(1,2,2)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Val Loss')
plt.title("Training vs Validation Loss")
plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.legend()
plt.tight_layout()
plt.show()

# -----------------------
# 9) Evaluasi di test set
# -----------------------
test_loss, test_acc = model.evaluate(test_ds)
print(f"\nTest Accuracy: {test_acc:.4f}  | Test Loss: {test_loss:.4f}")

# collect predictions
y_true = []
y_pred = []
y_prob = []

start_time = time.time()
for x, y in test_ds:
    probs = model.predict(x)
    preds = np.argmax(probs, axis=1)
    y_true.extend(np.argmax(y.numpy(), axis=1))
    y_pred.extend(preds)
    y_prob.extend(probs)
end_time = time.time()
inference_time = (end_time - start_time) / max(1, len(y_true))
print(f"Rata-rata waktu inferensi per gambar: {inference_time:.4f} detik")

# -----------------------
# 10) Confusion Matrix
# -----------------------
cm = confusion_matrix(y_true, y_pred, labels=range(num_classes))
plt.figure(figsize=(8,6))
plt.imshow(cm, cmap="Blues")
plt.title("Confusion Matrix")
plt.xlabel("Predicted"); plt.ylabel("True")
plt.xticks(range(num_classes), class_names, rotation=45)
plt.yticks(range(num_classes), class_names)
for i in range(num_classes):
    for j in range(num_classes):
        plt.text(j, i, cm[i,j], ha="center", va="center", color="black")
plt.colorbar()
plt.tight_layout()
plt.show()

# -----------------------
# 11) Classification report + per-class metrics
# -----------------------
print("\nClassification Report:\n")
report_dict = classification_report(y_true, y_pred, target_names=class_names, output_dict=True, zero_division=0)
print(classification_report(y_true, y_pred, target_names=class_names, zero_division=0))

acc = accuracy_score(y_true, y_pred)
precision_macro = precision_score(y_true, y_pred, average='macro', zero_division=0)
recall_macro = recall_score(y_true, y_pred, average='macro', zero_division=0)
f1_macro = f1_score(y_true, y_pred, average='macro', zero_division=0)
print(f"Accuracy: {acc:.4f} | Precision(macro): {precision_macro:.4f} | Recall(macro): {recall_macro:.4f} | F1(macro): {f1_macro:.4f}")

# -----------------------
# 12) ROC AUC (One-vs-Rest) - multi-class
# -----------------------
y_prob_arr = np.array(y_prob)
y_true_bin = label_binarize(y_true, classes=list(range(num_classes)))

# Pastikan y_true_bin dan y_prob_arr punya kolom sesuai num_classes
if num_classes == 2:
    # jika y_prob_arr shape (N,1), ubah jadi (N,2)
    if y_prob_arr.shape[1] == 1:
        y_prob_arr = np.hstack([1 - y_prob_arr, y_prob_arr])
    # jika y_true_bin shape (N,1), tambahkan kolom lain
    if y_true_bin.shape[1] == 1:
        y_true_bin = np.hstack([1 - y_true_bin, y_true_bin])

# compute AUC per class
auc_per_class = {}
for c in range(num_classes):
    try:
        auc_score = roc_auc_score(y_true_bin[:, c], y_prob_arr[:, c])
    except ValueError:
        auc_score = np.nan
    auc_per_class[class_names[c]] = auc_score

print("\nAUC per class (One-vs-Rest):")
for k, v in auc_per_class.items():
    print(f"{k}: {v}")

# ROC curve / AUC plotting
plt.figure(figsize=(6,6))
if num_classes == 2:
    from sklearn.metrics import roc_curve, auc
    y_score = y_prob_arr[:, 1]  # kolom kelas positif
    fpr, tpr, _ = roc_curve(y_true_bin[:, 1], y_score)
    auc_score = roc_auc_score(y_true_bin[:, 1], y_score)
    plt.plot(fpr, tpr, label=f"AUC = {auc_score:.4f}")
    plt.plot([0,1], [0,1], 'k--')
    plt.xlabel("False Positive Rate"); plt.ylabel("True Positive Rate")
    plt.title("ROC Curve")
    plt.legend()
else:
    # multi-class: tampilkan bar chart AUC per class
    plt.bar(list(auc_per_class.keys()), [0 if np.isnan(v) else v for v in auc_per_class.values()])
    plt.xticks(rotation=45)
    plt.ylim(0,1)
    plt.title("AUC per Class (One-vs-Rest)")
plt.tight_layout()
plt.show()


# -----------------------
# 13) Bar chart: precision/recall/f1 per class
# -----------------------
precisions = [report_dict[c]['precision'] for c in class_names]
recalls = [report_dict[c]['recall'] for c in class_names]
f1s = [report_dict[c]['f1-score'] for c in class_names]

x = np.arange(len(class_names))
width = 0.25
plt.figure(figsize=(10,6))
plt.bar(x - width, precisions, width, label='Precision')
plt.bar(x, recalls, width, label='Recall')
plt.bar(x + width, f1s, width, label='F1-Score')
plt.xticks(x, class_names, rotation=45)
plt.ylabel("Score")
plt.ylim(0,1)
plt.title("Per-Class Precision / Recall / F1")
plt.legend()
plt.tight_layout()
plt.show()
