# Loading Dataset

In [30]:
from torch.utils.data import DataLoader, Dataset, Subset
from torchvision.datasets import ImageFolder
from torchvision import transforms
import cv2
import matplotlib.pyplot as plt
import random
import torch

# reproducibility
random.seed(42)
torch.manual_seed(42)

# Data Transformation pipeline
transforms_ = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor()
])
# define data path
data_path = r'/kaggle/input/surface-crack-detection/'
dataset = ImageFolder(data_path, transform = transforms_) # add transformations with (x,z) where z is transformation

#classes = data.classes
#extensions = data.extensions
#images = data.samples


# Split size
total_length = len(dataset)
indices = list(range(total_length))
random.shuffle(indices)

train_size = int(0.70 * total_length)
val_size = int(0.15* total_length)

# Indexes 
train_idx = indices[:train_size]
val_idx = indices[train_size : train_size + val_size]
test_idx = indices[train_size + val_size :]

# Subsets
train_subset = Subset(dataset, train_idx)
val_subset = Subset(dataset, val_idx)
test_subset = Subset(dataset, test_idx)

# Dataloader for shuffle & batch_size
testset = DataLoader(test_subset, shuffle=False, batch_size=32, num_workers=4, pin_memory=True)


# Model Class

In [None]:
!pip install torch.nn

In [23]:
import torch.nn.functional as F
import torch.nn as nn
import torch
import torchvision
import numpy as np
import torch.optim as optim

class CNN(nn.Module):
    def __init__(self, trial = None, out_1 = None, out_2 = None, layers = None):
        super(CNN, self).__init__()
        # search space
        if trial is not None:
            out_1 = trial.suggest_int('out_1',16, 64, step = 8)
            out_2 = trial.suggest_int('out_2', 64, 128, step = 8)
            layers = trial.suggest_int('layers', 64, 128, step =8)

        # params
        self.out_1 = out_1
        self.out_2 = out_2
        self.layers = layers
                                #dropout = trial.suggest_float('dropout', 0.2, 0.5)
        # convo layer - 1
        #out_1 = trial.suggest_int('out_1', 16, 64, step = 8)
        self.conv1 = nn.Conv2d(3, out_1, kernel_size = 3, padding = 1)
        self.bn1 = nn.BatchNorm2d(out_1)
        self.relu1 = nn.ReLU()
        self.max2d_1 = nn.MaxPool2d(kernel_size = 2)
        
        # convo layer - 2
        #out_2 = trial.suggest_int('out_2', 64, 128, step = 8)
        self.conv2 = nn.Conv2d(out_1, out_2, kernel_size = 3, padding = 1)
        self.bn2 = nn.BatchNorm2d(out_2)
        self.relu2 = nn.ReLU()
        self.max2d_2 = nn.MaxPool2d(kernel_size = 2)

        # input dimensions
        dummy = torch.randn(1,3,224, 224)
        with torch.no_grad():
            x = self.relu1(self.bn1(self.conv1(dummy)))
            x = self.max2d_1(x)
            x = self.relu2(self.bn2(self.conv2(x)))
            x = self.max2d_2(x)
            
            n_features = x.view(1, -1).shape[1]

        # Dense layer
        self.fc1 = nn.Linear(n_features, layers)
        self.relu3 = nn.ReLU()
        
        self.fc2 = nn.Linear(layers, 1)

    def forward(self, x):
        x = self.relu1(self.bn1(self.conv1(x)))
        x = self.max2d_1(x)

        x = self.relu2(self.bn2(self.conv2(x)))
        x = self.max2d_2(x)

        x = x.view(x.size(0), -1)

        x = self.relu3(self.fc1(x))
        x = self.fc2(x)
        
        return x
        

In [None]:
import torch_xla
import torch_xla.core.xla_model as xm
import torch_xla.core.xla_model as xm
import torch_xla.distributed.parallel_loader as pl
import torch_xla.distributed.xla_multiprocessing as xmp

device = xm.xla_device()

print(xm.get_xla_supported_devices())
print(f"Using device: {device}")

In [None]:
!pip install optuna

In [13]:
import torch
import torch.nn as nn
import torch.optim as optim
import os
import optuna

# Example CNN class placeholder
def train_one_epoch(model, trainset, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for imgs, labels in trainset:
        imgs, labels = imgs.to(device), labels.to(device)
        labels = labels.float().unsqueeze(1)

        output = model(imgs)
        loss = criterion(output, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * imgs.size(0)
        preds = (torch.sigmoid(output) > 0.5).long()
        correct += (preds == labels.long()).sum().item()
        total += labels.size(0)
    if total == 0:
        return 0.0,0.0
    return running_loss / total, correct / total


def validate(model, valset, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for imgs, labels in valset:
            imgs, labels = imgs.to(device), labels.to(device)
            labels = labels.float().unsqueeze(1)

            output = model(imgs)
            loss = criterion(output, labels)

            running_loss += loss.item() * imgs.size(0)
            preds = (torch.sigmoid(output) > 0.5).long()
            correct += (preds == labels.long()).sum().item()
            total += labels.size(0)
            
    return running_loss / total, correct / total


def objective(trial, trainset, valset):
    criterion = nn.BCEWithLogitsLoss()
    epochs = 10
    patience = 3
    early_stop_counter = 0
    best_val_loss = float('inf')
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Build model from trial
    model = CNN(trial).to(device)

    # Hyperparameter suggestions
    batch_size = trial.suggest_categorical('batch_size', [16, 32])
    lr = trial.suggest_float('lr', 1e-5, 1e-1, log=True)
    optimizer_name = trial.suggest_categorical('optimizers', ["Adam", "SGD", "Adagrad"])
    weight_decay = trial.suggest_float('weight_decay', 1e-6, 1e-2, log=True)

    if optimizer_name == "SGD":
        momentum = trial.suggest_float('momentum', 0.5, 0.99, log=True)
        optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay)
    else:
        optimizer_class = getattr(optim, optimizer_name)
        optimizer = optimizer_class(model.parameters(), lr=lr, weight_decay=weight_decay)
    # data
    train_loader = DataLoader(trainset, shuffle=True, batch_size=batch_size, num_workers=8, pin_memory=True)
    val_loader = DataLoader(valset, shuffle=False, batch_size=batch_size, num_workers=8, pin_memory=True)
    
    # Training loop
    best_state_dict = None
    for epoch in range(epochs):
        train_loss, train_acc = train_one_epoch(model, train_loader, optimizer, criterion, device)
        val_loss, val_acc = validate(model, val_loader, criterion, device)
    
        print(f"Epoch [{epoch + 1}/{epochs}] | Train Loss: {train_loss:.4f} Acc: {train_acc:.4f} | Val Loss: {val_loss:.4f} Acc: {val_acc:.4f}")
    
        trial.report(val_loss, step=epoch)
    
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()
    
        if val_loss < best_val_loss - 1e-4:
            best_val_loss = val_loss
            best_train_loss = train_loss
            best_train_acc = train_acc
            best_val_acc = val_acc
            early_stop_counter = 0
            best_state_dict = model.state_dict()  # Keep best model in memory
        else:
            early_stop_counter += 1
            if early_stop_counter >= patience:
                print("Early stopping triggered.")
                break

    # Save best model once at the end
    if best_state_dict:
        os.makedirs("models", exist_ok=True)
        model_path = f"models/best_model_trial_{trial.number}.pt"
        torch.save(best_state_dict, model_path)
        trial.set_user_attr('model_path', model_path)
    
    trial.set_user_attr("train_loss", best_train_loss)
    trial.set_user_attr("train_acc", best_train_acc)
    trial.set_user_attr("val_loss", best_val_loss)
    trial.set_user_attr("val_acc", best_val_acc)

    return best_val_loss

# Training and Validation

In [16]:
import optuna

def objective_wrapper(trainset, valset):
    def wrapped(trial):
        return objective(trial, trainset, valset)
    return wrapped

study = optuna.create_study(direction = 'minimize', pruner = optuna.pruners.MedianPruner(n_warmup_steps = 3))
study.optimize(objective_wrapper(train_subset, val_subset), n_trials = 10, show_progress_bar = True)

[I 2025-06-12 04:24:47,663] A new study created in memory with name: no-name-308bc5fb-4680-4c4e-bdcb-38ae67968e6c


  0%|          | 0/10 [00:00<?, ?it/s]

Epoch [1/10] | Train Loss: 0.4184 Acc: 0.9365 | Val Loss: 0.0818 Acc: 0.9670
Epoch [2/10] | Train Loss: 0.1187 Acc: 0.9615 | Val Loss: 0.0686 Acc: 0.9823
Epoch [3/10] | Train Loss: 0.0975 Acc: 0.9759 | Val Loss: 0.0291 Acc: 0.9925
Epoch [4/10] | Train Loss: 0.0500 Acc: 0.9875 | Val Loss: 0.0283 Acc: 0.9915
Epoch [5/10] | Train Loss: 0.0395 Acc: 0.9899 | Val Loss: 0.0339 Acc: 0.9888
Epoch [6/10] | Train Loss: 0.0330 Acc: 0.9915 | Val Loss: 0.0404 Acc: 0.9898
Epoch [7/10] | Train Loss: 0.0267 Acc: 0.9926 | Val Loss: 0.0346 Acc: 0.9917
Early stopping triggered.
[I 2025-06-12 04:34:29,754] Trial 0 finished with value: 0.028286033366496363 and parameters: {'out_1': 64, 'out_2': 104, 'layers': 120, 'batch_size': 16, 'lr': 0.0022682026914930455, 'optimizers': 'Adam', 'weight_decay': 0.0002809753042472424}. Best is trial 0 with value: 0.028286033366496363.
Epoch [1/10] | Train Loss: 1.8994 Acc: 0.9679 | Val Loss: 0.4472 Acc: 0.9800
Epoch [2/10] | Train Loss: 0.2296 Acc: 0.9842 | Val Loss: 0.36

KeyboardInterrupt: 

# Loading Saved Model

In [31]:
import torch
from sklearn.metrics import classification_report

# Load the saved model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = CNN(                                   # Recreate your model architecture
    trial=None,
    out_1=best_params['out_1'],
    out_2=best_params['out_2'],
    layers=best_params['layers']
)
model.load_state_dict(torch.load("models/best_model_trial_2.pt"))
model.eval()
model.to(device)

# Evaluate on test set
all_preds = []
all_labels = []

with torch.no_grad():
    for inputs, labels in testset:
        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)
        probs = torch.sigmoid(outputs)  # If you're using BCEWithLogitsLoss

        preds = (probs > 0.5).long()

        all_preds.append(preds.cpu())
        all_labels.append(labels.cpu())

# Test Scores

In [32]:
# Stack all predictions and labels
all_preds = torch.cat(all_preds)
all_labels = torch.cat(all_labels)

# Classification report
print(classification_report(all_labels.numpy(), all_preds.numpy()))


              precision    recall  f1-score   support

           0       1.00      0.99      0.99      2977
           1       0.99      1.00      0.99      3023

    accuracy                           0.99      6000
   macro avg       0.99      0.99      0.99      6000
weighted avg       0.99      0.99      0.99      6000



# Saving samples for test - Streamlit.


In [40]:
X_test = []
y_test = []

for i in range(32):
    features, label = test_subset[i]  # testset = Dataset, not DataLoader
    X_test.append(features.numpy())  # No squeeze needed
    y_test.append(label)  # If label is scalar

np.savez("test_samples.npz", X=X_test, y=y_test, allow_pickle = True)


In [39]:
print(f"features.shape: {features.shape}, labels.shape: {labels.shape}")


features.shape: torch.Size([3, 224, 224]), labels.shape: torch.Size([32])
