In [14]:
import os
import shutil

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import numpy as np

In [8]:
%load_ext autoreload
%autoreload 2

---
### 1. Prepare Data

In the Yolov8 models, images and labels are required to be in distinct folders. However, when constructing a model from scratch with PyTorch, the label data must reside in the same directory as the images. Therefore, we establish a new directory named `img_with_panels_and_labels` and duplicate the images and label data into this location.

In [17]:
dataset_name = "kasmi_solar"

In [25]:
# Define the main directories
main_dirs = ["train", "test", "val"]

for main_dir in main_dirs:
    # Define the source directories
    img_dir = f"datasets/{dataset_name}/{main_dir}/img_with_panels"
    label_dir = f"datasets/{dataset_name}/{main_dir}/labels_segmentation/normalized"

    # Define the destination directory
    dest_dir = f"datasets/{dataset_name}/{main_dir}/img_with_label/solar_panel"

    # Create the destination directory if it doesn't exist
    os.makedirs(dest_dir, exist_ok=True)

    # Copy all files from the image and label directories to the destination directory
    for filename in os.listdir(img_dir):
        shutil.copy(os.path.join(img_dir, filename), dest_dir)

    for filename in os.listdir(label_dir):
        shutil.copy(os.path.join(label_dir, filename), dest_dir)

In [26]:
# function to count files in directory because we will use that often to doublecheck
def count_files_in_directory(directory):
    return len(
        [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
    )

In [29]:
# Define the directories
dirs = ["train", "test", "val"]

for dir in dirs:
    for root, subdirs, files in os.walk(
        f"datasets/{dataset_name}/{dir}/img_with_label"
    ):
        for subdir in subdirs:
            subdir_path = os.path.join(root, subdir)
            num_files = sum([len(files) for r, d, files in os.walk(subdir_path)])
            print(
                f"Number of files in {subdir_path}: {num_files}. That means {num_files / 2} images and {num_files / 2} labels."
            )

Number of files in datasets/kasmi_solar/train/img_with_label/solar_panel: 18624. That means 9312.0 images and 9312.0 labels.
Number of files in datasets/kasmi_solar/test/img_with_label/solar_panel: 3990. That means 1995.0 images and 1995.0 labels.
Number of files in datasets/kasmi_solar/val/img_with_label/solar_panel: 3992. That means 1996.0 images and 1996.0 labels.


The images and labels for the train, test, and validation datasets are now consolidated into their respective folders.

---
### 2. Building the Model

In [30]:
# Define the segmentation model
class SegmentationModel(nn.Module):
    def __init__(self):
        super().__init__()

        # Define the encoder-decoder architecture
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, stride=2),
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2, stride=2),
            nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(2, stride=2),
        )

        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(
                512, 256, kernel_size=3, stride=2, padding=1, bias=False
            ),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.ConvTranspose2d(
                256, 128, kernel_size=3, stride=2, padding=1, bias=False
            ),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 1, kernel_size=3, stride=1, padding=1),
            nn.Sigmoid(),
        )

    def forward(self, x):
        features = self.encoder(x)
        outputs = self.decoder(features)
        return outputs


# Define the data loading and preprocessing pipeline
def label_to_polygons(image, label):
    # Split the label string into segments
    segments = label.split(" ")

    # Extract class information (should be 0 for solar panel)
    class_id = int(segments[0])

    # Extract polygon coordinates
    coordinates = [float(value) for value in segments[1:]]

    # Convert polygon coordinates to shape (n_points, 2)
    polygon = np.array(coordinates).reshape(-1, 2)

    # Convert polygon coordinates to normalized coordinates within the image
    polygon_normalized = polygon / torch.tensor([image.size, image.size])

    return class_id, polygon_normalized


def pil_to_tensor(pil_image):
    image = np.array(pil_image)
    image = torch.from_numpy(image)
    image = image.float()
    image = image.permute((2, 0, 1))
    return image


def get_dataloader(data_dir, transform, batch_size=16, shuffle=True):
    dataset = torchvision.datasets.ImageFolder(data_dir, transform=transform)
    dataloader = torch.utils.data.DataLoader(
        dataset, batch_size=batch_size, shuffle=shuffle
    )
    return dataloader


# Define the training loop
def train(model, criterion, optimizer, train_loader, val_loader):
    for epoch in range(num_epochs):
        train_loss = 0.0
        val_loss = 0.0

---
### 3. Load and Transform the Data

The transformations you use can depend on the specific requirements of your task. Here are some common transformations you might consider:

1. `Resize`: Resizes the input image to the given size.
2. `ToTensor`: Converts a PIL Image or numpy.ndarray to a tensor.
3. `Normalize`: Normalizes a tensor image with mean and standard deviation.
4. `RandomHorizontalFlip`: Horizontally flip the given image randomly with a given probability.
5. `RandomVerticalFlip`: Vertically flip the given image randomly with a given probability.
6. `RandomRotation`: Rotates the image by a random angle.
7. `ColorJitter`: Randomly change the brightness, contrast, and saturation of an image.

You can find the full list in the [torchvision.transforms documentation](https://pytorch.org/vision/stable/transforms.html).

In [35]:
# Define the transformations
transform = torchvision.transforms.Compose(
    [
        torchvision.transforms.Resize((224, 224)),
        torchvision.transforms.RandomHorizontalFlip(),
        torchvision.transforms.RandomRotation(10),
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Normalize(
            mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]
        ),
    ]
)

# Get the train_loader and val_loader
train_loader = get_dataloader(
    f"datasets/{dataset_name}/train/img_with_label", transform=transform
)
val_loader = get_dataloader(
    f"datasets/{dataset_name}/val/img_with_label", transform=transform
)

---
### 4. Train the Model

If you use a Mac computer you can verify mps support using a simple Python script:

In [34]:
if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print(x)
else:
    print("MPS device not found.")

tensor([1.], device='mps:0')


The output should show:
```python
tensor([1.], device='mps:0')
```

In [39]:
# Check that MPS is available
if not torch.backends.mps.is_available():
    if not torch.backends.mps.is_built():
        print(
            "MPS not available because the current PyTorch install was not "
            "built with MPS enabled."
        )
    else:
        print(
            "MPS not available because the current MacOS version is not 12.3+ "
            "and/or you do not have an MPS-enabled device on this machine."
        )

else:
    mps_device = torch.device("mps")

In [51]:
import torch

model = SegmentationModel()
# device = torch.device("mps")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# Number of epochs
num_epochs = 10

# Loop over the epochs
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode

    # Training loop
    for i, (inputs, labels) in enumerate(train_loader):
        inputs = inputs.to(device).float()  # Convert inputs to torch.FloatTensor
        labels = labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}")

    model.eval()  # Set the model to evaluation mode

    # Validation loop
    with torch.no_grad():
        correct = 0
        total = 0
        for inputs, labels in val_loader:
            inputs = inputs.to(device).float()  # Convert inputs to torch.FloatTensor
            labels = labels.to(device)

            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        print(f"Validation accuracy: {100 * correct / total}%")

ValueError: Using a target size (torch.Size([32])) that is different to the input size (torch.Size([32, 1, 105, 105])) is deprecated. Please ensure they have the same size.