<a href="https://colab.research.google.com/github/swadhwa5/MLFinalProject/blob/main/ML_FinalProject_Submission.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Machine Learning Final Project
By: Shreya Wadhwa, Alan Zhang, Aidan Aug, Trisha Karani
JHED: swadhwa, azhang, tkarani1, aaug1

*Due: April 28th, 2022*

**Description:**

This is the iPython/Jupyter Notebook for our Machine Learning Final Project. For this project, we decided to develop a Majority Vote classifer model over three different CNNs to train a model to recognize sign language letters. The project into the following sections:

1. Required Packages for Running the Notebook

2. Data Augmentation

3. Model Implementation

4. Model Training and Testing

5. Conclusions and Future Works

## Part 1 Python Packages:

This section is simply a compilation of all the required packages for every section in the notebook. Please make sure to run this prior to any of the other code sections.

In [None]:
## Data Processing, Augmentation, and Feature Engineering:
import numpy as np
import random
from PIL import Image, ImageEnhance
from os import listdir
import imghdr
import skimage
from skimage.transform import rotate, AffineTransform, warp

## Model Implementation
import sys
import csv
import os
import numpy as np
import datetime
import torch
import torch.nn as nn
import torch.nn.functional as F
import skimage
from skimage.transform import rotate

## Part 2: Data Augmentation and Feature Engineering

For Data Augmentation, we decided to increase our dataset via the following processes:
1. Blur
2. Brighten
3. Rotate
4. Translate
5. Zoom

In [None]:
def loadImages(path):
    imagesList = listdir(path)
    imgs = []
    labels = []
    for image in imagesList:
      if imghdr.what(path + image) == 'png':
        if (image[6].isalpha()): # only add 5 of each image, only add alphabetical values
          img = Image.open(path + image)
          imgs.append(img)
          labels.append(ord(image[6]) - ord('a')) # assumes that filename structure is 'handx_[label]_....'
    return imgs, labels

# Convert png img array to array np arrays
# Input: PNG image array
# Output: a list of numpy images
# Works?: Yes
def ImagesToArray(imgs):
  imgs_array = []
  for img in imgs:
    img_array = np.array(img)
    imgs_array.append(img_array)
  return imgs_array

# Zero pad all images
# Input: list of numpy images
# Output: numpy array of N 600x600 images with 3 channels
# Works?: Yes, but might not be necessary
def shape600(x):
  reshaped_array = np.zeros((len(x), 600, 600, 3))
  for i, img in enumerate(x):
    x_pad_width = (600 - img.shape[0])//2
    y_pad_width = (600 - img.shape[1])//2
    reshaped_array[i,:,:,:] = np.pad(img, ((x_pad_width, x_pad_width + (img.shape[0])%2), (y_pad_width, y_pad_width+(img.shape[1]%2)), (0,0)), constant_values=img[0][0][0])
  return reshaped_array

# Normalizes images... based on... what?
# Input: image array
# Output: a list of numpy arrays
# Works?: ??? Not entirely sure if this is the correct method though, based on online implementations of AlexNet
def Normalize(imgs):
  new_imgs = []
  for img in imgs:
      # flat_img = img.flatten()
      m = np.mean(img)
      std = np.std(img)
      img = (img-m)/std
      new_imgs.append(img)
  return new_imgs

################### DATA AUGMENTATION ######################
# x is input image, sd is how much to blur
def blur(imgs, sd=1):
  filtered_img = np.zeros((imgs.shape[0], 600, 600, 3))
  for i in range(imgs.shape[0]):
    filtered_img[i,:,:,:] = skimage.filters.gaussian(imgs[i,:,:,:], sigma=sd)
    #filtered_img.save('./Filter_gaussian/img_' + i + '_gaussianfilt.png')
  return filtered_img

def Scale(imgs):
  scaled_images = np.zeros((len(imgs), 600, 600, 3))
  for i, img in enumerate(imgs):
    # ratio = random.randrange(.2, .5, .1)
    ratio = random.choice([0.1, 0.2, 0.3, 0.4])
    x = int(ratio * 600 / 2)
    scaled = img[x:600-x, x:600-x]
    # scaled = cv2.imread(scaled)
    res = cv2.resize(scaled, dsize=(600, 600), interpolation=cv2.INTER_CUBIC)
    scaled_images[i] = res
  return scaled_images
    # final.save('./Crop/img_' + str(i) + '_scale.png')

def Rotate30(imgs): 
  rot30_imgs = np.empty([imgs.shape[0], imgs.shape[1], imgs.shape[2], imgs.shape[3]])
  for i, img in enumerate(imgs): 
    rand_dir = random.choice([-1, 1])
    new_img = rotate(img, rand_dir * 30) 
    rot30_imgs[i, :] = new_img
  return rot30_imgs

def VerticalFlip(imgs): 
  flip_imgs = np.empty([imgs.shape[0], imgs.shape[1], imgs.shape[2], imgs.shape[3]])
  for i, img in enumerate(imgs): 
    new_img = np.fliplr(img)
    flip_imgs[i, :, :, :] = new_img
  return flip_imgs

def Translation(imgs): 
  trans_imgs = np.empty([imgs.shape[0], imgs.shape[1], imgs.shape[2], imgs.shape[3]])
  for i, img in enumerate(imgs): 
    rand_x = random.randrange(-150, 150, 50)
    rand_y = random.randrange(-150, 150, 50)
    transform = AffineTransform(translation=(rand_x,rand_y))
    new_img = warp(img,transform, mode="constant")  
    trans_imgs[i, :] = new_img
  return trans_imgs

## Part 3: Model Implementation

For our model, we decided to implement a majority vote classifier based on three Convolutional Neural Networks, each with differing structures. Each model structure has basis in other current models.

### Model 1: LeNet with 3 Channels

LeNet was one of the first convolutional neural network (CNN) models used on 28x28 black and white images. While it is simple, it was one of the first uses of the backpropgation algorithm in practical applications: specifically, reading handwritten numbers. In 1990, there was an error rate of 1% and rejection rate of about 9%. The model structure is as follows:
1. 2 convolutional layers
2. 2 pooling layers
3. 3 fully-connected

In this implementation, we make slight modifications to this network and apply it to RGB images.

LeCun, Y.; Boser, B.; Denker, J. S.; Henderson, D.; Howard, R. E.; Hubbard, W. & Jackel, L. D. (1989). Backpropagation applied to handwritten zip code recognition. Neural Computation, 1(4):541-551.[1]

In [None]:
import numpy as np
import datetime
import torch
import torch.nn as nn
import torch.nn.functional as F
import skimage
from skimage.transform import rotate
import matplotlib.pyplot as plt

In [None]:
class LeNet(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.conv_layer = nn.Sequential(
            nn.Conv2d(3, 6, 5),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(6, 16, 5),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
        )

        self.linear = nn.Sequential(
            nn.Flatten(),
            nn.Linear(16 * 5 * 5, 120),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(120, 84),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(84, num_classes)
        )

    def forward(self, x):
        x = self.conv_layer(x)
        x = self.linear(x)
        return x

### Model 2: AlexNet

AlexNet was one of the breakthrough CNN models that competed and won the ImageNet Large Scale Visual Recognition Challenge in 2021. The model achieved an error of 15.3%, which was greatly better than the runner-up error. The following is an implementation for this CNN.

In [None]:
# AlexNet Implementation
# Expects input of size 227 at least, for the kernels to work

class AlexNet(torch.nn.Module):
    def __init__(self, input_height=227, input_width=227, n_classes=26, channels=3):
        super().__init__()

        # Initialize the parameters of the model
        self.input_height = input_height
        self.input_width = input_width
        self.n_classes = n_classes
        self.channels = channels

        # AlexNet Implementation; Same Structure with different outputs die to input
        self.model_convolution = nn.Sequential(
            nn.Conv2d(in_channels=channels,out_channels=96, kernel_size=11, stride=4),
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=3, stride=2),
            nn.ReLU(),
            nn.Conv2d(in_channels=96,out_channels=256,kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=3,stride=2),
            nn.Conv2d(in_channels=256,out_channels=384,kernel_size=3, stride=1, padding=1), 
            nn.ReLU(),
            nn.Conv2d(in_channels=384,out_channels=384,kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(in_channels=384,out_channels=256,kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.AvgPool2d(kernel_size=3, stride=2)
        )

        # The dense network architecture. Assumes input has 4096 nodes, or 4x4x256
        self.model_dense = nn.Sequential(
            nn.Flatten(),
            nn.Linear(9216, 4096),
            nn.ReLU(),
            nn.Dropout(p=0.2), # Regularization
            nn.Linear(4096, 4096),
            nn.ReLU(),
            nn.Dropout(p=0.2),
            nn.Linear(4096, n_classes)
        )

    def forward(self, x):
        x = x.reshape(x.shape[0], self.channels, self.input_height, self.input_width)
        x = self.model_convolution(x)
        x = self.model_dense(x)
        return x
    
        

## Part 4: Training, Testing

Here, we actually train and test the model on the provided datasets.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms

import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import string

import imghdr
from PIL import Image
from os import listdir

In [None]:
# Input: Path to a folder of .png files. Let #=dataset num, l = letter represented, v=variation, a=augmentation
#     Must be structured s.t. hand#_l_v_a.png
# Output: List of .png images and their respective labels
def loadImages(path):
    imagesList = listdir(path)
    imgs = []
    labels = []
    for image in imagesList: # iterate over all images in the folder
      if imghdr.what(path + image) == 'png':
        if (image[6].isalpha()): # 6th position is the letter
          img = Image.open(path + image)
          imgs.append(img)
          labels.append(ord(image[6]) - ord('a')) # assumes that filename structure is 'handx_[label]_....'
    return imgs, labels

# Input: the images list (3 channels), crop size, and resize hyperparameters
# Output: a tensor array of all the reshaped + resized images
def applyTransforms(imgs, crop_size, resize):
  # Define the necessary preprocessing transforms
  num_imgs = len(imgs)
  preprocess = transforms.Compose([
    transforms.Resize(resize), # Hyperparameter
    transforms.CenterCrop(crop_size),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
  ])

  # Create tensor array
  transforms_array = np.zeros((num_imgs, 3, crop_size, crop_size))
  for i in range(num_imgs):
    temp = preprocess(imgs[i])
    transforms_array[i,:,:,:] = temp
  
  return transforms_array

# Input: an image tensor [num_imgs, channels, x_dim, y_dim], test split, and a batch size
# Output: a tensor array of all the reshaped + resized images
def train_dev_test_loaders(transforms_array, labels, test_split=0.2, batch_size=4):
  ## Create the training dataand trainloader
  train_data, dev_data, train_labels, dev_labels = train_test_split(transforms_array, labels, test_size=test_split, random_state=42)

  dev_data, test_data, dev_labels, test_labels = train_test_split(dev_data, dev_labels, test_size=0.5, random_state=42)

  train_data_and_labels = []
  for i in range(len(train_data)):
    sample = (torch.Tensor(train_data[i,:,:,:]), train_labels[i])
    train_data_and_labels.append(sample)

  trainloader = torch.utils.data.DataLoader(train_data_and_labels, batch_size=batch_size,
                                            shuffle=True, num_workers=2)

  ## Create the developmental data and devloader
  dev_data_and_labels = []
  for i in range(len(dev_data)):
    sample = (torch.Tensor(dev_data[i,:,:,:]), dev_labels[i])
    dev_data_and_labels.append(sample)

  devloader = torch.utils.data.DataLoader(dev_data_and_labels, batch_size=batch_size,
                                            shuffle=False, num_workers=2)
  
  ## Create the Test data and testloader
  test_data_and_labels = []
  for i in range(len(test_data)):
    sample = (torch.Tensor(test_data[i,:,:,:]), test_labels[i])
    test_data_and_labels.append(sample)

  testloader = torch.utils.data.DataLoader(test_data_and_labels, batch_size=batch_size,
                                            shuffle=False, num_workers=2)
  
  return [train_data, train_labels, trainloader, dev_data, dev_labels, devloader, test_data, test_labels, testloader]

# Input: list of .png images, their labels, and other default parameters
# Output: None
# Plots the image, assumed to be [3, x_dim, y_dim]
def test_loader(transforms_array, labels, batch_size=4):
  ## Create the testing data and testloader    
  test_data_and_labels = []
  for i in range(len(transforms_array)):
    sample = (torch.Tensor(transforms_array[i,:,:,:]), labels[i])
    test_data_and_labels.append(sample)

  testloader = torch.utils.data.DataLoader(test_data_and_labels, batch_size=batch_size,
                                            shuffle=False, num_workers=2)
  return testloader


Also, add a couple of functions for visualizing images

In [None]:
# Input: One normalized image
# Output: None
# Plots the image, assumed to be [3, x_dim, y_dim]
def imshow(img):
  # Unnormalize the image before showing!
    invTrans = transforms.Compose([ transforms.Normalize(mean = [ 0., 0., 0. ],
                                                        std = [ 1/0.229, 1/0.224, 1/0.225 ]),
                                    transforms.Normalize(mean = [ -0.485, -0.456, -0.406 ],
                                                        std = [ 1., 1., 1. ]),
                                  ])
    img = invTrans(img)
    plt.figure(figsize=(10,10))
    plt.imshow(np.transpose(img, (1, 2, 0)))
    plt.show()

# Show number_batches * display_num images
def show_true_vs_predicted(testloader, classes, model, num_batches, batch_size):
    # print images
    dataiter = iter(testloader)
    images, labels = dataiter.next()

    imshow(torchvision.utils.make_grid(images))
    print('GroundTruth: ', ' '.join(f'{classes[labels[j]]:5s}' for j in range(num_batches * batch_size)))

    outputs = model(images)
    _, predicted = torch.max(outputs, 1)

    print('Predicted: ', ' '.join(f'{classes[predicted[j]]:5s}'
                                for j in range(num_batches * batch_size)))


Model 3: ResNet Basis

In 2015, the next field-shaking model to be proposed for image classification was ResNet. One major issue for training models with many hidden layers is the vanishing gradient problem. ResNet, through a skip-layer structure, also called an "identity shortcut connection," avoids this problem.

In [None]:
# We use CNN blocks, which are multiple CNNs, multiple times.
class CNNblock(nn.Module):
    def __init__(self, in_chan, interm_chan, identity_downsample=None, stride=1):
        super(CNNblock, self).__init__()
        self.expansion = 4 # Hyperparameter for tuning

        self.model_convolution = nn.Sequential(
            nn.Conv2d(in_chan, interm_chan, kernel_size=1),
            nn.BatchNorm2d(interm_chan),
            nn.Conv2d(interm_chan, interm_chan, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(interm_chan),
            nn.Conv2d(interm_chan, interm_chan * self.expansion, kernel_size=1),
            nn.BatchNorm2d(interm_chan * self.expansion),
            nn.ReLU()
        )
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample

    def forward(self, x):
        identity = x.clone()
        x = self.model_convolution(x)

        # Skip Connection
        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)

        x += identity
        x = self.relu(x)
        return x


class ResNet(nn.Module):
    def __init__(self, block, layers, image_channels, num_classes):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.model_convolution = torch.nn.Sequential(
            nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )

        self.layer1 = self._make_layer(
          block, layers[0], intermediate_channels=64, stride=1
        )
        self.layer2 = self._make_layer(
          block, layers[1], intermediate_channels=128, stride=2
        )
        self.layer3 = self._make_layer(
          block, layers[2], intermediate_channels=256, stride=2
        )
        self.layer4 = self._make_layer(
          block, layers[3], intermediate_channels=512, stride=2
        )

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * 4, num_classes)


    def forward(self, x):
      x = self.model_convolution(x)
      x = self.layer1(x)
      x = self.layer2(x)
      x = self.layer3(x)
      x = self.layer4(x)

      x = self.avgpool(x)
      x = x.reshape(x.shape[0], -1)
      x = self.fc(x)

      return x

    def _make_layer(self, block, num_residual_blocks, intermediate_channels, stride):
      identity_downsample = None
      layers = []

      # Either if we half the input space for ex, 56x56 -> 28x28 (stride=2), or channels changes
      # we need to adapt the Identity (skip connection) so it will be able to be added
      # to the layer that's ahead

      if stride != 1 or self.in_channels != intermediate_channels * 4:
        identity_downsample = nn.Sequential(
          nn.Conv2d(self.in_channels, intermediate_channels * 4, kernel_size=1, stride=stride),
          nn.BatchNorm2d(intermediate_channels * 4),
        )
        layers.append(
          block(self.in_channels, intermediate_channels, identity_downsample, stride)
        )

      # The expansion size is always 4 for ResNet 50,101,152
      self.in_channels = intermediate_channels * 4

      # For example for first resnet layer: 256 will be mapped to 64 as intermediate layer,
      # then finally back to 256. Hence no identity downsample is needed, since stride = 1,
      # and also same amount of channels.
      
      for i in range(num_residual_blocks - 1):
        layers.append(block(self.in_channels, intermediate_channels))
      return nn.Sequential(*layers)

def ResNet50(img_channel=3, num_classes=1000):
  return ResNet(CNNblock, [3, 4, 6, 3], img_channel, num_classes)

def test():
    net = ResNet50(img_channel=3, num_classes=1000)
    device = "cuda" if torch.cuda.is_available() else "cpu"
    y = net(torch.randn(4, 3, 224, 224)).to(device)
    print(y.size())

test()


### Model 3: Inception



Citations:

ResNet:
https://www.analyticsvidhya.com/blog/2021/06/build-resnet-from-scratch-with-python/

