In [2]:
import torchvision
import torchvision.models as models
from torchvision import transforms
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from pycocotools.coco import COCO
import torch
import os
import cv2

print("Torch version:", torchvision.__version__)
print(torch.__version__)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')



Torch version: 0.16.1
2.1.1


In [3]:
#Transformation from the paper 
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

from torchvision.transforms import v2 as T


def get_transform(train):
    transforms = []
    if train:
        transforms.append(T.RandomHorizontalFlip(0.5))
    transforms.append(T.ToDtype(torch.float, scale=True))
    transforms.append(T.ToPureTensor())
    return T.Compose(transforms)

In [4]:
import torch
from torchvision.io import read_image
from torchvision.ops.boxes import masks_to_boxes
from torchvision import tv_tensors
from torchvision.transforms.v2 import functional as F

class RoadDamageDataset(Dataset):
     def __init__(self, root, annFile, transform=None, target_transform=None, include_img_without_cracks=True):
        
        self.root = root
        self.coco = COCO(annFile)
        self.ids = list(self.coco.imgs.keys())
        self.transform = transform
        self.target_transform = target_transform
        self.crackless = include_img_without_cracks

     def __len__(self):
           return len(self.ids)
     
     def __getitem__(self, index):
        coco = self.coco
        img_id = self.ids[index]
        ann_ids = coco.getAnnIds(imgIds=img_id)
        target = coco.loadAnns(ann_ids)
        targets = {}
        boxes = []
        labels = []
        area = []
        path = coco.loadImgs(img_id)[0]['file_name']
        img = read_image(os.path.join(self.root, path))
        img = tv_tensors.Image(img)
        iscrowd = []
        #img = img.resize((1221, 4040))
        #img = tv_tensors.Image(img)
        for t in target:
            x_min, y_min, width, height = t["bbox"]
            x_max, y_max = x_min + width, y_min + height
            boxes.append([x_min, y_min, x_max, y_max])
            labels.append(t["category_id"])
            area.append(t["area"])
            iscrowd.append(t["iscrowd"])
        if len(target) == 0:
            if self.crackless:
                boxes.append([0,0,1,1])
                area.append(0)
                labels.append(0)
            else:
                return 
        targets["image_id"] = torch.as_tensor(index)
        targets["boxes"] = tv_tensors.BoundingBoxes(boxes, format="XYXY", canvas_size=F.get_size(img))
        targets["labels"] = torch.tensor(labels, dtype=torch.int64)
        targets["area"] = torch.as_tensor(area)

        
        if self.transform is not None:
            img, targets= self.transform(img, targets)
        
          
        

        return img, targets


In [5]:
from torch.utils.data import random_split

annFile = '../coco_annotations.json'
root = "train/images/"
annFile_val = '../coco_annotations_validation.json'
root_val = "validate/images/"
def collate_fn(batch):
    return tuple(zip(*batch))

road_damage_dataset = RoadDamageDataset(root, annFile, transform=get_transform(train=True), target_transform=None)
road_damage_dataset_validate = RoadDamageDataset(root_val, annFile_val, transform=get_transform(train=False), target_transform=None )
mini_dataset_size = 50  # Set the desired size for your mini-dataset


# Create a random split to get a mini-dataset
mini_dataset, _ = random_split(road_damage_dataset, [mini_dataset_size, len(road_damage_dataset) - mini_dataset_size])
mini_dataset_val, _ = random_split(road_damage_dataset, [mini_dataset_size, len(road_damage_dataset) - mini_dataset_size])

# Create a DataLoader
train_loader = data_loader = torch.utils.data.DataLoader(
    road_damage_dataset,
    batch_size=16,
    shuffle=True,
    num_workers=0,
    collate_fn=collate_fn
)
val_loader = data_loader = torch.utils.data.DataLoader(
    road_damage_dataset_validate,
    batch_size=16,
    shuffle=True,
    num_workers=0,
    collate_fn=collate_fn
)
data_loader = torch.utils.data.DataLoader(
    mini_dataset,
    batch_size=5,
    shuffle=False,
    num_workers=0,
    collate_fn=collate_fn
)
data_loader_1 = torch.utils.data.DataLoader(
    mini_dataset_val,
    batch_size=5,
    shuffle=False,
    num_workers=0,
    collate_fn=collate_fn
)

loading annotations into memory...
Done (t=0.10s)
creating index...
index created!
loading annotations into memory...
Done (t=0.00s)
creating index...
index created!


In [29]:
sample = road_damage_dataset[10]
img, target = sample
print(f"{type(img) = }\n{type(target) = }\n{target.keys() = }")
print(f"{type(target['boxes']) = }\n{type(target['labels']) = }")
print(target["boxes"])
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


type(img) = <class 'torch.Tensor'>
type(target) = <class 'dict'>
target.keys() = dict_keys(['image_id', 'boxes', 'labels', 'area'])
type(target['boxes']) = <class 'torch.Tensor'>
type(target['labels']) = <class 'torch.Tensor'>
tensor([[ 514.0000,  811.6000,  641.0000,  922.6000],
        [  84.0000,  924.6000,  420.0000, 1217.6000],
        [1765.0000,  597.6000, 2318.0000, 1210.6000],
        [1652.0000,  626.6000, 1741.0000,  700.6000]])


In [7]:
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# load a model pre-trained on COCO
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")

# replace the classifier with a new one, that has
# num_classes which is user-defined
num_classes = 5  # 1 class (person) + background
# get number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features
# replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

In [7]:
from engine import train_one_epoch, evaluate
# train on the GPU or on the CPU, if a GPU is not available

model.to(device)
# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
    params,
    lr=0.005,
    momentum=0.9,
    weight_decay=0.0005
)

# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=3,
    gamma=0.1
)

# let's train it just for 2 epochs
num_epochs = 2

for epoch in range(num_epochs):
    # train for one epoch, printing every 10 iterations
    train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq=10)
    print("1")
    # update the learning rate
    lr_scheduler.step()
    print("2")
    # evaluate on the test dataset
    evaluate(model, data_loader, device=device)

print("That's it!")

Epoch: [0]  [0/2]  eta: 0:00:01  lr: 0.005000  loss: 2.7486 (2.7486)  loss_classifier: 2.0895 (2.0895)  loss_box_reg: 0.0073 (0.0073)  loss_objectness: 0.4339 (0.4339)  loss_rpn_box_reg: 0.2179 (0.2179)  time: 0.6960  data: 0.0750  max mem: 3758
Epoch: [0]  [1/2]  eta: 0:00:00  lr: 0.005000  loss: 2.7486 (4.4105)  loss_classifier: 2.0895 (2.1162)  loss_box_reg: 0.0073 (0.0110)  loss_objectness: 0.4339 (1.5499)  loss_rpn_box_reg: 0.2179 (0.7333)  time: 0.4815  data: 0.0840  max mem: 3918
Epoch: [0] Total time: 0:00:00 (0.4815 s / it)
1
2


In [56]:
from matplotlib import pyplot as plt
import numpy as np
from torch import Tensor, tensor
import torch.nn as nn
import torch.optim as optim
from itertools import islice
save_path = "model"
# Set up loss function, optimizer, and device
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Move the model to the specified device
model.to(device)
def train_model(model, num_epochs, train_loader, val_loader, criterion, optimizer, save_model=False, save_plots=False):
    best_loss = 100
    train_losses = []
    val_batch_loss = []

    for epoch in range(num_epochs):
    
        epoch_loss = 0
        model.train()
        i = 0    
        for imgs, annotations in train_loader:
            i += 1
            imgs = list(img.to(device) for img in imgs)
            annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
    
            loss_dict = model(imgs, annotations)
            losses = sum(loss for loss in loss_dict.values())
            epoch_loss += losses
            optimizer.zero_grad()
            losses.backward()
            optimizer.step()
        train_losses.append(epoch_loss/len(train_loader))
            
            
        val_loss = 0
        with torch.no_grad():
            for val_imgs, val_annotations in val_loader:
                val_imgs = list(img.to(device) for img in val_imgs)
                val_annotations = [{k: v.to(device) for k, v in t.items()} for t in val_annotations]
                val_loss_dict = model(val_imgs, val_annotations)
                val_losses = sum(val_loss for val_loss in val_loss_dict.values())
                val_loss += val_losses

            avg_val_loss = val_loss / len(val_loader)
            val_batch_loss.append(avg_val_loss)
            print(f'Epoch [{epoch + 1}/{num_epochs}], Trainig Loss: {epoch_loss/len(train_loader):.4f}, Validation Loss: {avg_val_loss:.4f}')
            if save_model:
            # Save the best model
                if avg_val_loss < best_loss:
                    best_loss = avg_val_loss
                    torch.save(model.state_dict(), save_path+"best.pth")
                    print("Best model saved.")

            # Save the model every 10 epochs
            if (epoch + 1) % 10 == 0 and save_model:
                torch.save(model.state_dict(), f"model_epoch_{epoch + 1}.pth")
                print(f"Model saved at epoch {epoch + 1}.")
        if save_plots and (epoch + 1) % 10 == 0:
            # Save loss plot
            val_batch_loss_np = np.array([val.cpu().numpy() for val in val_batch_loss])
            train_batch_loss_np = np.array([train.detach().cpu().numpy() for train in train_losses])
            plt.plot(train_batch_loss_np, label='Training Loss')
            plt.plot(val_batch_loss_np, label='Validation Loss')
            plt.xlabel('Epoch')
            plt.ylabel('Loss')
            plt.legend()
            plt.savefig(f'loss_plot_epoch_{epoch + 1}.png')
            plt.close()
                
                # Save training progress to a text file
            with open("training_log.txt", "a") as f:
                f.write(f'Epoch [{epoch + 1}/{num_epochs}], Total Loss: {epoch_loss:.4f}, Validation Loss: {avg_val_loss:.4f}\n')

train_model(model, 250, train_loader, val_loader, criterion, optimizer, True, True)

Epoch [1/250], Trainig Loss: 1.0686, Validation Loss: 0.2544
Best model saved.
Epoch [2/250], Trainig Loss: 0.2452, Validation Loss: 0.2330
Best model saved.
Epoch [3/250], Trainig Loss: 0.2315, Validation Loss: 0.2205
Best model saved.
Epoch [4/250], Trainig Loss: 0.2165, Validation Loss: 0.2113
Best model saved.
Epoch [5/250], Trainig Loss: 0.2116, Validation Loss: 0.2236
Epoch [6/250], Trainig Loss: 0.2091, Validation Loss: 0.1919
Best model saved.
Epoch [7/250], Trainig Loss: 0.2075, Validation Loss: 0.2078
Epoch [8/250], Trainig Loss: 0.2065, Validation Loss: 0.1883
Best model saved.
Epoch [9/250], Trainig Loss: 0.2059, Validation Loss: 0.1919
Epoch [10/250], Trainig Loss: 0.2037, Validation Loss: 0.2182
Model saved at epoch 10.
Epoch [11/250], Trainig Loss: 0.2013, Validation Loss: 0.2004
Epoch [12/250], Trainig Loss: 0.2013, Validation Loss: 0.1854
Best model saved.
Epoch [13/250], Trainig Loss: 0.2174, Validation Loss: 0.2623
Epoch [14/250], Trainig Loss: 98390.6094, Validation

In [10]:
from torchvision import transforms as torchtrans  

def apply_nms(orig_prediction, iou_thresh=0.5):
    
    # torchvision returns the indices of the bboxes to keep
    keep = torchvision.ops.nms(orig_prediction['boxes'], orig_prediction['scores'], iou_thresh)
    
    final_prediction = orig_prediction
    final_prediction['boxes'] = final_prediction['boxes'][keep]
    final_prediction['scores'] = final_prediction['scores'][keep]
    final_prediction['labels'] = final_prediction['labels'][keep]
    
    return final_prediction

def get_good_pred(orig_prediction, t=0.3):
    final_pred = apply_nms(orig_prediction)
    good_pred =[]
    if len(final_pred) == 0 :
        return []
    for i in range(len(final_pred["boxes"])):
        if final_pred["scores"][i] > t:
             good_pred.append({
                "boxes": final_pred["boxes"][i],
                "scores": final_pred["scores"][i],
                "labels": final_pred["labels"][i]
            })
    return good_pred

In [1]:
import matplotlib.pyplot as plt
import numpy as np
from torchvision.utils import draw_bounding_boxes, draw_segmentation_masks
i = 0
model_finetuned = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=False)
num_classes = 5  # Assuming the model was trained on COCO dataset
# Replace the classifier with a new one, with the appropriate number of classes
in_features = model_finetuned.roi_heads.box_predictor.cls_score.in_features
model_finetuned.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

# Load the state dictionary
state_dict = torch.load("modelbest.pth")

# Load the state dictionary into the model
model_finetuned.load_state_dict(state_dict)
model_finetuned.to(device)
model_finetuned.eval()


for i in range(100):
    image = read_image(f"train/images/Norway_000{501+i}.jpg")
    image_pil = Image.open(f"train/images/Norway_000{501+i}.jpg")
    eval_transform = get_transform(train=False)
    # Convert PIL image to a NumPy array
    image_np = np.array(image_pil)
    with torch.no_grad():
        x = eval_transform(image)
        # convert RGBA -> RGB and move to device
        x = x[:3, ...].to(device)
        predictions = model_finetuned([x, ])
        
        pred = predictions[0]
    
    if len(pred["boxes"] > 0):
        image = (255.0 * (image - image.min()) / (image.max() - image.min())).to(torch.uint8)
        image = image[:3, ...]
        pred_labels = [f"{label}: {score:.3f}" for label, score in zip(pred["labels"], pred["scores"])]
        pred_boxes = pred["boxes"]
        filtered_labels = []
        filtered_boxes = []
        filtered_scores = []
        for label, score, box in zip(pred["labels"], pred["scores"], pred["boxes"]):
            if score >= 0.3:
                filtered_labels.append(f"{label}: {score:.3f}")
                filtered_boxes.append(box.unsqueeze(0))
                filtered_scores.append(score)
        if (len(filtered_boxes) > 0):
            filtered_boxes = torch.cat(filtered_boxes, dim=0)
            filtered_scores = [tensor.unsqueeze(0) for tensor in filtered_scores]
            filtered_scores = torch.cat(filtered_scores, dim=0)

            keep = torchvision.ops.nms(filtered_boxes, filtered_scores.to(device), iou_threshold=0.2)
            boxes_to_plot = torch.cat([filtered_boxes[i].unsqueeze(0) for i in keep], dim=0)
            labels_to_plot = [filtered_labels[i] for i in keep]
            output_image = draw_bounding_boxes(image, boxes_to_plot, labels_to_plot, colors="red")




            plt.figure(figsize=(12, 12))
            plt.imshow(output_image.permute(1, 2, 0))

NameError: name 'torchvision' is not defined