In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import requests
import pickle
import time
import os
import json
import seaborn as sns
import tensorflow as tf
import random
import cv2
from tqdm import tqdm
from PIL import Image

from pandas import json_normalize
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.naive_bayes import MultinomialNB
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, precision_score, recall_score, f1_score,classification_report
from sklearn.preprocessing import normalize,StandardScaler,RobustScaler,MinMaxScaler
from sklearn.svm import SVC,LinearSVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import BernoulliNB
from sklearn.compose import ColumnTransformer
from matplotlib.colors import LinearSegmentedColormap
from sklearn.preprocessing import LabelBinarizer



from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, BatchNormalization, Flatten, Conv2D, MaxPooling2D,LeakyReLU
from tensorflow.keras.regularizers import l1, l2, l1_l2
from tensorflow.keras.optimizers import Adam, SGD, RMSprop
from keras.callbacks import EarlyStopping
from tensorflow.keras.datasets import mnist, cifar10
from tensorflow import keras
from tensorflow.keras.preprocessing.image import ImageDataGenerator


from scipy.stats import skew
from scipy.sparse import csr_matrix


In [2]:
image_dir = "Datasets/VMMRdb/"

In [11]:
!wget https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names

--2023-05-05 14:34:08--  https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 625 [text/plain]
Saving to: ‘coco.names’


2023-05-05 14:34:08 (6.85 MB/s) - ‘coco.names’ saved [625/625]



In [None]:
### PyTorch Model

In [None]:
# Put them in a dictionary
dataloaders = {'train': train_data_generator, 'valid': test_data_generator}

In [None]:
if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print (x)
else:
    print ("MPS device not found.")

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, 3)
        self.conv2 = nn.Conv2d(32, 64, 3)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 53 * 53, 32)
        self.fc2 = nn.Linear(32, 2)
        self.dropout = nn.Dropout2d(p=0.2)
    
    def forward(self, x):
        x = F.leaky_relu(self.conv1(x))
        x = self.pool(F.leaky_relu(self.conv2(x)))
        x = x.view(-1, 64 * 53 * 53)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return F.softmax(x, dim=1)

In [None]:
def train_model(model, criterion, optimizer, num_epochs=5, device='cuda'):
    start = time.time()
    train_results = []
    valid_results = []
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch+1, num_epochs))
        print('-' * 10)
        # Each epoch has a training and validation phase
        for phase in ['train', 'valid']:
            if phase == 'train':            
              model.train()  # Set model to training mode
            else:
              model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward pass
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]
            
            if(phase == 'train'):
              train_results.append([epoch_loss,epoch_acc])
            if(phase == 'valid'):
              valid_results.append([epoch_loss,epoch_acc])
                                   
            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # deep copy the model (Early Stopping) and Saving our model, when we get best accuracy
            if phase == 'valid' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())       
                model_save_name = "resnetCars.pt"
                path = F"/content/drive/My Drive/{model_save_name}"
                torch.save(model.state_dict(), path)        

        print()

    # Calculating time it took for model to train    
    time_elapsed = time.time() - start
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    #load best model weights
    model.load_state_dict(best_model_wts)
    
    return model, train_results, valid_results

In [None]:
model = Net()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

model, train_results, valid_results = train_model(model, criterion, optimizer, num_epochs=5)

In [None]:
### Gradient Tape

In [None]:
y_train = np.hstack((y_train, 1 - y_train))
y_test = np.hstack((y_test, 1 - y_test))

In [None]:
def train(model, optimizer, loss_fn, train_data_generator, X_train, y_train, batch_size, epochs):
    for epoch in range(epochs):
        epoch_loss = tf.Variable(0.0, dtype=tf.float32)
        epoch_accuracy = tf.Variable(0.0, dtype=tf.float32)
        batches = train_data_generator(X_train, y_train, batch_size)
        with tqdm(total=len(X_train)//batch_size) as pbar:
            for i, (X_batch_train, y_batch_train) in enumerate(batches):
                with tf.GradientTape() as tape:
                    y_pred = model(X_batch_train, training=True)
                    batch_loss = loss_fn(y_batch_train, y_pred)
                gradients = tape.gradient(batch_loss, model.trainable_variables)
                optimizer.apply_gradients(zip(gradients, model.trainable_variables))
                batch_accuracy = accuracy_score(np.argmax(y_batch_train, axis=1), np.argmax(y_pred.numpy(), axis=1))
                epoch_loss.assign_add(tf.reduce_sum(batch_loss))
                epoch_accuracy.assign_add(batch_accuracy)
                pbar.update(1)
        epoch_loss = epoch_loss / (X_train.shape[0] // batch_size)
        epoch_accuracy = epoch_accuracy / (X_train.shape[0] // batch_size)
        print(f"Epoch {epoch + 1}: loss={epoch_loss}, accuracy={epoch_accuracy}")

In [None]:
def test(model, loss_fn, test_data_generator, X_test, y_test, batch_size):

    # Initialize the loss and accuracy
    test_loss = 0.0
    test_accuracy = 0.0

    # Iterate over the batches in the test data generator
    for X_batch_test, y_batch_test in test_data_generator(X_test, y_test, batch_size):

        # Compute the predictions and loss for the batch
        y_pred = model(X_batch_test, training=False)
        batch_loss = loss_fn(y_batch_test, y_pred)

        # Compute the batch accuracy
        batch_accuracy = accuracy_score(np.argmax(y_batch_train.numpy(), axis=1), np.argmax(y_pred.numpy(), axis=1))

        # Update the test loss and accuracy
        test_loss += batch_loss.numpy()
        test_accuracy += batch_accuracy

    # Compute the average test loss and accuracy
    test_loss /= (len(X_test) / batch_size)
    test_accuracy /= (len(X_test) / batch_size)

    # Print the test loss and accuracy
    print("Test loss: {:.4f} - Test accuracy: {:.4f}".format(test_loss, test_accuracy))

    return test_loss, test_accuracy

In [None]:
train(model, optimizer, binary_crossentropy, train_data_generator, X_train, y_train, batch_size, epochs)

In [None]:
tf.keras.backend.clear_session()
early_stopping = EarlyStopping(monitor='val_loss', patience=3)
log_dir = "logs/fit/" + datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

In [None]:
history = model.fit(
    train_data_generator(X_train, y_train, batch_size),
    validation_data=test_data_generator(X_test, y_test, batch_size),
    epochs=epochs, 
    steps_per_epoch=train_steps, 
    validation_steps=val_steps,
    callbacks=[early_stopping, tensorboard_callback]
)

In [None]:
test_loss, test_acc = test(model, loss_fn, test_data_generator, X_test, y_test, batch_size)

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()