In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.datasets import make_blobs
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import random

In [2]:
def generate_data(seed=15):
    num_train = 2000
    num_test = 20000
    total_points = num_train + num_test
    DATA_DIM = 5

    # Generate all data
    np.random.seed(seed)
    X = np.random.normal(loc=0, scale=1, size=(total_points, DATA_DIM))
    theta = np.array([1 for _ in range(DATA_DIM)])

    # Get training data
    X_train = X[:num_train]
    eps_train = np.random.normal(loc=0, scale=0.01, size=(num_train, 1))

    # Add noise to training labels
    y_train = np.array([X_train[i].dot(theta) + eps_train[i] if X_train[i][0] <= 1.645 else X_train[i].dot(theta) + X_train[i][0] + eps_train[i] for i in range(num_train)])

    X_train = torch.tensor(X_train, dtype=torch.float)
    y_train = torch.tensor(y_train.reshape(-1, 1), dtype=torch.float)

    # Get test data
    X_test = X[num_train:]
    eps_test = np.random.normal(loc=0, scale=0.01, size=(num_test, 1))

    # Class indicator
    is_majority = [False if X_test[i][0] > 1.645 else True for i in range(num_test)]

    # Add noise to test labels
    y_test = np.array([X_test[i].dot(theta) + eps_test[i] if X_test[i][0] <= 1.645 else X_test[i].dot(theta) + X_test[i][0] + eps_test[i] for i in range(num_test)])
    y_test = y_test.reshape(-1, 1)

    return X_train, X_test, y_train, y_test, is_majority

In [3]:
X_train, X_test, y_train, y_test, is_majority = generate_data()

In [4]:
import sys
sys.path.insert(0, '/Users/william/Documents/CMU/Research/RiskSensitiveLearning/Supervised Learning/objectives')
import cvar, human_aligned_risk, entropic_risk, trimmed_risk, mean_variance

In [5]:
input_size = 5
h1 = 50
h2 = 25
epochs = 200

class RegressionMLP(nn.Module):
    def __init__(self, input_size, h1, h2):
        super(RegressionMLP, self).__init__()
        self.fc1 = nn.Linear(input_size, h1)
        self.fc2 = nn.Linear(h1, h2)
        self.fc3 = nn.Linear(h2, 1)

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        x = self.fc3(x)
        return x

In [35]:
seeds = [15, 30]

objectives = {
    'Expected Value': nn.MSELoss(reduction='mean'),
    'CVaR': cvar.CVaR(a=0.35, criterion=nn.MSELoss(reduction='none')), # We look at bottom alpha% of losses
    'Entropic Risk': entropic_risk.EntropicRisk(t=0.14, criterion=nn.MSELoss(reduction='none')),
    'Human-Aligned Risk': human_aligned_risk.HumanAlignedRisk(a=0.5, b=0, criterion=nn.MSELoss(reduction='none')),
    'Inverted CVaR': cvar.CVaR(a=0.99, inverted=True, criterion=nn.MSELoss(reduction='none')), # We look at top alpha% of losses
    'Mean-Variance': mean_variance.MeanVariance(c=0.5, criterion=nn.MSELoss(reduction='none')),
    'Trimmed Risk': trimmed_risk.TrimmedRisk(a=0.01, criterion=nn.MSELoss(reduction='none')),
}

for objective in objectives.keys():
    majority_risks = []
    minority_risks = []
    overall_risks = []

    for seed in seeds:
        X_train, X_test, y_train, y_test, is_majority = generate_data(seed=seed)
        # Get objective
        criterion = objectives[objective]

        # Reset random seeds for comparaison.
        torch.manual_seed(seed)
        random.seed(seed)
        np.random.seed(seed)

        model = RegressionMLP(input_size, h1, h2)
        
        # Train model
        model.train()
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)

        for epoch in range(epochs):
            # clear the gradients so they wont accumulate
            optimizer.zero_grad()

            output = model(X_train)
            loss = criterion(output.flatten(), y_train.flatten().type(torch.float32))

            # calculate gradient
            loss.backward()

            # gradient descent
            optimizer.step()

        # Evaluate model
        model.eval()
        majority_losses = []
        minority_losses = []
        overall_losses = []
        
        criterion = nn.MSELoss(reduction='mean')

        with torch.no_grad():
            for i, sample in enumerate(list(zip(X_test, y_test))):
                X, y = sample
                output = model(torch.tensor(X, dtype=torch.float32))

                loss = criterion(output, torch.tensor(y, dtype=torch.float32))
                if is_majority[i]:
                    majority_losses.append(loss)
                else:
                    minority_losses.append(loss)
                overall_losses.append(loss)

        majority_risk = torch.mean(torch.tensor(majority_losses)).item()
        minority_risk = torch.mean(torch.tensor(minority_losses)).item()
        overall_risk = torch.mean(torch.tensor(overall_losses)).item()
        majority_risks.append(majority_risk)
        minority_risks.append(minority_risk)
        overall_risks.append(overall_risk)
    print('Objective: {}, Majority MSE: {}, Minority MSE: {}, Overall MSE: {}'.format(objective, np.mean(majority_risks), np.mean(minority_risks), np.mean(overall_risks)))

Objective: Inverted CVaR, Majority MSE: 0.03819034807384014, Minority MSE: 0.8747345209121704, Overall MSE: 0.08006656542420387
Objective: Mean-Variance, Majority MSE: 0.030599155463278294, Minority MSE: 0.3587324619293213, Overall MSE: 0.04704507440328598
Objective: Trimmed Risk, Majority MSE: 0.038540150970220566, Minority MSE: 0.8289810419082642, Overall MSE: 0.07810477539896965
