https://gist.github.com/loevlie/5044e62aea2ce625b70d6d6d75113d25

https://neptune.ai/blog/pytorch-loss-functions

In [7]:
import torch
import torch.nn.functional as F
from scipy.stats import multivariate_normal
import math
import numpy as np, scipy.stats as st
import matplotlib.pyplot as plt
from matplotlib import cm
%matplotlib inline
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import random
#from __future__ import print_function
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from tqdm import tqdm

In [8]:
def sample(mu, var, nb_samples=500):
    """
    sample guassian random variable
    :param mu: torch.Tensor (features)
    :param var: torch.Tensor (features) (note: zero covariance)
    :return: torch.Tensor (nb_samples, features)
    """
    out = []
    for i in range(nb_samples):
        out += [
            torch.normal(mu, var.sqrt())
        ]
    return torch.stack(out, dim=0)

In [9]:
class Linear_net_sig(nn.Module):
    '''
    Linear binary classifier with unit init
    '''
    def __init__(self, input_dim, out_dim = 1):
        super(Linear_net_sig, self).__init__()
        # an affine operation: y = Wx + b
        self.fc1 = nn.Linear(input_dim, 1)
        torch.nn.init.ones_(self.fc1.weight)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.fc1(x)
        x = self.sigmoid(x)
        return x        


In [10]:
def run_classifier_sig(net, data_x, data_y):
    '''
    trains binary classifier using SGD
    '''
    BCE = torch.nn.BCELoss()
    optimizer = optim.SGD(net.parameters(), lr=0.1, momentum=0)
    for epoch in range(100):  # loop over the dataset multiple times

        running_loss = 0.0
        # get the inputs; data is a list of [inputs, labels]
        inputs = data_x
        labels = data_y
        order = np.array(range(len(data_x)))
        np.random.shuffle(order)
        # in-place changing of values
        inputs[np.array(range(len(data_x)))] = inputs[order]
        labels[np.array(range(len(data_x)))] = labels[order]

        # zero the parameter gradients
        #scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, len(inputs)*100)

        # forward + backward + optimize
        outputs = net(inputs)
        #loss = -labels*torch.log2(outputs) - (1-labels)*torch.log2(1-outputs) #BCE(outputs, labels)
        #loss = torch.sum(loss)/ len(inputs)
        loss = BCE(outputs, labels) 
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        #scheduler.step()
        running_loss += loss.item()
    #print("loss " + str(loss.item()))

    #print('Finished Training')

In [11]:
def test_classifier_sig(net, data_x, data_y):
    '''
    tests binary classifier and prints accuracy
    '''
    correct = 0
    total = 0
    with torch.no_grad():
        inputs =  data_x
        labels = data_y
        outputs = net(inputs)
        predicted = torch.round(outputs.data)
        total = labels.size(0)
        for i in range(total):
            correct += predicted[i].item() == labels[i].item()
        #correct = (predicted == labels).sum()
    print('Accuracy of the network on the  test examples: %d %%' % (
        100 * correct / total))

In [12]:
class Linear_net_rej(nn.Module):
    '''
    Linear Classifier to be used for the L_CE loss
    '''
    def __init__(self, input_dim, out_dim):
        super(Linear_net_rej, self).__init__()
        # an affine operation: y = Wx + b
        self.fc = nn.Linear(input_dim, out_dim+1)
        self.fc_rej = nn.Linear(input_dim, 1)
        torch.nn.init.ones_(self.fc.weight)
        torch.nn.init.ones_(self.fc_rej.weight)
        self.softmax = nn.Softmax()

    def forward(self, x):
        out = self.fc(x)
        rej = self.fc_rej(x)
        #out = torch.cat([out,rej],1)
        out = self.softmax(out)
        return out

In [13]:
def reject_CrossEntropyLoss(outputs, m, labels, m2, n_classes):
    '''
    Implmentation of L_{CE}^{\alpha}
        outputs: network outputs
        m: cost of deferring to expert cost of classifier predicting (I_{m =y})
        labels: target
        m2:  cost of classifier predicting (alpha* I_{m\neq y} + I_{m =y})
        n_classes: number of classes
    '''    
    batch_size = outputs.size()[0]            # batch_size
    rc = [n_classes] * batch_size
    rc = torch.tensor(rc)
    outputs =  -m*torch.log2( outputs[range(batch_size), rc]) - m2*torch.log2(outputs[range(batch_size), labels])   # pick the values corresponding to the labels
    return torch.sum(outputs)/batch_size

In [32]:
def run_classifier_rej(net, net_exp, data_x, data_y, alpha):
    '''
    training script for L_{CE}
        net: classifier and rejector model
        net_exp: expert model
        data_x: numpy x data
        data_y: numpy y data
        alpha: hyperparam alpha for loss L_CE^{\alpha}
    '''
    optimizer = optim.SGD(net.parameters(), lr=0.1)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, len(data_x)*50)

    for epoch in range(1):  # loop over the dataset multiple times
        running_loss = 0.0
        inputs = data_x
        labels = data_y
        order = np.array(range(len(data_x)))
        np.random.shuffle(order)
        # in-place changing of values
        inputs[np.array(range(len(data_x)))] = inputs[order]
        labels[np.array(range(len(data_x)))] = labels[order]
        x_batches = torch.split(inputs,64)
        y_batches = torch.split(labels,64)  
        for inputs, labels in zip(x_batches, y_batches):
            # get the inputs; data is a list of [inputs, labels]


            #order = np.array(range(len(data_x)))
            #np.random.shuffle(order)
            # in-place changing of values
            #inputs[np.array(range(len(data_x)))] = inputs[order]
            #labels[np.array(range(len(data_x)))] = labels[order]
            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            m = net_exp(inputs)
            _, predicted = torch.max(m.data, 1)
            m = (predicted==labels)*1
            m2 = [0] * len(inputs)
            for j in range (0,len(inputs)):
                if m[j]:
                    m2[j] = alpha
                else:
                    m2[j] = 1
            m = torch.tensor(m)
            m2 = torch.tensor(m2)
            outputs = net(inputs)
            loss = reject_CrossEntropyLoss(outputs, m, labels, m2, 2)
            #loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            scheduler.step()
            running_loss += loss.item()

    #print('Finished Training')

In [15]:
def test_classifier_rej(net, net_exp, data_x, data_y):
    '''
    Testing script for L_{CE} loss
    '''
    correct = 0
    correct_sys = 0
    exp = 0
    exp_total = 0
    total = 0
    real_total = 0
    alone_correct = 0
    with torch.no_grad():
        inputs =  data_x
        labels = data_y
        m = net_exp(inputs)
        _, predicted_exp = torch.max(m.data, 1)
        outputs = net(inputs)
        _, predicted = torch.max(outputs.data, 1)
        for i in range(len(inputs)):
            r = (predicted[i] == 2).item() # if 2, then defer to expert
            if r:
                exp += (predicted_exp[i] == labels[i]).item()
                correct_sys += (predicted_exp[i] == labels[i]).item()
                exp_total += 1
            else:
                correct += (predicted[i] == labels[i]).item() 
                correct_sys += (predicted[i] == labels[i]).item()
                total += 1
        real_total += labels.size(0)
    cov = str(total) + str(" out of") + str(real_total)

    print("coverage", cov)
    print("system accuracy", 100 * correct_sys / real_total)
    print("expert accuracy",100* exp/(exp_total+0.0002))
    print("classifier accuracy", 100 * correct / (total + 0.0001))
    #print("alone classifier", 100 * alone_correct / real_total)
    print()

    return [100*total/real_total,  100*correct_sys/real_total, 100* exp/(exp_total+0.0002),100*correct/(total+0.0001) ]


In [33]:
experimental_data_rej1 = []
experimental_data_rej5 = []
experimental_data_rej0 = []

trials = 1

TO_PRINT = False
for exp in tqdm(range(0,trials)):
    d = 10
    total_samples = 1000
    '''
    group_proportion = np.random.uniform()
    if group_proportion <= 0.02:
        group_proportion = 0.02
    if group_proportion >= 0.98:
        group_proportion = 0.98
    #group_proportion = 0.4
    '''
    group_proportion = 0.5

    cluster1_mean = torch.rand(d)*d
    cluster1_var = torch.rand(d)*d
    cluster1 = sample(
        cluster1_mean,
        cluster1_var,
        nb_samples= math.floor(total_samples * group_proportion * 0.5 )
    )
    cluster1_labels = torch.ones([math.floor(total_samples * group_proportion * 0.5 )], dtype=torch.long)
    cluster2_mean = torch.rand(d)*d
    cluster2_var = torch.rand(d)*d
    cluster2 = sample(
        cluster2_mean,
        cluster2_var,
        nb_samples= math.floor(total_samples * group_proportion * 0.5 )
    )
    cluster2_labels = torch.zeros([math.floor(total_samples * group_proportion * 0.5 )], dtype=torch.long)
    cluster3_mean = torch.rand(d)*d
    cluster3_var = torch.rand(d)*d
    cluster3 = sample(
        cluster3_mean,
        cluster3_var,
        nb_samples= math.floor(total_samples * (1-group_proportion) * 0.5 )
    )
    cluster3_labels = torch.ones([math.floor(total_samples * (1-group_proportion) * 0.5 )], dtype=torch.long)
    
    cluster4_mean = torch.rand(d)*d
    cluster4_var = torch.rand(d)*d
    cluster4 = sample(
        cluster4_mean,
        cluster4_var,
        nb_samples= math.floor(total_samples * (1-group_proportion) * 0.5 )
    )
    cluster4_labels = torch.zeros([math.floor(total_samples * (1-group_proportion) * 0.5 )], dtype=torch.long)
    
    # test data
    cluster1_test = sample(
        cluster1_mean,
        cluster1_var,
        nb_samples= math.floor(total_samples * group_proportion * 0.5 )
    )
    cluster1_labels_test = torch.ones([math.floor(total_samples * group_proportion * 0.5 )], dtype=torch.long)
    
    cluster2_test = sample(
        cluster2_mean,
        cluster2_var,
        nb_samples= math.floor(total_samples * group_proportion * 0.5 )
    )
    cluster2_labels_test = torch.zeros([math.floor(total_samples * group_proportion * 0.5 )], dtype=torch.long)

    cluster3_test = sample(
        cluster3_mean,
        cluster3_var,
        nb_samples= math.floor(total_samples * (1-group_proportion) * 0.5 )
    )
    cluster3_labels_test = torch.ones([math.floor(total_samples * (1-group_proportion) * 0.5 )], dtype=torch.long)
    
    cluster4_test = sample(
        cluster4_mean,
        cluster4_var,
        nb_samples= math.floor(total_samples * (1-group_proportion) * 0.5 )
    )
    cluster4_labels_test = torch.zeros([math.floor(total_samples * (1-group_proportion) * 0.5 )], dtype=torch.long)
    data_x_test = torch.cat([cluster1_test, cluster2_test, cluster3_test, cluster4_test])
    data_y_test = torch.cat([cluster1_labels_test, cluster2_labels_test, cluster3_labels_test, cluster4_labels_test])


100%|██████████| 1/1 [00:00<00:00, 45.59it/s]


In [34]:
# expert0 model
net_exp = Linear_net_sig(d,2)
data_x = torch.cat([cluster4])
data_y = cluster4_labels.view(-1, 1).type(torch.float)
run_classifier_sig(net_exp, data_x, data_y)

#reject
data_x = torch.cat([cluster1, cluster2, cluster3, cluster4])
data_y = torch.cat([cluster1_labels, cluster2_labels, cluster3_labels, cluster4_labels])
net_rej = Linear_net_rej(d,2)
alpha = 0
run_classifier_rej(net_rej, net_exp, data_x, data_y, alpha)
print(test_classifier_sig(net_exp,data_x,data_y))
batch_data = test_classifier_rej(net_rej, net_exp, data_x_test, data_y_test)
experimental_data_rej0.append(batch_data)

net_rej = Linear_net_rej(d,2)
alpha = 0.5
run_classifier_rej(net_rej, net_exp, data_x, data_y, alpha)
print(test_classifier_sig(net_exp,data_x,data_y))
batch_data = test_classifier_rej(net_rej, net_exp, data_x_test, data_y_test)
print(test_classifier_sig(net_exp,data_x,data_y))
experimental_data_rej5.append(batch_data)
    
net_rej = Linear_net_rej(d,2)
alpha = 1
run_classifier_rej(net_rej, net_exp, data_x, data_y, alpha)
batch_data = test_classifier_rej(net_rej, net_exp, data_x_test, data_y_test)
print(test_classifier_sig(net_exp,data_x,data_y))
experimental_data_rej1.append(batch_data)

Accuracy of the network on the  test examples: 50 %
None
coverage 512 out of1000
system accuracy 95.4
expert accuracy 96.5163538867402
classifier accuracy 94.33591907501581

Accuracy of the network on the  test examples: 50 %
None
coverage 1000 out of1000
system accuracy 93.5
expert accuracy 0.0
classifier accuracy 93.49999065000094

Accuracy of the network on the  test examples: 50 %
None
coverage 485 out of1000
system accuracy 95.3
expert accuracy 93.98054602697242
classifier accuracy 96.70101098948227

Accuracy of the network on the  test examples: 50 %
None


  m = torch.tensor(m)


In [35]:
# expert1 model

net_exp = Linear_net_sig(d,2)
data_x = torch.cat([cluster3])
data_y = cluster3_labels.view(-1, 1).type(torch.float)
run_classifier_sig(net_exp, data_x, data_y)


#reject
data_x = torch.cat([cluster1, cluster2, cluster3, cluster4])
data_y = torch.cat([cluster1_labels, cluster2_labels, cluster3_labels, cluster4_labels])
net_rej = Linear_net_rej(d,2)
alpha = 0
run_classifier_rej(net_rej, net_exp, data_x, data_y, alpha)
print(test_classifier_sig(net_exp,data_x,data_y))
batch_data = test_classifier_rej(net_rej, net_exp, data_x_test, data_y_test)
experimental_data_rej0.append(batch_data)

net_rej = Linear_net_rej(d,2)
alpha = 0.5
run_classifier_rej(net_rej, net_exp, data_x, data_y, alpha)
print(test_classifier_sig(net_exp,data_x,data_y))
batch_data = test_classifier_rej(net_rej, net_exp, data_x_test, data_y_test)
experimental_data_rej5.append(batch_data)
    
net_rej = Linear_net_rej(d,2)
alpha = 1
run_classifier_rej(net_rej, net_exp, data_x, data_y, alpha)
print(test_classifier_sig(net_exp,data_x,data_y))
batch_data = test_classifier_rej(net_rej, net_exp, data_x_test, data_y_test)
experimental_data_rej1.append(batch_data)

Accuracy of the network on the  test examples: 50 %
None
coverage 500 out of1000
system accuracy 95.4
expert accuracy 95.39996184001527
classifier accuracy 95.39998092000383

Accuracy of the network on the  test examples: 50 %
None
coverage 295 out of1000
system accuracy 79.3
expert accuracy 70.78012176450447
classifier accuracy 99.66098316576843

Accuracy of the network on the  test examples: 50 %
None
coverage 1000 out of1000
system accuracy 73.3
expert accuracy 0.0
classifier accuracy 73.29999267000073



  m = torch.tensor(m)
