## create a r-cnn model for object detection and train it with the Aquarium datase

### https://public.roboflow.com/object-detection/aquarium/2#

### https://chatgpt.com/c/5f97037a-3b6c-4521-bc2f-8070b6ee8b50

## "Aquarium.coco"

In [None]:
# !pip install roboflow

from roboflow import Roboflow
rf = Roboflow(api_key="fqSAzeINBX9oPfwLixDj")
project = rf.workspace("brad-dwyer").project("aquarium")
version = project.version(2)
dataset = version.download("coco")

In [1]:
import torch
import torchvision
from torch import nn
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as T
import numpy as np
import os
import json
from PIL import Image

In [14]:
from torchvision import transforms

class RoboFlowDataset(Dataset):
    def __init__(self, root, transforms=None):
        self.root = root
        self.transforms = transforms
        self.imgs = list(sorted(os.listdir(os.path.join(root, "train"))))
        self.annotations = json.load(open(os.path.join(root, "train/_annotations.coco.json")))
        
    def __getitem__(self, idx):
        img_path = os.path.join(self.root, "train", self.imgs[idx])
        img = Image.open(img_path).convert("RGB")
        
        ann = [a for a in self.annotations['annotations'] if a['image_id'] == idx]
        boxes = [a['bbox'] for a in ann]
        labels = [a['category_id'] for a in ann]
        
        # Convert to tensors
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        
        if self.transforms is not None:
            img = self.transforms(img)
        
        return img, target
    
    def __len__(self):
        return len(self.imgs)

def get_transform(train):    
    if train:
        return transforms.Compose([
            transforms.Resize((512, 512)),  # Resize to 512x512
            transforms.RandomHorizontalFlip(),  # Random horizontal flip for augmentation
            transforms.ToTensor(),  # Convert image to PyTorch tensor
            transforms.Normalize(mean=[0.485, 0.456, 0.406],  # Normalize to the same mean and std used by pretrained models
                                 std=[0.229, 0.224, 0.225]),
        ])
    else:
        return transforms.Compose([
            transforms.Resize((512, 512)),  # Resize to 512x512
            transforms.ToTensor(),  # Convert image to PyTorch tensor
            transforms.Normalize(mean=[0.485, 0.456, 0.406],  # Normalize to the same mean and std used by pretrained models
                                 std=[0.229, 0.224, 0.225]),
        ])


#def get_transform(train):
#    transforms = []
#    transforms.append(T.ToTensor())
#    if train:
#        transforms.append(T.RandomHorizontalFlip(0.5))
#    return T.Compose(transforms)

In [3]:
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=3, stride=2, padding=1),  # [16, H/2, W/2]
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),  # [16, H/4, W/4]
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),  # [32, H/8, W/8]
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),  # [32, H/16, W/16]
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),  # [64, H/32, W/32]
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),  # [64, H/64, W/64]
        )
        self.out_channels = 64

    def forward(self, x):
        return self.features(x)

In [4]:
class DetectionHead(nn.Module):
    def __init__(self, in_channels, num_classes):
        super(DetectionHead, self).__init__()
        self.num_classes = num_classes
        self.cls_head = nn.Sequential(
            nn.Linear(in_channels * 7 * 7, 1024),
            nn.ReLU(),
            nn.Linear(1024, num_classes)
        )
        self.reg_head = nn.Sequential(
            nn.Linear(in_channels * 7 * 7, 1024),
            nn.ReLU(),
            nn.Linear(1024, 4)  # 4 for bounding box regression
        )

    def forward(self, x):
        x = x.flatten(start_dim=1)
        cls_logits = self.cls_head(x)
        bbox_reg = self.reg_head(x)
        return cls_logits, bbox_reg


In [30]:
class SimpleRCNN(nn.Module):
    def __init__(self, backbone, num_classes):
        super(SimpleRCNN, self).__init__()
        self.backbone = backbone
        self.head = DetectionHead(backbone.out_channels, num_classes)

    def forward(self, images, targets=None):
        features = self.backbone(images)
        pooled_features = nn.functional.adaptive_max_pool2d(features, (7, 7))

        cls_logits, bbox_reg = self.head(pooled_features)
        
        if self.training:
            # targets is a list of dictionaries            
            print(f"cls_logits: {cls_logits} - shape: {cls_logits.shape}")
            for target in targets:
                print(f"lables: {target['labels']} - shape: {target['labels'].shape}")
            
            # Compute losses
            loss_cls = sum(nn.CrossEntropyLoss()(cls_logits[i], target["labels"]) for i, target in enumerate(targets))
            loss_bbox = sum(nn.MSELoss()(bbox_reg[i], target["boxes"]) for i, target in enumerate(targets))
            losses = {"loss_cls": loss_cls, "loss_bbox": loss_bbox}
            return losses            

        return cls_logits, bbox_reg

In [31]:
# Anzahl der Klassen im Roboflow-Dataset (einschließlich Hintergrund)
num_classes = len(json.load(open('Aquarium.coco/train/_annotations.coco.json'))['categories']) + 1

dataset = RoboFlowDataset('Aquarium.coco', get_transform(train=True))
dataset_test = RoboFlowDataset('Aquarium.coco', get_transform(train=False))

data_loader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=lambda x: tuple(zip(*x))) #, num_workers=4, collate_fn=lambda x: tuple(zip(*x)))
data_loader_test = DataLoader(dataset_test, batch_size=2, shuffle=False, collate_fn=lambda x: tuple(zip(*x))) #, num_workers=4, collate_fn=lambda x: tuple(zip(*x)))

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(device)

#for images, targets in data_loader:
#    print(images)
#    print(targets)

cuda


In [32]:
backbone = SimpleCNN()
model = SimpleRCNN(backbone, num_classes)
model.to(device)

# Optimierer definieren
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Training
num_epochs = 2
print(f"Running for {num_epochs} epochs")

for epoch in range(num_epochs):
    model.train()
    print("Started Training")
    counter = 0
    for images, targets in data_loader:        
        images = torch.stack(images).to(device)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        print(f"Run: {counter} - After targets")
        
        losses = model(images, targets)        
        loss = sum(loss for loss in losses.values())
        print(f"Run: {counter} - After loss")
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        #if counter % 100 == 0:
        print(f"Epoch: {epoch} - Run: {counter}")
        counter += counter
    
    # Lernrate anpassen
    lr_scheduler.step()
    
    print(f"Epoch: {epoch}, Loss: {loss.item()}")


Running for 2 epochs
Started Training
Run: 0 - After targets
cls_logits: tensor([[ 0.0302, -0.0080, -0.0271,  0.0696, -0.0113, -0.0542, -0.0287,  0.0647,
          0.0115],
        [ 0.0176, -0.0194, -0.0162,  0.0824,  0.0014, -0.0327, -0.0335,  0.0325,
         -0.0050]], device='cuda:0', grad_fn=<AddmmBackward0>) - shape: torch.Size([2, 9])
lables: tensor([2, 2, 2, 2, 2, 2, 2, 2, 2, 2], device='cuda:0') - shape: torch.Size([10])
lables: tensor([1, 5, 1, 1, 1, 1, 1, 1, 1], device='cuda:0') - shape: torch.Size([9])


RuntimeError: size mismatch (got input: [9], target: [10])

In [None]:
torch.save(model.state_dict(), "simple_rcnn.pth")

# Um das Modell später zu laden
backbone = SimpleCNN()
model = SimpleRCNN(backbone, num_classes)
model.load_state_dict(torch.load("simple_rcnn.pth"))
model.to(device)

In [None]:
import torch
from torchvision.transforms import functional as F
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from PIL import Image

# Laden Sie das Modell
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

backbone = SimpleCNN()
model = SimpleRCNN(backbone, num_classes)  # num_classes entspricht der Anzahl der Klassen in Ihrem Dataset
model.load_state_dict(torch.load("simple_rcnn.pth"))
model.to(device)
model.eval()


In [None]:
def load_image(image_path):
    image = Image.open(image_path).convert("RGB")
    image_tensor = F.to_tensor(image).unsqueeze(0)  # Fügen Sie eine Batch-Dimension hinzu
    return image, image_tensor

image_path = 'path_to_your_image.jpg'
image, image_tensor = load_image(image_path)
image_tensor = image_tensor.to(device)


In [None]:
with torch.no_grad():
    cls_logits, bbox_reg = model(image_tensor)

# Konvertieren Sie die Ergebnisse in numpy Arrays und bringen Sie sie auf die CPU
cls_logits = cls_logits.cpu().numpy()
bbox_reg = bbox_reg.cpu().numpy()

# Wählen Sie die Klasse mit der höchsten Wahrscheinlichkeit
predicted_class = np.argmax(cls_logits, axis=1)
predicted_boxes = bbox_reg

# Umwandeln der Bounding Box-Koordinaten
predicted_boxes[:, 0] = predicted_boxes[:, 0] * image.width
predicted_boxes[:, 1] = predicted_boxes[:, 1] * image.height
predicted_boxes[:, 2] = predicted_boxes[:, 2] * image.width
predicted_boxes[:, 3] = predicted_boxes[:, 3] * image.height


In [None]:
def plot_results(image, boxes, labels):
    fig, ax = plt.subplots(1)
    ax.imshow(image)
    
    for box, label in zip(boxes, labels):
        # Zeichnen Sie die Bounding Box
        rect = patches.Rectangle((box[0], box[1]), box[2], box[3], linewidth=2, edgecolor='r', facecolor='none')
        ax.add_patch(rect)
        # Textbeschriftung hinzufügen
        plt.text(box[0], box[1], f'Class: {label}', bbox=dict(facecolor='yellow', alpha=0.5))

    plt.show()

plot_results(image, predicted_boxes, predicted_class)