In [None]:
import os
import numpy as np
import cv2
import json
import matplotlib.pyplot as plt
from PIL import Image
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.metrics import mean_squared_error
from tqdm import tqdm
import time
from datetime import datetime

# Install packages jika belum ada
try:
    import lpips
except ImportError:
    !pip install lpips

try:
    from skimage.metrics import structural_similarity as ssim
    from skimage.metrics import peak_signal_noise_ratio as psnr
except ImportError:
    !pip install scikit-image

import lpips
from skimage.metrics import structural_similarity as ssim
from skimage.metrics import peak_signal_noise_ratio as psnr

: 

In [None]:
# Konfigurasi
DATASET_PATH = "processed_dataset"
MODEL_PATH = "models"
RESULTS_PATH = "results"
SCALE_FACTOR = 4
BATCH_SIZE = 16
EPOCHS = 100
LEARNING_RATE = 0.001

# Buat folder untuk model dan hasil
os.makedirs(MODEL_PATH, exist_ok=True)
os.makedirs(RESULTS_PATH, exist_ok=True)

# GPU configuration
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        tf.config.experimental.set_memory_growth(gpus[0], True)
        print("✅ GPU configured successfully")
    except RuntimeError as e:
        print(f"GPU configuration error: {e}")
else:
    print("⚠️  No GPU found, using CPU")

In [None]:
def load_dataset_info():
    """Load informasi dataset"""
    with open(os.path.join(DATASET_PATH, "dataset_info.json"), "r") as f:
        return json.load(f)

dataset_info = load_dataset_info()
print("Dataset Info:")
for key, value in dataset_info.items():
    print(f"  {key}: {value}")

In [None]:
def create_data_generator(split="train", batch_size=BATCH_SIZE):
    """Generator untuk loading data secara batch"""
    high_res_path = os.path.join(DATASET_PATH, split, "high_res")
    low_res_path = os.path.join(DATASET_PATH, split, "low_res")
    
    # Dapatkan list file
    files = sorted(os.listdir(high_res_path))
    
    def generator():
        while True:
            # Shuffle files untuk training
            if split == "train":
                np.random.shuffle(files)
            
            for i in range(0, len(files), batch_size):
                batch_files = files[i:i+batch_size]
                
                lr_batch = []
                hr_batch = []
                
                for filename in batch_files:
                    # Load low-res image
                    lr_img = cv2.imread(os.path.join(low_res_path, filename))
                    lr_img = cv2.cvtColor(lr_img, cv2.COLOR_BGR2RGB)
                    lr_img = lr_img.astype(np.float32) / 255.0
                    
                    # Load high-res image
                    hr_img = cv2.imread(os.path.join(high_res_path, filename))
                    hr_img = cv2.cvtColor(hr_img, cv2.COLOR_BGR2RGB)
                    hr_img = hr_img.astype(np.float32) / 255.0
                    
                    lr_batch.append(lr_img)
                    hr_batch.append(hr_img)
                
                yield np.array(lr_batch), np.array(hr_batch)
    
    return generator, len(files)


In [None]:
def build_espcn_model():
    """Build model ESPCN"""
    inputs = keras.Input(shape=(64, 64, 3))
    
    # Feature extraction layers
    x = layers.Conv2D(64, 5, padding='same', activation='relu')(inputs)
    x = layers.Conv2D(64, 3, padding='same', activation='relu')(x)
    x = layers.Conv2D(32, 3, padding='same', activation='relu')(x)
    
    # Sub-pixel convolution layer
    # For 4x upscaling, we need 4^2 * 3 = 48 channels
    x = layers.Conv2D(SCALE_FACTOR * SCALE_FACTOR * 3, 3, padding='same')(x)
    
    # Sub-pixel shuffle (pixel shuffle)
    outputs = tf.nn.depth_to_space(x, SCALE_FACTOR)
    
    model = keras.Model(inputs, outputs, name="ESPCN")
    return model

# Build model
model = build_espcn_model()
model.summary()

In [None]:
# Compile model
model.compile(
    optimizer=keras.optimizers.Adam(learning_rate=LEARNING_RATE),
    loss='mse',
    metrics=['mae']
)

In [None]:
# Setup data generators
train_generator, train_steps = create_data_generator("train", BATCH_SIZE)
test_generator, test_steps = create_data_generator("test", BATCH_SIZE)

print(f"Training steps per epoch: {train_steps // BATCH_SIZE}")
print(f"Testing steps per epoch: {test_steps // BATCH_SIZE}")


In [None]:
# Callbacks
checkpoint_callback = keras.callbacks.ModelCheckpoint(
    os.path.join(MODEL_PATH, "espcn_best.h5"),
    monitor='val_loss',
    save_best_only=True,
    verbose=1
)

early_stopping = keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=15,
    restore_best_weights=True,
    verbose=1
)

lr_scheduler = keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=0.5,
    patience=10,
    min_lr=1e-7,
    verbose=1
)


In [None]:
# Training model
print("🚀 Starting training...")
start_time = time.time()

history = model.fit(
    train_generator(),
    steps_per_epoch=train_steps // BATCH_SIZE,
    epochs=EPOCHS,
    validation_data=test_generator(),
    validation_steps=test_steps // BATCH_SIZE,
    callbacks=[checkpoint_callback, early_stopping, lr_scheduler],
    verbose=1
)

training_time = time.time() - start_time
print(f"✅ Training completed in {training_time:.2f} seconds")


In [None]:
def plot_training_history(history):
    """Plot training dan validation loss"""
    plt.figure(figsize=(12, 4))
    
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(history.history['mae'], label='Training MAE')
    plt.plot(history.history['val_mae'], label='Validation MAE')
    plt.title('Model MAE')
    plt.xlabel('Epoch')
    plt.ylabel('MAE')
    plt.legend()
    
    plt.tight_layout()
    plt.savefig(os.path.join(RESULTS_PATH, "training_history.png"))
    plt.show()

plot_training_history(history)

In [None]:
# Load best model
model.load_weights(os.path.join(MODEL_PATH, "espcn_best.h5"))
print("✅ Best model loaded")

In [None]:
# Implementasi baseline methods untuk perbandingan
def bicubic_upscale(img, scale_factor):
    """Bicubic upscaling baseline"""
    h, w = img.shape[:2]
    new_h, new_w = h * scale_factor, w * scale_factor
    return cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_CUBIC)

def bilinear_upscale(img, scale_factor):
    """Bilinear upscaling baseline"""
    h, w = img.shape[:2]
    new_h, new_w = h * scale_factor, w * scale_factor
    return cv2.resize(img, (new_w, new_h), interpolation=cv2.INTER_LINEAR)

In [None]:
# Evaluasi metrics
def calculate_psnr(img1, img2):
    """Calculate PSNR"""
    img1 = (img1 * 255).astype(np.uint8)
    img2 = (img2 * 255).astype(np.uint8)
    return psnr(img1, img2, data_range=255)

def calculate_ssim(img1, img2):
    """Calculate SSIM"""
    img1 = (img1 * 255).astype(np.uint8)
    img2 = (img2 * 255).astype(np.uint8)
    return ssim(img1, img2, data_range=255, channel_axis=2)

def calculate_lpips(img1, img2, lpips_model):
    """Calculate LPIPS"""
    # Convert to tensor format for LPIPS
    img1_tensor = tf.convert_to_tensor(img1 * 2.0 - 1.0)  # Normalize to [-1, 1]
    img2_tensor = tf.convert_to_tensor(img2 * 2.0 - 1.0)
    
    # Add batch dimension
    img1_tensor = tf.expand_dims(img1_tensor, 0)
    img2_tensor = tf.expand_dims(img2_tensor, 0)
    
    # Transpose to NCHW format
    img1_tensor = tf.transpose(img1_tensor, [0, 3, 1, 2])
    img2_tensor = tf.transpose(img2_tensor, [0, 3, 1, 2])
    
    return lpips_model(img1_tensor, img2_tensor).numpy().item()

# Initialize LPIPS model
lpips_model = lpips.LPIPS(net='alex')

In [None]:
def evaluate_model():
    """Evaluasi model pada test set"""
    test_files = sorted(os.listdir(os.path.join(DATASET_PATH, "test", "high_res")))
    
    espcn_psnr = []
    espcn_ssim = []
    espcn_lpips = []
    
    bicubic_psnr = []
    bicubic_ssim = []
    bicubic_lpips = []
    
    bilinear_psnr = []
    bilinear_ssim = []
    bilinear_lpips = []
    
    print("🔍 Evaluating model...")
    
    for filename in tqdm(test_files[:50]):  # Evaluate first 50 images
        # Load images
        hr_path = os.path.join(DATASET_PATH, "test", "high_res", filename)
        lr_path = os.path.join(DATASET_PATH, "test", "low_res", filename)
        
        hr_img = cv2.imread(hr_path)
        lr_img = cv2.imread(lr_path)
        
        hr_img = cv2.cvtColor(hr_img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
        lr_img = cv2.cvtColor(lr_img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
        
        # ESPCN prediction
        lr_batch = np.expand_dims(lr_img, axis=0)
        espcn_pred = model.predict(lr_batch, verbose=0)[0]
        espcn_pred = np.clip(espcn_pred, 0, 1)
        
        # Baseline methods
        bicubic_pred = bicubic_upscale(lr_img, SCALE_FACTOR)
        bilinear_pred = bilinear_upscale(lr_img, SCALE_FACTOR)
        
        # Calculate metrics
        # ESPCN
        espcn_psnr.append(calculate_psnr(hr_img, espcn_pred))
        espcn_ssim.append(calculate_ssim(hr_img, espcn_pred))
        espcn_lpips.append(calculate_lpips(hr_img, espcn_pred, lpips_model))
        
        # Bicubic
        bicubic_psnr.append(calculate_psnr(hr_img, bicubic_pred))
        bicubic_ssim.append(calculate_ssim(hr_img, bicubic_pred))
        bicubic_lpips.append(calculate_lpips(hr_img, bicubic_pred, lpips_model))
        
        # Bilinear
        bilinear_psnr.append(calculate_psnr(hr_img, bilinear_pred))
        bilinear_ssim.append(calculate_ssim(hr_img, bilinear_pred))
        bilinear_lpips.append(calculate_lpips(hr_img, bilinear_pred, lpips_model))
    
    # Calculate averages
    results = {
        "ESPCN": {
            "PSNR": np.mean(espcn_psnr),
            "SSIM": np.mean(espcn_ssim),
            "LPIPS": np.mean(espcn_lpips)
        },
        "Bicubic": {
            "PSNR": np.mean(bicubic_psnr),
            "SSIM": np.mean(bicubic_ssim),
            "LPIPS": np.mean(bicubic_lpips)
        },
        "Bilinear": {
            "PSNR": np.mean(bilinear_psnr),
            "SSIM": np.mean(bilinear_ssim),
            "LPIPS": np.mean(bilinear_lpips)
        }
    }
    
    return results

# Run evaluation
evaluation_results = evaluate_model()

In [None]:
# Print evaluation results
print("="*60)
print("EVALUATION RESULTS")
print("="*60)

for method, metrics in evaluation_results.items():
    print(f"\n{method}:")
    print(f"  PSNR: {metrics['PSNR']:.4f} dB")
    print(f"  SSIM: {metrics['SSIM']:.4f}")
    print(f"  LPIPS: {metrics['LPIPS']:.4f}")

# Save results
with open(os.path.join(RESULTS_PATH, "evaluation_results.json"), "w") as f:
    json.dump(evaluation_results, f, indent=2)

In [None]:
def plot_metrics_comparison():
    """Plot comparison metrics"""
    methods = list(evaluation_results.keys())
    psnr_values = [evaluation_results[method]["PSNR"] for method in methods]
    ssim_values = [evaluation_results[method]["SSIM"] for method in methods]
    lpips_values = [evaluation_results[method]["LPIPS"] for method in methods]
    
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    # PSNR
    axes[0].bar(methods, psnr_values, color=['red', 'blue', 'green'])
    axes[0].set_title('PSNR Comparison')
    axes[0].set_ylabel('PSNR (dB)')
    axes[0].set_ylim(0, max(psnr_values) * 1.1)
    
    # SSIM
    axes[1].bar(methods, ssim_values, color=['red', 'blue', 'green'])
    axes[1].set_title('SSIM Comparison')
    axes[1].set_ylabel('SSIM')
    axes[1].set_ylim(0, 1)
    
    # LPIPS (lower is better)
    axes[2].bar(methods, lpips_values, color=['red', 'blue', 'green'])
    axes[2].set_title('LPIPS Comparison (Lower is Better)')
    axes[2].set_ylabel('LPIPS')
    axes[2].set_ylim(0, max(lpips_values) * 1.1)
    
    plt.tight_layout()
    plt.savefig(os.path.join(RESULTS_PATH, "metrics_comparison.png"))
    plt.show()

plot_metrics_comparison()

In [None]:
# Visualisasi hasil prediksi
def visualize_predictions(num_samples=3):
    """Visualisasi prediksi model"""
    test_files = sorted(os.listdir(os.path.join(DATASET_PATH, "test", "high_res")))
    
    fig, axes = plt.subplots(num_samples, 5, figsize=(20, 4*num_samples))
    fig.suptitle("Model Predictions Comparison", fontsize=16)
    
    for i in range(num_samples):
        filename = test_files[i]
        
        # Load images
        hr_path = os.path.join(DATASET_PATH, "test", "high_res", filename)
        lr_path = os.path.join(DATASET_PATH, "test", "low_res", filename)
        
        hr_img = cv2.imread(hr_path)
        lr_img = cv2.imread(lr_path)
        
        hr_img = cv2.cvtColor(hr_img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
        lr_img = cv2.cvtColor(lr_img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
        
        # Predictions
        lr_batch = np.expand_dims(lr_img, axis=0)
        espcn_pred = model.predict(lr_batch, verbose=0)[0]
        espcn_pred = np.clip(espcn_pred, 0, 1)
        
        bicubic_pred = bicubic_upscale(lr_img, SCALE_FACTOR)
        bilinear_pred = bilinear_upscale(lr_img, SCALE_FACTOR)
        
        # Plot
        axes[i, 0].imshow(lr_img)
        axes[i, 0].set_title("Low-Res Input")
        axes[i, 0].axis('off')
        
        axes[i, 1].imshow(bilinear_pred)
        axes[i, 1].set_title("Bilinear")
        axes[i, 1].axis('off')
        
        axes[i, 2].imshow(bicubic_pred)
        axes[i, 2].set_title("Bicubic")
        axes[i, 2].axis('off')
        
        axes[i, 3].imshow(espcn_pred)
        axes[i, 3].set_title("ESPCN")
        axes[i, 3].axis('off')
        
        axes[i, 4].imshow(hr_img)
        axes[i, 4].set_title("Ground Truth")
        axes[i, 4].axis('off')
    
    plt.tight_layout()
    plt.savefig(os.path.join(RESULTS_PATH, "predictions_comparison.png"), dpi=150)
    plt.show()

visualize_predictions(3)

In [None]:
model.save(os.path.join(MODEL_PATH, "espcn_final.h5"))
print("✅ Model saved for GUI application")


In [None]:
# Training summary
def save_training_summary():
    """Save training summary"""
    summary = {
        "model_name": "ESPCN",
        "scale_factor": SCALE_FACTOR,
        "training_time": training_time,
        "epochs_trained": len(history.history['loss']),
        "best_val_loss": min(history.history['val_loss']),
        "evaluation_results": evaluation_results,
        "dataset_info": dataset_info,
        "hyperparameters": {
            "batch_size": BATCH_SIZE,
            "learning_rate": LEARNING_RATE,
            "optimizer": "Adam",
            "loss_function": "MSE"
        },
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    
    with open(os.path.join(RESULTS_PATH, "training_summary.json"), "w") as f:
        json.dump(summary, f, indent=2)
    
    print("Training Summary:")
    print(f"  Model: {summary['model_name']}")
    print(f"  Scale Factor: {summary['scale_factor']}x")
    print(f"  Training Time: {summary['training_time']:.2f} seconds")
    print(f"  Epochs: {summary['epochs_trained']}")
    print(f"  Best Validation Loss: {summary['best_val_loss']:.6f}")
    print(f"  Timestamp: {summary['timestamp']}")

save_training_summary()


In [None]:
# Test inference speed
def test_inference_speed():
    """Test inference speed"""
    test_file = sorted(os.listdir(os.path.join(DATASET_PATH, "test", "low_res")))[0]
    lr_path = os.path.join(DATASET_PATH, "test", "low_res", test_file)
    
    lr_img = cv2.imread(lr_path)
    lr_img = cv2.cvtColor(lr_img, cv2.COLOR_BGR2RGB).astype(np.float32) / 255.0
    lr_batch = np.expand_dims(lr_img, axis=0)
    
    # Warm up
    for _ in range(5):
        _ = model.predict(lr_batch, verbose=0)
    
    # Measure inference time
    times = []
    for _ in range(100):
        start_time = time.time()
        _ = model.predict(lr_batch, verbose=0)
        times.append(time.time() - start_time)
    
    avg_time = np.mean(times)
    fps = 1.0 / avg_time
    
    print(f"Inference Speed:")
    print(f"  Average time: {avg_time:.4f} seconds")
    print(f"  FPS: {fps:.2f}")
    
    return avg_time, fps

inference_time, fps = test_inference_speed()