In [1]:
!git clone https://github.com/mszczesniak02/bachlor_google

Cloning into 'bachlor_google'...
remote: Enumerating objects: 1133, done.[K
remote: Counting objects: 100% (27/27), done.[K
remote: Compressing objects: 100% (24/24), done.[K
remote: Total 1133 (delta 8), reused 12 (delta 3), pack-reused 1106 (from 1)[K
Receiving objects: 100% (1133/1133), 66.92 MiB | 19.62 MiB/s, done.
Resolving deltas: 100% (245/245), done.


In [2]:
!cp -r /content/bachlor_google/DeepCrack/ .

In [3]:
!pip install segmentation-models-pytorch

Collecting segmentation-models-pytorch
  Downloading segmentation_models_pytorch-0.5.0-py3-none-any.whl.metadata (17 kB)
Downloading segmentation_models_pytorch-0.5.0-py3-none-any.whl (154 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m154.8/154.8 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: segmentation-models-pytorch
Successfully installed segmentation-models-pytorch-0.5.0


In [4]:
import albumentations as A                              # for augmentation transform

import numpy as np                                      # sci kit specials ;D
import matplotlib.pyplot as plt                         # plots
from PIL import Image                                   # for opening images as numpy arrays or torch tensors

from datetime import datetime                           # for model timestamp

import torch

from torch.utils.data import Dataset                    # preset class for creating a dataset
from torch.utils.data import random_split               # for splitting datasets into training, test, validation
from torch.utils.data import DataLoader                 # self-explanitory
import segmentation_models_pytorch as smp               # preset model

from tqdm import tqdm                                   # for the progress bar
import os                                               # for accessing files and setting proper paths to   them

from torch.utils.tensorboard import SummaryWriter       # tensorboard srv
from torch.nn.functional import binary_cross_entropy

In [13]:
DEBUG = False

if DEBUG==True:

  MASK_TRAIN_PATH = "../assets/datasets/DeepCrack/train_lab"
  IMG_TRAIN_PATH = "../assets/datasets/DeepCrack/train_img"
  MASK_TEST_PATH = "../assets/datasets/DeepCrack/test_lab"
  IMG_TEST_PATH = "../assets/datasets/DeepCrack/test_img"
  DEVICE = "cpu"
  BATCH_SIZE = 2
  WORKERS = 4

else:
  MASK_TRAIN_PATH = "/content/DeepCrack/train_lab"
  IMG_TRAIN_PATH = "/content/DeepCrack/train_img"
  MASK_TEST_PATH = "/content/DeepCrack/test_lab"
  IMG_TEST_PATH = "/content/DeepCrack/test_img"
  BATCH_SIZE = 16


  DEVICE="cuda"
  WORKERS = 2

PIN_MEMORY = True
LEARNING_RATE = 1e-4
WEIGHT_DECAY = 1e-5
EPOCHS = 10

EARLY_STOPPING_PATIENCE = 15

SCHEDULER_PATIENCE = 5
SCHEDULER_FACTOR = 0.5

In [6]:
def fetch_data(path) -> list:
  """Return files as their paths+filename in an array"""

  assert (os.path.exists(path) == True),  "Failure during data fetching"

  result = []
  for file in tqdm(os.listdir(path), desc=f"Loading files from {path} ",unit="File", leave=True):
    fpath = os.path.join(path,file)
    result.append(fpath)

  print(f"{path} - len({len(result)})")
  return result


class DeepCrackDataset(Dataset):
  def __init__(self, img_dir, mask_dir, transform=None):

    self.img_dir = img_dir
    self.mask_dir = mask_dir
    self.transform = transform

    # sort values so the file names corespoding to each other are loaded in order
    self.images = sorted([os.path.join(img_dir, file) for file in os.listdir(img_dir)] )
    self.masks = sorted([os.path.join(mask_dir, file) for file in os.listdir(mask_dir)])

  def __len__(self):
    return len(self.images)

  def __getitem__(self, index):
    np_image = np.array(Image.open(self.images[index]))
    np_mask = np.array(Image.open(self.masks[index]))


    if len(np_mask.shape) == 3:
      np_mask = np_mask[:,:,0]

    np_mask = (np_mask > 127).astype(np.uint8)

    if self.transform: # if using transforms
      t = self.transform(image=np_image, mask=np_mask)
      np_image = t["image"]
      np_mask = t["mask"]

    # conversion from numpy array convention to tensor via permute,
    #     then normalizing to [0,1] range, same for mask, only using binary data
    tensor_image = torch.from_numpy(np_image).permute(2, 0, 1).float() / 255.0
    tensor_mask = torch.from_numpy(np_mask).unsqueeze(0).float()

    return tensor_image,tensor_mask


def get_dataset(img_path, mask_path, transform_train = None ):

  dataset = DeepCrackDataset(img_path, mask_path, transform=transform_train)
  return dataset

def split_dataset(dataset: DeepCrackDataset, test_factor:float, val_factor:float )->list:
  """Split exising dataset given percentages as [0,1] floats, return list of  """
  return random_split(dataset, [test_factor, val_factor])

In [7]:
transofrm_train = A.Compose([
    A.Resize(512, 512),  # ← Stały rozmiar, bez crop
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.3),
    A.Rotate(limit=10, p=0.3),  # ← Mniejszy kąt
    A.RandomBrightnessContrast(brightness_limit=0.1, contrast_limit=0.1, p=0.3),  # ← Mniej agresywne
])

transform_val = A.Compose([A.Resize(512,512)])

In [8]:
class DiceLoss(torch.nn.Module):
  def __init__(self, smooth=1e-6):
    super(DiceLoss,self).__init__()
    self.smooth = smooth
  def forward(self, predictions, targets):
    predictions = torch.sigmoid(predictions)

    predictions = predictions.view(-1)
    targets = targets.view(-1)

    intersection = (predictions * targets).sum()
    dice = (2. * intersection  + self.smooth) / (predictions.sum() + targets.sum() + self.smooth)

    return 1-dice


# binary cross entropy

class BCrossEntropyLoss(torch.nn.Module):
  def __init__(self, smooth=1e-6):
    super(BCrossEntropyLoss,self).__init__()
    self.smooth = smooth
  def forward(self, predictions, targets):
    predictions = torch.sigmoid(predictions)
    loss = binary_cross_entropy(predictions, targets)
    return loss



# sigmoid focal loss


# Jaccard index  loss (IoU)




In [9]:
def calculate_metrics(predictions, targets, threshold=0.5):

    # Binaryzacja
    preds = (predictions > threshold).float()
    targets = targets.float()

    # Flatten
    preds_flat = preds.view(-1)
    targets_flat = targets.view(-1)

    # True/False Positives/Negatives
    TP = ((preds_flat == 1) & (targets_flat == 1)).sum().float()
    TN = ((preds_flat == 0) & (targets_flat == 0)).sum().float()
    FP = ((preds_flat == 1) & (targets_flat == 0)).sum().float()
    FN = ((preds_flat == 0) & (targets_flat == 1)).sum().float()

    conf_table = [[TP,FP],[FN, TN]]

    # Metryki
    epsilon = 1e-7  # Unikaj dzielenia przez zero

    accuracy = (TP + TN) / (TP + TN + FP + FN + epsilon)
    precision = TP / (TP + FP + epsilon)
    recall = TP / (TP + FN + epsilon)
    f1_score = 2 * (precision * recall) / (precision + recall + epsilon)
    specificity = TN / (TN + FP + epsilon)

    # IoU (Intersection over Union) - NAJWAŻNIEJSZA dla segmentacji!
    intersection = (preds * targets).sum()
    union = preds.sum() + targets.sum() - intersection
    iou = intersection / (union + epsilon)

    # Dice Coefficient
    dice = (2 * intersection) / (preds.sum() + targets.sum() + epsilon)

    return {
        'accuracy': accuracy.item(),
        'precision': precision.item(),
        'recall': recall.item(),
        'f1_score': f1_score.item(),
        'specificity': specificity.item(),
        'iou': iou.item(),
        'dice': dice.item(),
        'confusion_table': conf_table
    }

In [26]:
def train_epoch(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = .0
    metrics = {
        'iou': [], 'dice': [], 'recall': [],
        'precision': [], 'f1_score': [], 'confusion_table' :[]
    }
    loop = tqdm(train_loader, desc="Training", leave=False)

    for batch_idx, (images, masks) in enumerate(loop):
        # move to adequete memory
        images = images.to(device)
        masks = masks.to(device)

        # Forward pass
        predictions = model(images)
        loss = criterion(predictions, masks)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        with torch.no_grad():
            predictions_sigmoid = torch.sigmoid(predictions)
            batch_metrics = calculate_metrics(predictions_sigmoid, masks)

            for key in metrics.keys():
                metrics[key].append(batch_metrics[key])

        loop.set_postfix({'loss': loss.item()})

    loop.close()

    avg_loss = running_loss / len(train_loader)
    avg_metrics = {k: np.mean(v) for k, v in metrics.items()}
    avg_metrics['loss'] = avg_loss


    return avg_loss

def validate(model, val_loader, criterion,device):
  model.eval()
  running_loss = 0.0
  metrics = {
        'iou': [], 'dice': [], 'recall': [],
        'precision': [], 'f1_score': [], 'confusion_table' :[]
    }
  with torch.no_grad():
    for images,masks in tqdm(val_loader, desc="Validation", leave=False):
      images = images.to(device)
      masks = masks.to(device)

      predictions = model(images)
      loss = criterion(predictions, masks)

      running_loss += loss.item()

      predictions_sigmoid = torch.sigmoid(predictions)
      batch_metrics = calculate_metrics(predictions_sigmoid, masks)

      for key in metrics.keys():
          metrics[key].append(batch_metrics[key])

    avg_loss = running_loss / len(val_loader)
    # avg_metrics = {k: np.mean(v) for k, v in metrics.items()}
    # avg_metrics['loss'] = avg_loss

    return avg_loss


In [27]:
def main()-> int:

    its_training_time = datetime.now().strftime('H%M') # Jared Leto likes it
    writer = SummaryWriter(f"runs/model_{its_training_time}")

    train_set = get_dataset(IMG_TRAIN_PATH, MASK_TRAIN_PATH, transform_train=transofrm_train)
    testing_dataset = get_dataset(IMG_TEST_PATH, MASK_TEST_PATH, transform_train=transform_val)


    test_set, val_set = split_dataset(testing_dataset, .5, .5 )

    train_loader = DataLoader( train_set, batch_size=BATCH_SIZE, shuffle=True, num_workers=WORKERS, pin_memory=PIN_MEMORY)
    test_loader = DataLoader( test_set, batch_size=BATCH_SIZE, shuffle=False, num_workers=WORKERS, pin_memory=PIN_MEMORY)
    val_loader = DataLoader( val_set, batch_size=BATCH_SIZE , shuffle=False, num_workers=WORKERS    , pin_memory=PIN_MEMORY)

    print("Datasets sizes: ")
    print(f"\t  Train: {len(train_set)}")
    print(f"\t    Val: {len(val_set)}")
    print(f"\t   Test: {len(test_set)}")

    device = torch.device(DEVICE if torch.cuda.is_available() else "cpu")

    model = smp.Unet(
        encoder_name="resnet34",
        encoder_weights="imagenet",
        in_channels=3,
        classes=1,
        activation=None,
    )

    model = model.to(device)


    total_params = sum(p.numel() for p in model.parameters())
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

    print(f"\t    Total parameters: {total_params:,}")
    print(f"\tTrainable parameters: {trainable_params:,}")
    print(f"\t          Model size: ~{total_params * 4 / 1e6:.1f} MB")

    # criterion = DiceLoss()
    criterion = BCrossEntropyLoss()

    optimizer = torch.optim.Adam(
        model.parameters(),
        lr=LEARNING_RATE,
        weight_decay=WEIGHT_DECAY,
    )

    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optimizer,
        mode='max',  # Maksymalizuj IoU
        factor=SCHEDULER_FACTOR,
        patience=SCHEDULER_PATIENCE,
        min_lr=1e-7
    )

    print(f"\nTraining configuration:")
    print(f"\t       Optimizer: Adam")
    print(f"\t   Learning rate: {LEARNING_RATE}")
    print(f"\t    Weight decay: {WEIGHT_DECAY}")
    print(f"\t       Scheduler: ReduceLROnPlateau (patience={SCHEDULER_PATIENCE})")
    print(f"\t  Early stopping: patience={EARLY_STOPPING_PATIENCE}")
    print(f"\t          Epochs: {EPOCHS}")

    epochs = EPOCHS
    best_val_iou = 0.0
    best_epoch = 0
    patience_counter = 0

    writer.add_text("Hparams",f"""
    -           Learning Rate: {LEARNING_RATE}
    -              Batch Size: {BATCH_SIZE}
    -            Weight Decay: {WEIGHT_DECAY}
    -                  Epochs: {EPOCHS}
    -      Scheduler Patience: {SCHEDULER_PATIENCE}
    - Early Stopping Patience: {EARLY_STOPPING_PATIENCE}
    -                  Device: {DEVICE}
""")



    for epoch in range(epochs):

        train_metrics = train_epoch(model, train_loader, criterion, optimizer, device)

        val_metrics = validate(model, val_loader, criterion, device)

        current_lr = optimizer.param_groups[0]['lr']
        scheduler.step(val_metrics['iou'])

        # Loss
        writer.add_scalars('Loss', {
            'train': train_metrics['loss'],
            'val': val_metrics['loss']
        }, epoch)

        # IoU
        writer.add_scalars('IoU', {
            'train': train_metrics['iou'],
            'val': val_metrics['iou']
        }, epoch)

        # Dice
        writer.add_scalars('Dice', {
            'train': train_metrics['dice'],
            'val': val_metrics['dice']
        }, epoch)

        # Precision
        writer.add_scalars('Precision', {
            'train': train_metrics['precision'],
            'val': val_metrics['precision']
        }, epoch)

        # Recall
        writer.add_scalars('Recall', {
            'train': train_metrics['recall'],
            'val': val_metrics['recall']
        }, epoch)

        # F1-Score
        writer.add_scalars('F1-Score', {
            'train': train_metrics['f1_score'],
            'val': val_metrics['f1_score']
        }, epoch)

        # Learning Rate
        writer.add_scalar('Learning_Rate', current_lr, epoch)

        # Accuracy (tylko validation)
        writer.add_scalar('Validation/Accuracy', val_metrics['accuracy'], epoch)

    # ========================================
        # PRINT METRICS
        # ========================================
        # print(f"\nTraining:")
        # print(f"   Loss: {train_metrics['loss']:.4f}")
        # print(f"   IoU:  {train_metrics['iou']:.4f}")
        # print(f"   Dice: {train_metrics['dice']:.4f}")

        # print(f"\nValidation:")
        # print(f"   Loss:      {val_metrics['loss']:.4f}")
        # print(f"   IoU:       {val_metrics['iou']:.4f} {'✅ NEW BEST!' if val_metrics['iou'] > best_val_iou else ''}")
        # print(f"   Dice:      {val_metrics['dice']:.4f}")
        # print(f"   Recall:    {val_metrics['recall']:.4f}")
        # print(f"   Precision: {val_metrics['precision']:.4f}")
        # print(f"   F1-Score:  {val_metrics['f1_score']:.4f}")

        # print(f"\n LR: {current_lr:.6f}")


        # SAVE BEST MODEL
        if val_metrics['iou'] > best_val_iou:
            best_val_iou = val_metrics['iou']
            best_epoch = epoch
            patience_counter = 0

            checkpoint = {
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'best_val_iou': best_val_iou,
                'val_metrics': val_metrics,
                'train_metrics': train_metrics,
            }

            torch.save(checkpoint, f'best_model_iou_{best_val_iou:.4f}.pth')
            print(f"\n Model saved: best_model_iou_{best_val_iou:.4f}.pth")
        else:
            patience_counter += 1
            print(f"\n No improvement for {patience_counter} epoch(s)")

        # ========================================
        # EARLY STOPPING
        # ========================================
        if patience_counter >= EARLY_STOPPING_PATIENCE:
            print(f"\n  Early stopping triggered!")
            print(f"   No improvement for {EARLY_STOPPING_PATIENCE} epochs")
            print(f"   Best IoU: {best_val_iou:.4f} at epoch {best_epoch + 1}")
            break

        try:
            val_images, val_masks = next(iter(val_loader))
            val_images = val_images.to(device)
            val_masks = val_masks.to(device)

            with torch.no_grad():
                val_outputs = model(val_images)
                val_preds = torch.sigmoid(val_outputs)

            # Weź pierwsze 4 obrazy
            writer.add_images('Images/Input', val_images[:4], epoch)
            writer.add_images('Images/Ground_Truth', val_masks[:4], epoch)
            writer.add_images('Images/Prediction', val_preds[:4], epoch)

        except Exception as e:
            print(f"Failure - Could not log images: {e}")

    # ========================================
    # FINAL EVALUATION ON TEST SET
    # ========================================

    writer.add_text('Final_Test_Metrics', f"""
    Test Set Results (Best Model from Epoch {best_epoch + 1})

-       IoU: {test_metrics['iou']:.4f}
-      Dice: {test_metrics['dice']:.4f}
-    Recall: {test_metrics['recall']:.4f}
- Precision: {test_metrics['precision']:.4f}
-  F1-Score: {test_metrics['f1_score']:.4f}
-  Accuracy: {test_metrics['accuracy']:.4f}
Best Validation IoU: {best_val_iou:.4f}
    """)

    # Dodaj finalne metryki jako skalary
    writer.add_scalar('Final/Test_IoU', test_metrics['iou'], 0)
    writer.add_scalar('Final/Test_Dice', test_metrics['dice'], 0)
    writer.add_scalar('Final/Test_Recall', test_metrics['recall'], 0)
    writer.add_scalar('Final/Test_Precision', test_metrics['precision'], 0)
    writer.add_scalar('Final/Test_F1', test_metrics['f1_score'], 0)

    # Zapisz hyperparametry vs metryki (do porównania w TensorBoard)
    writer.add_hparams(
        {
            'lr': LEARNING_RATE,
            'batch_size': BATCH_SIZE,
            'weight_decay': WEIGHT_DECAY,
            'scheduler_patience': SCHEDULER_PATIENCE,
        },
        {
            'hparam/test_iou': test_metrics['iou'],
            'hparam/test_dice': test_metrics['dice'],
            'hparam/test_f1': test_metrics['f1_score'],
        }
    )


    writer.close()

    # Load best model
    checkpoint = torch.load(f'best_model_iou_{best_val_iou:.4f}.pth', weights_only=False)
    model.load_state_dict(checkpoint['model_state_dict'])

    test_metrics = validate(model, test_loader, criterion, device)

    print(f"\nTest Results:")
    print(f"   IoU:       {test_metrics['iou']:.4f}")
    print(f"   Dice:      {test_metrics['dice']:.4f}")
    print(f"   Recall:    {test_metrics['recall']:.4f}")
    print(f"   Precision: {test_metrics['precision']:.4f}")
    print(f"   F1-Score:  {test_metrics['f1_score']:.4f}")
    print(f"   Accuracy:  {test_metrics['accuracy']:.4f}")



    print(f"\n{'=' * 80}")
    print(" TRAINING COMPLETED!")
    print(f"   Best Validation IoU: {best_val_iou:.4f} (epoch {best_epoch + 1})")
    print(f"   Test IoU: {test_metrics['iou']:.4f}")
    print(f"   Model saved as: best_model_iou_{best_val_iou:.4f}.pth")
    print(f"{'=' * 80}\n")
    return 0

In [28]:
main()

Datasets sizes: 
	  Train: 300
	    Val: 118
	   Test: 119
	    Total parameters: 24,436,369
	Trainable parameters: 24,436,369
	          Model size: ~97.7 MB

Training configuration:
	       Optimizer: Adam
	   Learning rate: 0.0001
	    Weight decay: 1e-05
	       Scheduler: ReduceLROnPlateau (patience=5)
	  Early stopping: patience=15
	          Epochs: 10




TypeError: can't convert cuda:0 device type tensor to numpy. Use Tensor.cpu() to copy the tensor to host memory first.

In [None]:
import gc
gc.collect()
# torch.cuda.empty_cache()

AttributeError: module 'torch' has no attribute 'empty_cache'

In [None]:

def visualize_prediction(img:np.array, prediction, threshold=0.5):
    """
    Wizualizacja predykcji

    Args:
        image_path: ścieżka do oryginalnego obrazu
        prediction: maska prawdopodobieństwa [H, W]
        threshold: próg binaryzacji (default: 0.5)
    """
    # Wczytaj oryginalny obraz
    image = img
    # np.array(Image.open(image_path))

    # Binaryzacja predykcji
    binary_mask = (prediction > threshold).astype(np.uint8)

    # Wizualizacja
    fig, axes = plt.subplots(1, 4, figsize=(20, 5))

    # Oryginalny obraz
    axes[0].imshow(image)
    axes[0].set_title("Original Image", fontsize=14)
    axes[0].axis('off')

    # Heatmapa prawdopodobieństwa
    im1 = axes[1].imshow(prediction, cmap='hot', vmin=0, vmax=1)
    axes[1].set_title("Probability Heatmap", fontsize=14)
    axes[1].axis('off')
    plt.colorbar(im1, ax=axes[1], fraction=0.046)

    # Binarna maska
    axes[2].imshow(binary_mask, cmap='gray', vmin=0, vmax=1)
    axes[2].set_title(f"Binary Mask (threshold={threshold})", fontsize=14)
    axes[2].axis('off')

    # Overlay
    axes[3].imshow(image)
    axes[3].imshow(prediction, cmap='Reds', alpha=0.5, vmin=0, vmax=1)
    axes[3].set_title("Overlay", fontsize=14)
    axes[3].axis('off')

    plt.tight_layout()
    plt.show()

    # Statystyki
    crack_pixels = binary_mask.sum()
    total_pixels = binary_mask.size
    crack_percentage = (crack_pixels / total_pixels) * 100

    print(f"📊 Statistics:")
    print(f"   Crack pixels: {crack_pixels:,} ({crack_percentage:.2f}% of image)")
    print(f"   Max probability: {prediction.max():.3f}")
    print(f"   Mean probability: {prediction.mean():.3f}")


In [None]:
checkpoint = torch.load("best_model_iou_0.5603.pth", weights_only=False)

device = torch.device(DEVICE if torch.cuda.is_available() else "cpu")

model = smp.Unet(
        encoder_name="resnet34",
        encoder_weights="imagenet",
        in_channels=3,
        classes=1,
        activation=None,
    )

model = model.to(device)

model.load_state_dict(checkpoint['model_state_dict'])


# visualize_prediction(image_path, prediction, threshold=0.5)

<All keys matched successfully>

In [None]:
testing_dataset = get_dataset("/content/DeepCrack/test_img","/content/DeepCrack/test_lab" )