Imports and Setup

In [1]:
# Imports
import torch
import torch.nn as nn  # All neural network modules, nn.Linear, nn.Conv2d, BatchNorm, Loss functions
import torch.nn.functional as F
import torch.optim as optim  # For all Optimization algorithms, SGD, Adam, etc.
import torchvision.transforms as transforms  # Transformations we can perform on our dataset
import os
import cv2
import pandas as pd
from skimage import io
from torch.optim.lr_scheduler import StepLR
import numpy as np
import random

# import scripts
from scripts.saveResults import  *
from torch.utils.data import (
    Dataset,
    DataLoader,
)  # Gives easier dataset managment and creates mini batches

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
seed = 42
np.random.seed(seed)
random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)

# When running on the CuDNN backend, two further options must be set
# This ensures Reproducibility
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

if torch.cuda.is_available():
    device = torch.device("cuda") 
else:
    device = torch.device("cpu")

print(device)

cuda


Dataset 

In [None]:

class SolarRadiance(Dataset):
    def __init__(self, root_dir, labels, transform):
        self.root_dir = root_dir
        self.labels = labels
        self.transform = transform
        # self.data = self.load_dataset()

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        img_path = self.labels.iloc[index,0]
        target = self.labels.iloc[index,1]

        ''' While using the traditional upscaling methods
            the following image processing codes are applied.
            The upscaling itself is perfromed here'''
        
        # Load the image
        image = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)

        # Resize the image
        # fx and fy values determine the scaling ratio, 4 means 4x upscaling is applied
        # interpolation methods can be cv2.INTER_NEAREST, cv2.INTER_LINEAR, cv2.INTER_CUBIC
        image = cv2.resize(image, None, fx = 4, fy = 4, interpolation = cv2.INTER_NEAREST)

        # Normalize the image
        image = cv2.normalize(image, None, 0, 255, cv2.NORM_MINMAX)



        ''' While using the deep learning based upscaling methods
            the following image processing code is applied.
            The images are only read and normalized '''
        
        # Load the image
        image = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        # Normalize the image
        image = cv2.normalize(image, None, 0, 255, cv2.NORM_MINMAX)


        if self.transform:
             image = self.transform(image)

        y_label = torch.tensor(target)

        return image, y_label

In [None]:
def load_dataset(root_dir):
        ds = pd.DataFrame()
        dates = os.listdir(root_dir)

        try:
            for date in dates:
                infrared_folder = os.path.join(root_dir, date, "infrared")
                pyranometer_folder = os.path.join(root_dir, date, "pyranometer")
                csv_path = os.path.join(pyranometer_folder, "{date}.csv".format(date=date))
                if not os.path.exists(csv_path):
                    print("Skipping date {date} because it does not have both infrared and pyranometer folders".format(date=date))
                    continue
                ds_temp = getDs(infrared_folder, csv_path)
                
                # append the dataframe in the final dataframe
                # ds = ds.append(ds_temp)
                ds = pd.concat([ds, ds_temp], ignore_index=True)

                ds['name'] = ds['name'].apply(lambda img: os.path.join(root_dir, date, 'infrared', img))
        except Exception as e:
            print(e)
        return ds

def getDs(path, labels):
    pyranometer = pd.read_csv(labels)
    images = os.listdir(path)
    
   
    #convert column 1 to int
    X = pyranometer.iloc[:,0].astype(int)

    #convert to image names
    pyranometer.iloc[:,0] = X.apply(lambda x: str(x) + 'IR.png')
    
    # Filter pyranometer DataFrame based on the 'x' column

    filtered_pyranometer = pyranometer[pyranometer.iloc[:,0].isin(images)]
    # Display the result
    filtered_pyranometer.columns = ['name', 'value']

    filtered_pyranometer = filtered_pyranometer.drop_duplicates(subset='name')
    return filtered_pyranometer


Load Data

In [5]:
in_channel = 1
batch_size = 256
num_epochs = 100
loss = 1 # if loss = 0 the model will be trained with RMSE loss and vice versa
lr=0.01 # learning rate
result_dir = 'results'

In [6]:
result_dir 

'results'

In [None]:
train_dir = 'C:/Users/yeara/OneDrive/Desktop/IR Upscaling/datasets/GIRASOL Dataset Extracted/train/'
train_data  = load_dataset(train_dir)
train_set = SolarRadiance(root_dir= train_dir, labels=train_data, transform = transforms.Compose([transforms.ToTensor()]))
train_loader = DataLoader(dataset=train_set, batch_size=batch_size, shuffle=True)

val_dir = 'C:/Users/yeara/OneDrive/Desktop/IR Upscaling/datasets/GIRASOL Dataset Extracted/val/'
val_data  = load_dataset(val_dir)
val_set = SolarRadiance(root_dir= val_dir, labels=val_data, transform = transforms.Compose([transforms.ToTensor()]))
val_loader = DataLoader(dataset=val_set, batch_size=batch_size, shuffle=False)

In [8]:
print(len(train_loader.dataset))
print(len(val_loader.dataset))

46527
11574


Model

In [10]:

class CNNRegression(nn.Module):
    def __init__(self, num_channels=1):
        super(CNNRegression, self).__init__()
        self.conv1 = nn.Conv2d(num_channels, 16, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.relu3 = nn.ReLU()
        self.conv4 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.relu4 = nn.ReLU()
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(128, 1)  # Output layer for regression

    def forward(self, x):
        x = self.relu1(self.conv1(x))
        x = self.relu2(self.conv2(x))
        x = self.relu3(self.conv3(x))
        x = self.relu4(self.conv4(x))
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)  # Flatten the tensor
        x = self.fc(x)
        return x
    
model = CNNRegression()
model.to(device)

CNNRegression(
  (conv1): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (relu1): ReLU()
  (conv2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (relu2): ReLU()
  (conv3): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (relu3): ReLU()
  (conv4): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
  (relu4): ReLU()
  (global_avg_pool): AdaptiveAvgPool2d(output_size=(1, 1))
  (fc): Linear(in_features=128, out_features=1, bias=True)
)

Loss and Optimizers

In [12]:
if loss == 0:
    # create a function (this my favorite choice)
    def RMSELoss(yhat,y):
        return torch.sqrt(torch.mean((yhat-y)**2))
    criterion = RMSELoss
else:
    # Define the model, loss function, and optimizer
    criterion = nn.MSELoss()
    

optimizer = torch.optim.Adam(model.parameters(), lr=lr)
# Create a StepLR scheduler
scheduler = StepLR(optimizer, step_size=1, gamma=0.5)

# set initial loswest_loss to an infinite number
lowest_loss = float('inf')


Train and Save

In [None]:
losses = []

from tqdm import tqdm
# Train the model
for epoch in range(num_epochs):
    scheduler.step()
    print('Epoch ',epoch)
    for i, (inputs, targets) in enumerate(tqdm(train_loader)):
        # Zero the gradients
        optimizer.zero_grad()
        inputs = inputs.to(device)
        targets = targets.to(device)
        # Forward pass
        outputs = model(inputs)

        loss = criterion(outputs[:,0], targets.float())

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    # Evaluate the model on the test data
    with torch.no_grad():
        total_loss = 0
        for inputs, targets in val_loader:
            inputs = inputs.to(device)
            targets = targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs[:,0], targets.float())
            total_loss += loss.item()
        mean_loss = total_loss / len(val_loader)
        print(f'val Loss: {mean_loss:.4f}')
        losses.append(mean_loss)
        if  mean_loss<lowest_loss:
            print('mean_loss: '+str(mean_loss)+' lowest_loss: '+str(lowest_loss))
            lowest_loss = mean_loss
            torch.save(model.state_dict(), os.path.join(result_dir,'model-save.pth'))
            print('---saved a model due to lower loss---')
            