In [None]:
import os
import re
import json
import random
import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt
import seaborn as sns

import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau

from PIL import Image, ImageEnhance
from collections import Counter

from sklearn.metrics import classification_report, confusion_matrix
from tqdm import tqdm


In [None]:

# ==========================================
# STEP 0: PARAMETERS & PATHS (sesuaikan)
# ==========================================
PROCESSED_DIR = "../dataset_processed"  # output: train/ val/ test/ per kelas
MODEL_DIR = "../models"
MODEL_ID = 1

IMG_SIZE = (224, 224)   # tuple: target_size untuk flow_from_directory
IMG_SIDE = IMG_SIZE[0]  # integer untuk fungsi make_square
BATCH_SIZE = 16

# Training
TRAIN_EPOCHS = 5
TRAIN_LEARNING_RATE = 0.0005

# Fine-Tuning
TUNING_EPOCHS = 10
TUNING_LEARNING_RATE = 1e-5

SPLIT_RATIO = {
    "train": 0.8, 
    "val": 0.1, 
    "test": 0.1
}

os.makedirs(PROCESSED_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)

print("PARAMETER siap:", IMG_SIZE, "BATCH_SIZE=", BATCH_SIZE)


In [None]:

# ==========================================
# STEP 4: Data generators
# - train: augmentation + rescale
# - val/test: hanya rescale
# ==========================================

train_datagen = ImageDataGenerator(
    rescale=1./255,
    rotation_range=30,
    width_shift_range=0.2,
    height_shift_range=0.2,
    zoom_range=0.2,
    shear_range=0.2,
    horizontal_flip=True
)

valtest_datagen = ImageDataGenerator(
    rescale=1./255
)

train_generator = train_datagen.flow_from_directory(
    os.path.join(PROCESSED_DIR, "train"),
    target_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=True,
)

val_generator = valtest_datagen.flow_from_directory(
    os.path.join(PROCESSED_DIR, "val"),
    target_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=False
)

test_generator = valtest_datagen.flow_from_directory(
    os.path.join(PROCESSED_DIR, "test"),
    target_size=IMG_SIZE,
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    shuffle=False
)

print("Generators siap:")
print(" - train samples:", train_generator.samples)
print(" - val samples:", val_generator.samples)
print(" - test samples:", test_generator.samples)
print("Classes:", train_generator.class_indices)


In [None]:

# ==========================================
# (Optional) print counts per class for sanity check
# ==========================================

print("\nCounts per class (train):")
train_counts = Counter([os.path.split(f)[0] for f in train_generator.filenames])
for cls, idx in train_generator.class_indices.items():
    print(f" - {cls}: {train_counts[cls]}")

print("\nCounts per class (val):")
val_counts = Counter([os.path.split(f)[0] for f in val_generator.filenames])
for cls, idx in val_generator.class_indices.items():
    print(f" - {cls}: {val_counts[cls]}")

print("\nCounts per class (test):")
val_counts = Counter([os.path.split(f)[0] for f in test_generator.filenames])
for cls, idx in val_generator.class_indices.items():
    print(f" - {cls}: {val_counts[cls]}")


In [None]:

# ==========================================
# STEP 5: Build model (MobileNetV2 transfer learning)
# ==========================================

num_classes = train_generator.num_classes

MODEL_NAME = "MobileNetV2"
base_model = MobileNetV2(weights='imagenet', include_top=False, input_shape=(IMG_SIDE, IMG_SIDE, 3))

x = base_model.output
x = GlobalAveragePooling2D()(x)
preds = Dense(num_classes, activation='softmax')(x)
model = Model(inputs=base_model.input, outputs=preds)

# freeze base
for layer in base_model.layers:
    layer.trainable = False

model.compile(
    optimizer=Adam(learning_rate=TRAIN_LEARNING_RATE), 
    loss='categorical_crossentropy', 
    metrics=['accuracy']
)

print("\nModel siap. Jumlah kelas:", num_classes)


In [None]:

# ==========================================
# STEP 6: Callbacks (recommended)
# ==========================================

checkpoint_path = os.path.join(MODEL_DIR, "best_model.h5")
callbacks = [
    ModelCheckpoint(checkpoint_path, save_best_only=True, monitor='val_loss', verbose=1),
    EarlyStopping(monitor='val_loss', patience=6, restore_best_weights=True, verbose=1),
    ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, verbose=1)
]


In [None]:
import matplotlib.pyplot as plt
from IPython.display import clear_output
import tensorflow as tf

import matplotlib.pyplot as plt
from IPython.display import clear_output
import tensorflow as tf

class LiveTrainPlot(tf.keras.callbacks.Callback):
    def on_train_begin(self, logs=None):
        self.batch_count = []
        self.loss = []
        self.acc = []
        self.step = 0
        self.current_epoch = 1

    def on_batch_end(self, batch, logs=None):
        self.step += 1
        self.batch_count.append(self.step)
        self.loss.append(logs.get('loss'))
        self.acc.append(logs.get('accuracy'))

        clear_output(wait=True)
        plt.figure(figsize=(13,5))

        # ================= LOSS =================
        plt.subplot(1,2,1)
        plt.plot(self.batch_count, self.loss, color='red', label='Loss')

        plt.text(0.01, 0.95, f'Current Epoch: {self.current_epoch}',
                 transform=plt.gca().transAxes,
                 fontsize=11, color='red', weight='bold')

        plt.title('Loss (per batch)')
        plt.xlabel('Batch')
        plt.legend()
        plt.grid(True)

        # ================= ACCURACY =================
        plt.subplot(1,2,2)
        plt.plot(self.batch_count, self.acc, color='blue', label='Accuracy')

        plt.text(0.01, 0.95, f'Current Epoch: {self.current_epoch}',
                 transform=plt.gca().transAxes,
                 fontsize=11, color='blue', weight='bold')

        plt.title('Accuracy (per batch)')
        plt.xlabel('Batch')
        plt.legend()
        plt.grid(True)

        plt.tight_layout()
        plt.show()

    def on_epoch_end(self, epoch, logs=None):
        self.current_epoch = epoch + 1


liveplot = LiveTrainPlot()


In [None]:

# ==========================================
# STEP 7: Training
# ==========================================

history = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=TRAIN_EPOCHS,
    callbacks=[liveplot] + callbacks
)



In [None]:
# ==========================================
# STEP 8: Fine-tuning (fase kedua)
# ==========================================

# Unfreeze 50 layer terakhir dari base model
for layer in base_model.layers[-50:]:
    layer.trainable = True

# Compile ulang dengan learning rate lebih kecil
model.compile(
    optimizer=Adam(learning_rate=TUNING_LEARNING_RATE),  # LR kecil untuk fine-tuning
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

# Latih ulang model (fine-tuning)
history_finetune = model.fit(
    train_generator,
    validation_data=val_generator,
    epochs=TUNING_EPOCHS,
   callbacks=[liveplot] + callbacks
)


In [None]:

# ==========================================
# STEP 8: Save final model & metadata
# ==========================================

# --- GABUNGKAN HISTORY (fase 1 + fase 2) ---
# agar grafik & statistik mencakup seluruh pelatihan
combined_history = {}
for key in history.history.keys():
    combined_history[key] = history.history[key] + history_finetune.history.get(key, [])

# --- Hitung rata-rata akurasi dari fase akhir (fine-tuning) ---
train_acc_avg = np.mean(history_finetune.history.get('accuracy', history.history.get('accuracy', [-1])))
val_acc_avg   = np.mean(history_finetune.history.get('val_accuracy', history.history.get('val_accuracy', [-1])))

train_acc_int = int(train_acc_avg * 100)
val_acc_int = int(val_acc_avg * 100)

# create versioned folder like before
pattern = re.compile(r"model(\d+)-(\d+)-(\d+)")
existing_dirs = [d for d in os.listdir(MODEL_DIR) if os.path.isdir(os.path.join(MODEL_DIR, d))]
model_id = MODEL_ID
versions = []
for d in existing_dirs:
    m = pattern.match(d)
    if m and int(m.group(1)) == model_id:
        versions.append(int(m.group(2)))
next_version = max(versions) + 1 if versions else 1

# --- Buat folder penyimpanan baru ---
save_dir = os.path.join(MODEL_DIR, f"model{model_id}-{next_version}-{train_acc_int}-{val_acc_int}")
os.makedirs(save_dir, exist_ok=True)

# --- Simpan model final (setelah fine-tuning) ---
final_model_path = os.path.join(save_dir, f"model.h5")
model.save(final_model_path)

# --- Simpan label kelas ---
labels = {v: k for k, v in train_generator.class_indices.items()}
with open(os.path.join(save_dir, "labels.json"), "w") as f:
    json.dump(labels, f, indent=4)

# --- Simpan ringkasan metadata model ---
summary = {
    "model_id": model_id,
    "model_name": MODEL_NAME,
    "version": next_version,
    "train_acc_avg": round(float(train_acc_avg) * 100, 2),
    "val_acc_avg": round(float(val_acc_avg) * 100, 2),
    "epochs_phase1": len(history.history.get('accuracy', [])),
    "epochs_phase2": len(history_finetune.history.get('accuracy', [])),
    "model_path": final_model_path
}
with open(os.path.join(save_dir, "summary.json"), "w") as f:
    json.dump(summary, f, indent=4)

def to_serializable(obj):
    """Konversi semua numpy type ke tipe Python biasa agar bisa di-dump ke JSON"""
    if isinstance(obj, (np.float32, np.float64)):
        return float(obj)
    if isinstance(obj, (np.int32, np.int64)):
        return int(obj)
    if isinstance(obj, (np.ndarray, list)):
        return [to_serializable(x) for x in obj]
    if isinstance(obj, dict):
        return {k: to_serializable(v) for k, v in obj.items()}
    return obj


# --- Simpan riwayat pelatihan (fase 1, fase 2, dan gabungan) ---
with open(os.path.join(save_dir, "history_phase1.json"), "w") as f:
    json.dump(to_serializable(history.history), f, indent=4)

with open(os.path.join(save_dir, "history_finetune.json"), "w") as f:
    json.dump(to_serializable(history_finetune.history), f, indent=4)

with open(os.path.join(save_dir, "history_combined.json"), "w") as f:
    json.dump(to_serializable(combined_history), f, indent=4)


print(f"üìÅ Semua file metadata dan history disimpan di: {save_dir}")


In [None]:
# ==========================================
# STEP 9: Plot & save metrics (FINAL)
# ==========================================

# Gunakan combined_history dari STEP 8
acc      = combined_history.get('accuracy', [])
val_acc  = combined_history.get('val_accuracy', [])
loss     = combined_history.get('loss', [])
val_loss = combined_history.get('val_loss', [])

epochs_range = range(1, len(acc) + 1)

plt.figure(figsize=(12, 5))

# --- Plot Accuracy ---
plt.subplot(1, 2, 1)
plt.plot(epochs_range, acc, label='Train Accuracy', marker='o')
plt.plot(epochs_range, val_acc, label='Val Accuracy', marker='s')
plt.title("Training & Validation Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend()
plt.grid(True, linestyle='--', alpha=0.5)

# --- Plot Loss ---
plt.subplot(1, 2, 2)
plt.plot(epochs_range, loss, label='Train Loss', marker='o')
plt.plot(epochs_range, val_loss, label='Val Loss', marker='s')
plt.title("Training & Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.grid(True, linestyle='--', alpha=0.5)

plt.tight_layout()

# --- Simpan grafik ---
metrics_path = os.path.join(save_dir, "metrics.png")
plt.savefig(metrics_path)
plt.show()
plt.close()

print(f"üìà Grafik akurasi & loss disimpan di: {metrics_path}")


In [None]:

# ==========================================
# STEP 10: Evaluation (validation set)
# ==========================================
print("\nüöÄ Evaluasi Model pada Validation Set...")

# --- Reset generator agar prediksi berurutan ---
val_generator.reset()

# --- Prediksi ---
y_pred = model.predict(val_generator, verbose=0)
y_pred_classes = np.argmax(y_pred, axis=1)
y_true = val_generator.classes

# --- Label kelas ---
class_labels = list(val_generator.class_indices.keys())

# --- Classification Report ---
print("\nüìä Classification Report (Validation):")
report_val = classification_report(
    y_true,
    y_pred_classes,
    target_names=class_labels,
    zero_division=0,
    output_dict=True
)
print(classification_report(
    y_true,
    y_pred_classes,
    target_names=class_labels,
    zero_division=0
))

# --- Simpan laporan evaluasi ke JSON ---
eval_val_path = os.path.join(save_dir, "evaluation_val.json")
with open(eval_val_path, "w") as f:
    json.dump(report_val, f, indent=4)
print(f"‚úÖ Laporan evaluasi validasi disimpan: {eval_val_path}")

# --- Confusion Matrix ---
cm = confusion_matrix(y_true, y_pred_classes)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=class_labels,
            yticklabels=class_labels)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix - Validation Set')

# --- Simpan confusion matrix ---
cm_val_path = os.path.join(save_dir, "confusion_matrix_val.png")
plt.tight_layout()
plt.savefig(cm_val_path)
plt.show()
plt.close()

print(f"üß© Confusion matrix validasi disimpan di: {cm_val_path}")

In [None]:

# ==========================================
# STEP 11: Testing (test set)
# ==========================================
print("\nüöÄ Evaluasi Model pada Test Set...")

# --- Reset generator ---
test_generator.reset()

# --- Evaluasi langsung ---
test_loss, test_acc = model.evaluate(test_generator, verbose=0)
print(f"\nüìà Test Accuracy: {test_acc*100:.2f}%  |  Test Loss: {test_loss:.4f}")

# --- Prediksi ---
y_pred_test = model.predict(test_generator, verbose=0)
y_pred_test_classes = np.argmax(y_pred_test, axis=1)
y_true_test = test_generator.classes

# --- Label kelas ---
class_labels = list(test_generator.class_indices.keys())

# --- Classification Report ---
print("\nüìä Classification Report (Test):")
report_test = classification_report(
    y_true_test,
    y_pred_test_classes,
    target_names=class_labels,
    zero_division=0,
    output_dict=True
)
print(classification_report(
    y_true_test,
    y_pred_test_classes,
    target_names=class_labels,
    zero_division=0
))

# --- Simpan laporan evaluasi ke JSON ---
eval_test_path = os.path.join(save_dir, "evaluation_test.json")
with open(eval_test_path, "w") as f:
    json.dump(report_test, f, indent=4)
print(f"‚úÖ Laporan evaluasi testing disimpan: {eval_test_path}")

# --- Confusion Matrix ---
cm_test = confusion_matrix(y_true_test, y_pred_test_classes)

plt.figure(figsize=(8, 6))
sns.heatmap(cm_test, annot=True, fmt='d', cmap='Greens',
            xticklabels=class_labels,
            yticklabels=class_labels)
plt.xlabel('Predicted Label')
plt.ylabel('True Label')
plt.title('Confusion Matrix - Test Set')

# --- Simpan confusion matrix ---
cm_test_path = os.path.join(save_dir, "confusion_matrix_test.png")
plt.tight_layout()
plt.savefig(cm_test_path)
plt.show()
plt.close()

print(f"üß© Confusion matrix testing disimpan di: {cm_test_path}")

# --- Simpan ringkasan hasil test ke summary.json ---
summary["test_accuracy"] = round(float(test_acc) * 100, 2)
summary["test_loss"] = round(float(test_loss), 4)

with open(os.path.join(save_dir, "summary.json"), "w") as f:
    json.dump(summary, f, indent=4)

print(f"üìò Summary diperbarui dengan hasil test: {os.path.join(save_dir, 'summary.json')}")
