In [None]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Deep Neural Networks 
## Session 24a: Lecture

##  Convolutional Neural Network (CNN)
- Dropout
- Flowers
- Dataset From directory

<img src='../../images/prasami_color_tutorials_small.png' style = 'width:400px;' alt="By Pramod Sharma : pramod.sharma@prasami.com" align="left"/>

In [None]:
###-----------------
### Import Libraries
###-----------------

import os
#import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, confusion_matrix, ConfusionMatrixDisplay

import tensorflow as tf
  
%matplotlib inline

In [None]:
###----------------------
### Some basic parameters
###----------------------


inpDir = '../../input' # location where input data is stored
outDir = '../output' # location to store outputs
modelDir = '../models'
subDir = 'flower_photos'
altName = 'cnn_base'

RANDOM_STATE = 24 # for initialization ----- REMEMBER: to remove at the time of promotion to production
tf.random.set_seed(RANDOM_STATE) # setting for Tensorflow as well

TEST_SIZE = 0.2

ALPHA = 0.001
EPOCHS = 50 # number of cycles to run
PATIENCE = 20
LR_PATIENCE = 10
FACTOR_LR = 0.5
BATCH_SIZE = 32 # inline of Training Rows being 60000
IMG_HEIGHT = 190
IMG_WIDTH = 190


# Set parameters for decoration of plots
params = {'legend.fontsize' : 'large',
          'figure.figsize'  : (15,10),
          'axes.labelsize'  : 'x-large',
          'axes.titlesize'  :'x-large',
          'xtick.labelsize' :'large',
          'ytick.labelsize' :'large',
         }

CMAP = plt.cm.coolwarm

plt.rcParams.update(params) # update rcParams

## Basic Hygiene

In [None]:
physical_devices = tf.config.list_physical_devices('GPU') 

if len(physical_devices) > 0:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)

In [None]:
print (physical_devices)

## Helper Function

In [None]:
def fn_verify_dir(_path : str):
    '''
    Arg:
        path: path to verify the directory
    returns:
        create dir if it does not exists
    '''
    if os.path.exists(_path): # check if the path exists. Maybe a file or a folder
        
        print(_path, ' exists') # advised the user
        
    else:
        
        os.makedirs(_path) # create the path
        
        print("Created folder : ", _path)

In [None]:
###-----------------------------------
### Function to plot Loss Curve
###-----------------------------------

def fn_plot_tf_hist(hist_df : pd.DataFrame):
    '''
    Args:
      hist_df : pandas Dataframe with four columns
                For 'x' values, we will use index
    '''
    fig, axes = plt.subplots(1,2 , figsize = (15,6))

    # properties  matplotlib.patch.Patch 
    props = dict(boxstyle='round', facecolor='aqua', alpha=0.4)
    facecolor = 'cyan'
    fontsize=12
    
    # Get columns by index to eliminate any column naming error
    y1 = hist_df.columns[0]
    y2 = hist_df.columns[1]
    y3 = hist_df.columns[2]
    y4 = hist_df.columns[3]

    # Where was min loss
    best = hist_df[hist_df[y3] == hist_df[y3].min()]
    
    ax = axes[0]

    hist_df.plot(y = [y1,y3], ax = ax, colormap=CMAP)


    # little beautification
    txtFmt = "Loss: \n  train: {:6.4f}\n   test: {:6.4f}"
    txtstr = txtFmt.format(hist_df.iloc[-1][y1],
                           hist_df.iloc[-1][y3]) #text to plot
    
    # place a text box in upper middle in axes coords
    ax.text(0.3, 0.95, txtstr, transform=ax.transAxes, fontsize=fontsize,
            verticalalignment='top', bbox=props)

    # Mark arrow at lowest
    ax.annotate(f'Min: {best[y3].to_numpy()[0]:6.4f}', # text to print
                xy=(best.index.to_numpy(), best[y3].to_numpy()[0]), # Arrow start
                xytext=(best.index.to_numpy()-1, best[y3].to_numpy()[0]), # location of text 
                fontsize=fontsize, va='bottom', ha='right',bbox=props, # beautification of text
                arrowprops=dict(facecolor=facecolor, shrink=0.05)) # arrow

    # Draw vertical line at best value
    ax.axvline(x = best.index.to_numpy(), color = 'green', linestyle='-.', lw = 3);

    ax.set_xlabel("Epochs")
    ax.set_ylabel(y1.capitalize())
    ax.set_title('Errors')
    ax.grid();
    ax.legend(loc = 'upper left') # model legend to upper left

    ax = axes[1]

    hist_df.plot( y = [y2, y4], ax = ax, colormap=CMAP)
    
    # little beautification
    txtFmt = "Accuracy: \n  train: {:6.4f}\n  test:  {:6.4f}"
    txtstr = txtFmt.format(hist_df.iloc[-1][y2],
                           hist_df.iloc[-1][y4]) #text to plot

    # place a text box in upper middle in axes coords
    ax.text(0.3, 0.2, txtstr, transform=ax.transAxes, fontsize=fontsize,
            verticalalignment='top', bbox=props)

    # Mark arrow at lowest
    ax.annotate(f'Best: {best[y4].to_numpy()[0]:6.4f}', # text to print
                xy=(best.index.to_numpy(), best[y4].to_numpy()[0]), # Arrow start
                xytext=(best.index.to_numpy()-1, best[y4].to_numpy()[0]), # location of text 
                fontsize=fontsize, va='bottom', ha='right',bbox=props, # beautification of text
                arrowprops=dict(facecolor=facecolor, shrink=0.05)) # arrow
    
    
    # Draw vertical line at best value
    ax.axvline(x = best.index.to_numpy(), color = 'green', linestyle='-.', lw = 3);

    ax.set_xlabel("Epochs")
    ax.set_ylabel(y2.capitalize())
    ax.grid()
    ax.legend(loc = 'lower left')
    
    plt.tight_layout()

In [None]:
def fn_plot_label(tr_ds, ts_ds):
    
    plt.figure(figsize = (15,5)) # instantiate the figure
    
    plt.subplot(1,2,1) # first out of 2

    train_labels = tf.concat([lbl for img, lbl in tr_ds], axis = 0).numpy() # get the labels

    unique, _, counts = tf.unique_with_counts(train_labels) # get counts

    plt.bar(range(len(unique)), counts, align='center', color = 'DarkBlue') # barplot the counts

    plt.xticks(range(len(unique)), class_names)

    plt.title('Training Set')

    plt.subplot(1,2,2)
    
    test_labels = tf.concat([lbl for img, lbl in ts_ds], axis = 0).numpy()

    unique, _, counts = tf.unique_with_counts(test_labels)

    plt.bar(range(len(unique)), counts, align='center', color = 'Orange')

    plt.xticks(range(len(unique)), class_names)

    plt.title('Test Set')


In [None]:
train_filename = os.path.join(inpDir, subDir, 'fashion-mnist_train.csv')
test_filename = os.path.join(inpDir, subDir, 'fashion-mnist_test.csv')

train_df = pd.read_csv(train_filename, header = 0)
test_df = pd.read_csv(test_filename, header = 0)

In [None]:
train_df.head()

In [None]:
test_df.head()

In [None]:
fn_plot_label(train_df, test_df)

In [None]:
'''import pathlib
dataset_url = "https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz"

data_dir = tf.keras.utils.get_file(origin=dataset_url,
                                   fname='flower_photos',
                                   untar=True)
data_dir = pathlib.Path(data_dir)'''


data_dir = os.path.join(inpDir, subDir)
data_dir

In [None]:
os.listdir(data_dir)

In [None]:
# create training data
train_ds =tf.keras.preprocessing.image_dataset_from_directory(
    data_dir, # path the the data directory
    validation_split=TEST_SIZE, # what ratio of validation data
    subset='training', # purpose
    seed=RANDOM_STATE, 
    image_size=[IMG_HEIGHT, IMG_WIDTH], ## @@@ WHAT!
    batch_size=BATCH_SIZE
)
# test data
test_ds =tf.keras.preprocessing.image_dataset_from_directory(
    data_dir, # path the the data directory
    validation_split=TEST_SIZE, # what ratio of validation data
    subset='validation', # purpose
    seed=RANDOM_STATE, 
    image_size=[IMG_HEIGHT, IMG_WIDTH], ## @@@ WHAT!
    batch_size=BATCH_SIZE
)

In [None]:
# is it picking class names
class_names = train_ds.class_names
class_names

In [None]:
plt.figure(figsize=(15,6))

for images, labels in train_ds.take(1):
    for i in range (BATCH_SIZE):
        plt.subplot(int(BATCH_SIZE/8), 8, i +1)
        plt.grid(False)
        plt.imshow(images[i].numpy().astype('uint8'))
        plt.title(class_names[labels[i]])
        plt.axis('off')
    plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(15,6))

for images, labels in test_ds.take(1): # get me one batch
    
    for i in range (BATCH_SIZE): # loop over batch
        
        plt.subplot(int(BATCH_SIZE/8), 8, i +1) # access the axis
        
        plt.grid(False) # no to grid
        
        plt.imshow(images[i].numpy().astype('uint8')) # show image convert to numpy and int
        
        plt.title(class_names[labels[i]])
        
        plt.axis('off')
    
    plt.tight_layout()

plt.show()

In [None]:
fn_plot_label(train_ds, test_ds)

In [None]:
input_shape = (IMG_HEIGHT, IMG_WIDTH, 3)

## Random Zoom

In [None]:
layer =tf.keras.layers.RandomZoom(height_factor=(-0.2, -0.2), 
                                  width_factor=(-0.2, -0.2) )

plt.figure()

img_num = 3

for images, labels in test_ds.take(1): # get me one batch
    
    out_images = layer(images)
    
    plt.subplot(1,2,1)
    plt.title('Original')
    plt.imshow(images[img_num].numpy().astype('uint8'))

    plt.subplot(1,2,2)
    plt.title('Zoomed')
    plt.imshow(out_images[img_num].numpy().astype('uint8'))


In [None]:
train_ds = train_ds.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

test_ds = test_ds.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
# Build model

model =  tf.keras.Sequential()

model.add(tf.keras.layers.Rescaling(1/255.)) # convert between 0 and 1


###-------
### Set I
###--------
model.add(tf.keras.layers.Conv2D(8, (3,3), activation='relu')) # 188 x 188 x 8
model.add(tf.keras.layers.MaxPool2D(2,2)) # 94 x 94 x 8
###-------
### Set II
###--------
model.add(tf.keras.layers.Conv2D(16, (3,3), activation='relu')) # 92 x 92 x 16
model.add(tf.keras.layers.MaxPool2D(2,2)) # 46 x 46 x 16
###-------
### Set III
###--------
model.add(tf.keras.layers.Conv2D(32, (3,3), activation='relu')) # 44 x 44 x 32
model.add(tf.keras.layers.MaxPool2D(2,2)) # 22 x 22 x 32
###-------
### Set IV
###--------
model.add(tf.keras.layers.Conv2D(64, (3,3), activation='relu')) # 20 x 20 x 128
model.add(tf.keras.layers.MaxPool2D(2,2)) # 10 x 10 x 64


###-------
### Set V
###--------
model.add(tf.keras.layers.Conv2D(128, (3,3), activation='relu')) # 8 x 8 x 128
model.add(tf.keras.layers.MaxPool2D(2,2)) # 4 x 4 x 128

###-------
### Set VI
###--------
model.add(tf.keras.layers.Conv2D(256, (3,3), activation='relu')) # 2 x 2 x 256

###-------
### Head Starts
###--------
model.add(tf.keras.layers.Flatten()) # Flatten

model.add(tf.keras.layers.Dense(256, activation='relu')) # Dense 1

model.add(tf.keras.layers.Dense(5)) # output

In [None]:
optim = tf.keras.optimizers.Adam(learning_rate = ALPHA)

loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)

model.compile(optimizer=optim, loss = loss_fn, metrics=['accuracy'])

In [None]:
early_stopping_callback = tf.keras.callbacks.EarlyStopping(
    monitor = 'val_loss',
    patience=PATIENCE,
    mode='auto',
    baseline =None,
    restore_best_weights=True,
    verbose = 1)

lr_decay = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',
    factor=FACTOR_LR,
    patience=LR_PATIENCE,
    verbose=1,
    mode='auto'
)

# define model file path
modelFile = os.path.join(modelDir, subDir, altName)

# define checkpoint callback
model_callback = tf.keras.callbacks.ModelCheckpoint(
    modelFile,
    monitor = 'val_loss',
    verbose = 1,
    save_best_only = True,
    save_weights_only = True,
    mode = 'auto',
    )

In [None]:
history = model.fit(train_ds, validation_data=test_ds, 
                    epochs=EPOCHS, 
                    verbose=1, 
                    batch_size=BATCH_SIZE, 
                    callbacks=[early_stopping_callback, lr_decay, model_callback]
                   )

In [None]:
model.summary()

In [None]:
res_df = pd.DataFrame(history.history)
res_df.head()

In [None]:
fn_plot_tf_hist(res_df)

### Evaluate the model

In [None]:
test_loss, test_acc = model.evaluate(test_ds, verbose=2)

In [None]:
y_test = tf.concat([y for x, y in test_ds], axis=0).numpy()

In [None]:
yhat = model.predict(test_ds)

y_pred = yhat.argmax(axis = 1)

print(f'Accuracy score on Test Data : {accuracy_score(y_test, y_pred) : .4f}')


|| |Predicted| class
|:-|:-|:-|:-|
| | |P|N|
Actual|P|TP|FN
class|N|FP|TN

In [None]:
CMAP = plt.cm.Blues

cm = confusion_matrix(y_test, y_pred)

disp = ConfusionMatrixDisplay(confusion_matrix=cm,
                       display_labels=class_names)

fig, ax = plt.subplots(figsize = (6,6))

disp.plot(ax = ax, cmap=CMAP, colorbar=False, xticks_rotation=90);

In [None]:
for images, labels in test_ds.take(5):

    yhat =  model.predict(images, verbose= 0)

    y_pred = yhat.argmax( axis = 1 )
    
    fig, axes = plt.subplots(BATCH_SIZE//8 , 8)
    axes = axes.ravel()

    
    for i in range( BATCH_SIZE):

        ax = axes[i]
        ax.imshow(images[i].numpy().astype('uint8'))
        #ax.set_title(f'{class_names[labels[i].numpy()]}')
        ax.set_title(f'{class_names[y_pred[i]]}')
        ax.set_axis_off()