Table 2

In [None]:
import requests
import pandas as pd
import seaborn as sns
from io import BytesIO
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
import os

from google.colab import drive
drive.mount('/content/drive')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

EPOCHS = 5000  # エポック数

# AutoMPG dataset
def load_auto_mpg():
    auto_mpg = sns.load_dataset("mpg")
    # remove 'car_name'
    auto_mpg.drop(columns=['name','origin'], inplace=True)
    auto_mpg.dropna(inplace=True)
    return auto_mpg

# Liver Disorders dataset
def load_liver_disorders():
    url_liver = 'http://archive.ics.uci.edu/ml/machine-learning-databases/liver-disorders/bupa.data'
    columns_liver = ['mcv', 'alkphos', 'sgpt', 'sgot', 'gammagt', 'drinks', 'selector']
    liver = pd.read_csv(url_liver, names=columns_liver)
    # remove 'selector'
    liver.drop(columns=['selector'], inplace=True)
    return liver

# Real Estate Evaluation dataset
def load_real_estate():
    file_path = '/content/drive/MyDrive/ARTL/Real_estate.csv'
    real_estate = pd.read_csv(file_path)
    real_estate.drop(columns=['X1 transaction date', 'X5 latitude', 'X6 longitude'], inplace=True)
    return real_estate

def load_datasets():
    try:
        auto_mpg = load_auto_mpg()
        print("Auto MPG dataset loaded successfully.")

        liver = load_liver_disorders()
        print("Liver Disorders dataset loaded successfully.")

        real_estate = load_real_estate()
        print("Real Estate dataset loaded successfully.")

        datasets = {
            'Auto MPG': (auto_mpg, 'mpg'),
            'Liver Disorders': (liver, 'drinks'),
            'Real Estate': (real_estate, 'Y house price of unit area')
        }
        return datasets

    except Exception as e:
        print(f"Error loading datasets: {e}")
        return None

class NeuralNetwork(nn.Module):
    def __init__(self, input_dim, activation_func='sigmoid'):
        super(NeuralNetwork, self).__init__()
        self.first = nn.Linear(input_dim, 100)
        self.hidden1 = nn.Linear(100, 100)
        self.hidden2 = nn.Linear(100, 100)
        self.hidden3 = nn.Linear(100, 100)
        self.output = nn.Linear(100, 1)
        if activation_func == 'sigmoid':
            self.activation = nn.Sigmoid()
        else:
            self.activation = nn.ReLU()
    def forward(self, x):
        x = self.activation(self.first(x))
        x = self.activation(self.hidden1(x))
        x = self.activation(self.hidden2(x))
        x = self.activation(self.hidden3(x))
        return self.output(x)

# HOVR
def hovr_regularization(model, x, k=2, q=2, M=10):
    x_min, x_max = x.min(0)[0], x.max(0)[0]
    random_points = torch.tensor(
        np.random.uniform(x_min.cpu().numpy(), x_max.cpu().numpy(), (M, x.shape[1])),
        dtype=torch.float32, requires_grad=True).to(device)
    preds = model(random_points)
    grads = torch.autograd.grad(preds, random_points, torch.ones_like(preds),
                                create_graph=True)[0]
    hovr_term = 0.0
    n_dims = x.shape[1]
    for i in range(n_dims):
        grad_i = grads[:, i]
        temp_grad = grad_i
        for _ in range(k - 1):
            temp_grad = torch.autograd.grad(temp_grad, random_points,
                                            torch.ones_like(temp_grad),
                                            create_graph=True)[0][:, i]
        hovr_term += (1 / n_dims) * torch.sum(torch.abs(temp_grad) ** q)
    return hovr_term

# TTL+HOVR
def transformed_ttl_hovr_loss(model, xi, x, y, h, lambd, k, q):
    n = x.shape[0]
    preds = model(x)
    residuals = (y - preds).view(-1, 1)
    loss_fit = (1 / n) * torch.sum((residuals - xi) ** 2)
    xi_squared = xi.view(-1) ** 2
    T_h_xi = (1 / n) * torch.sum(torch.topk(xi_squared, h, largest=False)[0])
    hovr_term = lambd * hovr_regularization(model, x, k, q)
    total_loss = loss_fit + T_h_xi + hovr_term
    return total_loss

def create_directory(path):
    if not os.path.exists(path):
        os.makedirs(path)

def save_results(data_name, trial, method_name, pmse):
    # Google Drive
    create_directory('/content/drive/MyDrive/ARTL/benchmark_experiment')
    filename = f'/content/drive/MyDrive/ARTL/benchmark_experiment/{data_name}_{method_name}_trial_{trial}.csv'
    df = pd.DataFrame({'pmse': [pmse]})
    df.to_csv(filename, index=False)

def evaluate_methods(data_name, X, y, y_col):
    pmse_results = []
    input_dim = X.shape[1]
    for trial in range(1, 6):
        print(f"Dataset: {data_name}, Trial: {trial}")
        # Split dataset into train/test
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=trial)
        # Outliers
        n_outliers = int(0.05 * len(y_train))
        outlier_indices = np.random.choice(len(y_train), n_outliers, replace=False)
        std_y = y_train.std()
        y_train.iloc[outlier_indices] += 2 * std_y
        # Baselines
        methods = [
            ('NN with Huber Loss', nn_huber_loss),
            ('NN with Tukey Loss', nn_tukey_loss),
            ('NN with RANSAC', nn_ransac_like),
            ('NN with Label Noise Reg.', label_noise_regularization),
        ]
        # TTL+HOVR parameters
        lambdas = [1e-3, 1e-4]
        ks = [1, 2]
        for lambd in lambdas:
            for k in ks:
                method_name = f'NN with TTL+HOVR(k={k}, λ={lambd})'
                methods.append((method_name, lambda X_train, y_train, X_test, y_test, trial, outlier_indices, input_dim:
                                ttl_hovr(X_train, y_train, X_test, y_test, trial,
                                         outlier_indices, k=k, lambd=lambd, input_dim=input_dim)))
        for method_name, method_function in methods:
            print(f"Running method: {method_name}")
            pmse = method_function(X_train, y_train, X_test, y_test, trial, outlier_indices, input_dim)
            save_results(data_name, trial, method_name, pmse)
            pmse_results.append({'dataset': data_name,
                                 'method': method_name,
                                 'trial': trial,
                                 'pmse': pmse})
    return pmse_results

# Baselines
def nn_huber_loss(X_train, y_train, X_test, y_test, trial, outlier_indices, input_dim):
    model = NeuralNetwork(input_dim).to(device)
    criterion = nn.SmoothL1Loss()
    optimizer = optim.Adam(model.parameters(), lr=0.01)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1000, gamma=0.5)
    X_tensor = torch.tensor(X_train.values, dtype=torch.float32).to(device)
    y_tensor = torch.tensor(y_train.values.reshape(-1, 1), dtype=torch.float32).to(device)
    for epoch in range(EPOCHS):
        optimizer.zero_grad()
        outputs = model(X_tensor)
        loss = criterion(outputs, y_tensor)
        loss.backward()
        optimizer.step()
        scheduler.step()
    with torch.no_grad():
        X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32).to(device)
        y_pred = model(X_test_tensor).cpu().numpy().flatten()
    pmse = mean_squared_error(y_test, y_pred)
    return pmse

def nn_tukey_loss(X_train, y_train, X_test, y_test, trial, outlier_indices, input_dim):
    def tukey_loss(output, target, c=4.685):
        residual = target - output
        abs_residual = torch.abs(residual)
        mask = abs_residual <= c
        loss = torch.zeros_like(residual)
        loss[mask] = (c**2 / 6)*(1 - (1 - (residual[mask]/c)**2)**3)
        loss[~mask] = (c**2 / 6)
        return torch.mean(loss)
    model = NeuralNetwork(input_dim).to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.01)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1000, gamma=0.5)
    X_tensor = torch.tensor(X_train.values, dtype=torch.float32).to(device)
    y_tensor = torch.tensor(y_train.values.reshape(-1, 1), dtype=torch.float32).to(device)
    for epoch in range(EPOCHS):
        optimizer.zero_grad()
        outputs = model(X_tensor)
        loss = tukey_loss(outputs, y_tensor)
        loss.backward()
        optimizer.step()
        scheduler.step()
    with torch.no_grad():
        X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32).to(device)
        y_pred = model(X_test_tensor).cpu().numpy().flatten()
    pmse = mean_squared_error(y_test, y_pred)
    return pmse

def nn_ransac_like(X_train, y_train, X_test, y_test, trial, outlier_indices, input_dim):
    model = NeuralNetwork(input_dim).to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.01)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1000, gamma=0.5)
    X_t = torch.tensor(X_train.values, dtype=torch.float32).to(device)
    y_t = torch.tensor(y_train.values.reshape(-1, 1), dtype=torch.float32).to(device)
    for epoch in range(1000):
        optimizer.zero_grad()
        outputs = model(X_t)
        loss = nn.MSELoss()(outputs, y_t)
        loss.backward()
        optimizer.step()
        scheduler.step()
    with torch.no_grad():
        residuals = torch.abs(y_t - model(X_t))
    threshold = torch.quantile(residuals, 0.90)
    inliers = residuals <= threshold
    X_inliers = X_t[inliers.view(-1)]
    y_inliers = y_t[inliers]
    for epoch in range(EPOCHS):
        optimizer.zero_grad()
        outputs = model(X_inliers)
        loss = nn.MSELoss()(outputs, y_inliers)
        loss.backward()
        optimizer.step()
        scheduler.step()
    with torch.no_grad():
        X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32).to(device)
        y_pred = model(X_test_tensor).cpu().numpy().flatten()
    pmse = mean_squared_error(y_test, y_pred)
    return pmse

def label_noise_regularization(X_train, y_train, X_test, y_test, trial, outlier_indices, input_dim):
    model = NeuralNetwork(input_dim).to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.01)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1000, gamma=0.5)
    criterion = nn.MSELoss()
    X_t = torch.tensor(X_train.values, dtype=torch.float32).to(device)
    y_t = torch.tensor(y_train.values.reshape(-1, 1), dtype=torch.float32).to(device)
    smoothing = 0.1
    for epoch in range(EPOCHS):
        optimizer.zero_grad()
        outputs = model(X_t)
        targets_smoothed = y_t * (1 - smoothing) + smoothing * y_t.mean()
        loss = criterion(outputs, targets_smoothed)
        loss.backward()
        optimizer.step()
        scheduler.step()
    with torch.no_grad():
        X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32).to(device)
        y_pred = model(X_test_tensor).cpu().numpy().flatten()
    pmse = mean_squared_error(y_test, y_pred)
    return pmse

def ttl_hovr(X_train, y_train, X_test, y_test, trial, outlier_indices, k, lambd, input_dim):
    model = NeuralNetwork(input_dim).to(device)
    n_params = X_train.shape[0]
    xi = torch.zeros(n_params, 1, requires_grad=True, device=device) # augmented parameter
    optimizer = optim.Adam(list(model.parameters()) + [xi], lr=0.01)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1000, gamma=0.5)
    X_t = torch.tensor(X_train.values, dtype=torch.float32).to(device)
    y_t = torch.tensor(y_train.values.reshape(-1, 1), dtype=torch.float32).to(device)
    h = int(0.95 * n_params)
    q = 2
    for epoch in range(EPOCHS):
        optimizer.zero_grad()
        loss = transformed_ttl_hovr_loss(model, xi, X_t, y_t, h, lambd, k, q)
        loss.backward()
        optimizer.step()
        scheduler.step()
    with torch.no_grad():
        X_test_tensor = torch.tensor(X_test.values, dtype=torch.float32).to(device)
        y_pred = model(X_test_tensor).cpu().numpy().flatten()
    pmse = mean_squared_error(y_test, y_pred)
    return pmse

def main():
    datasets = load_datasets()
    all_results = []
    for data_name, (df, y_col) in datasets.items():
        if df is not None:
            print(f"Processing dataset: {data_name}")
            X = df.drop(y_col, axis=1)
            y = df[y_col]
            X = (X - X.mean()) / X.std()
            y = (y - y.mean()) / y.std()
            results = evaluate_methods(data_name, X, y, y_col)
            all_results.extend(results)
    df_results = pd.DataFrame(all_results)
    summary = df_results.groupby(['dataset', 'method'])['pmse'].agg(['mean', 'std']).reset_index()
    print(summary)

if __name__ == '__main__':
    main()