<a href="https://colab.research.google.com/github/visiont3lab/deep-learning-course/blob/main/colab/Regressione.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Importa Libreria

In [1]:
from torch import nn
from torch.utils.data import DataLoader
from torch import optim
import torch
from torch import nn
from torchsummary import summary
#!pip install torchsummary
import torch.nn.functional as F
from torch.utils.data import TensorDataset,Dataset
from torchvision import datasets
from torchvision import transforms
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
import numpy as np
import plotly.graph_objects as go
# Loss function pytorch: https://neptune.ai/blog/pytorch-loss-functions
import copy

import pandas as pd
from datetime import datetime

## Load Data

In [2]:
# Dati Numpy
X = np.linspace(-2,8,1000)
Y = np.exp(0.2*X)*np.sin(3*X) - 10*np.cos(X)

# Normalization
R_mean = np.mean(X)
R_std = np.std(X)

# Dati Pytorch Tensor
Xt = torch.from_numpy(X).type(torch.float32).reshape(-1,1)#.unsqueeze(1)
Yt = torch.from_numpy(Y).type(torch.float32).unsqueeze(1)
print(f"X Tensor data shape: ", Xt.shape)
print(f"Y Tensor data shape: ", Yt.shape)

# Training and Test Set
X_train, X_test, Y_train, Y_test = train_test_split(X,Y,test_size=0.3,shuffle=True,random_state=4)
print(f"X Train shape: {X_train.shape} , X Test shape: {X_test.shape}")


X Tensor data shape:  torch.Size([1000, 1])
Y Tensor data shape:  torch.Size([1000, 1])
X Train shape: (700,) , X Test shape: (300,)


In [3]:
# Visualization
fig = go.Figure()
fig.add_traces( go.Scatter(x=X, y=Y,hovertemplate='x: %{x} <br>y: %{y}',mode="markers", name="Real data") )
fig.add_traces( go.Scatter(x=X_train, y=Y_train,hovertemplate='x: %{x} <br>y: %{y}',mode="markers", name="Train data") )
fig.add_traces( go.Scatter(x=X_test, y=Y_test,hovertemplate='x: %{x} <br>y: %{y}',mode="markers", name="Test data") )

fig.update_layout(title="Funzione di stimare")
fig.show()

In [4]:
# Tensor Dataset Che converte i dati da numpy a Pytorch
class CustomTensorDataset(Dataset):
    def __init__(self, x,y,mean,std):
        x = (x - mean)/std
        self.x = torch.from_numpy(x).type(torch.float32).unsqueeze(1)
        self.y = torch.from_numpy(y).type(torch.float32).unsqueeze(1)
    def __getitem__(self, index):
        x = self.x[index]
        y = self.y[index]
        return x, y
    def __len__(self):
        return self.x.shape[0]

# Dataset generator creation
train_ds = CustomTensorDataset(X_train,Y_train,R_mean,R_std)
test_ds = CustomTensorDataset(X_test,Y_test,R_mean,R_std)

## Neural Network

In [5]:
class RegressionNet(nn.Module):
    def __init__(self,num_inputs):
        super(RegressionNet,self).__init__()
        self.fc1 = nn.Linear(num_inputs,100)
        self.fc2 = nn.Linear(100,50)
        self.fc3 = nn.Linear(50,1)
    def forward(self,x):
        # torch.sigmoid, torch.tanh, torch.relu
        x = torch.tanh(self.fc1(x)) 
        x = torch.tanh(self.fc2(x))
        x = self.fc3(x)
        return x

model = RegressionNet(num_inputs=1)
summary(model, (1,1), batch_size=-1, device='cpu')

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Linear-1               [-1, 1, 100]             200
            Linear-2                [-1, 1, 50]           5,050
            Linear-3                 [-1, 1, 1]              51
Total params: 5,301
Trainable params: 5,301
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.00
Forward/backward pass size (MB): 0.00
Params size (MB): 0.02
Estimated Total Size (MB): 0.02
----------------------------------------------------------------


## Training


In [6]:
# validation: metric regression
def metrics_func_regression(target, output):
    # Comptue mean squaer error (Migliora quanto piu' ci avviciniamo a zero)
    mse = torch.sum((output - target) ** 2)
    return mse

# validation metric classification
def metrics_func_classification(target, output):
    # Compute number of correct prediction
    pred = output.argmax(dim=-1,keepdim=True)
    corrects =pred.eq(target.reshape(pred.shape)).sum().item()
    return -corrects # minus for coeherence with best result is the most negative one

# training: loss calculation and backward step
def loss_batch(loss_func,metric_func, xb,yb,yb_h, opt=None):
    # obtain loss
    loss = loss_func(yb_h, yb)
    # obtain performance metric 
    with torch.no_grad():
        metric_b = metric_func(yb,yb_h)
    if opt is not None:
        loss.backward()
        opt.step()
        opt.zero_grad()
    return loss.item(), metric_b

# one epoch training
def loss_epoch(model, loss_func,metric_func, dataset_dl, sanity_check,opt, device):
    loss = 0.0
    metric = 0.0
    len_data = float(len(dataset_dl.dataset))
    # get batch data
    for xb,yb in dataset_dl:    
        # send to cuda the data (batch size)
        xb = xb.to(device)
        yb = yb.to(device)
        # obtain model output 
        yb_h = model.forward(xb)
        # loss and metric Calculation
        loss_b, metric_b = loss_batch(loss_func,metric_func, xb,yb,yb_h,opt)
        # update loss
        loss += loss_b
        # update metric
        if metric_b is not None:
            metric+=metric_b 
        if sanity_check is True:
            break
    # average loss
    loss /=len_data
    # average metric
    metric /=len_data
    return loss, metric

# get learning rate from optimizer
def get_lr(opt):
    # opt.param_groups[0]['lr']
    for param_group in opt.param_groups:
        return param_group["lr"]

# trainig - test loop
def train_test(params):
    # --> extract params
    model = params["model"]
    loss_func=params["loss_func"]
    metric_func=params["metric_func"]
    num_epochs=params["num_epochs"]
    opt=params["optimizer"]
    lr_scheduler=params["lr_scheduler"]
    train_dl=params["train_dl"]
    test_dl=params["test_dl"]
    device=params["device"]
    continue_training=params["continue_training"]
    sanity_check=params["sanity_check"]
    path2weigths=params["path2weigths"]
    # --> send model to device and print device
    model = model.to(device)
    print("--> training device %s" % (device))
    # --> if continue_training=True load path2weigths
    if continue_training==True and os.path.isfile(path2weigths):
        print("--> continue training  from last best weights")
        weights = torch.load(path2weigths)
        model.load_state_dict(weights)
    # --> history of loss values in each epoch
    loss_history={"train": [],"test":[]}
    # --> history of metric values in each epoch
    metric_history={"train": [],"test":[]}
    # --> a deep copy of weights for the best performing model
    best_model_weights = copy.deepcopy(model.state_dict())
    # --> initialiaze best loss to large value
    best_loss=float("inf")
    # --> main loop
    for epoch in range(num_epochs):
        # --> get learning rate
        lr = get_lr(opt)
        print("----\nEpoch %s/%s, lr=%.6f" % (epoch+1,num_epochs,lr))
        # --> train model on training dataset
        # we tell to the model to enter in train state. it is important because
        # there are somelayers like dropout, batchnorm that behaves 
        # differently between train and test
        model.train()
        train_loss,train_metric = loss_epoch(model, loss_func, metric_func,train_dl,sanity_check, opt,device)
        # --> collect loss and metric for training dataset
        loss_history["train"].append(train_loss)
        metric_history["train"].append(train_metric)
        # --> tell the model to be in test (validation) mode
        model.eval()
        with torch.no_grad():
            test_loss, test_metric = loss_epoch(model, loss_func, metric_func, test_dl,sanity_check,opt=None,device=device)
        # --> collect loss and metric for test dataset
        loss_history["test"].append(test_loss)
        metric_history["test"].append(test_metric)
        # --> store best model
        if test_loss < best_loss:
            print("--> model improved! --> saved to %s" %(path2weigths))
            best_loss = test_loss
            best_model_weights = copy.deepcopy(model.state_dict())
            # --> store weights into local file
            torch.save(model.state_dict(),path2weigths)
        # --> learning rate scheduler
        lr_scheduler.step()
        print("--> train_loss: %.6f, test_loss: %.6f, train_metric: %.3f, test_metric: %.3f" % (train_loss,test_loss,train_metric,test_metric))
    # --> load best weights
    model.load_state_dict(best_model_weights)
    return model, loss_history,metric_history


In [10]:

# Setup GPU Device
device = torch.device("cpu")
if torch.cuda.is_available():
    device = torch.device("cuda:0")

# Regression
model = RegressionNet(num_inputs=1).to(device)
loss_func = nn.MSELoss(reduction="sum")  
opt = optim.Adam(model.parameters(),lr=0.005)
train_dl = DataLoader(train_ds,batch_size=100,shuffle=True)
test_dl = DataLoader(test_ds,batch_size=50,shuffle=True)

# Setup GPU Device
device = torch.device("cpu")
if torch.cuda.is_available():
    device = torch.device("cuda:0")
lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(opt, gamma=0.999)  #  lr = lr * gamma ** last_epoch
params = {
    "model":                 model,
    "loss_func":             loss_func, 
    "metric_func":           metrics_func_regression,
    "num_epochs":            2500,
    "optimizer":             opt,
    "lr_scheduler":          lr_scheduler,
    "train_dl":              train_dl,
    "test_dl":               test_dl,
    "device":                device,  
    "continue_training" :    False,  # continue training from last save weights
    "sanity_check":          False, # if true we only do one batch per epoch
    "path2weigths":          "./weights_regression.pt"  
} 
model, loss_history,metric_history = train_test(params)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
--> train_loss: 0.041124, test_loss: 0.045541, train_metric: 0.041, test_metric: 0.046
----
Epoch 934/2500, lr=0.001966
--> train_loss: 0.041467, test_loss: 0.046051, train_metric: 0.041, test_metric: 0.046
----
Epoch 935/2500, lr=0.001964
--> model improved! --> saved to ./weights_regression.pt
--> train_loss: 0.041172, test_loss: 0.045117, train_metric: 0.041, test_metric: 0.045
----
Epoch 936/2500, lr=0.001962
--> train_loss: 0.041392, test_loss: 0.045664, train_metric: 0.041, test_metric: 0.046
----
Epoch 937/2500, lr=0.001960
--> train_loss: 0.040724, test_loss: 0.045230, train_metric: 0.041, test_metric: 0.045
----
Epoch 938/2500, lr=0.001958
--> model improved! --> saved to ./weights_regression.pt
--> train_loss: 0.040592, test_loss: 0.045010, train_metric: 0.041, test_metric: 0.045
----
Epoch 939/2500, lr=0.001956
--> train_loss: 0.040624, test_loss: 0.046151, train_metric: 0.041, test_metric: 0.046
----
Epoch 940

In [11]:
fig_loss = go.Figure()
fig_metric = go.Figure()

x = [i+1 for i in range(params["num_epochs"])]

fig_loss.add_traces( go.Scatter(x=x,y=loss_history["train"], name="train loss", mode="lines+markers" ) )
fig_loss.add_traces( go.Scatter(x=x,y=loss_history["test"], name="test loss"  , mode="lines+markers") )
fig_loss.update_layout(title="Loss Results", xaxis_title="epochs", hovermode="x")
fig_loss.show()

fig_metric.add_traces( go.Scatter(x=x,y=metric_history["train"], name="train metric", mode="lines+markers") )
fig_metric.add_traces( go.Scatter(x=x,y=metric_history["test"], name="test_metric" , mode="lines+markers") )
fig_metric.update_layout(title="Metric Results", xaxis_title="epochs", hovermode="x")
fig_metric.show()

## Test

In [12]:
# Run on cpu
device = torch.device("cpu")

# Load Regression
model = RegressionNet(num_inputs=1)
weights = torch.load("weights_regression.pt")
model.load_state_dict(weights)
model = model.to(device)

# Predict Regression
Xt = torch.from_numpy(X).type(torch.float32).unsqueeze(1)
Xt = (Xt - R_mean) / R_std
Y_hat = model.forward(Xt).detach().numpy().reshape(-1)

# Visualize Regression
fig = go.Figure()
fig.add_traces( go.Scatter(x=X, y=Y, name="Real",hovertemplate='x: %{x} <br>y: %{y}') )
fig.add_traces( go.Scatter(x=X, y=Y_hat, name="Predicted",hovertemplate='x: %{x} <br>y: %{y}') )
fig.update_layout(title="Regression Results",hovermode="x")
fig.show()