In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import sys
!{sys.executable} -m pip install gdown

# Download dataset dari Google Drive
file_id = '1KP0OjF7MkeOaOGQDdqPx2AuHk8s9stV3'
!gdown --id $file_id --output dataset.zip

In [None]:
import zipfile
import os

zip_file = 'dataset.zip'
extract_dir = '.' # Extract to the current directory

# Unzip the file
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
    zip_ref.extractall(extract_dir)

print(f"'{zip_file}' has been unzipped to '{extract_dir}'")

# Verify the 'DATASET' directory
dataset_path = os.path.join(extract_dir, 'DATASET')
if os.path.exists(dataset_path):
    print(f"'{dataset_path}' directory created successfully.")
    print("Contents of 'DATASET' directory:")
    for root, dirs, files in os.walk(dataset_path):
        level = root.replace(dataset_path, '').count(os.sep)
        indent = ' ' * 4 * (level)
        print(f'{indent}{os.path.basename(root)}/')
        subindent = ' ' * 4 * (level + 1)
        for f in files:
            print(f'{subindent}{f}')
else:
    print(f"Error: '{dataset_path}' directory was not created.")

# Ensemble Model Training: Swin-T + EfficientNetV2-S + ConvNeXt-T

Notebook ini menggunakan **Ensemble Learning** dengan 3 model state-of-the-art:
1. **Swin Transformer Tiny** (Vision Transformer)
2. **EfficientNetV2-S** (CNN Efficient)
3. **ConvNeXt Tiny** (Modern CNN)

Plus **Test Time Augmentation (TTA)** untuk boost akurasi maksimal.

Dataset: Klasifikasi 3 jenis nyamuk
- Aedes aegypti
- Aedes albopictus  
- Culex quinquefasciatus

## 1. Import Libraries

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import json
from IPython.display import display
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models

from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
from sklearn.metrics import precision_score, recall_score, f1_score

print(f"PyTorch: {torch.__version__}")
print(f"Device: {'CUDA' if torch.cuda.is_available() else 'CPU'}")

## 2. Configuration

In [None]:
# Random seed
torch.manual_seed(42)
np.random.seed(42)

# Config
DATA_DIR = 'DATASET'
BATCH_SIZE = 16  # Lebih kecil karena 3 model
NUM_EPOCHS = 50
LEARNING_RATE = 0.0005
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Ensemble config
ENSEMBLE_WEIGHTS = [0.35, 0.35, 0.30]  # Swin, EfficientNet, ConvNeXt
TTA_ENABLE = True
TTA_TRANSFORMS = 5  # Jumlah augmentasi untuk TTA

print("=" * 60)
print("ENSEMBLE CONFIGURATION")
print("=" * 60)
print(f"Models: Swin-T + EfficientNetV2-S + ConvNeXt-T")
print(f"Ensemble Weights: {ENSEMBLE_WEIGHTS}")
print(f"TTA: {'Enabled' if TTA_ENABLE else 'Disabled'} ({TTA_TRANSFORMS} transforms)")
print(f"Batch Size: {BATCH_SIZE}")
print(f"Epochs: {NUM_EPOCHS}")
print(f"LR: {LEARNING_RATE}")
print(f"Device: {DEVICE}")
print("=" * 60)

## 3. Data Preparation

In [None]:
# Transform training dengan augmentasi kuat
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(p=0.3),
    transforms.RandomRotation(20),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Transform validation/test
val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# Load datasets
train_dataset = datasets.ImageFolder(os.path.join(DATA_DIR, 'train'), transform=train_transform)
test_dataset = datasets.ImageFolder(os.path.join(DATA_DIR, 'test'), transform=val_transform)

# Split train -> train + val
train_size = int(0.8 * len(train_dataset))
val_size = len(train_dataset) - train_size
train_subset, val_subset = torch.utils.data.random_split(train_dataset, [train_size, val_size])

# DataLoaders
train_loader = DataLoader(train_subset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = DataLoader(val_subset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

class_names = train_dataset.classes

print(f"\nClasses: {class_names}")
print(f"Train: {len(train_subset)} | Val: {len(val_subset)} | Test: {len(test_dataset)}")

## 4. Build Ensemble Models

In [None]:
num_classes = len(class_names)

# Model 1: Swin Transformer Tiny
print("Loading Swin Transformer Tiny...")
swin_model = models.swin_t(weights='DEFAULT')
for name, param in swin_model.named_parameters():
    if 'features.0' in name or 'features.1' in name or 'features.2' in name:
        param.requires_grad = False
swin_model.head = nn.Sequential(
    nn.Linear(swin_model.head.in_features, 512),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(512, num_classes)
)
swin_model = swin_model.to(DEVICE)

# Model 2: EfficientNetV2-S
print("Loading EfficientNetV2-S...")
efficientnet_model = models.efficientnet_v2_s(weights='DEFAULT')
for param in list(efficientnet_model.parameters())[:-30]:
    param.requires_grad = False
efficientnet_model.classifier = nn.Sequential(
    nn.Dropout(0.3),
    nn.Linear(efficientnet_model.classifier[1].in_features, 512),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(512, num_classes)
)
efficientnet_model = efficientnet_model.to(DEVICE)

# Model 3: ConvNeXt Tiny
print("Loading ConvNeXt Tiny...")
convnext_model = models.convnext_tiny(weights='DEFAULT')
for param in list(convnext_model.parameters())[:-30]:
    param.requires_grad = False
convnext_model.classifier[2] = nn.Sequential(
    nn.Linear(convnext_model.classifier[2].in_features, 512),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(512, num_classes)
)
convnext_model = convnext_model.to(DEVICE)

models_list = [swin_model, efficientnet_model, convnext_model]
model_names = ['Swin-T', 'EfficientNetV2-S', 'ConvNeXt-T']

print("\n" + "=" * 60)
print("ENSEMBLE MODELS LOADED")
print("=" * 60)
for i, (model, name) in enumerate(zip(models_list, model_names)):
    total = sum(p.numel() for p in model.parameters())
    trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"{i+1}. {name:20s} | Total: {total:,} | Trainable: {trainable:,}")
print("=" * 60)

## 5. Training Functions

In [None]:
def train_ensemble_epoch(models, criterion, optimizers, data_loader, device, weights):
    """Train ensemble satu epoch"""
    for model in models:
        model.train()
    
    running_loss = 0.0
    correct = 0
    total = 0
    
    for inputs, labels in data_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        # Forward pass semua model
        ensemble_outputs = []
        for model, optimizer in zip(models, optimizers):
            optimizer.zero_grad()
            outputs = model(inputs)
            ensemble_outputs.append(outputs)
        
        # Weighted average ensemble
        combined_output = sum(w * out for w, out in zip(weights, ensemble_outputs))
        
        # Loss dan backward
        loss = criterion(combined_output, labels)
        loss.backward()
        
        # Update semua optimizer
        for optimizer in optimizers:
            optimizer.step()
        
        running_loss += loss.item()
        _, predicted = torch.max(combined_output.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    return running_loss / len(data_loader), 100 * correct / total

def validate_ensemble(models, criterion, data_loader, device, weights):
    """Validate ensemble"""
    for model in models:
        model.eval()
    
    running_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for inputs, labels in data_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Ensemble prediction
            ensemble_outputs = [model(inputs) for model in models]
            combined_output = sum(w * out for w, out in zip(weights, ensemble_outputs))
            
            loss = criterion(combined_output, labels)
            
            running_loss += loss.item()
            _, predicted = torch.max(combined_output.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    return running_loss / len(data_loader), 100 * correct / total

print("Training functions ready")

## 6. Execute Training

In [None]:
# Loss dan optimizer untuk setiap model
criterion = nn.CrossEntropyLoss()
optimizers = [optim.Adam(model.parameters(), lr=LEARNING_RATE) for model in models_list]
schedulers = [optim.lr_scheduler.ReduceLROnPlateau(opt, 'min', factor=0.5, patience=5) 
              for opt in optimizers]

# History
train_losses, val_losses = [], []
train_accs, val_accs = [], []
best_val_acc = 0.0

print("\n" + "=" * 60)
print("MULAI TRAINING ENSEMBLE")
print("=" * 60)

for epoch in range(NUM_EPOCHS):
    # Train
    train_loss, train_acc = train_ensemble_epoch(
        models_list, criterion, optimizers, train_loader, DEVICE, ENSEMBLE_WEIGHTS
    )
    train_losses.append(train_loss)
    train_accs.append(train_acc)
    
    # Validate
    val_loss, val_acc = validate_ensemble(
        models_list, criterion, val_loader, DEVICE, ENSEMBLE_WEIGHTS
    )
    val_losses.append(val_loss)
    val_accs.append(val_acc)
    
    # Update LR
    for scheduler in schedulers:
        scheduler.step(val_loss)
    
    print(f"Epoch [{epoch+1}/{NUM_EPOCHS}] - "
          f"Loss: {train_loss:.4f}/{val_loss:.4f} | "
          f"Acc: {train_acc:.2f}%/{val_acc:.2f}%")
    
    # Save best
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        for i, (model, name) in enumerate(zip(models_list, model_names)):
            torch.save(model.state_dict(), f'best_{name.lower().replace("-", "_")}.pth')
        print(f"  â†’ Models saved (Val Acc: {val_acc:.2f}%)")

print(f"\nTraining selesai! Best Val Acc: {best_val_acc:.2f}%")

## 7. Test Time Augmentation (TTA) Function

In [None]:
def get_tta_transforms(n=5):
    """Generate TTA transforms"""
    tta_list = [
        # Original
        transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ]),
        # Horizontal flip
        transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.RandomHorizontalFlip(p=1.0),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ]),
        # Vertical flip
        transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.RandomVerticalFlip(p=1.0),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ]),
        # Rotation +10
        transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.RandomRotation((10, 10)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ]),
        # Rotation -10
        transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.RandomRotation((-10, -10)),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ]),
    ]
    return tta_list[:n]

def predict_with_tta(models, image, tta_transforms, weights, device):
    """Predict dengan TTA"""
    all_predictions = []
    
    for transform in tta_transforms:
        img_transformed = transform(image).unsqueeze(0).to(device)
        
        # Ensemble prediction
        ensemble_outputs = []
        for model in models:
            model.eval()
            with torch.no_grad():
                output = model(img_transformed)
                ensemble_outputs.append(output)
        
        # Weighted average
        combined = sum(w * out for w, out in zip(weights, ensemble_outputs))
        all_predictions.append(torch.softmax(combined, dim=1))
    
    # Average semua TTA predictions
    final_pred = torch.mean(torch.stack(all_predictions), dim=0)
    return final_pred

print(f"TTA transforms ready: {TTA_TRANSFORMS} augmentations")

## 8. Evaluate Ensemble + TTA on Test Set

In [None]:
# Load best models
for i, (model, name) in enumerate(zip(models_list, model_names)):
    model.load_state_dict(torch.load(f'best_{name.lower().replace("-", "_")}.pth'))
    model.eval()

print("\n" + "=" * 60)
print("EVALUASI TEST SET DENGAN ENSEMBLE + TTA")
print("=" * 60)

all_preds = []
all_labels = []

if TTA_ENABLE:
    tta_transforms = get_tta_transforms(TTA_TRANSFORMS)
    
    # Evaluasi per image dengan TTA
    test_dataset_pil = datasets.ImageFolder(os.path.join(DATA_DIR, 'test'))
    
    for img, label in tqdm(test_dataset_pil, desc="Testing with TTA"):
        pred = predict_with_tta(models_list, img, tta_transforms, ENSEMBLE_WEIGHTS, DEVICE)
        all_preds.append(torch.argmax(pred, dim=1).item())
        all_labels.append(label)
else:
    # Evaluasi tanpa TTA (standard ensemble)
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(DEVICE)
            ensemble_outputs = [model(inputs) for model in models_list]
            combined = sum(w * out for w, out in zip(ENSEMBLE_WEIGHTS, ensemble_outputs))
            _, predicted = torch.max(combined.data, 1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.numpy())

# Calculate metrics
accuracy = accuracy_score(all_labels, all_preds)
precision = precision_score(all_labels, all_preds, average='weighted')
recall = recall_score(all_labels, all_preds, average='weighted')
f1 = f1_score(all_labels, all_preds, average='weighted')
cm = confusion_matrix(all_labels, all_preds)
report = classification_report(all_labels, all_preds, target_names=class_names, output_dict=True)

print(f"\n{'='*60}")
print("HASIL EVALUASI")
print(f"{'='*60}")
print(f"Test Accuracy:  {accuracy*100:.2f}%")
print(f"Precision:      {precision:.4f}")
print(f"Recall:         {recall:.4f}")
print(f"F1-Score:       {f1:.4f}")
print(f"{'='*60}")

## 9. Visualizations

In [None]:
# Plot training history
fig, axes = plt.subplots(1, 2, figsize=(16, 5))

axes[0].plot(train_losses, 'b-', label='Train Loss', linewidth=2)
axes[0].plot(val_losses, 'r-', label='Val Loss', linewidth=2)
axes[0].set_xlabel('Epoch', fontweight='bold')
axes[0].set_ylabel('Loss', fontweight='bold')
axes[0].set_title('Ensemble Training/Validation Loss', fontweight='bold', fontsize=14)
axes[0].legend()
axes[0].grid(alpha=0.3)

axes[1].plot(train_accs, 'b-', label='Train Accuracy', linewidth=2)
axes[1].plot(val_accs, 'r-', label='Val Accuracy', linewidth=2)
axes[1].set_xlabel('Epoch', fontweight='bold')
axes[1].set_ylabel('Accuracy (%)', fontweight='bold')
axes[1].set_title('Ensemble Training/Validation Accuracy', fontweight='bold', fontsize=14)
axes[1].legend()
axes[1].grid(alpha=0.3)

plt.tight_layout()
plt.savefig('ensemble_training_history.png', dpi=300, bbox_inches='tight')
plt.show()

# Confusion Matrix
fig, axes = plt.subplots(1, 2, figsize=(18, 6))

sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
           xticklabels=class_names, yticklabels=class_names,
           ax=axes[0], annot_kws={'size': 14, 'weight': 'bold'})
axes[0].set_xlabel('Predicted', fontweight='bold', fontsize=12)
axes[0].set_ylabel('True', fontweight='bold', fontsize=12)
axes[0].set_title('Confusion Matrix (Count)', fontweight='bold', fontsize=14)

cm_pct = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100
sns.heatmap(cm_pct, annot=True, fmt='.1f', cmap='Greens',
           xticklabels=class_names, yticklabels=class_names,
           ax=axes[1], annot_kws={'size': 14, 'weight': 'bold'})
axes[1].set_xlabel('Predicted', fontweight='bold', fontsize=12)
axes[1].set_ylabel('True', fontweight='bold', fontsize=12)
axes[1].set_title('Confusion Matrix (%)', fontweight='bold', fontsize=14)

plt.tight_layout()
plt.savefig('ensemble_confusion_matrix.png', dpi=300, bbox_inches='tight')
plt.show()

print("Visualizations saved")

## 10. Detailed Metrics

In [None]:
# TP, TN, FP, FN per class
print("\n" + "=" * 70)
print("DETAIL CONFUSION MATRIX (TP, TN, FP, FN)")
print("=" * 70)

tp_tn_data = []
for i, class_name in enumerate(class_names):
    TP = cm[i, i]
    FP = cm[:, i].sum() - TP
    FN = cm[i, :].sum() - TP
    TN = cm.sum() - (TP + FP + FN)
    
    tp_tn_data.append({
        'Class': class_name,
        'TP': TP,
        'TN': TN,
        'FP': FP,
        'FN': FN
    })
    
    print(f"\n{class_name}:")
    print(f"  TP: {TP} | TN: {TN} | FP: {FP} | FN: {FN}")

tp_tn_df = pd.DataFrame(tp_tn_data)
display(tp_tn_df)

# Metrics table
metrics_data = [{
    'Class': 'Overall',
    'Accuracy': f"{accuracy*100:.2f}%",
    'Precision': f"{precision:.4f}",
    'Recall': f"{recall:.4f}",
    'F1-Score': f"{f1:.4f}",
    'Support': len(all_labels)
}]

for class_name in class_names:
    metrics_data.append({
        'Class': class_name,
        'Accuracy': '-',
        'Precision': f"{report[class_name]['precision']:.4f}",
        'Recall': f"{report[class_name]['recall']:.4f}",
        'F1-Score': f"{report[class_name]['f1-score']:.4f}",
        'Support': int(report[class_name]['support'])
    })

metrics_df = pd.DataFrame(metrics_data)
print("\n" + "=" * 80)
print("METRICS SUMMARY")
print("=" * 80)
display(metrics_df)

# Classification report
print("\n" + "=" * 80)
print("CLASSIFICATION REPORT")
print("=" * 80)
print(classification_report(all_labels, all_preds, target_names=class_names))

## 11. Save Results

In [None]:
results = {
    'ensemble_info': {
        'models': model_names,
        'weights': ENSEMBLE_WEIGHTS,
        'tta_enabled': TTA_ENABLE,
        'tta_transforms': TTA_TRANSFORMS if TTA_ENABLE else 0,
        'num_epochs': NUM_EPOCHS,
        'batch_size': BATCH_SIZE,
        'learning_rate': LEARNING_RATE,
        'device': str(DEVICE)
    },
    'classes': class_names,
    'training_history': {
        'train_losses': [float(x) for x in train_losses],
        'val_losses': [float(x) for x in val_losses],
        'train_accuracies': [float(x) for x in train_accs],
        'val_accuracies': [float(x) for x in val_accs],
        'best_val_accuracy': float(best_val_acc)
    },
    'test_metrics': {
        'accuracy': float(accuracy),
        'precision': float(precision),
        'recall': float(recall),
        'f1_score': float(f1)
    },
    'confusion_matrix': cm.tolist(),
    'classification_report': report
}

with open('ensemble_results.json', 'w') as f:
    json.dump(results, f, indent=4)

print("\n" + "=" * 80)
print("HASIL AKHIR ENSEMBLE + TTA")
print("=" * 80)
print(f"Models: {', '.join(model_names)}")
print(f"Ensemble Weights: {ENSEMBLE_WEIGHTS}")
print(f"TTA: {'Yes' if TTA_ENABLE else 'No'} ({TTA_TRANSFORMS if TTA_ENABLE else 0} augmentations)")
print(f"\nBest Val Acc: {best_val_acc:.2f}%")
print(f"Test Accuracy: {accuracy*100:.2f}%")
print(f"Test Precision: {precision:.4f}")
print(f"Test Recall: {recall:.4f}")
print(f"Test F1-Score: {f1:.4f}")
print("\nFiles generated:")
print("  - best_swin_t.pth")
print("  - best_efficientnetv2_s.pth")
print("  - best_convnext_t.pth")
print("  - ensemble_training_history.png")
print("  - ensemble_confusion_matrix.png")
print("  - ensemble_results.json")
print("=" * 80)