## Label Generation

The Label Generation notebook is responsible for creating the classifier predicted labels and saving them to a specified directory in *.npy form. It is assumed you ALREADY HAVE THE TRUE LABELS of the dataset in *.npy form.

The notebook can also convert the classifier and predicted labels to text files, where the first column indicates the index of the instance of data that the label is associated with,  and the second column is the actual label (0, 1, or 2). The instance data is assumed to be in *.npy form as well, although all notebooks autoconvert from the saved numpy form to a PyTorch tensor. The index can then be used in the raw tensor or a PyTorch dataset to obtain the actual instance.

## Inputs



In [None]:
import torch
import torch.optim as optim
from torch.optim import lr_scheduler

import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader

import torchvision
from torchvision import datasets, models, transforms
import copy

import numpy as np
import pandas as pd

%matplotlib inline 
import matplotlib.pyplot as plt
import time
import os
import copy
import random
import math
import string

import tqdm
from tqdm.auto import tqdm

import scipy.stats as stats

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix
from sklearn.mixture import GaussianMixture

from skimage.filters import sobel
from skimage.color import rgb2gray

# TPU support
## Insufficient testing has been done to see what methods support using the TPU but the ability to enable the TPU has been provided here.
## Ensure that the Runtime for the Colab notebook has "TPU" selected prior to use!
#!pip install -q cloud-tpu-client==0.10 torch==1.11.0 https://storage.googleapis.com/tpu-pytorch/wheels/colab/torch_xla-1.11-cp37-cp37m-linux_x86_64.whl
#import torch_xla
#import torch_xla.core.xla_model as xm

In [None]:
from enum import Enum

class Backend(Enum):
  CPU = "cpu"
  GPU = "cuda:0"
  TPU = "tpu"

In [None]:
## Control Flow Variables
BACKEND = Backend.GPU
BATCH_SIZE = 4
TEXT_LABELS_NEEDED = False

MODEL_PATH = "/content/drive/MyDrive/Fish Attribution/model1e-050.5.2022-05-22 12:13:10.pt Work/model1e-050.5.2022-05-22 12_13_10.pt"
X_PATH = "/content/drive/Shareddrives/Exploding Gradients/X_cropped_b.npy"
TRUE_LABELS_PATH = "/content/drive/Shareddrives/Exploding Gradients/y_b.npy"
CLASSIFIER_GENERATED_LABELS_PATH = "/content/drive/MyDrive/Fish Attribution/model1e-050.5.2022-05-22 12:13:10.pt Work/predicted_labels.npy"
TRUE_LABELS_TEXT_PATH = "/content/drive/MyDrive/Fish Attribution/model1e-050.5.2022-05-22 12:13:10.pt Work/true-labels.txt"
CLASSIFIER_GENERATED_LABELS_TEXT_PATH = "/content/drive/MyDrive/Fish Attribution/model1e-050.5.2022-05-22 12:13:10.pt Work/predicted-labels.txt"

# Check that GPU is available, if it isn't then BACKEND will automatically toggle to CPU
if(BACKEND == Backend.GPU):
  if(torch.cuda.is_available()):
    print("GPU Selected, and confirmed available!")
    device = torch.device("cuda:0")
  else:
    print("GPU Selected, but not found! Switching to CPU for backend")
    BACKEND = Backend.CPU
    device = torch.device("cpu")
elif(BACKEND == Backend.TPU):
  assert os.environ['COLAB_TPU_ADDR'], 'Make sure to select TPU from Edit > Notebook settings > Hardware accelerator'
  print("TPU Selected")
  device = xm.xla_device()
elif(BACKEND == Backend.CPU):
  print("CPU Selected")
  device = torch.device("cpu")

GPU Selected, and confirmed available!


In [None]:
#This function takes in a model and replaces inplace relu layers to an independent relu layer
def reluToInplaceFalse(model):
  for name, child in model.named_children():
    if isinstance(child, nn.ReLU):
      setattr(child, 'inplace', False)
    else:
      reluToInplaceFalse(child)

In [None]:
# Load the Model Class
TARGET_WIDTH = 750
TARGET_HEIGHT = 130

from torchvision.transforms.transforms import RandomRotation, RandomAdjustSharpness, RandomGrayscale
import torchvision.transforms.functional as tf

def init_weights(m):
  if isinstance(m, nn.Linear):
    nn.init.kaiming_normal_(m.weight, nonlinearity='relu')


class Classifier(torch.nn.Module):

  def __init__(self, backbone='resnet', multi_backbone = False, device ="cuda:0",dropout_rate = 0.2, do_augmentation = False, target_height=TARGET_HEIGHT, target_width=TARGET_WIDTH):
    super().__init__()
    self.multi_backbone = multi_backbone # Bool: Indicates if we use multibackbone

    #In the following section we download the appropriate prettrained model
    if backbone == "vgg19":
      backbone = torchvision.models.vgg19(pretrained=True)
      self.out_channels = 25088
      
    elif backbone == "resnet18":
      backbone = torchvision.models.resnet18(pretrained=True)
      self.out_channels = 512

    elif backbone == "resnet50":
      backbone = torchvision.models.resnet50(pretrained=True)
      self.out_channels = 2048

    elif backbone == "Efficientnet b1":
      backbone = torchvision.models.efficientnet_b1(pretrained=True)
      self.out_channels = 1280

    elif backbone == "Efficientnet b3":
      backbone = torchvision.models.efficientnet_b3(pretrained=True)
      self.out_channels = 1536

    elif backbone == "Efficientnet b5":
      backbone = torchvision.models.efficientnet_b5(pretrained=True)
      self.out_channels = 2048

    elif backbone == "Efficientnet b7":
      backbone = torchvision.models.efficientnet_b7(pretrained=True)
      self.out_channels = 2560
    else:
      raise ValueError(f'Invalid backbone "{backbone}"')
      
    # Disabling inplace ReLu becasuse GradCam doesn't work it enabled
    reluToInplaceFalse(backbone)
     
    modules = list(backbone.children())[:-1]

    if self.multi_backbone: #We create the backbones and put them on the device
      self.backbone1 = nn.Sequential(*copy.deepcopy(modules)).to(device)
      self.backbone2 = nn.Sequential(*copy.deepcopy(modules)).to(device)
      self.backbone3 = nn.Sequential(*copy.deepcopy(modules)).to(device)
      self.backbone4 = nn.Sequential(*copy.deepcopy(modules)).to(device)

    else:
      self.backbone =  nn.Sequential(*modules).to(device)

    self.do_augmentation = do_augmentation

    # Note: These are not all of the augmnetations performed, see custom_augmentation()
    self.unlabeled_augmentation = nn.Sequential(transforms.RandomVerticalFlip(0.5),
                                      transforms.RandomCrop(size=(target_height,target_width)),
                                      transforms.RandomRotation(10, interpolation=transforms.InterpolationMode.BILINEAR, fill=1),
                                      transforms.Normalize(0, 1)
    )

    self.bottleneck_dim = 256

    # This is the linear layer to compress each backbone
    self.fc_bb = nn.Sequential(nn.BatchNorm1d(self.out_channels),
                               nn.Dropout(dropout_rate),
                               nn.Linear(self.out_channels, self.bottleneck_dim),
                               nn.BatchNorm1d(self.bottleneck_dim),
                               nn.ReLU())
    self.fc_bb.apply(init_weights)

    self.fc_hflip1 = nn.Sequential(nn.Dropout(dropout_rate),
                                   nn.Linear(self.bottleneck_dim, 1))
    self.fc_hflip1.apply(init_weights)

    self.fc_hflip2 = nn.Sequential(nn.Dropout(dropout_rate),
                                   nn.Linear(self.bottleneck_dim, 1))
    self.fc_hflip2.apply(init_weights)

    self.fc_hflip3 = nn.Sequential(nn.Dropout(dropout_rate),
                                   nn.Linear(self.bottleneck_dim, 1))
    self.fc_hflip3.apply(init_weights)

    self.fc_hflip4 = nn.Sequential(nn.Dropout(dropout_rate),
                                   nn.Linear(self.bottleneck_dim, 1))
    self.fc_hflip4.apply(init_weights)

    #This is the final classification layer
    self.fc = nn.Sequential(nn.Dropout(dropout_rate),
                            nn.Linear(self.bottleneck_dim * 4, 3))
    self.fc.apply(init_weights)

    # A softmax is applied in eval mode
    self.softmax = nn.Softmax(dim=1)              
     
  def forward(self, x):
    if self.do_augmentation and self.training:
      imgs, hflip_labels = map(list, zip(*[self.custom_augmentation(x[:,i]) for i in range(4)])) #list of 4 images
      hflip_labels = [torch.Tensor(hflip_label).float().unsqueeze(1) for hflip_label in hflip_labels]
    else:
      imgs = [x[:,i] for i in range(4)] #list of 4 images
      hflip_labels = None
    
    if self.multi_backbone:
      encodings = [self.fc_bb(self.backbone1(imgs[0]).flatten(1)), 
                   self.fc_bb(self.backbone2(imgs[1]).flatten(1)),
                   self.fc_bb(self.backbone3(imgs[2]).flatten(1)),
                   self.fc_bb(self.backbone4(imgs[3]).flatten(1))]
    else:
      encodings = [self.fc_bb(self.backbone(img).flatten(1)) for img in imgs]

    logits = self.fc(torch.cat(encodings,1))
    if self.training:
      # get hflip predictions
      hflip_preds = [self.fc_hflip1(encodings[0]),
                     self.fc_hflip2(encodings[1]),
                     self.fc_hflip3(encodings[2]),
                     self.fc_hflip4(encodings[3])]
      hflip_preds = torch.cat(hflip_preds, 1)
      hflip_labels = torch.cat(hflip_labels, 1).to(device)
      return logits, hflip_preds, hflip_labels
    else:
      return self.softmax(logits)

  def custom_augmentation(self, images):
    hflip_labels = np.random.choice([0, 1], size = images.size(0))
    for i, hflip_label in enumerate(hflip_labels):
      if hflip_label == 1:
        images[i] = tf.hflip(images[i])
    images = self.unlabeled_augmentation(images)
    return images, hflip_labels

In [None]:
# Load the model
from google.colab import drive
drive.mount('/content/drive')

model = torch.load(MODEL_PATH, map_location="cpu")
model.eval()
model.zero_grad()

# Put the model onto the device
model.to(device);

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
 # Prepare data
x = np.load(X_PATH)
y = np.load(TRUE_LABELS_PATH)

print(x.shape)
print(y.shape)

tensor_x = torch.Tensor(x) 
tensor_y = torch.Tensor(y).long()

tensor_x = torch.swapaxes(tensor_x,2,4)
tensor_x = torch.swapaxes(tensor_x,3,4)

from torch.utils.data import TensorDataset

attribution_ds = TensorDataset(tensor_x) 
attribution_dl = DataLoader(attribution_ds, BATCH_SIZE ,shuffle = False, pin_memory=True)
del x, y

(285, 4, 130, 750, 3)
(285, 1)


In [None]:
classifier_generated_labels = []

for (images,) in tqdm(attribution_dl):
  # send images to the device
  images = images.to(device)
  # perform attributions
  for label in model(images):
    classifier_generated_labels.append(torch.argmax(label))
  # append attributions
  #classifier_generated_labels.append(label.cpu())
  # delete images, labels, and batch attributions from memory
  del images, label
  # If things were run on the GPU, empty the cache to prevent memory overloading
  if(BACKEND == Backend.GPU):
    torch.cuda.empty_cache()

  0%|          | 0/72 [00:00<?, ?it/s]

In [None]:
np.save(CLASSIFIER_GENERATED_LABELS_PATH,
        torch.unsqueeze(torch.Tensor(classifier_generated_labels), 1).numpy())

In [None]:
if(TEXT_LABELS_NEEDED):
  true_labels = np.squeeze(np.load(TRUE_LABELS_PATH)).astype(int)
  predicted_labels = np.squeeze(np.load(CLASSIFIER_GENERATED_LABELS_PATH)).astype(int)
  with open(TRUE_LABELS_TEXT_PATH,'a') as file:
    for idx, label in enumerate(true_labels):
      file.write(str(idx) + " " + str(label) + "\n")
  with open(PREDICTED_LABELS_TEXT_PATH, 'a') as file:
    for idx, label in enumerate(predicted_labels):
      file.write(str(idx) + " " + str(label) + "\n")