In [None]:
import numpy as np
import pandas as pd 

import os
for dirname, _, filenames in os.walk('./archive'):
    print(dirname)


In [None]:
import torch
import torchvision.transforms as transforms
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import segmentation_models_pytorch as smp
from pathlib import Path


# --- Configuration ---
ENCODER='resnext101_32x16d'
ENCODER_WEIGHTS = 'instagram' 
N_CLASSES = 1
ACTIVATION = 'sigmoid' 
DATA_DIR = Path("/kaggle/input/breast-ultrasound-images-dataset/Dataset_BUSI_with_GT/")
INPUT_SIZE = (256, 256)
DEVICE= 'cuda' if torch.cuda.is_available() else 'cpu'
DEVICE

In [None]:
imagenet_mean = [0.485, 0.456, 0.406]
imagenet_std = [0.229, 0.224, 0.225]
    
transform=transforms.Compose([
    transforms.Resize(INPUT_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=imagenet_mean,std=imagenet_std)
])
def image_tensor(img_path):
    img_pil = Image.open(img_path).convert("RGB")
    return transform(img_pil).unsqueeze(0) # shape [1, C, H, W]

mask_transform = transforms.Compose([
    transforms.Resize(INPUT_SIZE),
    transforms.ToTensor()  # Converts to [0, 1] float tensor
])
def mask_tensor(mask_path):
    mask_pil = Image.open(mask_path).convert('L')  # grayscale
    mask = mask_transform(mask_pil)
    mask = (mask > 0.5).float()
    return mask.unsqueeze(0)


def dice_loss(pred_mask, actual_mask, smooth=1e-6):
    pred_flat = pred_mask.view(-1)
    actual_flat = actual_mask.view(-1)
    
    intersection = (pred_flat * actual_flat).sum()
    dice = (2. * intersection + smooth) / (pred_flat.sum() + actual_flat.sum() + smooth)
    
    return 1 - dice

In [None]:
IMAGE_PATH=DATA_DIR/"benign/benign (10).png"
img_pil = Image.open(IMAGE_PATH).convert("RGB")

MASK_PATH=DATA_DIR/"benign/benign (10)_mask.png"
out_img_pil = Image.open(MASK_PATH).convert("L")

In [None]:
from torch.utils.data import Dataset
from pathlib import Path
from PIL import Image
import torch

# Assuming image_tensor and mask_tensor are already defined
# and INPUT_SIZE, imagenet_mean/std are declared outside this class.

class BUSI(Dataset):
    def __init__(self, data_dir):
        self.data_dir = Path(data_dir)
        self.images = []
        self.masks = []

        subDirs = ['benign', 'malignant', 'normal']
        print(f"Scanning folders: {subDirs} in {self.data_dir}")

        for subdir in subDirs:
            classDir = self.data_dir / subdir
            if not classDir.is_dir():
                print(f"Path Problem {classDir}")
                continue

            for file in classDir.glob('*.png'):
                if not file.name.endswith('_mask.png'):
                    maskPath = file.parent / (file.stem + '_mask.png')
                    if maskPath.exists():
                        # Use your predefined functions
                        image = image_tensor(file)
                        mask = mask_tensor(maskPath)
                        self.images.append(image.squeeze(0))  # Remove batch dim
                        self.masks.append(mask.squeeze(0))    # Remove batch dim
                    else:
                        print(f"Path Problem {maskPath}")
        
        print(f"Found {len(self.images)} image-mask pairs.")

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.item()
        return {
            'image': self.images[idx],  # shape: [3, H, W]
            'mask': self.masks[idx]     # shape: [1, H, W]
        }


In [None]:
dataset = BUSI(DATA_DIR)

In [None]:
from torch.utils.data import random_split, DataLoader

test_ratio = 0.2
dataset_size = len(dataset)
test_size = int(test_ratio * dataset_size)
train_size = dataset_size - test_size

# Split the dataset
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

In [None]:
for batch in train_loader:
    images = batch['image']  # shape: [B, 3, H, W]
    masks = batch['mask']    # shape: [B, 1, H, W]
    print(images.shape, masks.shape)
    break


In [None]:
model = smp.Unet(
            encoder_name=ENCODER,
            encoder_weights="instagram",
            in_channels=3,
            classes=1,
            activation=ACTIVATION
         )

In [None]:
from tqdm import tqdm
import copy
import torch.optim as optim

# Define weights for train and test loss
TRAIN_LOSS_WEIGHT = 0.2
TEST_LOSS_WEIGHT = 0.8

# --- Training Configuration ---
EPOCHS = 20
LEARNING_RATE = 1e-4
CHECKPOINT_PATH = "./best_model.pth"

# Define optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
criterion = dice_loss

# Training loop
best_combined_loss = float('inf')
best_model_wts = copy.deepcopy(model.state_dict())

model.to(DEVICE)
for epoch in range(EPOCHS):
    print(f"Epoch {epoch + 1}/{EPOCHS}")
    
    # Training phase
    model.train()
    train_loss = 0.0
    train_loader_tqdm = tqdm(train_loader, desc="Training", leave=False)
    for batch in train_loader_tqdm:
        images = batch['image'].to(DEVICE)
        masks = batch['mask'].to(DEVICE)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, masks)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()
        train_loader_tqdm.set_postfix(loss=loss.item())

    avg_train_loss = train_loss / len(train_loader)
    print(f"Epoch {epoch + 1} Train Loss: {avg_train_loss:.4f}")

    # Validation phase (test loss)
    model.eval()
    test_loss = 0.0
    with torch.no_grad():
        for batch in test_loader:
            images = batch['image'].to(DEVICE)
            masks = batch['mask'].to(DEVICE)
            outputs = model(images)
            loss = criterion(outputs, masks)
            test_loss += loss.item()

    avg_test_loss = test_loss / len(test_loader)
    print(f"Epoch {epoch + 1} Test Loss: {avg_test_loss:.4f}")

    # Calculate combined loss
    combined_loss = (TRAIN_LOSS_WEIGHT * avg_train_loss) + (TEST_LOSS_WEIGHT * avg_test_loss)

    # Save the best model if combined loss is lower
    if combined_loss < best_combined_loss:
        best_combined_loss = combined_loss
        best_model_wts = copy.deepcopy(model.state_dict())
        torch.save(model.state_dict(), CHECKPOINT_PATH)
        print(f"Best model saved with Combined Loss: {combined_loss:.4f} (Train Loss: {avg_train_loss:.4f}, Test Loss: {avg_test_loss:.4f})")

# Load the best model weights
model.load_state_dict(best_model_wts)
print("Training complete. Best model loaded.")

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, jaccard_score
import numpy as np

def compute_iou(preds, labels):
    preds = preds.astype(bool)
    labels = labels.astype(bool)
    
    intersection = np.logical_and(preds, labels).sum()
    union = np.logical_or(preds, labels).sum()
    
    if union == 0:
        return 1.0 if intersection == 0 else 0.0
    return intersection / union

def compute_dice(preds, labels):
    preds = preds.astype(bool)
    labels = labels.astype(bool)
    
    intersection = np.logical_and(preds, labels).sum()
    pred_sum = preds.sum()
    label_sum = labels.sum()
    
    if pred_sum + label_sum == 0:
        return 1.0
    return (2.0 * intersection) / (pred_sum + label_sum)

def evaluate_model(model, test_loader, device):
    model.to(device)
    model.eval()
    
    total_loss = 0.0
    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for batch in test_loader:
            images = batch['image'].to(device)  # [B, 3, H, W]
            masks = batch['mask'].to(device)    # [B, 1, H, W]
            
            # Forward pass
            outputs = model(images)  # [B, 1, H, W]
            loss = dice_loss(outputs, masks)
            total_loss += loss.item()
            
            # Threshold predictions
            predictions = (outputs > 0.5).float()
            
            # Flatten predictions and labels for metric calculation
            all_preds.extend(predictions.cpu().numpy().flatten())
            all_labels.extend(masks.cpu().numpy().flatten())
    
    # Convert to numpy arrays
    all_preds = np.array(all_preds)
    all_labels = np.array(all_labels)
    
    # Calculate metrics
    avg_loss = total_loss / len(test_loader)
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, zero_division=0)
    recall = recall_score(all_labels, all_preds, zero_division=0)
    f1 = f1_score(all_labels, all_preds, zero_division=0)
    iou = compute_iou(all_preds, all_labels)
    dice = compute_dice(all_preds, all_labels)
    
    print(f"Test Loss: {avg_loss:.4f}")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"IoU: {iou:.4f}")
    print(f"Dice Coefficient: {dice:.4f}")
    
    return avg_loss, accuracy, precision, recall, f1, iou, dice

In [None]:
import matplotlib.pyplot as plt

# Load the trained model
model.load_state_dict(torch.load(CHECKPOINT_PATH))
model.eval()

# Function to visualize images, masks, and predictions
def visualize_predictions(model, test_loader, device, num_samples=5):
    model.to(device)
    model.eval()
    
    samples = 0
    with torch.no_grad():
        for batch in test_loader:
            images = batch['image'].to(device)  # [B, 3, H, W]
            masks = batch['mask'].to(device)    # [B, 1, H, W]
            predictions = model(images)        # [B, 1, H, W]
            
            predictions = (predictions > 0.5).float()  # Threshold predictions
            
            for i in range(images.size(0)):
                if samples >= num_samples:
                    return
                
                # Convert tensors to numpy arrays for visualization
                image_np = images[i].cpu().permute(1, 2, 0).numpy()
                mask_np = masks[i].cpu().squeeze(0).numpy()
                pred_np = predictions[i].cpu().squeeze(0).numpy()
                
                # Denormalize the image
                image_np = image_np * np.array(imagenet_std) + np.array(imagenet_mean)
                image_np = np.clip(image_np, 0, 1)
                
                # Plot the image, ground truth mask, and predicted mask
                plt.figure(figsize=(12, 4))
                plt.subplot(1, 3, 1)
                plt.imshow(image_np)
                plt.title("Image")
                plt.axis("off")
                
                plt.subplot(1, 3, 2)
                plt.imshow(mask_np, cmap="gray")
                plt.title("Ground Truth Mask")
                plt.axis("off")
                
                plt.subplot(1, 3, 3)
                plt.imshow(pred_np, cmap="gray")
                plt.title("Predicted Mask")
                plt.axis("off")
                
                plt.show()
                
                samples += 1

# Visualize predictions


In [None]:
model2 = smp.Unet(
            encoder_name=ENCODER,
            encoder_weights="instagram",
            in_channels=3,
            classes=1,
            activation=ACTIVATION
         )
model2.load_state_dict(torch.load("best_model.pth"))
evaluate_model(model2, test_loader, DEVICE)
visualize_predictions(model2, test_loader, DEVICE, num_samples=10)

In [None]:
model.load_state_dict(torch.load("best_model.pth"))
evaluate_model(model, test_loader, DEVICE)
visualize_predictions(model, test_loader, DEVICE, num_samples=5)