In [None]:

# -*- coding: utf-8 -*-
"""
Name: Shane Quinn
Student Number: R00144107
Email: shane.quinn1@mycit.ie
Course: MSc Artificial Intelligence
Module: Deep Learning
Date: 01/05/2021
"""

# from google.colab import drive
# drive.mount('/content/gdrive')

# !unzip "/content/gdrive/My Drive/datasets/earth_data.zip" -d "./"

# !ls


import numpy as np
import h5py
import matplotlib.pyplot as plt
from tensorflow.python.client import device_lib
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from keras.models import load_model
from sklearn.metrics import accuracy_score
import functools
import time
import os
from sklearn.ensemble import RandomForestClassifier
from sklearn import metrics
from sklearn.metrics import f1_score

#Transfer Learning ML Model Imports
from sklearn.tree import DecisionTreeClassifier 
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.linear_model import SGDClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import AdaBoostClassifier
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import GridSearchCV



def exec_time(func):
    """
    Generic Execution time recorder, pass in function. Records execution time using decorators
    Have used this in previous assignments to record execution time

    Parameters
    ----------
    func : FUNCTION
        Function we're recording and printing execution time of.
    """
    
    @functools.wraps(func)
    def record_exec_time(*args, **kwargs):
        start_time = time.perf_counter()
        mn = func(*args, **kwargs)
        execution_time = time.perf_counter() - start_time
        print("Execution Time: ", execution_time)
        return mn

    return record_exec_time


def grid_search(X,X_val, y, y_val, model, param_grid):
    """
    Grid search cross fold validation
    
    ################################### Taken from PML Assignment 2 ###################################

    Parameters
    ----------
    X : NUMPY ARRAY
        Training Data.
    y : NUMPY ARRAY
        Target Data.
    model : SKLearn Classifier Model


    Returns
    -------
    None.

    """

    cv = StratifiedKFold(n_splits=5, shuffle=True)                     #5 Splits for cross fold
    grid_search = GridSearchCV(model, param_grid, scoring='f1_micro', cv = cv, refit=True, n_jobs=-1)
    result = grid_search.fit(X, y)
    best_model=result.best_estimator_
    predictions = best_model.predict(X_val)
    f1 = f1_score(y_val, predictions, average='micro') 
    print("Best f1 Results: ", f1, "with parameters: ", result.best_params_)      
      
         


def compare_ml_models(X_train, X_test, y_train, y_test):
    """
    ################################### Taken from PML Assignment 2 ###################################

    Parameters
    ----------
    X_train : NUMPY ARRAY
        Training data.
    X_test : NUMPY ARRAY
        Training target class values.
    y_train : NUMPY ARRAY
        Test data.
    y_test : NUMPY ARRAY
        Test target class values.

    Returns
    -------
    best_model : SKLearn model object
        Top performing model
    """

    keys = []
    scores = {} 
    fit_models = {}
    best_model = None
    best_score = 0
    #Models to be tested
    models = [DecisionTreeClassifier(), KNeighborsClassifier(), RandomForestClassifier(), SVC(), 
              SGDClassifier(), LogisticRegression(), GaussianNB(), AdaBoostClassifier()]
    
    #iterate through models, save f1 scores
    for model in models:
        print(type(model).__name__)
        m = model.fit(X_train, y_train)
        predictions = m.predict(X_test)
        f1 = f1_score(y_test, predictions, average='micro')
        m_name = type(model).__name__
        scores[m_name] = f1
        fit_models[m_name] = m      
        keys.append(m_name)
        
    print("\n\t\tML Model : F1 Score")
    for key in keys:
        print("{:<25} : {}".format(key, scores[key]))
        if scores[key] > best_score:
            best_score = scores[key]
            best_model = fit_models[key]
    
    print("\nUsing default settings, {} had the highest F1 score of {}".format(type(best_model).__name__, best_score))    
    predictions = best_model.predict(X_test)

    return best_model


@exec_time 
def main():
    """
    Transfer learning using VGG as feature extractor
    Test various ML models to find best performing model
    Use Grid search to further fine tune top performing model hyper parameters

    Returns
    -------
    None.

    """

    X, y, X_val, y_val = loadDataH5()
    
    vggModel = tf.keras.applications.VGG16(weights='imagenet', include_top=False, input_shape=(64, 64, 3))
    # vggModel = tf.keras.applications.VGG19(weights='imagenet', include_top=False, input_shape=(64, 64, 3))
    print(vggModel.summary())
    
    'Feature extraction'
    X_n = vggModel.predict(X)
    X_n = X_n.reshape(X_n.shape[0], -1)
    X_val_n = vggModel.predict(X_val)
    X_val_n = X_val_n.reshape(X_val_n.shape[0], -1)

    'Uncomment to compare ML models'
    # model = compare_ml_models(X_n, X_val_n, y, y_val)
    
    'Best performing model was VGG16 and SVC'
    model = SVC()
    pg = {"kernel": ["linear","rbf"],"C":[0.1,1,10]}
    
    'Uncomment to run Grid Search'
    # grid_search(X_n,X_val_n, y, y_val, model, pg)
    
    model = SVC(kernel = 'rbf', C = 10)
    
    model.fit(X_n, y)
    results = model.predict(X_n)
    print ('Validation accuracy: ', accuracy_score(results, y))    
    results = model.predict(X_val_n)
    print ('Validation accuracy: ', accuracy_score(results, y_val))

    
    
def loadDataH5():
    """
    Extract dataset (supplied in assignment)    
    
    Returns
    -------
    trainX : NUMPY ARRAY
        Training data.
    trainY : NUMPY ARRAY
        Training target class values.
    valX : NUMPY ARRAY
        Test data.
    valY : NUMPY ARRAY
        Test target class values.
    """    

    with h5py.File('earth_data.h5','r') as hf:
        trainX = np.array(hf.get('trainX'))
        trainY = np.array(hf.get('trainY'))
        valX = np.array(hf.get('valX'))
        valY = np.array(hf.get('valY'))
        # print (trainX.shape,trainY.shape)
        # print (valX.shape,valY.shape)
    return trainX, trainY, valX, valY


if __name__=="__main__":
    main()

Model: "vgg16"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_3 (InputLayer)         [(None, 64, 64, 3)]       0         
_________________________________________________________________
block1_conv1 (Conv2D)        (None, 64, 64, 64)        1792      
_________________________________________________________________
block1_conv2 (Conv2D)        (None, 64, 64, 64)        36928     
_________________________________________________________________
block1_pool (MaxPooling2D)   (None, 32, 32, 64)        0         
_________________________________________________________________
block2_conv1 (Conv2D)        (None, 32, 32, 128)       73856     
_________________________________________________________________
block2_conv2 (Conv2D)        (None, 32, 32, 128)       147584    
_________________________________________________________________
block2_pool (MaxPooling2D)   (None, 16, 16, 128)       0     