In [None]:
#!pip install torch torchvision torchaudio

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader

%matplotlib inline

In [None]:
# load dataset
ds = datasets.fetch_california_housing()
X = ds.data.astype(np.float32)
y = ds.target.astype(np.float32)

# remove very cheap or very expensive homes (saturates =< 0.15 or >= 5)
ind = (y > 0.15) & (y < 5)
X = X[ind,:]
y = y[ind]

# transform target - more Gaussian
y = np.log(y)

# scale input attributes
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# split data into train and test - !!! added valid dataset for pytorch
X_train_valid, X_test, y_train_valid, y_test = train_test_split(X_scaled, y, test_size=0.33, random_state=0)
X_train, X_valid, y_train, y_valid = train_test_split(X_train_valid, y_train_valid, test_size=0.33, random_state=0)

In [None]:
# info about the dataset
print(ds.DESCR)

In [None]:
# -------------------------------------------------------------
# backprop for traning & evaluation for validation
# https://pytorch.org/tutorials/beginner/nn_tutorial.html
# -------------------------------------------------------------
def loss_batch(model, loss_func, xb, yb, opt=None):
    loss = loss_func(model(xb), yb)

    if opt is not None:
        loss.backward()
        opt.step()
        opt.zero_grad()

    return loss.item(), len(xb)

# -------------------------------------------------------------
# custom PyTorch fit function
# https://pytorch.org/tutorials/beginner/nn_tutorial.html
# -------------------------------------------------------------
def fit(epochs, model, loss_func, opt, train_dl, valid_dl, prt_every_epoch=100):
    
    learning_curve = []
    
    for epoch in range(1,epochs+1):
        model.train()
        for xb, yb in train_dl:
            loss_batch(model, loss_func, xb, yb, opt)

        model.eval()
        with torch.no_grad():
            train_losses, train_nums = zip(
                *[loss_batch(model, loss_func, xb, yb) for xb, yb in train_dl]
            )            
            valid_losses, valid_nums = zip(
                *[loss_batch(model, loss_func, xb, yb) for xb, yb in valid_dl]
            )
        train_loss = np.sum(np.multiply(train_losses, train_nums)) / np.sum(train_nums)
        valid_loss = np.sum(np.multiply(valid_losses, valid_nums)) / np.sum(valid_nums)
        learning_curve.append([epoch, train_loss, valid_loss])

        if epoch % prt_every_epoch == 0:
            print(f'epoch # {epoch:<6}\t train_loss = {train_loss:>6.4f}\t valid_loss = {valid_loss:>6.4f}')
        
    return np.array(learning_curve)

In [None]:
# -------------------------------------------------------------
# Step 0 - preparations for pytorch
# -------------------------------------------------------------

# 0.1 - training settings
batch_size = 512
epochs = 200
learning_rate = 0.001

# 0.2 - prepare data for pytorch

# tensorize data
ts_X_train, ts_y_train, ts_X_valid, ts_y_valid, ts_X_test, ts_y_test = map(
    torch.tensor, (X_train, y_train.reshape(-1,1), X_valid, y_valid.reshape(-1,1), X_test, y_test.reshape(-1,1))
)

# batch management
train_ds = TensorDataset(ts_X_train, ts_y_train)
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)

valid_ds = TensorDataset(ts_X_valid, ts_y_valid)
valid_dl = DataLoader(valid_ds, batch_size=batch_size * 2)

# -------------------------------------------------------------
# Step 1 - create pytorch nn object
# -------------------------------------------------------------
model = nn.Sequential(
    nn.Linear(8,100),
    nn.BatchNorm1d(100),
    nn.ELU(),
    nn.Linear(100,50),
    nn.BatchNorm1d(50),
    nn.ELU(),    
    nn.Linear(50,2),
    nn.BatchNorm1d(2),
    nn.ELU(),        
    nn.Linear(2,1),
)

# -------------------------------------------------------------
# Step 2 - train model
# -------------------------------------------------------------

# 2.1 - define the loss function
loss_func = nn.MSELoss()

# 2.2 - choose the optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# 2.3 - perform the optimization
learning_curve = fit(epochs, model, loss_func, optimizer, train_dl, valid_dl, prt_every_epoch=10)

# -------------------------------------------------------------
# Step 3 - make predictions
# -------------------------------------------------------------
model.eval()
with torch.no_grad():
    y_pred = model(ts_X_test).numpy()

In [None]:
# loss function
plt.figure(figsize=(10,10))
plt.plot(learning_curve[:,1], 'r', label='train')
plt.plot(learning_curve[:,2], 'b' ,label='val')
plt.xlabel('Epoch', fontsize=20)
plt.ylabel('Loss - MSE', fontsize=20)
plt.legend()
plt.tick_params(labelsize=20)

In [None]:
# info about the model
print('Model arch:\n', model)
# print('Model params:')
# for name, param in model.named_parameters():
#     if param.requires_grad:
#         print(name, ':', param)
print('Test RMSE:\n', np.sqrt(mean_squared_error(y_test, y_pred)))
print('R2:\n', r2_score(y_test, y_pred))

In [None]:
# Plot actual vs predicted
plt.figure(figsize=(10,10))
plt.scatter(y_test, y_pred, c='red')

p1 = max(max(y_pred), max(y_test))
p2 = min(min(y_pred), min(y_test))
plt.plot([p1, p2], [p1, p2], 'b-')
plt.xlabel('Actual', fontsize=15)
plt.ylabel('Predictions', fontsize=15)
plt.axis('equal')
plt.show()

In [None]:
# save model
torch.save(model, 'nonlin_reg.nn')