<a href="https://colab.research.google.com/github/khalida1wwin/CMPUT-328/blob/main/Logistic_Regression_on_MNIST_with_evolutionary_optimization_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
from torchvision import transforms, datasets
import numpy as np
import timeit
from collections import OrderedDict
from pprint import pformat
from tqdm import tqdm

torch.multiprocessing.set_sharing_strategy('file_system')

def compute_score(acc, min_thres, max_thres):
    if acc <= min_thres:
        base_score = 0.0
    elif acc >= max_thres:
        base_score = 100.0
    else:
        base_score = float(acc - min_thres) / (max_thres - min_thres) \
                     * 100
    return base_score


def run(algorithm, dataset_name, filename):
    start = timeit.default_timer()
    predicted_test_labels, gt_labels = algorithm(dataset_name)
    if predicted_test_labels is None or gt_labels is None:
      return (0, 0, 0)
    stop = timeit.default_timer()
    run_time = stop - start
    
    np.savetxt(filename, np.asarray(predicted_test_labels))

    correct = 0
    total = 0
    for label, prediction in zip(gt_labels, predicted_test_labels):
      total += label.size(0)
      correct += (prediction.cpu().numpy() == label.cpu().numpy()).sum().item()   # assuming your model runs on GPU
      
    accuracy = float(correct) / total
    
    print('Accuracy of the network on the 10000 test images: %d %%' % (100 * correct / total))
    return (correct, accuracy, run_time)

In [None]:
# Source: https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html
# from CIFAR10_Multiple_Linear_Regression.ipynb on eclass
CIFAR10_batch_size_train = 200
MNIST_batch_size_train = 270
batch_size_test = 1000
from torch.utils.data import random_split
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
random_seed = 1
torch.backends.cudnn.enabled = False
torch.manual_seed(random_seed)
CIFAR10_training = datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)

# create a training and a validation set
CIFAR10_training_set, CIFAR10_validation_set = random_split(CIFAR10_training, [38000, 12000])

# CIFAR-10 test set
CIFAR10_test_set = datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)

# Create data loaders
CIFAR10_train_loader = torch.utils.data.DataLoader(CIFAR10_training_set,
                                           batch_size=CIFAR10_batch_size_train,
                                           shuffle=True, num_workers=2)

CIFAR10_validation_loader = torch.utils.data.DataLoader(CIFAR10_validation_set,
                                                batch_size=CIFAR10_batch_size_train,
                                                shuffle=True, num_workers=2)


CIFAR10_test_loader = torch.utils.data.DataLoader(CIFAR10_test_set,
                                          batch_size=batch_size_test, 
                                          shuffle=False, num_workers=2)
classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
#MNIST dataset is part of torchvision

MNIST_training = datasets.MNIST('/MNIST_dataset/', train=True, download=True,
                             transform=transforms.Compose([
                               transforms.ToTensor(),
                               transforms.Normalize((0.1307,), (0.3081,))]))

MNIST_test_set = datasets.MNIST('/MNIST_dataset/', train=False, download=True,
                             transform=transforms.Compose([
                               transforms.ToTensor(),
                               transforms.Normalize((0.1307,), (0.3081,))]))

# create a training and a validation set
MNIST_training_set, MNIST_validation_set = random_split(MNIST_training, [48000, 12000])


MNIST_train_loader = torch.utils.data.DataLoader(MNIST_training_set,batch_size=MNIST_batch_size_train, shuffle=True)

MNIST_validation_loader = torch.utils.data.DataLoader(MNIST_validation_set,batch_size=MNIST_batch_size_train, shuffle=True)

MNIST_test_loader = torch.utils.data.DataLoader(MNIST_test_set,batch_size=batch_size_test, shuffle=True)


Files already downloaded and verified
Files already downloaded and verified


In [None]:
print(MNIST_test_set.targets)

tensor([7, 2, 1,  ..., 4, 5, 6])


In [None]:
# from CIFAR10_Multiple_Linear_Regression.ipynb on eclass
# from torch.utils.data import random_split

# momentum = 0.5
log_interval = 100
CIFAR10_n_epochs = int(3000 /(38000/CIFAR10_batch_size_train))
MNIST_n_epochs = int(3800 /(48000/CIFAR10_batch_size_train))
random_seed = 1
torch.backends.cudnn.enabled = False
torch.manual_seed(random_seed)

# Checking GPU availability
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)
print(CIFAR10_n_epochs)
print(MNIST_n_epochs)

cuda:0
15
15


In [None]:
examples = enumerate(CIFAR10_test_loader)
batch_idx, (example_data, example_targets) = next(examples)
print(example_data.shape)
print(example_targets.shape)

examples = enumerate(MNIST_test_loader)
batch_idx, (example_data, example_targets) = next(examples)
print(example_data.shape)
print(example_targets.shape)
# print(example_targets)

torch.Size([1000, 3, 32, 32])
torch.Size([1000])
torch.Size([1000, 1, 28, 28])
torch.Size([1000])


In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable

In [None]:
# Multiple Linear regression
class CIFAR10LogisticRegression(nn.Module):
    def __init__(self):
        super(CIFAR10LogisticRegression, self).__init__()
        self.fc = nn.Linear(3*32*32, 10)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        # Softmax = nn.Softmax(dim=1)
        # x = Softmax(x)
        x = self.fc(x)
        return x
class MNISTLogisticRegression(nn.Module):
    def __init__(self):
        super(MNISTLogisticRegression, self).__init__()
        self.fc = nn.Linear(1*28*28, 10)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        # Softmax = nn.Softmax(dim=1)
        # x = Softmax(x)
        x = self.fc(x)
        return x


In [None]:
class One_Hot(nn.Module):
    def __init__(self, depth):
        super(One_Hot,self).__init__()
        self.depth = depth
        self.ones = torch.sparse.torch.eye(depth).to(device)
    def forward(self, X_in):
        X_in = X_in.long()
        return self.ones.index_select(0,X_in.data)
    def __repr__(self):
        return self.__class__.__name__ + "({})".format(self.depth)

In [None]:
def validation(multi_linear_model,validation_loader):
  multi_linear_model.eval()
  validation_loss = 0
  correct = 0
  one_hot = One_Hot(10).to(device)
  with torch.no_grad(): # notice the use of no_grad
    for data, target in validation_loader:
      data = data.to(device)
      target = target.to(device)
      output = multi_linear_model(data)
      pred = output.data.max(1, keepdim=True)[1]
      correct += pred.eq(target.data.view_as(pred)).sum()
      ####
      CE = nn.CrossEntropyLoss()
      loss = CE(output, one_hot(target)) # notice the use of view_as
      validation_loss +=loss
      ####
      # validation_loss += F.mse_loss(output, one_hot(target), size_average=False).item()

  validation_loss /= len(validation_loader.dataset)
  Accuracy = 100. * correct / len(validation_loader.dataset)
  print('\nValidation set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(validation_loss, correct, len(validation_loader.dataset), 100. * correct / len(validation_loader.dataset)))

In [None]:
def test(multi_linear_model,test_loader):
  multi_linear_model.eval()
  test_loss = 0
  correct = 0
  one_hot = One_Hot(10).to(device)
  # gt_labels_tensor = torch.zeros(len(test_loader),dtype=torch.float,device= device)
  final_target = torch.FloatTensor([]).to(device)
  final_pred = torch.FloatTensor([]).to(device)
  with torch.no_grad():
    for i, (data, target) in enumerate(test_loader):
      data = data.to(device)
      target = target.to(device)
      final_target = torch.cat([final_target, target]).view(-1).to(device)
      
      # print(target)
      # print(target.shape)
      # gt_labels_tensor[i] = target
      
      output = multi_linear_model(data)
      ####
      CE = nn.CrossEntropyLoss()
      loss = CE(output, one_hot(target)) # notice the use of view_as
      test_loss +=loss
      ####
      # test_loss += F.mse_loss(output, one_hot(target), size_average=False).item()
      
      pred = output.data.max(1, keepdim=True)[1]
      # print(pred)
      final_pred = torch.cat([final_pred, pred.view_as(target)])
      correct += pred.eq(target.data.view_as(pred)).sum()

      
  test_loss /= len(test_loader.dataset)
  print('\nTest set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(test_loss, correct, len(test_loader.dataset), 100. * correct / len(test_loader.dataset)))
  # print(final_pred.shape, final_target.shape)
  # print(final_pred, final_target)
  return final_pred, final_target

In [None]:
def logistic_regression(dataset_name):
    # epoch = 5
    log_interval = 100
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    if dataset_name == "CIFAR10":
      # Lambda = 0.001
      # learning_rate = 0.0005
      Lambda = Lambda_CIFAR10
      learning_rate = learning_rate_CIFAR10
      LogisticRegression = CIFAR10LogisticRegression().to(device)
      # optimizer = optim.SGD(LogisticRegression.parameters(), lr=learning_rate, momentum=0.95) # the best
      # optimizer = optim.Adam(LogisticRegression.parameters(), lr=learning_rate)
      if optimizerName_CIFAR10 == "SGD":
        optimizer = optim.SGD(LogisticRegression.parameters(), lr=learning_rate, momentum=0.95)
      elif optimizerName_CIFAR10 == "Adam": 
        optimizer = optim.Adam(LogisticRegression.parameters(), lr=learning_rate)
      one_hot = One_Hot(10).to(device)
      LogisticRegression.train()
      # validation(LogisticRegression,CIFAR10_validation_loader)
      for epoch in range(CIFAR10_n_epochs):
        for batch_idx, (data, target) in enumerate(CIFAR10_train_loader):
          data = data.requires_grad_().to(device)
          target = target.to(device)
          optimizer.zero_grad()
          output = LogisticRegression(data)
          CE = nn.CrossEntropyLoss()
          loss = CE(output, one_hot(target)) # notice the use of view_as

          # L = [(torch.abs(p)).sum() for p in LogisticRegression.parameters()] #L1
          L = [(p**2).sum() for p in LogisticRegression.parameters()] #L2
          loss = loss + Lambda * sum(L)
          loss.backward()
          optimizer.step()
          # validation(LogisticRegression,CIFAR10_validation_loader)
          if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
              epoch, batch_idx * len(data), len(CIFAR10_train_loader.dataset),
              100. * batch_idx / len(CIFAR10_train_loader), loss.item()))
        if epoch % 5 == 0:
          validation(LogisticRegression,CIFAR10_validation_loader)
      # test after training 
      predicted_test_labels , gt_labels_tensor = test(LogisticRegression,CIFAR10_test_loader)
      gt_labels = CIFAR10_test_set.targets
    elif dataset_name == "MNIST":
      # Lambda = 0.0001
      # learning_rate = 0.001
      Lambda = Lambda_MNIST
      learning_rate = learning_rate_MNIST
      LogisticRegression = MNISTLogisticRegression().to(device)
      # optimizer = optim.SGD(LogisticRegression.parameters(), lr=learning_rate, momentum=0.95)
      # optimizer = optim.Adam(LogisticRegression.parameters(), lr=learning_rate) # The best 
      if optimizerName_MNIST == "SGD":
        optimizer = optim.SGD(LogisticRegression.parameters(), lr=learning_rate, momentum=0.95)
      elif optimizerName_MNIST == "Adam": 
        optimizer = optim.Adam(LogisticRegression.parameters(), lr=learning_rate)
      one_hot = One_Hot(10).to(device)
      LogisticRegression.train()
      # validation(LogisticRegression,MNIST_validation_loader)
      for epoch in range(MNIST_n_epochs):
        for batch_idx, (data, target) in enumerate(MNIST_train_loader):
          data = data.requires_grad_().to(device)
          target = target.to(device)
          optimizer.zero_grad()
          output = LogisticRegression(data)
          CE = nn.CrossEntropyLoss()
          loss = CE(output, one_hot(target)) # notice the use of view_as
          # L = [(torch.abs(p)).sum() for p in LogisticRegression.parameters()] #L1
          L = [(p**2).sum() for p in LogisticRegression.parameters()] #L2
          loss = loss + Lambda * sum(L)
          loss.backward()
          optimizer.step()
          # validation(LogisticRegression,MNIST_validation_loader)
          if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
              epoch, batch_idx * len(data), len(MNIST_train_loader.dataset),
              100. * batch_idx / len(MNIST_train_loader), loss.item()))
        if epoch % 5 == 0:
          validation(LogisticRegression,MNIST_validation_loader)
      # test after training 
      predicted_test_labels,gt_labels_tensor = test(LogisticRegression,MNIST_test_loader)
    return predicted_test_labels.view(1000,10).cpu() , gt_labels_tensor.view(1000,10).cpu() 

In [None]:
def tune_hyper_parameter():
    # TODO: implement logistic regression hyper-parameter tuning here
    # learning rate and lambda
    startTime = timeit.default_timer()
    params_to_tune = [{"lr": 0.01 , "lambda": 0.001}, 
                      {"lr": 0.01 , "lambda": 0.0001},
                      {"lr": 0.001, "lambda": 0.001}, 
                      {"lr": 0.001, "lambda": 0.0001},
                      {"lr": 0.01 , "lambda": 0.01},
                      {"lr": 0.0005, "lambda": 0.01}, 
                      {"lr": 0.0005, "lambda": 0.001}]
    
    best_accuracy = 0.0
    best_params = None
    filenames = { "MNIST": "predictions_mnist_KhalidAlmahrezi_1580848.txt", "CIFAR10": "predictions_cifar10_KhalidAlmahrezi_1580848.txt"}

    final_best_best_params = {"Adam":{"lr":0.0 , "lambda": 0.0}, 
                              "SGD" :{"lr": 0.0, "lambda": 0.0}}
    global learning_rate 
    global Lambda 
    global optimizerName 
    for dataset_name in ["CIFAR10", "MNIST"]:
    # for dataset_name in [ "MNIST", "CIFAR10"]:
      for optimizerName in ["Adam","SGD"]:
        for params in params_to_tune:
          learning_rate = params["lr"]
          Lambda = params["lambda"]
          print("optimizer Name: ", optimizerName, "| learning rate = ", learning_rate, "| Lambda = ", Lambda)
        # global params    # Specify params to search as a global variable, to be used for logistic_regression, also feel free to add more arguments to all existing functions
          result, score = run_on_dataset_for_tuning(dataset_name, filenames[dataset_name])
          if result["accuracy"] > best_accuracy:
              best_accuracy = result["accuracy"]
              best_params = params
              final_best_best_params[optimizerName]["lr"] = learning_rate
              final_best_best_params[optimizerName]["lambda"] = Lambda
          print(best_params, best_accuracy)

    stop = timeit.default_timer()
    run_time = stop - startTime
    print(best_params, best_accuracy, run_time)
    print(final_best_best_params, best_accuracy, run_time)

    return final_best_best_params, best_accuracy, run_time
    
    # return None, None, None

In [None]:
# import numpy as np
# import heapq
# rng_seed = np.random.default_rng(143341)
# LR = (rng_seed.integers(low=0, high=1000, size=3)/10000) 
# Lammda =  (rng_seed.integers(low=0, high=1000, size=3)/100000) 

# LR_2d = []
# Lammda_2d = []
# for i in range(len(LR)):
#   heapq.heappush(LR_2d,([i,LR[i]]))
#   heapq.heappush(Lammda_2d,([i,Lammda[i]]))
# # for i in [0.91, 0.82, 0.89]:


# print("lammda",Lammda_2d,"LR",LR_2d)
# print(Lammda_2d,LR_2d)
# print(heapq.heappop(Lammda_2d))
# print(heapq.heappop(LR_2d))
# print("lammda",Lammda_2d,"LR",LR_2d)
# print((rng_seed.integers(low=0, high=1000, size=1)/10000)[0])
# print(LR_2d)
# print(Lammda_2d)
# sorted(Lammda_2d)
# # heapq.heapify([el * -1 for el in LR_2d ])

# print(LR_2d)
# print(Lammda_2d)
# print(LR_2d.pop())
# print(Lammda_2d.pop())
# print(LR_2d)
# print(Lammda_2d)

In [None]:
def tune_hyper_parameter():
    # TODO: implement logistic regression hyper-parameter tuning here
    # learning rate and lambda
    # with Evolutionary optimization from
    # https://en.wikipedia.org/wiki/Hyperparameter_optimization#Evolutionary_optimization
    startTime = timeit.default_timer()
    import numpy as np
    import heapq
    
    div = 1000000
    GenSize = 3
    remove = 2
    best_accuracy = 0.0
    genarations = 4
    

    
    final_best_best_params = {
                      "MNIST":{"Adam":{"lr": 0.0, "lambda": 0.0,"best_accuracy": 0.0}, 
                              "SGD" :{"lr": 0.0, "lambda": 0.0,"best_accuracy": 0.0},
                              "best_optimizer":None},
                              
                    "CIFAR10":{"Adam":{"lr": 0.0, "lambda": 0.0,"best_accuracy": 0.0}, 
                              "SGD" :{"lr": 0.0, "lambda": 0.0,"best_accuracy": 0.0},
                              "best_optimizer":None}}
    best_params = None
    filenames = { "MNIST": "predictions_mnist_KhalidAlmahrezi_1580848.txt", "CIFAR10": "predictions_cifar10_KhalidAlmahrezi_1580848.txt"}
    global learning_rate 
    global Lambda 
    global optimizerName
    global epochs  
    # for dataset_name in ["CIFAR10", "MNIST"]:
    for dataset_name in [ "MNIST", "CIFAR10"]:
      for optimizerName in ["Adam","SGD"]:
        # 1 Create an initial population of random solutions
        rng_seed = np.random.default_rng(143341)
        
        rng_seed = np.random.default_rng(143341)
        learning_rateGen1 = rng_seed.integers(low=0, high=10000, size=GenSize)/div
        LammdaGen1 =  rng_seed.integers(low=0, high=10000, size=GenSize)/div
        LR_2d = []
        Lammda_2d = []
        print("learning_rateGen1")
        print(learning_rateGen1)
        print("LammdaGen1")
        print(LammdaGen1)
        for i in range(GenSize):
          heapq.heappush(LR_2d,([0,learning_rateGen1[i]]))
          heapq.heappush(Lammda_2d,([0,LammdaGen1[i]]))

        for gen in range(genarations):
          print("gen:", gen)
          newLR_2d = []
          newLammda_2d = []
          # start of the generations
          for i in range(GenSize):
            # 2 Evaluate the hyperparameters tuples and acquire their fitness function
            learning_rate = LR_2d[i][1]
            Lambda = Lammda_2d[i][1]
            print("optimizer Name: ", optimizerName, "| learning rate = ", learning_rate, "| Lambda = ", Lambda)
            if gen == 0: 
              epochs = 1
            elif gen == 3: 
              epochs = 3
            # elif gen == 5:
            #   epochs = 4
            # elif gen == 6:
            #   epochs = 6
          #  global params    # Specify params to search as a global variable, to be used for logistic_regression, also feel free to add more arguments to all existing functions
            result, score = run_on_dataset_for_tuning(dataset_name, filenames[dataset_name])
            if result["accuracy"] > final_best_best_params[dataset_name][optimizerName]["best_accuracy"]:
              best_accuracy = result["accuracy"]
              final_best_best_params[dataset_name][optimizerName]["best_accuracy"] =  result["accuracy"]
              final_best_best_params[dataset_name]["best_optimizer"] =  optimizerName
              best_params = {learning_rate,Lambda}
              # 3 Rank the hyperparameter tuples by their relative fitness
              heapq.heappush(newLR_2d,[result["accuracy"],learning_rate])
              heapq.heappush(newLammda_2d,[result["accuracy"],Lambda])
              final_best_best_params[dataset_name][optimizerName]["lr"] = learning_rate
              final_best_best_params[dataset_name][optimizerName]["lambda"] = Lambda 
              # print(best_params, best_accuracy)
            else:
              heapq.heappush(newLR_2d,[result["accuracy"],learning_rate])
              heapq.heappush(newLammda_2d,[result["accuracy"],Lambda])


            # 4 Replace the worst-performing hyperparameter tuples with new hyperparameter tuples generated
            print("newLR_2d", newLR_2d,"newLammda_2d",newLammda_2d,)

          # select the best two acc two replace the worst acc
          for n in range(remove):
            sorted_LR_2d = sorted(newLR_2d.copy())
            sortedLimts_LR_2d = []
            heapq.heappush(sortedLimts_LR_2d, sorted_LR_2d.pop()[1])
            heapq.heappush(sortedLimts_LR_2d, sorted_LR_2d.pop()[1])
            High_LR_2d = sortedLimts_LR_2d[1]
            Low_LR_2d = sortedLimts_LR_2d[0]



            sorted_Lammda_2d = sorted(newLammda_2d.copy())
            sortedLimts_Lammda_2d = []
            heapq.heappush(sortedLimts_Lammda_2d, sorted_Lammda_2d.pop()[1])
            heapq.heappush(sortedLimts_Lammda_2d, sorted_Lammda_2d.pop()[1])
            High_Lammda_2d = sortedLimts_Lammda_2d[1]
            Low_Lammda_2d = sortedLimts_Lammda_2d[0]
            # print(sortedlimts_LR_2d)
            print("low= ",Low_LR_2d, "high=",High_LR_2d)
            # print(sortedlimts_Lammda_2d)
            print("low= ",Low_Lammda_2d, "high=",High_Lammda_2d)
            heapq.heappop(newLR_2d)
            heapq.heappop(newLammda_2d)
            if High_LR_2d != Low_LR_2d:
              heapq.heappush(newLR_2d, [0, (rng_seed.integers(low=Low_LR_2d* div, high=High_LR_2d* div, size=1)/div)[0]])
            else:
              heapq.heappush(newLR_2d, [0, High_LR_2d/2])
            
            if High_Lammda_2d != Low_Lammda_2d:  
              heapq.heappush(newLammda_2d, [0, (rng_seed.integers(low=Low_Lammda_2d * div, high=High_Lammda_2d* div, size=1)/div)[0]])
            else:
              heapq.heappush(newLammda_2d, [0, High_Lammda_2d/2])
            # update the new gen list
            # print("befor heapify",newLR_2d,newLammda_2d)
            # heapq.heapify(newLR_2d)
            # heapq.heapify(newLammda_2d)
            # print("after heapify",newLR_2d,newLammda_2d)
            LR_2d = newLR_2d
            Lammda_2d = newLammda_2d
          print("LR_2d", LR_2d,"Lammda_2d",Lammda_2d,)

    stop = timeit.default_timer()
    run_time = stop - startTime
    print(final_best_best_params, best_accuracy, run_time)
    return final_best_best_params, best_accuracy, run_time


In [None]:
def run_on_dataset_for_tuning(dataset_name, filename):
    if dataset_name == "MNIST":
        min_thres = 0.82
        max_thres = 0.92

    elif dataset_name == "CIFAR10":
        min_thres = 0.28
        max_thres = 0.38

    correct_predict, accuracy, run_time = run(logistic_regression_for_tuning, dataset_name, filename)

    score = compute_score(accuracy, min_thres, max_thres)
    result = OrderedDict(correct_predict=correct_predict,
                         accuracy=accuracy, score=score,
                         run_time=run_time)
    return result, score

In [None]:
def validation_for_tuning(multi_linear_model,validation_loader):
  multi_linear_model.eval()
  validation_loss = 0
  correct = 0
  one_hot = One_Hot(10).to(device)
  final_target = torch.FloatTensor([]).to(device)
  final_pred = torch.FloatTensor([]).to(device)
  with torch.no_grad(): 
    for data, target in validation_loader:
      data = data.to(device)
      target = target.to(device)
      target = target.to(device)
      final_target = torch.cat([final_target, target]).view(-1).to(device)
      output = multi_linear_model(data)
      pred = output.data.max(1, keepdim=True)[1]
      final_pred = torch.cat([final_pred, pred.view_as(target)])
      correct += pred.eq(target.data.view_as(pred)).sum()
      CE = nn.CrossEntropyLoss()
      loss = CE(output, one_hot(target))
      validation_loss +=loss
      
  validation_loss /= len(validation_loader.dataset)
  Accuracy = 100. * correct / len(validation_loader.dataset)
  print('\nValidation set: Avg. loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(validation_loss, correct, len(validation_loader.dataset), 100. * correct / len(validation_loader.dataset)))

  return final_pred, final_target

In [None]:
def logistic_regression_for_tuning(dataset_name):

    print("In the logistic_regression_for_tuning", "optimizerName: ", optimizerName, "| learning rate = ", learning_rate, "| Lambda = ", Lambda)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    if dataset_name == "CIFAR10":
      LogisticRegression = CIFAR10LogisticRegression().to(device)
      if optimizerName == "SGD":
        optimizer = optim.SGD(LogisticRegression.parameters(), lr=learning_rate, momentum=0.95)
      elif optimizerName == "Adam": 
        optimizer = optim.Adam(LogisticRegression.parameters(), lr=learning_rate)
      one_hot = One_Hot(10).to(device)
      LogisticRegression.train()
      for epoch in range(epochs):
        for batch_idx, (data, target) in enumerate(CIFAR10_train_loader):
          data = data.requires_grad_().to(device)
          target = target.to(device)
          optimizer.zero_grad()
          output = LogisticRegression(data)
          CE = nn.CrossEntropyLoss()
          loss = CE(output, one_hot(target)) 

          # L = [(torch.abs(p)).sum() for p in LogisticRegression.parameters()] #L1
          L = [(p**2).sum() for p in LogisticRegression.parameters()] #L2
          loss = loss + Lambda * sum(L)
          loss.backward()
          optimizer.step()
          if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
              epoch, batch_idx * len(data), len(CIFAR10_train_loader.dataset),
              100. * batch_idx / len(CIFAR10_train_loader), loss.item()))
        predicted_test_labels,gt_labels_tensor = validation_for_tuning(LogisticRegression,CIFAR10_test_loader)
        predicted_test_labels = predicted_test_labels.view(1000,10).cpu()
        gt_labels_tensor = gt_labels_tensor.view(1000,10).cpu() 
    elif dataset_name == "MNIST":
      LogisticRegression = MNISTLogisticRegression().to(device)
      if optimizerName == "SGD":
        optimizer = optim.SGD(LogisticRegression.parameters(), lr=learning_rate, momentum=0.95)
      elif optimizerName == "Adam": 
        optimizer = optim.Adam(LogisticRegression.parameters(), lr=learning_rate)
      one_hot = One_Hot(10).to(device)
      LogisticRegression.train()
      for epoch in range(epochs):
        for batch_idx, (data, target) in enumerate(MNIST_train_loader):
          data = data.requires_grad_().to(device)
          target = target.to(device)
          optimizer.zero_grad()
          output = LogisticRegression(data)
          CE = nn.CrossEntropyLoss()
          loss = CE(output, one_hot(target)) # notice the use of view_as
          # L = [(torch.abs(p)).sum() for p in LogisticRegression.parameters()] #L1
          L = [(p**2).sum() for p in LogisticRegression.parameters()] #L2
          loss = loss + Lambda * sum(L)
          loss.backward()
          optimizer.step()
          if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
              epoch, batch_idx * len(data), len(MNIST_train_loader.dataset),
              100. * batch_idx / len(MNIST_train_loader), loss.item()))
        predicted_test_labels,gt_labels_tensor = validation_for_tuning(LogisticRegression,MNIST_validation_loader)
        predicted_test_labels = predicted_test_labels.view(1200,10).cpu()
        gt_labels_tensor = gt_labels_tensor.view(1200,10).cpu() 
    return predicted_test_labels , gt_labels_tensor

In [None]:
tune_hyper_parameter()

Main loop. Run time and total score will be shown below.

In [None]:
def run_on_dataset(dataset_name, filename):
    if dataset_name == "MNIST":
        min_thres = 0.82
        max_thres = 0.92

    elif dataset_name == "CIFAR10":
        min_thres = 0.28
        max_thres = 0.38

    correct_predict, accuracy, run_time = run(logistic_regression, dataset_name, filename)

    score = compute_score(accuracy, min_thres, max_thres)
    result = OrderedDict(correct_predict=correct_predict,
                         accuracy=accuracy, score=score,
                         run_time=run_time)
    return result, score


def main():
    
    filenames = { "MNIST": "predictions_mnist_KhalidAlmahrezi_1580848.txt", "CIFAR10": "predictions_cifar10_KhalidAlmahrezi_1580848.txt"}
    result_all = OrderedDict()
    score_weights = [0.5, 0.5]
    scores = []
    global learning_rate_MNIST
    global Lambda_MNIST
    global learning_rate_CIFAR10
    global Lambda_CIFAR10
    # final_best_best_params, best_accuracy, run_time = tune_hyper_parameter()
    # final_best_best_params = {'MNIST': {'Adam': {'lr': 0.00149, 'lambda': 0.0051, 'best_accuracy': 0.9205833333333333}, 'SGD': {'lr': 0.00243, 'lambda': 0.00605, 'best_accuracy': 0.91575}}, 'CIFAR10': {'Adam': {'lr': 0.0009, 'lambda': 0.00343, 'best_accuracy': 0.3978}, 'SGD': {'lr': 0.0009, 'lambda': 0.00343, 'best_accuracy': 0.3972}}}
    final_best_best_params = {
                      "MNIST":{"Adam":{"lr": 0.0012, "lambda": 0.001,"best_accuracy": 0.0}, 
                              "SGD" :{"lr": 0.0001, "lambda": 0.001,"best_accuracy": 0.0},
                              "best_optimizer":"Adam"},
                              
                    "CIFAR10":{"Adam":{"lr": 0.0001, "lambda": 0.0001,"best_accuracy": 0.0}, 
                              "SGD" :{"lr": 0.0001, "lambda": 0.0001,"best_accuracy": 0.0},
                              "best_optimizer":"Adam"}}
    global optimizerName_CIFAR10 
    global optimizerName_MNIST 
    optimizerName_CIFAR10 = final_best_best_params["CIFAR10"]["best_optimizer"]
    optimizerName_MNIST= final_best_best_params["MNIST"]["best_optimizer"]

    learning_rate_CIFAR10 = final_best_best_params['CIFAR10'][optimizerName_CIFAR10]['lr']
    Lambda_CIFAR10 = final_best_best_params['CIFAR10'][optimizerName_CIFAR10]['lambda']

    learning_rate_MNIST = final_best_best_params['MNIST'][optimizerName_MNIST]['lr']
    Lambda_MNIST = final_best_best_params['MNIST'][optimizerName_MNIST]['lambda']


    for dataset_name in ["MNIST","CIFAR10"]:
    # for dataset_name in ["CIFAR10", "MNIST"]:
        result_all[dataset_name], this_score = run_on_dataset(dataset_name, filenames[dataset_name])
        scores.append(this_score)
    total_score = [score * weight for score, weight in zip(scores, score_weights)]
    total_score = np.asarray(total_score).sum().item()
    result_all['total_score'] = total_score
    with open('result.txt', 'w') as f:
        f.writelines(pformat(result_all, indent=4))
    print("\nResult:\n", pformat(result_all, indent=4))


main()


Validation set: Avg. loss: 0.0013, Accuracy: 10854/12000 (90%)


Validation set: Avg. loss: 0.0011, Accuracy: 11066/12000 (92%)


Validation set: Avg. loss: 0.0011, Accuracy: 11070/12000 (92%)


Test set: Avg. loss: 0.0003, Accuracy: 9247/10000 (92%)

Accuracy of the network on the 10000 test images: 92 %

Validation set: Avg. loss: 0.0093, Accuracy: 4351/12000 (36%)


Validation set: Avg. loss: 0.0088, Accuracy: 4801/12000 (40%)


Validation set: Avg. loss: 0.0087, Accuracy: 4896/12000 (41%)


Test set: Avg. loss: 0.0017, Accuracy: 4071/10000 (41%)

Accuracy of the network on the 10000 test images: 40 %

Result:
 OrderedDict([   (   'MNIST',
                    OrderedDict([   ('correct_predict', 9247),
                                    ('accuracy', 0.9247),
                                    ('score', 100.0),
                                    ('run_time', 118.33627447500021)])),
                (   'CIFAR10',
                    OrderedDict([   ('correct_predict', 4071),
      

In [None]:
main()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive
