In [None]:
"""
Here is the code used to carry out the experiments on the two coiling spirals: error_rate(epsilon). We average the value over 1000 runs. 
Reader can find also the code of our network architecture.
We used a dataset (x_train,y_train,x_test,y_test) generated by the notebook 'Generate_Spiral_dataset'. 
One can generate others datasets (noise level, number of samples) with the notebook 'Generate_Spiral_dataset'.
"""

import random
import os
import scipy.stats as st
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score

import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Dataset
from matplotlib.colors import ListedColormap
import torch.nn as nn
import torch.nn.functional as F


In [None]:
### Dataset 
class bis_spiral(Dataset):
  def __init__(self, x_train, x_test, y_train, y_test , train = True):
        if train:
          self.X = x_train
          self.Y = y_train
        else:
          self.X = x_test
          self.Y = y_test
  def __len__(self):
          'Denotes the total number of samples'
          return len(self.X)

  def __getitem__(self, index):
          x = torch.tensor(self.X[index]).float()
          y = torch.tensor(self.Y[index]).float()
          return x,y,index

def create_data(x_train, x_test, y_train, y_test,batch_size ):
                                                                                                                                                                        
  transformed_dataset = bis_spiral(x_train, x_test, y_train, y_test, train = True)
  train_loader =  DataLoader(transformed_dataset, batch_size, shuffle=True)
  test_loader = DataLoader(bis_spiral(x_train, x_test, y_train, y_test ,train = False),batch_size =  int(n_samples*0.2),shuffle=True)
  return train_loader, test_loader



In [None]:
### Load and plot the spiral dataset
path = './data/spirals_dataset'

cm_bright = ListedColormap(['#FF0000', '#0000FF'])
x_train, x_test, y_train, y_test = np.load("x_train.npy"), np.load("x_test.npy"),np.load("y_train.npy"),np.load("y_test.npy")
fig = plt.figure()
plt.scatter(x_train[:,0],x_train[:,1],c=y_train[:,0],cmap=cm_bright,edgecolors='k')
plt.show()

In [None]:
###Network
class Net(nn.Module):
    # define nn                                                                                                                                                                                        
    def __init__(self,Ncouche = 50):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(2, Ncouche )
        self.fc2 = nn.Linear(Ncouche, Ncouche )
        self.fc3= nn.Linear(Ncouche, Ncouche )
        self.fc4 = nn.Linear(Ncouche , 2)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, X, y = None,mixup = False,eps  =1, alpha =1):
        if mixup:
          X, y_a, y_b, lam,dist = mixup_data (X,y,eps = eps)
        else:
          y_a, y_b, lam = None,None,None

          dist = torch.ones(X.size()[0])

        X = F.relu(self.fc1(X))
        X = F.relu(self.fc2(X))
        X = F.relu(self.fc3(X))
        X = self.fc4(X)
        X = self.softmax(X)

        return X,y_a,y_b,lam,dist


def mixup_data(x, y, eps = 0, alpha=1.0, use_cuda=False,):

    '''Compute the mixup data. Return mixed inputs, pairs of targets, and lambda'''
    if alpha > 0.:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1.
    batch_size = x.size()[0]
    if use_cuda:
        index = torch.randperm(batch_size).cuda()
    else:
        index = torch.randperm(batch_size)

    mixed_x = lam * x + (1 - lam) * x[index,:]

    #we compute the weights here. If d> eps => w = 0.
    m = nn.Threshold(eps,0,inplace=False)
    Cdist = torch.norm(x.flatten(start_dim = 1)-x[index,:].flatten(start_dim = 1),p=2,dim =1) #Euclidean distance between x and x permuted
    dist = torch.heaviside(-m(Cdist),torch.tensor(1.)) #we apply the threshold. 
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam,dist
 
def mix_criterion(criterion, pred, y_a, y_b,lam):

    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)                                                                                                                                  

In [None]:
criterion = nn.CrossEntropyLoss()# cross entropy loss         
def experiment(net,train_loader,cost_index= None,mixup = False, eps = 0,lr = 0.01,epochs = 10 ,alpha =1,verbose=False):
  optimizer = torch.optim.Adam(net.parameters(), lr=0.01)
  scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=300, gamma=0.1)

  Loss=[]
  for epoch in range(epochs):
    for i,(x,y,index) in enumerate(train_loader):
      optimizer.zero_grad()
      y_onehot = torch.FloatTensor(y.shape[0], 2)
      y_onehot.zero_()
      if eps == 0:
        mixup = False
      if mixup:
        out, y_a,y_b,lam,dist = net(x, y, mixup = mixup, eps = eps)
        loss = mix_criterion(criterion, out,y_a.flatten().long(),y_b.flatten().long(),lam)
      else:
        out, y_a, y_b , lam, dist = net(x,y)
        loss = criterion(out, y.flatten().long())
      loss = (loss * dist).mean() #The loss is weighted by dist as described in our paper.
      loss.backward()
      optimizer.step()

    scheduler.step()

  mean_accuracy_scores = []
  for (x_test,y_test,__) in test_loader:
    predict_out,__,__,__ ,__= net(x_test)


    _, predict_y = torch.max(predict_out, 1)

    if verbose:
      print ('prediction accuracy', accuracy_score(y_test, predict_y.data))

      print( 'macro precision', precision_score(y_test, predict_y.data, average='macro'))
      print ('micro precision', precision_score(y_test, predict_y.data, average='micro'))
      print ('macro recall', recall_score(y_test, predict_y.data, average='macro'))
      print ('micro recall', recall_score(y_test, predict_y.data, average='micro'))
      #plot_decision_boundary(x_grid,y_grid,x_test,y_test,net)                                                                                                                                         
    mean_accuracy_scores.append(accuracy_score(y_test, predict_y.data))
  return np.mean(mean_accuracy_scores)


In [None]:
n_runs = 1000

Epsilon = [0,0.6,1,1.5,2,2.5,3,3.5,4,10]
Batch = [20,241,103,50,33,25,22,20,20,20] ###Determined by Experiments 

mean_accuracy =[]

for nb,eps in enumerate(Epsilon):
  mean_accuracy_eps = []
  for i in range(n_runs):
    train_loader, test_loader = create_data(x_train, x_test, y_train, y_test,Batch[nb])
    network_mixup_graph = Net(100)
    test_accuracy = experiment(network_mixup_graph,train_loader,epochs= 200, eps = eps, mixup = True,verbose=False,)                                                                                       
    mean_accuracy_eps.append(test_accuracy)                                                                                                                                                                            
  mean_accuracy.append(np.mean(mean_accuracy_eps))
  print(mean_accuracy)
  print(st.norm.interval(0.95, loc = np.mean(mean_accuracy), scale = st.sem(A)))
