<a href="https://colab.research.google.com/github/mustafabayrak0/Imagenius/blob/main/comvis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [184]:
import os
import sys
from pathlib import Path
from PIL import Image
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt

In [185]:
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Set the working directory to the project folder
project_path = '/content/drive/MyDrive/Main'
os.chdir(project_path)

# Add the project directory to the Python path
sys.path.append(project_path)

# Assuming config is defined somewhere, replace this with actual paths
data_tra_path = '/content/drive/MyDrive/Main/DeepCrack/train'  # Replace with your actual path
data_val_path = '/content/drive/MyDrive/Main/DeepCrack/test'    # Replace with your actual path

DIR_IMG_tra = os.path.join(data_tra_path, 'images')
DIR_MASK_tra = os.path.join(data_tra_path, 'masks')
DIR_IMG_val = os.path.join(data_val_path, 'images')
DIR_MASK_val = os.path.join(data_val_path, 'masks')

img_names_tra = [path.name for path in Path(DIR_IMG_tra).glob('*.jpg')]
mask_names_tra = [path.name for path in Path(DIR_MASK_tra).glob('*.png')]
img_names_val = [path.name for path in Path(DIR_IMG_val).glob('*.jpg')]
mask_names_val = [path.name for path in Path(DIR_MASK_val).glob('*.png')]

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [186]:
def load_images_and_masks(img_dir, mask_dir):
    img_names = sorted(os.listdir(img_dir))  # Ensure the list is sorted
    mask_names = sorted(os.listdir(mask_dir))  # Ensure the list is sorted

    images = []
    masks = []
    i = 0
    for img_name, mask_name in zip(img_names, mask_names):
        img_path = os.path.join(img_dir, img_name)
        mask_path = os.path.join(mask_dir, mask_name)

        image = Image.open(img_path).convert("RGB")
        mask = Image.open(mask_path).convert("L")  # Convert mask to grayscale

        images.append(np.array(image))
        masks.append(np.array(mask))
        i += 1
        if i == 10:
            break

    return images, masks

# Directories
DIR_IMG_tra = '/content/drive/MyDrive/Main/DeepCrack/train/images'
DIR_MASK_tra = '/content/drive/MyDrive/Main/DeepCrack/train/masks'
DIR_IMG_val = '/content/drive/MyDrive/Main/DeepCrack/test/images'
DIR_MASK_val = '/content/drive/MyDrive/Main/DeepCrack/test/masks'

# Load training and validation images and masks
train_images, train_masks = load_images_and_masks(DIR_IMG_tra, DIR_MASK_tra)
val_images, val_masks = load_images_and_masks(DIR_IMG_val, DIR_MASK_val)

# Print the shapes to verify
print(f'Loaded {len(train_images)} training images and {len(train_masks)} training masks.')
print(f'Loaded {len(val_images)} validation images and {len(val_masks)} validation masks.')

Loaded 10 training images and 10 training masks.
Loaded 10 validation images and 10 validation masks.


In [187]:
"""
# Verify image and mask pairs to ensure they are correctly aligned
def verify_image_mask_pairs(images, masks, num_pairs=5):
    fig, axs = plt.subplots(num_pairs, 2, figsize=(10, 5 * num_pairs))

    for i in range(num_pairs):
        img = images[i]
        mask = masks[i]

        axs[i, 0].imshow(img)
        axs[i, 0].set_title('Image')
        axs[i, 0].axis('off')

        axs[i, 1].imshow(mask, cmap='gray')
        axs[i, 1].set_title('Mask')
        axs[i, 1].axis('off')

    plt.tight_layout()
    plt.show()

# Verify the first few pairs of images and masks
verify_image_mask_pairs(train_images, train_masks, num_pairs=5)
"""

"\n# Verify image and mask pairs to ensure they are correctly aligned\ndef verify_image_mask_pairs(images, masks, num_pairs=5):\n    fig, axs = plt.subplots(num_pairs, 2, figsize=(10, 5 * num_pairs))\n    \n    for i in range(num_pairs):\n        img = images[i]\n        mask = masks[i]\n        \n        axs[i, 0].imshow(img)\n        axs[i, 0].set_title('Image')\n        axs[i, 0].axis('off')\n        \n        axs[i, 1].imshow(mask, cmap='gray')\n        axs[i, 1].set_title('Mask')\n        axs[i, 1].axis('off')\n    \n    plt.tight_layout()\n    plt.show()\n\n# Verify the first few pairs of images and masks\nverify_image_mask_pairs(train_images, train_masks, num_pairs=5)\n"

In [196]:
class PatchCrackSegmentationDataset(Dataset):
    def __init__(self, images, masks, patch_size=(960, 540), transform=None):
        self.images = images
        self.masks = masks
        self.patch_size = patch_size
        self.transform = transform

        # Precompute all patches
        self.image_patches = []
        self.mask_patches = []
        for image, mask in zip(self.images, self.masks):
            img_patches, msk_patches = self.divide_image_into_patches(image, mask, patch_size)
            self.image_patches.extend(img_patches)
            self.mask_patches.extend(msk_patches)

    def __len__(self):
        return len(self.image_patches)

    def __getitem__(self, idx):
        image_patch = self.image_patches[idx]
        mask_patch = self.mask_patches[idx]

        if self.transform:
            image_patch = self.transform(image_patch)
            mask_patch = torch.tensor(mask_patch, dtype=torch.float32).unsqueeze(0)

        return image_patch, mask_patch

    def pad_image(self, image, patch_size):
        h, w = image.shape[:2]
        patch_h, patch_w = patch_size

        # Calculate the padding needed
        pad_h = (patch_h - h % patch_h) % patch_h
        pad_w = (patch_w - w % patch_w) % patch_w

        if len(image.shape) == 3:
            # For images with channels
            padded_image = np.pad(image, ((0, pad_h), (0, pad_w), (0, 0)), mode='constant')
        else:
            # For masks without channels
            padded_image = np.pad(image, ((0, pad_h), (0, pad_w)), mode='constant')

        return padded_image

    def divide_image_into_patches(self, image, mask, patch_size=(960, 540)):
        image = self.pad_image(image, patch_size)
        mask = self.pad_image(mask, patch_size)

        image_patches = []
        mask_patches = []

        h, w = image.shape[:2]
        patch_h, patch_w = patch_size

        for i in range(0, h, patch_h):
            for j in range(0, w, patch_w):
                image_patch = image[i:i+patch_h, j:j+patch_w]
                mask_patch = mask[i:i+patch_h, j:j+patch_w]

                image_patches.append(image_patch)
                mask_patches.append(mask_patch)

        return image_patches, mask_patches

# Define transformations (if any)
transform = transforms.Compose([
    transforms.ToTensor(),
])

# Create dataset instances for training and validation data using patches
train_patch_dataset = PatchCrackSegmentationDataset(train_images, train_masks, transform=transform)
val_patch_dataset = PatchCrackSegmentationDataset(val_images, val_masks, transform=transform)

# Create data loaders
patch_batch_size = 2  # Adjust batch size as needed

train_patch_loader = DataLoader(train_patch_dataset, batch_size=patch_batch_size, shuffle=True)
val_patch_loader = DataLoader(val_patch_dataset, batch_size=patch_batch_size, shuffle=False)

# Print the number of batches
print(f'Number of training batches with patches: {len(train_patch_loader)}')
print(f'Number of validation batches with patches: {len(val_patch_loader)}')


Number of training batches with patches: 10
Number of validation batches with patches: 8


In [197]:
# Model definition ensuring raw logits output
class SimpleCNN(nn.Module):
    def __init__(self, in_channels=3, out_channels=1):
        super(SimpleCNN, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
        )

        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2, padding=1, output_padding=1),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2, padding=1, output_padding=1),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(64, out_channels, kernel_size=2, stride=2, padding=1, output_padding=1),
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [198]:
def pixel_level_accuracy(y_true, y_pred):
    """
    Calculate the pixel-level accuracy between the ground truth mask and the predicted mask.

    Args:
    y_true (numpy.ndarray): Ground truth mask.
    y_pred (numpy.ndarray): Predicted mask.

    Returns:
    float: Pixel-level accuracy.
    """
    assert y_true.shape == y_pred.shape, "The shape of the ground truth and predicted masks must match."

    y_true = y_true.flatten()
    y_pred = y_pred.flatten()

    correct = np.sum(y_true == y_pred)
    total = len(y_true)

    accuracy = correct / total
    return accuracy

In [199]:
# Training function
def train_patch_model(model, dataloader, criterion, optimizer, num_epochs=25):
    model.train()
    train_losses = []
    for epoch in range(num_epochs):
        running_loss = 0.0
        for inputs, masks in dataloader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, masks)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * inputs.size(0)

        epoch_loss = running_loss / len(dataloader.dataset)
        train_losses.append(epoch_loss)
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')

    return model, train_losses

# Evaluation function with pixel-level accuracy
def evaluate_patch_model_with_accuracy(model, dataloader, criterion):
    model.eval()
    val_loss = 0.0
    total_accuracy = 0.0
    num_samples = 0

    with torch.no_grad():
        for inputs, masks in dataloader:
            outputs = model(inputs)
            loss = criterion(outputs, masks)
            val_loss += loss.item() * inputs.size(0)

            outputs = torch.sigmoid(outputs)
            outputs = (outputs > 0.5).float()
            outputs = outputs.cpu().numpy()
            masks = masks.cpu().numpy()

            for i in range(len(outputs)):
                accuracy = pixel_level_accuracy(masks[i], outputs[i])
                total_accuracy += accuracy
                num_samples += 1

    val_loss /= num_samples
    avg_accuracy = total_accuracy / num_samples
    print(f'Validation Loss: {val_loss:.4f}')
    print(f'Average Pixel-level Accuracy: {avg_accuracy:.4f}')
    return val_loss, avg_accuracy

model = SimpleCNN()
# Define loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train the model with patches and capture loss history
num_epochs = 5
trained_patch_model, train_losses = train_patch_model(model, train_patch_loader, criterion, optimizer, num_epochs=num_epochs)

# Evaluate the model with patches and capture validation loss and accuracy
val_loss, avg_accuracy = evaluate_patch_model_with_accuracy(trained_patch_model, val_patch_loader, criterion)



ValueError: Target size (torch.Size([2, 1, 960, 540])) must be the same as input size (torch.Size([2, 1, 953, 529]))

In [None]:
# Plot training and validation loss
def plot_loss_curves(train_losses, val_loss):
    epochs = range(1, len(train_losses) + 1)
    plt.figure(figsize=(10, 5))
    plt.plot(epochs, train_losses, label='Training Loss')
    plt.axhline(y=val_loss, color='r', linestyle='-', label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Training and Validation Loss')
    plt.show()

# Plot the loss curves
plot_loss_curves(train_losses, val_loss)


In [None]:
# Visualize first image in the train data
image, mask = train_patch_dataset[0]
plt.imshow(image.permute(1, 2, 0))
plt.title('Image')
plt.show()
plt.imshow(mask.squeeze(), cmap='gray')
plt.title('Mask')
plt.show()

In [None]:
# Show first image's resolution values
image, mask = train_patch_dataset[0]
print(f'Image resolution: {image.shape}')
print(f'Mask resolution: {mask.shape}')

In [None]:
# Function to predict and visualize samples
def predict_and_visualize(model, dataset, num_samples=5):
    model.eval()
    indices = np.random.choice(len(dataset), num_samples, replace=False)
    fig, axs = plt.subplots(num_samples, 3, figsize=(15, 5 * num_samples))

    for i, idx in enumerate(indices):
        image_patch, mask_patch = dataset[idx]
        with torch.no_grad():
            output = model(image_patch.unsqueeze(0)).squeeze(0)
            output = torch.sigmoid(output)
            output = (output > 0.5).float()

        axs[i, 0].imshow(image_patch.permute(1, 2, 0))
        axs[i, 0].set_title('Input Image')
        axs[i, 1].imshow(mask_patch.squeeze(), cmap='gray')
        axs[i, 1].set_title('Ground Truth Mask')
        axs[i, 2].imshow(output.squeeze(), cmap='gray')
        axs[i, 2].set_title('Predicted Mask')

        for ax in axs[i]:
            ax.axis('off')

    plt.tight_layout()
    plt.show()

# Predict and visualize samples from the validation dataset
predict_and_visualize(trained_patch_model, val_patch_dataset, num_samples=5)
