# Specific Test Gravitational Lens Finding

In [1]:
import gdown

gdown.download(id="1doUhVoq1-c9pamZVLpvjW1YRDMkKO1Q5")

!unzip lens-finding-test.zip -d ./lens_dataset

Downloading...
From (original): https://drive.google.com/uc?id=1doUhVoq1-c9pamZVLpvjW1YRDMkKO1Q5
From (redirected): https://drive.google.com/uc?id=1doUhVoq1-c9pamZVLpvjW1YRDMkKO1Q5&confirm=t&uuid=a304613f-367b-432c-97fc-ede7196664cd
To: /content/lens-finding-test.zip
100%|██████████| 2.11G/2.11G [00:54<00:00, 38.7MB/s]


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: ./lens_dataset/test_nonlenses/17760.npy  
  inflating: ./lens_dataset/__MACOSX/test_nonlenses/._17760.npy  
  inflating: ./lens_dataset/test_nonlenses/18453.npy  
  inflating: ./lens_dataset/__MACOSX/test_nonlenses/._18453.npy  
  inflating: ./lens_dataset/test_nonlenses/5227.npy  
  inflating: ./lens_dataset/__MACOSX/test_nonlenses/._5227.npy  
  inflating: ./lens_dataset/test_nonlenses/4139.npy  
  inflating: ./lens_dataset/__MACOSX/test_nonlenses/._4139.npy  
  inflating: ./lens_dataset/test_nonlenses/3656.npy  
  inflating: ./lens_dataset/__MACOSX/test_nonlenses/._3656.npy  
  inflating: ./lens_dataset/test_nonlenses/11311.npy  
  inflating: ./lens_dataset/__MACOSX/test_nonlenses/._11311.npy  
  inflating: ./lens_dataset/test_nonlenses/3130.npy  
  inflating: ./lens_dataset/__MACOSX/test_nonlenses/._3130.npy  
  inflating: ./lens_dataset/test_nonlenses/11477.npy  
  inflating: ./lens_dataset/__MACOSX/test

## Image Dataset Loading  


The dataset is highly imbalanced, with much fewer lens images. To address this, I apply oversampling using data augmentation.  

**Augmentations applied:**  
- Vertical flip  
- Horizontal flip  
- Combined vertical & horizontal flip  
- 90° rotation  
- 270° rotation  

In [2]:
import os
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler

# Check for CUDA
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Custom Dataset Class with Flipping Augmentation for Lenses
class LensingDataset(Dataset):
    def __init__(self, data_dir, data_type="train", transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.data = []
        self.labels = []

        # Mapping based on data_type (train or test)
        mapping = {
            f"{data_type}_nonlenses": 0,
            f"{data_type}_lenses": 1
        }

        for class_name, label in mapping.items():
            class_dir = os.path.join(data_dir, class_name)
            if not os.path.exists(class_dir):
                print(f"Warning: Directory {class_dir} not found, skipping.")
                continue
            for img_file in os.listdir(class_dir):
                img_path = os.path.join(class_dir, img_file)
                img = np.load(img_path)

                # Add the original image
                self.data.append(img)
                self.labels.append(label)

                # If this is a "lenses" image (label 1), add augmented versions
                if label == 1 and data_type == 'train':
                    # Vertical flip (flip along the height axis)
                    img_vflip = np.fliplr(img).copy()
                    self.data.append(img_vflip)
                    self.labels.append(label)

                    # Horizontal flip (flip along the width axis)
                    img_hflip = np.flipud(img).copy()
                    self.data.append(img_hflip)
                    self.labels.append(label)

                    img_mflip = np.flipud(np.fliplr(img)).copy()
                    self.data.append(img_mflip)
                    self.labels.append(label)

                    # 2. Rotation Augmentations (90°, 270°)
                    img_rot90 = np.rot90(img, k=1, axes=(1, 2)).copy()  # 90° clockwise
                    self.data.append(img_rot90)
                    self.labels.append(label)

                    img_rot270 = np.rot90(img, k=3, axes=(1, 2)).copy()  # 270° clockwise
                    self.data.append(img_rot270)
                    self.labels.append(label)

        if len(self.data) == 0:
            raise ValueError(f"No data found for {data_type} in {data_dir}, check folder structure.")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img = self.data[idx]
        label = self.labels[idx]
        img = torch.tensor(img, dtype=torch.float32)
        img = img.to(device)
        label = torch.tensor(label, dtype=torch.long).to(device)
        label = label.to(device)
        return img, label

The dataset is structured into train and test folders. The train set is further split into 90% training and 10% validation to evaluate model performance before testing. This ensures proper generalization assessment and prevents overfitting.

In [3]:
# Load and split
data_path = "./lens_dataset"
full_train_dataset = LensingDataset(data_path, data_type="train")
test_dataset = LensingDataset(data_path, data_type="test")

# Simple random split for train/val
n_train = int(len(full_train_dataset) * 0.9)  # 90% train
indices = torch.randperm(len(full_train_dataset)).tolist()
train_sampler = SubsetRandomSampler(indices[:n_train])
val_sampler = SubsetRandomSampler(indices[n_train:])

# DataLoaders
train_loader = DataLoader(full_train_dataset, batch_size=32, sampler=train_sampler)
val_loader = DataLoader(full_train_dataset, batch_size=128, sampler=val_sampler)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

# Verify sizes
print(f"Train size: {len(train_sampler)}")
print(f"Val size: {len(val_sampler)}")
print(f"Test size: {len(test_dataset)}")

# Optional: Print class distribution to verify augmentation
train_labels = [full_train_dataset.labels[i] for i in indices[:n_train]]
val_labels = [full_train_dataset.labels[i] for i in indices[n_train:]]
test_labels = test_dataset.labels

print(f"Train class distribution: {np.bincount(train_labels)}")
print(f"Val class distribution: {np.bincount(val_labels)}")
print(f"Test class distribution: {np.bincount(test_labels)}")

Train size: 35149
Val size: 3906
Test size: 19650
Train class distribution: [25748  9401]
Val class distribution: [2927  979]
Test class distribution: [19455   195]


## Model Architecture

The CNN model has three main parts:  

1. **Stem Layer**: A `16 Channel` Conv2D (`3x3 Filter, stride=2`), BatchNorm, PReLU.  
2. **Feature Extraction**: Two **Basic CNN Blocks** (`16→32, 32→64`), where I opted to use **Depthwise Separable Convolutions** for efficiency, followed by BatchNorm, PReLU, and Dropout (0.5).  
3. **Classification Head**: A `1x1` Conv layer (`64→128`), followed by Adaptive Average Pooling. The output is pooled along the channel dimension, then passed through Dropout (`0.3`) and fully connected layers (`128→32→2`) for classification. <br>


**Other Features**
Dropout: Applied at different stages to reduce overfitting.

PReLU: Used instead of ReLU for better performance adaptability.

In [4]:
import torch.optim as optim
import torch.nn as nn
import torch
from torch.utils.data import DataLoader
import torch.nn.functional as F

class BasicBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride):
        super(BasicBlock, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(in_channels, in_channels, kernel_size=3, stride=stride, padding=1, groups=in_channels, bias=False),
            nn.BatchNorm2d(in_channels),
            nn.PReLU(in_channels),
            nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False),
            nn.BatchNorm2d(out_channels),
        )
        self.prelu = nn.PReLU(out_channels)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.conv(x)
        x = self.prelu(x)
        x = self.dropout(x)
        return x

class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.stem = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(16),
            nn.PReLU(16)
        )
        self.features = nn.Sequential(
            BasicBlock(16, 32, stride=2),
            BasicBlock(32, 64, stride=2),
        )
        self.head = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=1, bias=False),
            nn.BatchNorm2d(128),
            nn.PReLU(128),
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Dropout(0.3),
            nn.Linear(128, 32),
            nn.Linear(32, 2)
        )

    def forward(self, x):
        x = self.stem(x)
        x = self.features(x)
        x = self.head(x)
        return x

## Model Training

To handle class imbalance, I used Focal Loss, which helps the model focus on hard samples by reducing the impact of easy examples.<br><br>
Alpha (α = 4.0): Balances class importance. Higher values give more weight to the minority class.<br>
Gamma (γ = 3.0): Controls focus on hard samples. Higher values down-weight easy samples more aggressively.

In [5]:
def focal_loss(outputs, labels, alpha=4.0, gamma=3.0):
    ce_loss = F.cross_entropy(outputs, labels, reduction='none')
    pt = torch.exp(-ce_loss)
    focal_loss = alpha * (1 - pt) ** gamma * ce_loss
    return focal_loss

The training pipeline uses **Adam** (`lr=0.001`) as the optimiser. It tracks training and validation loss, accuracy, precision, recall, and F1-score, ensuring detailed performance evaluation.<br>

**Early stopping** is implemented to prevent overfitting by monitoring validation loss and stopping training if no improvement is observed for **20 consecutive epochs** (patience). <br>

The best model weights are saved based on the lowest validation loss. Finally, the trained model is stored as `best_model.pth`.

In [6]:
import torch.optim as optim
import torch.nn as nn
import torch
from sklearn.metrics import precision_recall_fscore_support
import copy

def train_model(model, train_loader, val_loader, criterion, optimizer, epochs=30, device='cuda',
                patience=5, min_delta=0.001):
    model.to(device)

    # Early stopping variables
    best_val_loss = float('inf')
    best_model_weights = copy.deepcopy(model.state_dict())
    early_stop_counter = 0

    for epoch in range(epochs):
        # Training phase
        model.train()
        total_train_loss = 0.0
        train_preds = []
        train_labels = []

        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            train_preds.extend(predicted.cpu().numpy())
            train_labels.extend(labels.cpu().numpy())

        train_loss = total_train_loss / len(train_loader)

        # Calculate training metrics
        train_prec, train_rec, train_f1, _ = precision_recall_fscore_support(
            train_labels, train_preds, average='weighted', zero_division=0
        )
        train_acc = sum(p == l for p, l in zip(train_preds, train_labels)) / len(train_labels)

        # Validation phase
        model.eval()
        total_val_loss = 0.0
        val_preds = []
        val_labels = []

        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                total_val_loss += loss.item()

                _, predicted = torch.max(outputs, 1)
                val_preds.extend(predicted.cpu().numpy())
                val_labels.extend(labels.cpu().numpy())

        val_loss = total_val_loss / len(val_loader)

        # Calculate validation metrics
        val_prec, val_rec, val_f1, _ = precision_recall_fscore_support(
            val_labels, val_preds, average='weighted', zero_division=0
        )
        val_acc = sum(p == l for p, l in zip(val_preds, val_labels)) / len(val_labels)

        # Calculate per-class metrics for validation
        per_class_metrics = precision_recall_fscore_support(
            val_labels, val_preds, average=None, zero_division=0
        )

        # Print detailed metrics
        print(f"\nEpoch {epoch+1}/{epochs}")
        print(f"Train - Loss: {train_loss:.4f}, Acc: {train_acc:.4f}")
        print(f"Val   - Loss: {val_loss:.4f}, Acc: {val_acc:.4f}")
        print(f"Class 1 - Prec: {per_class_metrics[0][1]:.4f}, "
              f"Rec: {per_class_metrics[1][1]:.4f}, F1: {per_class_metrics[2][1]:.4f}")


        # Early stopping and best model saving
        if val_loss < best_val_loss - min_delta:
            best_val_loss = val_loss
            best_model_weights = copy.deepcopy(model.state_dict())
            early_stop_counter = 0
            print(f"New best model saved with val_loss: {best_val_loss:.4f}")
        else:
            early_stop_counter += 1
            print(f"Early stopping counter: {early_stop_counter}/{patience}")

        if early_stop_counter >= patience:
            print("Early stopping triggered!")
            break

    # Load best weights
    model.load_state_dict(best_model_weights)
    print(f"Training completed. Best validation loss: {best_val_loss:.4f}")
    return model

model = CNNModel().to('cuda')

criterion  = focal_loss

optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train with LR annealing and early stopping
trained_model = train_model(
    model,
    train_loader,
    val_loader,
    criterion,
    optimizer,
    epochs=100,
    device='cuda',
    patience=20,  # Stop if no improvement for 5 epochs
    min_delta=0.0001  # Minimum improvement required
)

torch.save(trained_model.state_dict(), 'Test_II_Model.pth')


Epoch 1/100
Train - Loss: 44.2328, Acc: 0.8406
Val   - Loss: 138.1152, Acc: 0.8789
Class 1 - Prec: 0.7196, Rec: 0.8468, F1: 0.7780
New best model saved with val_loss: 138.1152

Epoch 2/100
Train - Loss: 34.5847, Acc: 0.8868
Val   - Loss: 122.7977, Acc: 0.9063
Class 1 - Prec: 0.7739, Rec: 0.8846, F1: 0.8255
New best model saved with val_loss: 122.7977

Epoch 3/100
Train - Loss: 30.5803, Acc: 0.9043
Val   - Loss: 106.0034, Acc: 0.9219
Class 1 - Prec: 0.8317, Rec: 0.8631, F1: 0.8471
New best model saved with val_loss: 106.0034

Epoch 4/100
Train - Loss: 28.5601, Acc: 0.9101
Val   - Loss: 97.4947, Acc: 0.9237
Class 1 - Prec: 0.8485, Rec: 0.8468, F1: 0.8476
New best model saved with val_loss: 97.4947

Epoch 5/100
Train - Loss: 26.6376, Acc: 0.9169
Val   - Loss: 95.4101, Acc: 0.9240
Class 1 - Prec: 0.9220, Rec: 0.7610, F1: 0.8338
New best model saved with val_loss: 95.4101

Epoch 6/100
Train - Loss: 26.0224, Acc: 0.9178
Val   - Loss: 93.4997, Acc: 0.9286
Class 1 - Prec: 0.8365, Rec: 0.8887,

## Model Evaluation

In [9]:
import numpy as np
import plotly.graph_objs as go
import plotly.figure_factory as ff
from sklearn.metrics import roc_curve, auc
import torch

def plot_roc_curve(y_true, y_pred_proba):
    """
    Plot ROC curve and calculate AUC score using Plotly

    Parameters:
    y_true (array-like): True binary labels
    y_pred_proba (array-like): Predicted probabilities for positive class

    Returns:
    plotly Figure object
    """
    # Compute ROC curve and ROC area
    fpr, tpr, _ = roc_curve(y_true, y_pred_proba)
    roc_auc = auc(fpr, tpr)

    # Create Plotly figure
    fig = go.Figure()

    # ROC Curve
    fig.add_trace(go.Scatter(
        x=fpr,
        y=tpr,
        mode='lines',
        name=f'ROC Curve (AUC = {roc_auc:.2f})',
        line=dict(color='darkorange', width=2)
    ))

    # Diagonal line
    fig.add_trace(go.Scatter(
        x=[0, 1],
        y=[0, 1],
        mode='lines',
        name='Random Classifier',
        line=dict(color='navy', dash='dash')
    ))

    # Update layout
    fig.update_layout(
        title='Receiver Operating Characteristic (ROC) Curve',
        xaxis_title='False Positive Rate',
        yaxis_title='True Positive Rate',
        width=800,
        height=600
    )

    return fig

# Example usage function
def evaluate_model(model, test_loader, device):
    """
    Evaluate model and generate visualizations

    Parameters:
    model (torch.nn.Module): Trained PyTorch model
    test_loader (DataLoader): Test data loader
    device (torch.device): Computing device

    Returns:
    Tuple of ROC curve figure, Confusion matrix figure, and metrics
    """
    model.eval()
    y_true, y_pred_proba, y_pred = [], [], []

    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)

            outputs = torch.sigmoid(model(images))
            y_true.extend(labels.cpu().numpy())
            y_pred_proba.extend(outputs.cpu().numpy())
            y_pred.extend((outputs > 0.5).int().cpu().numpy())

    # Convert to numpy arrays
    y_true = np.array(y_true)
    y_pred_proba = np.array(y_pred_proba)[:, 1]
    y_pred = np.array(y_pred)[:,1]

    # Generate plots
    roc_fig = plot_roc_curve(y_true, y_pred_proba)

    return roc_fig, {
        'AUC': auc(*(roc_curve(y_true, y_pred_proba)[:2])),
        'Accuracy': np.mean(y_true == y_pred)
    }

roc_figure, metrics = evaluate_model(model, test_loader, device)
print(metrics)
roc_figure.show()


{'AUC': 0.9628338901739056, 'Accuracy': 0.9721628498727736}
