In [None]:
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils import data
from torch.utils.data import DataLoader, TensorDataset, SubsetRandomSampler
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold
from pandas import DataFrame

In [None]:
raw_data = pd.read_csv('./data/train.csv')

In [None]:
raw_data

In [None]:
numerical_columns = raw_data.select_dtypes(np.number).columns
# numerical_columns = numerical_columns.drop('SalePrice')
numerical_columns = numerical_columns.drop('Id')

# Nan columns
numerical_columns = numerical_columns.drop('LotFrontage')
numerical_columns = numerical_columns.drop('MasVnrArea')
numerical_columns = numerical_columns.drop('GarageYrBlt')

string_columns = raw_data.select_dtypes(include=['object']).columns

numerical_columns, string_columns = list(numerical_columns), list(string_columns)

In [None]:
numerical_df = raw_data[numerical_columns]

numerical_df = numerical_df.dropna(axis=1, how='any')
numerical_df = numerical_df.dropna(axis=0, how='any')

numerical_columns = list(numerical_df.columns)

In [None]:
data_means, data_maxs, data_mins = {}, {}, {}

for col in numerical_df:
    data_means[col] = numerical_df[col].mean()
    data_maxs[col] = numerical_df[col].max()
    data_mins[col] = numerical_df[col].min()

normalized_numerical_df = (numerical_df - numerical_df.mean()) / (numerical_df.max() - numerical_df.min())


In [None]:
numerical_columns.remove('SalePrice')
numerical_y_columns = ['SalePrice']

normalized_numerical_x_df = normalized_numerical_df[numerical_columns]
normalized_numerical_y_df = normalized_numerical_df[numerical_y_columns]

normalized_numerical_x = torch.tensor(normalized_numerical_x_df.values, dtype=torch.float)
normalized_numerical_y = torch.tensor(normalized_numerical_y_df.values, dtype=torch.float)

In [None]:
train_ratio = 0.1
train_size = int(train_ratio * len(normalized_numerical_x))

x_train, x_val, y_train, y_val = train_test_split(normalized_numerical_x, normalized_numerical_y, train_size=train_size, random_state=12345)

In [None]:
class LinearRegression(nn.Module):
    def __init__(self, input_dim, dimensions):
        super(LinearRegression, self).__init__()
        self.layers = nn.ModuleList()
        prev_dim = input_dim
        # self.activation_function = nn.ReLU()
        self.activation_function = nn.LeakyReLU()
        # self.activation_function = nn.Softmax()
        # self.activation_function = None
        
        for dim in dimensions:
            self.layers.append(nn.Linear(prev_dim, dim))
            if self.activation_function:
                self.layers.append(self.activation_function)
            prev_dim = dim
        
        self.layers.append(nn.Linear(prev_dim, 1))
        
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

dimensions = [2, 4, 8, 4, 2]

In [None]:
class CNNRegression(nn.Module):
    def __init__(self, input_dim, cnn_dimensions, kernel_sizes):
        super(CNNRegression, self).__init__()
        self.layers = nn.ModuleList()
        prev_dim = 1
        
        self.layers.append(nn.Conv1d(1, cnn_dimensions[0], kernel_sizes[0]))
        prev_dim = cnn_dimensions[0]

        self.activation_function = nn.ReLU()
        # self.activation_function = nn.LeakyReLU()
        # self.activation_function = nn.Softmax()
        
        for i in range(1, len(kernel_sizes)):
            self.layers.append(nn.Conv1d(prev_dim, cnn_dimensions[i], kernel_sizes[i]))
            if self.activation_function:
                self.layers.append(self.activation_function)
            prev_dim = cnn_dimensions[i]
        
        self.layers.append(nn.Flatten())
        self.layers.append(nn.Linear(prev_dim * (input_dim - sum(kernel_sizes) + len(kernel_sizes)), 1))
        
    def forward(self, x):
        x = x.unsqueeze(1)
        for layer in self.layers:
            x = layer(x)
        return x

cnn_dimensions = [7, 11, 15]
kernel_sizes = [3 for _ in cnn_dimensions]

In [None]:
learning_rate = 0.003
input_dim = x_train.shape[1]
# 
# model = LinearRegression(input_dim, dimensions)
model = CNNRegression(input_dim, cnn_dimensions, kernel_sizes)
# model = GNNLinearRegression(num_features, hidden_channels, num_layers)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

num_epochs = 100
train_losses = []
val_losses = []

for epoch in range(num_epochs):
    model.train()
    optimizer.zero_grad()
    outputs = model(x_train)
    train_loss = criterion(outputs, y_train)
    train_loss.backward()
    optimizer.step()
    
    model.eval()
    with torch.no_grad():
        val_outputs = model(x_val)
        val_loss = criterion(val_outputs, y_val)
    
    train_losses.append(train_loss.item())
    val_losses.append(val_loss.item())
    
    # print(f'Epoch [{epoch + 1} / {num_epochs}], Train Loss: {train_loss.item():.4f}, Val Loss: {val_loss.item():.4f}')

plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')

# plt.gca().yaxis.set_major_formatter(plt.FormatStrFormatter('%.4f'))
plt.locator_params(axis='y', nbins=20)

activation_function_name = model.activation_function.__class__.__name__ if model.activation_function else 'None'
print_dimensions = dimensions if model.__class__.__name__ == 'LinearRegression' else cnn_dimensions
plt.text(0.975, 0.75, f'Learning Rate: {learning_rate}\nDimensions: {print_dimensions}\nTrain Ratio: {train_ratio}\nOptimizer: {str(type(optimizer).__name__)}\nModel: {model.__class__.__name__}\nActivate function: {activation_function_name}',
         transform=plt.gca().transAxes, ha='right', va='top', fontsize=12, bbox=dict(facecolor='white', alpha=0.8))

min_train_loss = np.min(train_losses)
min_train_epoch = np.argmin(train_losses)
min_val_loss = np.min(val_losses)
min_val_epoch = np.argmin(val_losses)


if min_train_epoch < min_val_epoch:
    train_annotation_pos = (min_train_epoch, min_train_loss - 0.01)
    val_annotation_pos = (min_val_epoch, min_val_loss + 0.01)
else:
    train_annotation_pos = (min_train_epoch, min_train_loss + 0.01)
    val_annotation_pos = (min_val_epoch, min_val_loss - 0.01)

plt.annotate(f'Min: {min_train_loss:.4f}', xy=(min_train_epoch, min_train_loss), xytext=train_annotation_pos,
             arrowprops=dict(facecolor='black', arrowstyle='->'), fontsize=10)
plt.annotate(f'Min: {min_val_loss:.4f}', xy=(min_val_epoch, min_val_loss), xytext=val_annotation_pos,
             arrowprops=dict(facecolor='black', arrowstyle='->'), fontsize=10)

plt.legend()
plt.tight_layout()
plt.show()

In [None]:
input_dim = normalized_numerical_x.shape[1]
model = LinearRegression(input_dim, dimensions)

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
num_epochs = 75
train_losses = []
val_losses = []

for epoch in range(num_epochs):
    # Training
    model.train()
    optimizer.zero_grad()
    outputs = model(normalized_numerical_x)
    train_loss = criterion(outputs, normalized_numerical_y)
    train_loss.backward()
    optimizer.step()
    
    # Validation
    model.eval()
    with torch.no_grad():
        val_outputs = model(x_val)
        val_loss = criterion(val_outputs, y_val)
    
    train_losses.append(train_loss.item())
    val_losses.append(val_loss.item())
    
    # print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss.item():.4f}, Val Loss: {val_loss.item():.4f}")

# Plot the training and validation losses
plt.figure(figsize=(10, 5))
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
test_data = pd.read_csv('./data/test.csv')

test_numerical_df = test_data[numerical_columns]
test_numerical_df = test_numerical_df.fillna(test_numerical_df.mean())

test_normalized_numerical_df = (test_numerical_df - test_numerical_df.mean()) / (test_numerical_df.max() - test_numerical_df.min())
test_normalized_numerical_x_df = test_normalized_numerical_df[numerical_columns]

test_normalized_numerical_x = torch.tensor(test_normalized_numerical_x_df.values, dtype=torch.float)

model.eval()
with torch.no_grad():
    test_outputs = model(test_normalized_numerical_x)

test_predictions = test_outputs.numpy().flatten()
test_predictions = test_predictions * (data_maxs['SalePrice'] - data_mins['SalePrice']) + data_means['SalePrice']

submission_df = pd.DataFrame({'Id': test_data['Id'], 'SalePrice': test_predictions})

submission_df.to_csv('./output/submission.csv', index=False)