In [7]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchmetrics

from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import torch.utils.data as data
from PIL import Image
import torch.nn.functional as F

In [4]:
if torch.cuda.is_available():
    # Get the number of available GPUs
    num_gpus = torch.cuda.device_count()
    print(f"GPU is available with {num_gpus} device(s).")
    
    # Get the name of the current GPU
    current_gpu = torch.cuda.get_device_name(0)  # Assuming the first GPU is used
    print(f"Current GPU: {current_gpu}")
else:
    print("GPU is not available. Using CPU.")

GPU is available with 1 device(s).
Current GPU: NVIDIA GeForce GTX 1660


In [5]:
if torch.cuda.is_available():
    device = torch.device("cuda")  # Use the first available GPU
else:
    device = torch.device("cpu")  # If no GPU is available, use CPU

In [17]:
current_directory = os.getcwd()
models_dir = current_directory + '/models'
os.makedirs(models_dir, exist_ok=True)
df = pd.read_csv('filtered_data.csv')
images_dir = '/run/media/victor/victor/cv_project/SolarPanelSoilingImageDataset/Solar_Panel_Soiling_Image_dataset/PanelImages'

In [8]:
train_df, test_df = train_test_split(df, test_size=0.2)

In [23]:
class SolarPanelDataset(data.Dataset):
    #data_dir: The directory containing the image files
    #image_shape: The shape of the images (height, width, channels)
    #augmentation_copies: The number of copies of each image to create for data augmentation
    def __init__(self, df, data_dir, image_shape=(3, 192, 192), augmentation_copies=1):
        self.copies = 1 + augmentation_copies
        self.image_shape = image_shape
        self.df = df #df: A pandas DataFrame containing the image filenames and labels
        self.labels = []
        self.transform = transforms.Compose([
            transforms.Resize((self.image_shape[1], self.image_shape[2])),
            transforms.ToTensor(),
        ])

        self.augmentation_transform = self.make_augmentation_transform()
        self.dataset_size = len(df)
        self.images = torch.zeros((self.dataset_size, *self.image_shape), dtype=torch.float32)

        for i, img_name in enumerate(self.df['original_title']):
            img_path = os.path.join(data_dir, img_name)
            image = Image.open(img_path).convert("RGB")
            if self.transform:
                image = self.transform(image)
            self.images[i] = image
            label = df.iloc[i]['loss_percentage']
            self.labels.append(label)  # Add corresponding label here

    def __len__(self):
        return len(self.images)*self.copies

    def make_augmentation_transform(self):
        return transforms.Compose([
            transforms.RandomHorizontalFlip(p=0.2),
            transforms.RandomVerticalFlip(p=0.2),
            transforms.RandomRotation(degrees=20),
            transforms.ColorJitter(brightness=(0.8, 1.2), contrast=(0.8, 1.2), saturation=(0.8, 1.2), hue=(-0.1, 0.1)),
            transforms.GaussianBlur(kernel_size=3),  # You can adjust the kernel size
            transforms.RandomApply([transforms.Lambda(lambda x: x + 0.01 * torch.randn_like(x))], p=0.2),
        ])
        
    def __getitem__(self, idx):
        if (idx >= len(self.images) ):
            image = self.images[idx % len(self.images)]
            label = self.labels[idx % len(self.labels)]
            return self.augmentation_transform(image), torch.tensor(label, dtype=torch.float32)
            
            
        image = self.images[idx]
        label = self.labels[idx]
        return image, torch.tensor(label, dtype=torch.float32)

In [27]:
class Regression_CNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(32400, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 1)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


In [58]:
class CNN_2(nn.Module):
    def __init__(self):
        super(RegressionCNN, self).__init__()

        # Convolutional layers
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)

        # Batch normalization for improved training stability
        self.batch_norm1 = nn.BatchNorm2d(64)
        self.batch_norm2 = nn.BatchNorm2d(128)
        self.batch_norm3 = nn.BatchNorm2d(256)

        # Max pooling layer
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)

        # Dropout for regularization
        self.dropout = nn.Dropout(0.5)

        # Fully connected layers
        self.fc1 = nn.Linear(256 * 48 * 48, 512)
        self.fc2 = nn.Linear(512, 1)

    def forward(self, x):
        x = self.pool(F.leaky_relu(self.batch_norm1(self.conv1(x))))
        x = self.pool(F.leaky_relu(self.batch_norm2(self.conv2(x))))
        x = self.pool(F.leaky_relu(self.batch_norm3(self.conv3(x))))

        x = x.view(-1, 256 * 48 * 48)

        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)

        return x

In [59]:

# Function to save the model
def saveModel(model:nn.Module, path):
    torch.save(model.state_dict(), path)

# Training function. We simply have to loop over our data iterator and feed the inputs to the network and optimize.
def train(model: torch.nn.Module,
          dataloader: torch.utils.data.DataLoader,
          loss_fn: torch.nn.Module,
          optimizer: torch.optim.Optimizer,
          metric: torchmetrics.Metric,
          device: torch.device,
          num_epochs,
          path_model,
          report: pd.DataFrame,
          verbatim):
    model.train()
    model.to(device)
    metric.to(device)
    best_accuracy = 0.0
    best_loss = 0.0
    best_epoch = 0
    metric_algorithm = metric.__class__.__name__
    loss_algorithm = loss_fn.__class__.__name__
    best_metric = 100
    for epoch in range(num_epochs):  # loop over the dataset multiple times

        train_acc = 0.0
        train_loss = 0.0
        for ibatch, (images, labels) in enumerate(dataloader, 0):
            images = images.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            y_pred = model(images)
            y_pred = y_pred.squeeze(dim=1) # Output is [Batch size, 1], but we want [Batch size]
            loss = loss_fn(y_pred, labels)
            train_loss += loss.item()

            metric(y_pred, labels)
            loss.backward()
            optimizer.step()

            print(loss)
            # Calculate and accumulate accuracy metric across all batches
            #y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
            #train_acc += (y_pred_class == labels).sum().item()/len(y_pred)

        # Adjust metrics to get average loss and accuracy per batch
        train_loss = train_loss / len(dataloader)

        # we want to save the model if the accuracy is the best

        path = "./myModel_" +str(epoch)+ ".pth"
        #saveModel(model, path = path)

        batch_metric = metric.compute()
        report_line = {'epoch': epoch,
                       'batch_loss': train_loss,
                       'metric_algorithm': metric_algorithm,
                       'batch_metric': batch_metric.item(),
                       'loss_algorithm': loss_algorithm
                      }
        report = pd.concat([report, pd.DataFrame([report_line])], ignore_index=True)

        if batch_metric < best_metric:
            path = str(path_model + '/bestModel.pth')
            saveModel(model, path = path)
            best_loss = train_loss
            best_metric = batch_metric
            best_epoch = epoch
            if verbatim:
              print('Best Epoch #', epoch,' Loss=', best_loss, " Accu=", best_accuracy )

    return best_loss, best_accuracy, best_epoch, report

In [60]:
model = Regression_CNN()
metric = torchmetrics.MeanAbsolutePercentageError()
dataset = SolarPanelDataset(train_df.sample(1000), images_dir)
train_dataloader = torch.utils.data.DataLoader(dataset, batch_size=64, shuffle=True)
loss_fn = torch.nn.MSELoss().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.005, momentum=0.9)
num_epochs = 10
path_model = models_dir + f'/{model.__class__.__name__}_{loss_fn.__class__.__name__}'
os.makedirs(path_model, exist_ok=True)
report = pd.DataFrame()
_, _, _, report = train(model, train_dataloader, loss_fn, optimizer, metric, device, num_epochs, path_model, report, verbatim=True)

tensor(0.1987, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.1058, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.1087, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.1263, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.1268, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0741, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.1042, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0934, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.1184, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.1150, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0913, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0759, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0637, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0718, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0924, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.0861, device='cuda:0', grad_fn=<MseLossBackward0>)
tensor(0.1056, device='cuda:0', grad_fn=

In [61]:
report

Unnamed: 0,epoch,batch_loss,metric_algorithm,batch_metric,loss_algorithm
0,0,0.091464,MeanAbsolutePercentageError,117.332161,MSELoss
1,1,0.072445,MeanAbsolutePercentageError,165.558563,MSELoss
2,2,0.065278,MeanAbsolutePercentageError,176.601837,MSELoss
3,3,0.056931,MeanAbsolutePercentageError,171.393692,MSELoss
4,4,0.049591,MeanAbsolutePercentageError,157.379044,MSELoss
5,5,0.045264,MeanAbsolutePercentageError,151.991791,MSELoss
6,6,0.041277,MeanAbsolutePercentageError,152.175613,MSELoss
7,7,0.035795,MeanAbsolutePercentageError,146.323624,MSELoss
8,8,0.0327,MeanAbsolutePercentageError,144.19928,MSELoss
9,9,0.029662,MeanAbsolutePercentageError,145.461533,MSELoss


In [55]:
# Function to test the model with the test dataset and print the accuracy for the test images
def test(model: torch.nn.Module,
         dataloader: torch.utils.data.DataLoader,
         loss_fn: torch.nn.Module,
         metric: torchmetrics.Metric,
         device: torch.device,
         verbatim = True):

    model.eval()
    metric = metric.to(device)

    # Setup test loss and test accuracy values
    test_loss = 0
    test_metric = 100
    pred_labels = []
    label_list = []
    with torch.no_grad():
        for data in dataloader:
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)

            y_pred = model(images)
            y_pred = y_pred.squeeze(dim=1) # Output is [Batch size, 1], but we want [Batch size]
            pred_labels+=y_pred.tolist()
            label_list += labels.tolist()
            
            loss = loss_fn(y_pred, labels)
            test_loss += loss.item()
            metric.update(y_pred, labels)

    eval_df = pd.DataFrame({'labels': label_list, 'predictions': pred_labels})
    # Adjust metrics to get average loss and accuracy per batch
    test_loss = test_loss / len(dataloader)
    test_metric = metric.compute()

    if verbatim:
      print("Loss =", test_loss, f'  Metric ({metric.__class__.__name__})=', test_metric.item())
    return pred_labels, test_loss, test_metric, eval_df

In [56]:
metric = torchmetrics.MeanAbsolutePercentageError()
test_dataset = SolarPanelDataset(test_df.sample(200), images_dir)
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=True)
_, _, _, eval_df = test(model, test_dataloader, loss_fn, metric, device, verbatim=True)

Loss = 0.044260744292002455   Metric (MeanAbsolutePercentageError)= 523.5400390625


In [57]:
eval_df

Unnamed: 0,labels,predictions
0,0.770407,0.164719
1,0.055255,0.137564
2,0.040302,0.114739
3,0.397067,0.128074
4,0.016211,0.116730
...,...,...
395,0.572883,0.129467
396,0.638548,0.635345
397,0.145178,0.120767
398,0.075883,0.170131


In [33]:

def tensor_to_image(tensor):

    """
    Converts a tensor to an image.

    Args:
        tensor: A tensor of shape (C, H, W).

    Returns:
        An image.
    """

    image = tensor.cpu().detach().numpy()
    image = image.transpose(1, 2, 0)  # Convert from (C, H, W) to (H, W, C)
    image = (image * 255).astype('uint8')  # Convert pixel values from [0, 1] to [0, 255]

    return image
