In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
import torchvision.transforms as transforms
from torch.optim import lr_scheduler
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import os
import zipfile
from sklearn.metrics import classification_report, confusion_matrix
import seaborn as sns
import pandas as pd
import time
import copy
from tqdm.auto import tqdm
import warnings
warnings.filterwarnings('ignore')

In [None]:
def set_seed(seed=42):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    print(f"✓ Random seed: {seed}")

SEED = 42
set_seed(SEED)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}')


✓ Random seed: 42
Device: cuda


In [None]:
zip_path = '/content/neu-surface.zip'
extract_path = '/content/neu_dataset'

if not os.path.exists(extract_path):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_path)

dataset_base = os.path.join(extract_path, 'NEU-DET')

class NEUSurfaceDefectDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

def load_dataset_from_folders(base_path, split='train'):
    image_paths = []
    labels = []
    split_path = os.path.join(base_path, split, 'images')

    if not os.path.exists(split_path):
        return None, None, None

    class_folders = sorted([d for d in os.listdir(split_path)
                           if os.path.isdir(os.path.join(split_path, d))])

    class_names = class_folders
    class_to_idx = {class_name: idx for idx, class_name in enumerate(class_names)}

    for class_name in class_folders:
        class_path = os.path.join(split_path, class_name)
        images = [f for f in os.listdir(class_path)
                 if f.endswith(('.jpg', '.png', '.bmp'))]

        for img_file in images:
            img_path = os.path.join(class_path, img_file)
            image_paths.append(img_path)
            labels.append(class_to_idx[class_name])

    return image_paths, labels, class_names

train_paths, train_labels, class_names = load_dataset_from_folders(dataset_base, 'train')
val_paths, val_labels, _ = load_dataset_from_folders(dataset_base, 'validation')

print(f"Classes: {class_names}")
print(f"Train: {len(train_paths)} | Val: {len(val_paths)}")

Classes: ['crazing', 'inclusion', 'patches', 'pitted_surface', 'rolled-in_scale', 'scratches']
Train: 1440 | Val: 360


In [None]:
IMG_SIZE = 224

train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.RandomRotation(degrees=30),
    transforms.RandomPerspective(distortion_scale=0.3, p=0.5),
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.2, hue=0.1),
    transforms.RandomGrayscale(p=0.2),
    transforms.ToTensor(),
    transforms.RandomErasing(p=0.3, scale=(0.02, 0.15)),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
def create_weighted_sampler(labels, class_names):

    class_counts = {}
    for label in labels:
        class_counts[label] = class_counts.get(label, 0) + 1

    weights = []
    for label in labels:
        class_name = class_names[label]

        # INCLUSION ve PITTED_SURFACE için 2x weight
        if class_name in ['inclusion', 'pitted_surface']:
            weight = 2.0 / class_counts[label]
        else:
            weight = 1.0 / class_counts[label]

        weights.append(weight)

    sampler = WeightedRandomSampler(
        weights=weights,
        num_samples=len(weights),
        replacement=True
    )

    return sampler

# Weighted sampler oluştur
train_sampler = create_weighted_sampler(train_labels, class_names)

print("\n✓ WEIGHTED SAMPLING ACTIVE")
print("  - INCLUSION: 2x more frequent")
print("  - PITTED_SURFACE: 2x more frequent")


✓ WEIGHTED SAMPLING ACTIVE
  - INCLUSION: 2x more frequent
  - PITTED_SURFACE: 2x more frequent


In [None]:
train_dataset = NEUSurfaceDefectDataset(
    image_paths=train_paths,
    labels=train_labels,
    transform=train_transform
)

val_dataset = NEUSurfaceDefectDataset(
    image_paths=val_paths,
    labels=val_labels,
    transform=val_transform
)

BATCH_SIZE = 32
NUM_WORKERS = 0

# ÖNEMLI: sampler kullanıldığında shuffle=False olmalı
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    sampler=train_sampler,  # ← shuffle yerine sampler
    num_workers=NUM_WORKERS,
    pin_memory=True if torch.cuda.is_available() else False,
    persistent_workers=False
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    pin_memory=True if torch.cuda.is_available() else False,
    persistent_workers=False
)

print(f"\n✓ Train batches: {len(train_loader)}")
print(f"✓ Val batches: {len(val_loader)}")



✓ Train batches: 45
✓ Val batches: 12


In [None]:

class Create_cnn_module(nn.Module):

    def __init__(self, num_classes=6):
        super(Create_cnn_module, self).__init__()

        # Convolutional layers
        self.features = nn.Sequential(
            # Block 1
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),

            # Block 2
            nn.Conv2d(64, 128, kernel_size=5, padding=2),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),

            # Block 3
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),

            # Block 4
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
        )

        # Adaptive pooling
        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))

        # Classifier
        self.classifier = nn.Sequential(
            nn.Dropout(p=0.4),
            nn.Linear(256 * 7 * 7, 1024),
            nn.ReLU(inplace=True),

            nn.Dropout(p=0.4),
            nn.Linear(1024, 512),
            nn.ReLU(inplace=True),

            nn.Linear(512, num_classes)
        )

        self._initialize_weights()

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

In [None]:
def train_one_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    running_corrects = 0
    total_samples = 0

    pbar = tqdm(dataloader, desc='Training', leave=False)

    for inputs, labels in pbar:
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)

        loss.backward()

        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        total_samples += inputs.size(0)

        pbar.set_postfix({
            'loss': f'{loss.item():.4f}',
            'acc': f'{running_corrects.double() / total_samples:.4f}'
        })

    epoch_loss = running_loss / total_samples
    epoch_acc = running_corrects.double() / total_samples

    return epoch_loss, epoch_acc.item()

def validate(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    total_samples = 0

    all_preds = []
    all_labels = []

    pbar = tqdm(dataloader, desc='Validation', leave=False)

    with torch.no_grad():
        for inputs, labels in pbar:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            total_samples += inputs.size(0)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

            pbar.set_postfix({
                'loss': f'{loss.item():.4f}',
                'acc': f'{running_corrects.double() / total_samples:.4f}'
            })

    epoch_loss = running_loss / total_samples
    epoch_acc = running_corrects.double() / total_samples

    return epoch_loss, epoch_acc.item(), all_preds, all_labels

def train_model(model, train_loader, val_loader, criterion, optimizer,
                scheduler, num_epochs, device, early_stopping_patience=15):

    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    best_loss = float('inf')
    epochs_no_improve = 0

    history = {
        'train_loss': [],
        'train_acc': [],
        'val_loss': [],
        'val_acc': []
    }

    for epoch in range(num_epochs):
        print(f'\nEpoch {epoch+1}/{num_epochs}')
        print('-' * 70)

        train_loss, train_acc = train_one_epoch(
            model, train_loader, criterion, optimizer, device
        )

        val_loss, val_acc, _, _ = validate(
            model, val_loader, criterion, device
        )

        scheduler.step(val_loss)

        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)

        print(f'Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}')
        print(f'Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}')
        current_lr = optimizer.param_groups[0]['lr']
        print(f'LR: {current_lr:.6f}')

        if val_acc > best_acc:
            best_acc = val_acc
            best_loss = val_loss
            best_model_wts = copy.deepcopy(model.state_dict())
            epochs_no_improve = 0
            print(f'✓ Best model updated! (Val Acc: {val_acc:.4f})')
        else:
            epochs_no_improve += 1
            print(f'⚠ {epochs_no_improve}/{early_stopping_patience} - No improvement')

        if epochs_no_improve >= early_stopping_patience:
            print(f'\nEarly Stopping at epoch {epoch+1}')
            break

    time_elapsed = time.time() - since
    print(f'\nTraining completed: {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best Val Accuracy: {best_acc:.4f}')
    print(f'Best Val Loss: {best_loss:.4f}')

    model.load_state_dict(best_model_wts)
    return model, history


In [None]:
class FocalLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, label_smoothing=0.1):
        super(FocalLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma
        self.label_smoothing = label_smoothing

    def forward(self, inputs, targets):
        ce_loss = nn.functional.cross_entropy(
            inputs, targets, reduction='none',
            label_smoothing=self.label_smoothing
        )
        pt = torch.exp(-ce_loss)
        focal_loss = self.alpha * (1-pt)**self.gamma * ce_loss
        return focal_loss.mean()

criterion = FocalLoss(alpha=1, gamma=2, label_smoothing=0.1)

LEARNING_RATE = 0.0001
WEIGHT_DECAY = 5e-4

optimizer = optim.AdamW(
    model.parameters(),
    lr=LEARNING_RATE,
    weight_decay=WEIGHT_DECAY
)

scheduler = lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=0.5,
    patience=5,
    min_lr=1e-7
)

print(f"\n✓ Loss: FocalLoss (gamma=2, smoothing=0.1)")
print(f"✓ Optimizer: AdamW (lr={LEARNING_RATE}, wd={WEIGHT_DECAY})")


✓ Loss: FocalLoss (gamma=2, smoothing=0.1)
✓ Optimizer: AdamW (lr=0.0001, wd=0.0005)


In [None]:
NUM_EPOCHS = 50
EARLY_STOPPING_PATIENCE = 10

print(f"\n{'='*70}")
print("TRAINING STARTED")
print(f"{'='*70}")

trained_model, history = train_model(
    model=model,
    train_loader=train_loader,
    val_loader=val_loader,
    criterion=criterion,
    optimizer=optimizer,
    scheduler=scheduler,
    num_epochs=NUM_EPOCHS,
    device=device,
    early_stopping_patience=EARLY_STOPPING_PATIENCE
)


In [None]:
val_loss, val_acc, val_preds, val_labels = validate(
    trained_model, val_loader, criterion, device
)

print(f"\n{'='*70}")
print("FINAL RESULTS")
print(f"{'='*70}")
print(f"Final Val Accuracy: {val_acc:.4f} ({val_acc*100:.2f}%)")
print(f"Final Val Loss: {val_loss:.4f}")

# Confusion Matrix
cm = confusion_matrix(val_labels, val_preds)
print(f"\n{'='*70}")
print("CONFUSION MATRIX")
print(f"{'='*70}")
print(cm)

# Classification Report
print(f"\n{'='*70}")
print("CLASSIFICATION REPORT")
print(f"{'='*70}")
report = classification_report(
    val_labels,
    val_preds,
    target_names=[c.upper() for c in class_names],
    digits=4
)
print(report)

# Per-class accuracy
per_class_acc = cm.diagonal() / cm.sum(axis=1)
print(f"\n{'='*70}")
print("PER-CLASS ACCURACY")
print(f"{'='*70}")
for i, class_name in enumerate(class_names):
    print(f"{class_name.upper()}: {per_class_acc[i]:.4f} ({per_class_acc[i]*100:.2f}%)")

print(f"\n{'='*70}")
print("✓✓✓ TRAINING COMPLETED SUCCESSFULLY ✓✓✓")
print(f"{'='*70}")

Validation:   0%|          | 0/12 [00:00<?, ?it/s]


FINAL RESULTS
Final Val Accuracy: 0.9639 (96.39%)
Final Val Loss: 0.1280

CONFUSION MATRIX
[[60  0  0  0  0  0]
 [ 0 52  0  8  0  0]
 [ 1  0 59  0  0  0]
 [ 0  1  1 58  0  0]
 [ 0  0  0  0 60  0]
 [ 0  0  1  0  1 58]]

CLASSIFICATION REPORT
                 precision    recall  f1-score   support

        CRAZING     0.9836    1.0000    0.9917        60
      INCLUSION     0.9811    0.8667    0.9204        60
        PATCHES     0.9672    0.9833    0.9752        60
 PITTED_SURFACE     0.8788    0.9667    0.9206        60
ROLLED-IN_SCALE     0.9836    1.0000    0.9917        60
      SCRATCHES     1.0000    0.9667    0.9831        60

       accuracy                         0.9639       360
      macro avg     0.9657    0.9639    0.9638       360
   weighted avg     0.9657    0.9639    0.9638       360


PER-CLASS ACCURACY
CRAZING: 1.0000 (100.00%)
INCLUSION: 0.8667 (86.67%)
PATCHES: 0.9833 (98.33%)
PITTED_SURFACE: 0.9667 (96.67%)
ROLLED-IN_SCALE: 1.0000 (100.00%)
SCRATCHES: 0.9667 (96