# Imports

In [1]:
import numpy as np
import matplotlib.pyplot as plt

from sklearn.metrics import accuracy_score

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau

# Config

In [2]:
"""
The dictionary represents a mapping where the keys represent the number of labeled data.
The corresponding values are lists indicating the number of unlabeled data for each fixed number of labeled data.
""" 
TRAINING_CONFIG = {
    10: {
        "M_BUCKET": [10, 100, 1000, 10000],
    },
    20: {
        "M_BUCKET": [20, 200, 2000, 10000],
    },
    40: {
        "M_BUCKET": [40, 400, 4000, 10000],
    }
}

# Search spaces of linear model, and robust linear model.

LINEAR_PARAM_GRID = {
    'gamma': [15, 0.001],
    'weight_decay': [1, 0],
}

ROBUST_LINEAR_PARAM_GRID = {
    'gamma': [20, 0.001],
    'gamma_unlabeled': [20, 0.0001],
    'l': [20, 0],
    'weight_decay': [1, 0],
}

# The rate of perturbation
ALPHA_RATIO = 0.5

# The amount of data for achieving the maximum accuracy
N_MAX = 10000

# Dimension of data
DIM = 200

STD = 1
TEST_SIZE = 10000

# Number of the combinations of each model's hyperparameters
ROBUST_LINEAR_NUM_COMBINATION = 4000
LINEAR_NUM_COMBINATION = 500

# Hyperparameter Tuning

In [3]:
import itertools
import random

def get_tuned_model(model_class, X_train, y_train, X_test, y_test, param_grid, d, num_combinations, X_unlabeled=None):
    best_params = None
    best_model = None
    best_accuracy = 0
    param_combinations = []
    for i in range(num_combinations):
        params = None
        if 'gamma_unlabeled' in param_grid: # To ensure that the unlabeled gamma does not exceed the gamma value for a robust linear model
            while True: 
                params = {k: random.uniform(v[1], v[0])  for k, v in param_grid.items()}
                if params['gamma_unlabeled'] < params['gamma']:
                    break
        else: # For linear model
            params = {k: random.uniform(v[1], v[0])  for k, v in param_grid.items()}

        param_combinations.append(params)
    
    for params in param_combinations:
        model = model_class(d, **params)
        if X_unlabeled is not None:
            model.fit(X_train, y_train, X_unlabeled)
        else:
            model.fit(X_train, y_train)

        accuracy = model.score(X_test, y_test)

        if accuracy > best_accuracy:
            best_params = params
            best_accuracy = accuracy
            best_model = model

    print("Best Parameters: ", best_params)
    return best_model, best_params, best_accuracy

# Models

In [13]:
class LinearModel(torch.nn.Module):
    def __init__(self, input_size):
        super(LinearModel, self).__init__()
        self.linear = nn.Linear(input_size, 1, bias=False)
        self.init_weights()

    def init_weights(self):
        torch.nn.init.xavier_uniform_(self.linear.weight)

    def forward(self, x):
        return self.linear(x)

class Trainer():
    def __init__(self, num_features, learning_rate=0.8, gamma=2, weight_decay=0.0001, num_epochs=10, num_iters=5):
        self.learning_rate = learning_rate
        self.num_features = num_features
        self.gamma = gamma
        self.weight_decay = weight_decay
        self.num_epochs = num_epochs
        self.num_iters = num_iters
        self.best_model = None
    
    def get_model_and_optimizers(self):
        model = LinearModel(self.num_features)

        optimizer = optim.Adam(
            model.parameters(),
            lr=self.learning_rate,
            weight_decay=self.weight_decay,
        )
        scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=3, verbose=False)
        return model, optimizer, scheduler

    def loss_fn(self, *args):
        pass

    def run(self, *args):
        pass

    def fit(self, *args):
        pass

    def predict(self, X):
        self.best_model.eval()
        X = torch.tensor(X, dtype=torch.float32, requires_grad=False)
        y_pred = torch.sign(self.best_model(X)).squeeze().detach()
        return y_pred.numpy()

    def score(self, X, y):
        y_pred = self.predict(X)
        return (y_pred == y).mean()
    

In [14]:
class LinearTrainer(Trainer):
    def __init__(self, num_features, learning_rate=0.07, gamma=2, weight_decay=0.0001, num_epochs=20, num_iters=15):
        super().__init__(
            num_features,
            learning_rate,
            gamma,
            weight_decay,
            num_epochs,
            num_iters,
        )
  
    def loss_fn(self, signed_dists):
        loss = torch.clamp(1 - self.gamma * signed_dists, min=0, max=1).mean()
        return loss


    def run(self, X, y):
        model, optimizer, scheduler = self.get_model_and_optimizers()
        model.train()
        best_checkpoint = None
        best_loss = np.inf
        for epoch in range(self.num_epochs):
            optimizer.zero_grad()
            distances = model(X)
            signed_distances = y * distances
            
            # Calculating loss
            loss = self.loss_fn(signed_distances)
            loss.backward()
            optimizer.step()
            scheduler.step(loss)

            if loss.item() <= best_loss:
                best_loss = loss.item()
                best_checkpoint = model.state_dict()

                
        # Load the best checkpoint
        best_model = LinearModel(self.num_features)
        for name, param in best_checkpoint.items():
            if name.startswith('linear'):
                best_model.linear.weight.data.copy_(param)

        return best_model, loss

    def fit(self, X, y):
        X = torch.tensor(X, dtype=torch.float32, requires_grad=False)
        y = torch.tensor(y, dtype=torch.float32, requires_grad=False).unsqueeze(1)
        best_loss = np.inf
        for _ in range(self.num_iters):
            model, loss = self.run(X, y)
            if loss < best_loss:
                best_loss = loss
                self.best_model = model

In [15]:
class RobustTrainer(Trainer):
    def __init__(self, num_features, learning_rate=0.07, gamma=2, weight_decay=0.7, num_epochs=20, num_iters=20, gamma_unlabeled=1, l=0.01):
        super().__init__(
            num_features,
            learning_rate,
            gamma,
            weight_decay,
            num_epochs,
            num_iters,
        )
        self.gamma_unlabeled = gamma_unlabeled
        self.l = l

    def loss_fn(self, signed_dists, unlabeled_dists):
        loss = torch.clamp(1 - self.gamma * signed_dists, min=0, max=1).mean()
        if len(unlabeled_dists):
            loss += self.l * torch.clamp(1 - self.gamma_unlabeled * unlabeled_dists, min=0, max=1).mean()
        return loss

    def run(self, X, y, X_unlabeled):
        model, optimizer, scheduler = self.get_model_and_optimizers()
        optimizer.zero_grad()

        model.train()
        best_checkpoint = None
        best_loss = np.inf
        for epoch in range(self.num_epochs):
            optimizer.zero_grad()
            # For labeled data
            distances = model(X)
            signed_distances = y * distances
            
            # For unlabeled data
            unlabeled_distances = model(X_unlabeled)
            abs_unlabeled_distances = torch.abs(unlabeled_distances)
            
            # Calculating loss
            loss = self.loss_fn(signed_distances, abs_unlabeled_distances)
            loss.backward()
            optimizer.step()
            scheduler.step(loss)

            if loss.item() <= best_loss:
                best_loss = loss.item()
                best_checkpoint = model.state_dict()


        # Load the best checkpoint
        best_model = LinearModel(self.num_features)
        for name, param in best_checkpoint.items():
            if name.startswith('linear'):
                best_model.linear.weight.data.copy_(param)
        return best_model, loss

    def fit(self, X, y, X_unlabeled):
        X = torch.tensor(X, dtype=torch.float32)
        X_unlabeled = torch.tensor(X_unlabeled, dtype=torch.float32)
        y = torch.tensor(y, dtype=torch.float32).unsqueeze(1)

        best_loss = np.inf
        for _ in range(self.num_iters):
            model, loss = self.run(X, y, X_unlabeled)
            if loss < best_loss:
                best_loss = loss
                self.best_model = model

In [16]:
def generate_mu(d, std):
    # mu is normalized
    unit_vector = np.random.randn(d)
    unit_vector /= np.linalg.norm(unit_vector)
    mu = unit_vector * 1
    return mu

def generate_multivariate_gaussian_samples(n_samples, mu, std):
    d = len(mu)
    covariance_matrix = np.identity(d) * (std ** 2)
    samples = np.random.multivariate_normal(mu, covariance_matrix, n_samples)
    return samples


def generate_data(n_samples, mu, std):
    mu_x = mu
    neg_mu_x = -1 * mu_x
    x_pos = generate_multivariate_gaussian_samples(int(n_samples/2), mu_x, std)
    X_neg = generate_multivariate_gaussian_samples(int(n_samples/2), neg_mu_x, std)
    X = np.concatenate((x_pos, X_neg))

    y = np.ones((n_samples))
    y[n_samples//2:] = -1
    y = np.array(y, dtype=int)
    return X, y


def generate_unlabeled_data(n_samples, mu, std, alpha_ratio=ALPHA_RATIO):
    d = len(mu)
    
    v = np.random.randn(d)
    v /= np.linalg.norm(v)
    
    mu_x = mu + alpha_ratio * np.linalg.norm(mu) * v
    neg_mu_x = -1 * mu_x
    
    x_pos = generate_multivariate_gaussian_samples(int(n_samples/2), mu_x, std)
    X_neg = generate_multivariate_gaussian_samples(int(n_samples/2), neg_mu_x, std)
    X = np.concatenate((x_pos, X_neg))

    y = 2 * np.ones((n_samples))
    y[n_samples//2:] = -2
    y = np.array(y, dtype=int)
    return X, y

# Training

In [None]:
mu = generate_mu(DIM, STD)

X_test, y_test = generate_data(TEST_SIZE , mu, STD)
X_unlabeled, y_unlabeled = generate_unlabeled_data(TEST_SIZE, mu, STD)

training_outputs = {}
for n, value in TRAINING_CONFIG.items():
    X_train, y_train = generate_data(n, mu, STD)
    m_bucket = value['M_BUCKET']
    
    _, _, linear_accuracy = get_tuned_model(
        LinearTrainer,
        X_train,
        y_train,
        X_test,
        y_test,
        LINEAR_PARAM_GRID,
        DIM,
        LINEAR_NUM_COMBINATION,
    )
    
    print(f'linear model accuracy at N={n} = {round(linear_accuracy, 4)}')
    robust_linear_accuracies = []
    for m in m_bucket:
        X_unlabeled, _ = generate_unlabeled_data(m, mu, STD)
        _, _, robust_linear_accuracy = get_tuned_model(
            RobustTrainer,
            X_train,
            y_train,
            X_test,
            y_test,
            ROBUST_LINEAR_PARAM_GRID,
            DIM,
            ROBUST_LINEAR_NUM_COMBINATION,
            X_unlabeled
        )
        robust_linear_accuracies.append(robust_linear_accuracy)
        print(f'robust linear model accuracy at N={n}, M={m} is equal to {round(robust_linear_accuracy, 4)}')
    
    
    training_outputs[n] = {}
    training_outputs[n] = {
        'linear_model_accuracy': linear_accuracy,
        'robust_linear_model_accuracies': robust_linear_accuracies
    }
    print('***********************************')

Best Parameters:  {'gamma': 9.601166867580856, 'weight_decay': 0.3589276260764547}
linear model accuracy at N=10 = 0.6174


## Calculate Upper Bound Accuracy

In [None]:
X_train_max, y_train_max = generate_data(N_MAX, mu, STD)
_, _, upper_bound_accuracy = get_tuned_model(
    LinearTrainer,
    X_train_max,
    y_train_max,
    X_test,
    y_test,
    LINEAR_PARAM_GRID,
    DIM,
    LINEAR_NUM_COMINATION,
)
print(f'upper bound accuracy  = {round(upper_bound_accuracy, 4)}')

In [None]:
import seaborn as sns

def plot_result(training_outputs, upper_bound_accuracy, xscale=None):
    fig, ax = plt.subplots(figsize=(12,8))
    colors = ['blue', 'red', 'green']
    whole_interval = [0, 10000]
    i = 0
    for n, value in training_outputs.items():
        y, y_0 = value['robust_linear_model_accuracies'], value['linear_model_accuracy']
        x = TRAINING_CONFIG[n]['M_BUCKET']
        sns.lineplot(x=whole_interval, y=[y_0]*len(whole_interval), ax=ax, color=f'tab:{colors[i]}', label=f'N={n}, linear')
        sns.lineplot(x=x, y=y, ax=ax,linestyle='dashed', color=f'tab:{colors[i]}', label=f'N={n}, robust linear')
        sns.scatterplot(x=x, y=y, ax=ax, color=f'tab:{colors[i]}', s=10)
        i+=1

    sns.lineplot(x=whole_interval, y=[upper_bound_accuracy]*len(whole_interval), ax=ax, color=f'black', label=f'N=10000, linear')
    ax.set(xscale=xscale, xlabel='Number OF Unlabeled', ylabel='accuracy', title=f'accuracy vs unlabaled count for dim={DIM}, ε=0')    
    plt.legend(loc='best')
    plt.show()


    
plot_result(training_outputs, upper_bound_accuracy, xscale='linear')
plot_result(training_outputs, upper_bound_accuracy, xscale='log')