In [13]:
import optuna
import torch
import os
from datetime import datetime
import pandas as pd
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data
#from torchvision import datasets
from torchvision import transforms
import matplotlib.pyplot as plt
from torch.utils.data import random_split, Dataset, DataLoader
from PIL import Image

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [9]:
class GalaxyJungle(Dataset): # sarebbe interessante implementare un rescale/crop
    
    #the init function initializes the directory containing the image,
    #the annotations file,
    #and both transforms
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform


    #returns number of samples in the dataset
    def __len__(self):
        return (self.img_labels).shape[0]

    #loads a sample from the dataset
    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, str(self.img_labels.iloc[idx, 0])) + '.jpg'
        #retrieves the image
        image = Image.open(img_path)
        if not is_rgb: image = image.convert('L')
        #retrieves corresponding label
        label = self.img_labels.iloc[idx, 1:]
        #if possible, transform the image and the label into a tensor.
        if self.transform:
            image = self.transform(image)
        label = torch.tensor(label.values, dtype=torch.float32)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label, self.img_labels.iloc[idx, 0]
    

transfs = transforms.Compose([
    transforms.ToTensor(), #fa già la normalizzazione se l'immagine non è un tensore
    # sarebbe interessante implementare un random crop prima del center crop per decentrare un poco le immagini????
    transforms.RandomHorizontalFlip(), # horizontal flip
    transforms.RandomVerticalFlip(), # vertical flip
    transforms.CenterCrop(324)          #CROP
    ]) #transforms.compose per fare una pipe di transformazioni



DS = GalaxyJungle('../data/training/training_solutions_rev1.csv', '../data/training/', transfs)
training, test = random_split(DS, [.8, .2])
train_loader = DataLoader(training, batch_size=(batch_size_train := 128), shuffle=True, num_workers=8)
test_loader = DataLoader(test, batch_size=(batch_size_test := 128), shuffle=False, num_workers=8)

In [None]:
class GalaxyNet(nn.Module):
    def __init__(self, n_conv_layers, num_filters,num_neurons1, num_neurons2):
        super().__init__()
        self.rgb = 1
        self.input_size = 324
        self.num_labels = 37
        self.loss_dict = { 'batch' : [], 'epoch' : [], 'vbatch' : [], 'vepoch' : []}


        stride = 2
        kernel_size = 3
        kernel_size_pool = 2
        ## convolutional layers
        self.convs = nn.ModuleList([nn.Conv2d(self.rgb, num_filters[0], kernel_size=kernel_size)])
        self.convs.append(nn.BatchNorm2d(num_filters[0]))
        output_size = (self.input_size - kernel_size + stride) // (stride*kernel_size_pool)
        for i in range(1,n_conv_layers):
            self.convs.append(nn.Conv2d(num_filters[i-1], num_filters[i], kernel_size=kernel_size)) # num filters are the number of channel the conv layer otputs.
            self.convs.append(nn.BatchNorm2d(num_filters[i]))
            output_size = (output_size - kernel_size + stride) // (stride*kernel_size_pool) #padding 0, dilation = 1
        self.pool = nn.MaxPool2d(kernel_size=kernel_size_pool)
        #self.convs.append(nn.dropout(p= value)) ## to be added in the future to test claims of BatchnOrm paper
        self.out_feature = num_filters[-1] *output_size * output_size # output size of the last conv layer
        self.fc1 = nn.Linear(self.out_feature, num_neurons1) # fully connected layer
        # dropout here if you want
        self.fc2 = nn.Linear(num_neurons1, num_neurons2)
        #dropout here if u want
        self.fc3 = nn.Linear(num_neurons2, self.num_labels)

    def init_weights(self,  mode, n_conv_layers):
        if mode == 'relu': # perchè kaiming normal e non uniform??1
            for i in range(0, n_conv_layers*2, 2):
                nn.init.kaiming_normal_(self.convs[i].weight, nonlinearity=mode)
                if self.convs[i].bias is not None:
                    nn.init.constant_(self.convs[i].bias,0)
            nn.init.kaiming_normal_(self.fc1.weight, nonlinearity=mode)
            if self.fc1.bias is not None:
                nn.init.constant_(self.fc1.bias,0)
            nn.init.kaiming_normal_(self.fc2.weight, nonlinearity=mode)
            if self.fc2.bias is not None:
                nn.init.constant_(self.fc2.bias,0)
        elif mode == 'leaky_relu':
            for i in range(0, n_conv_layers*2, 2):
                nn.init.kaiming_normal_(self.convs[i].weight, a = 0.01, nonlinearity=mode)
                if self.convs[i].bias is not None:
                    nn.init.constant_(self.convs[i].bias,0)
            nn.init.kaiming_normal_(self.fc1.weight, nonlinearity=mode)
            if self.fc1.bias is not None:
                nn.init.constant_(self.fc1.bias,0)
            nn.init.kaiming_normal_(self.fc2.weight, nonlinearity=mode)
            if self.fc2.bias is not None:
                nn.init.constant_(self.fc2.bias,0) 
        nn.init.xavier_uniform_(self.fc3.weight)
        if self.fc3.bias is not None:
            nn.init.constant_(self.fc3.bias,0)
        return print('weights initialized with {}'.format(mode))        
        

    def forward(self,x, activation):
        for i in range(0, len(self.convs),2):
            x = self.pool(activation(self.convs[i](x)))
            x = self.convs[i+1](x) # batch norm layer
        x = torch.flatten(x,1) # flatten operation -> 1 dimensional
        x = activation(self.fc1(x)) # apply relu al'output dei fully connected
        x = activation(self.fc2(x)) # idem sopra
        x = F.linear(self.fc3(x)) # output di fc3, 37 neuroni -> 37 classi ideally 
        ## MAPPING ALSO HERE

        return x
    
    def set_rgb(self,bool):
        if bool:
            self.rgb = 3
        else:
            self.rgb = 1
        return self.rgb
    
    def log_the_loss(self, item,epoch=False): # per avere una history della loss???
        train = self.__getstate__()['training']
        print(train)
        if epoch and train:
            self.loss_dict['epoch'].append(item) ### get state of the model so you can ditch the validation parameter
        elif not epoch and train:
            self.loss_dict['batch'].append(item)
        elif not train and epoch:
            self.loss_dict['vepoch'].append(item)
        elif not train and not epoch:
            self.loss_dict['vbatch'].append(item)
        return item

            

In [14]:
#n_conv_layers, num_filters,num_neurons1, num_neurons2):

prova = GalaxyNet(2,[3,6,9],50, 45).to(device)

In [15]:
prova.set_rgb(True)

3

## TRAINING + VALIDATION

In [None]:
loss_function = nn.MSELoss()
optimizer = optim.SGD(prova.parameters(), lr=0.001, momentum=0.9)

In [None]:
def one_epoch_train(verbose=False):
    running_loss = 0
    last_loss = 0
    prova.train()
    for i, data in enumerate(train_loader):
        inputs,labels, _ = data
        inputs,labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad(set_to_none=True)
        outputs = prova(inputs, activation=F.relu)
        loss=loss_function(outputs, labels)
        loss.backward()
        optimizer.step() # fa update del parameter
        



    
    