# Image Processing with Neural Network
## Lecture 16: Simple RNN

##  Weather Data

<img src='../../../images/prasami_color_tutorials_small.png' style = 'width:400px;' alt="By Pramod Sharma : pramod.sharma@prasami.com" align="left"/>

In [2]:
###-----------------

# Import Libraries

###-----------------

import os
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt

import tensorflow as tf

from sklearn.metrics import accuracy_score, classification_report

#from utils.helper import fn_plot_tf_hist

In [11]:
###----------------------
### Some basic parameters
###----------------------


inpDir = 'C:/Users/Rohit/OneDrive/Desktop/DNN_Assign/input' # location where input data is stored
outDir = '../output' # location to store outputs
modelDir = '../models'
subDir = 'flower_photos'
valDir = 'valid_flowers'
altName = 'cnn_base'

RANDOM_STATE = 24 # for initialization ----- REMEMBER: to remove at the time of promotion to production
tf.random.set_seed(RANDOM_STATE) # setting for Tensorflow as well

TEST_SIZE = 0.2

ALPHA = 0.001
EPOCHS = 1 # number of cycles to run
FLIP_MODE = 'horizontal_and_vertical'
ROTATION=(-0.1,0.1)
BATCH_SIZE = 8 
IMG_HEIGHT = 200 
IMG_WIDTH = 200 


# Set parameters for decoration of plots
params = {'legend.fontsize' : 'medium',
          'figure.figsize'  : (15,10),
          'axes.labelsize'  : 'medium',
          'axes.titlesize'  :'large',
          'xtick.labelsize' :'medium',
          'ytick.labelsize' :'medium',
         }

CMAP = plt.cm.coolwarm

plt.rcParams.update(params) # update rcParams

plt.style.use('seaborn-v0_8-darkgrid') # plt.style.use('ggplot')

In [12]:


# verify if the directory exists
def fn_dir_exists(_dir):
    '''
    Args:
    	_dir: path if a directory
     '''

    res = os.path.exists(_dir)

    fn_log_event ('-- Directory "{}" exist : {}'.format(_dir, res), 'debug')

    if not res:

        fn_log_event ('-- Directory "{}" does not exists.'.format(_dir), 'debug')

        sys.exit('-- Directory "{}" does not exists.'.format(_dir))

    return res



def fn_file_exists(_file):
    '''
        verify if the file exists
    '''
    res = os.path.exists(_file)

    fn_log_event ('-- File "{}" exist : {}'.format(_file, res), 'debug')

    if not res:

        fn_log_event ('-- File "{}" does not exists.'.format(_file), 'debug')

        sys.exit('-- File "{}" does not exists.'.format(_file))

    return res


###-----------------------------
### Verify or create a directory
###-----------------------------
def fn_verify_dir(_path : str):
    '''
    Arg:
        path: path to verify the directory
    returns:
        create dir if it does not exists
    '''
    if os.path.exists(_path): # check if the path exists. Maybe a file or a folder

        print(_path, ' exists') # advised the user

    else:

        os.makedirs(_path) # create the path

        print("Created folder : ", _path)

###------------------------------------------------
### Plot Loss Curve using Tensorflow history object
###------------------------------------------------
def fn_plot_tf_hist(hist_df):

    '''
    Args:
        hist_df: a dataframe with following ccolumns
            column 0: loss
            column 1: accuracy
            column 2: val_loss
            column 3: val_accuracy
            While plotting columns are accessed by index
            so that even if the column names are different it will not throw exceptions.
    '''

    fig, axes = plt.subplots(1,2 , figsize = (15,6))

    # properties  matplotlib.patch.Patch
    props = dict(boxstyle='round', facecolor='aqua', alpha=0.4)
    facecolor = 'cyan'
    fontsize=12
    CMAP = plt.cm.coolwarm

    # Get columns by index to eliminate any column naming error
    y1 = hist_df.columns[0]
    y2 = hist_df.columns[1]
    y3 = hist_df.columns[2]
    y4 = hist_df.columns[3]

    # Where was min loss
    best = hist_df[hist_df[y3] == hist_df[y3].min()]

    ax = axes[0]

    hist_df.plot(y = [y1,y3], ax = ax, colormap=CMAP)


    # little beautification
    txtFmt = "Loss: \n  train: {:6.4f}\n   test: {:6.4f}"
    txtstr = txtFmt.format(hist_df.iloc[-1][y1],
                           hist_df.iloc[-1][y3]) #text to plot

    # place a text box in upper middle in axes coords
    ax.text(0.3, 0.95, txtstr, transform=ax.transAxes, fontsize=fontsize,
            verticalalignment='top', bbox=props)

    # calculate offset for arroe
    y_min = min(hist_df[y1].min(), hist_df[y3].min())
    y_max = max(hist_df[y1].max(), hist_df[y3].max())
    offset = (y_max-y_min)/10.0

    # Mark arrow at lowest
    ax.annotate(f'Min: {best[y3].to_numpy()[0]:6.4f}', # text to print
                xy=(best.index.to_numpy(), best[y3].to_numpy()[0]), # Arrow start
                xytext=(best.index.to_numpy(), best[y3].to_numpy()[0] + offset), # location of text
                fontsize=fontsize, va='bottom', ha='right',bbox=props, # beautification of text
                arrowprops=dict(facecolor=facecolor, shrink=0.05)) # arrow

    # Draw vertical line at best value
    ax.axvline(x = best.index.to_numpy(), color = 'green', linestyle='-.', lw = 3);

    ax.set_xlabel("Epochs")
    ax.set_ylabel(y1.capitalize())
    ax.set_title('Errors')
    ax.grid();
    ax.legend(loc = 'upper left') # model legend to upper left

    ax = axes[1]

    hist_df.plot( y = [y2, y4], ax = ax, colormap=CMAP)

    # little beautification
    txtFmt = "Accuracy: \n  train: {:6.4f}\n  test:  {:6.4f}"
    txtstr = txtFmt.format(hist_df.iloc[-1][y2],
                           hist_df.iloc[-1][y4]) #text to plot

    # place a text box in upper middle in axes coords
    ax.text(0.3, 0.2, txtstr, transform=ax.transAxes, fontsize=fontsize,
            verticalalignment='top', bbox=props)

    # calculate offset for arroe
    y_min = min(hist_df[y2].min(), hist_df[y4].min())
    y_max = max(hist_df[y2].max(), hist_df[y4].max())
    offset = (y_max-y_min)/10.0

    # Mark arrow at lowest
    ax.annotate(f'Best: {best[y4].to_numpy()[0]:6.4f}', # text to print
                xy=(best.index.to_numpy(), best[y4].to_numpy()[0]), # Arrow start
                xytext=(best.index.to_numpy(), best[y4].to_numpy()[0]-offset), # location of text
                fontsize=fontsize, va='bottom', ha='right',bbox=props, # beautification of text
                arrowprops=dict(facecolor=facecolor, shrink=0.05)) # arrow


    # Draw vertical line at best value
    ax.axvline(x = best.index.to_numpy(), color = 'green', linestyle='-.', lw = 3);

    ax.set_xlabel("Epochs")
    ax.set_ylabel(y2.capitalize())
    ax.grid()
    ax.legend(loc = 'lower left')

    plt.tight_layout()



###------------------------------
### Plot Loss Curve using pytorch
###------------------------------
def fn_plot_torch_hist(hist_df):

    # instantiate figure
    fig, axes = plt.subplots(1,2 , figsize = (15,6))

    # properties  matplotlib.patch.Patch
    props = dict(boxstyle='round', facecolor='cyan', alpha=0.5)

    # columns
    x = hist_df.columns[0]
    y1 = hist_df.columns[1]
    y2 = hist_df.columns[2]
    y3 = hist_df.columns[3]
    y4 = hist_df.columns[4]

    # Where was min loss
    best = hist_df[hist_df[y2] == hist_df[y2].min()]
    best = best.drop_duplicates(subset=y2)
    # pick first axis
    ax = axes[0]

    # Plot all losses
    hist_df.plot(x = x, y = [y1, y2], ax = ax)

    # calculate offset for arroe
    y_min = min(hist_df[y1].min(), hist_df[y2].min())
    y_max = max(hist_df[y1].max(), hist_df[y2].max())
    offset = (y_max-y_min)/10.0

    # little beautification
    txtFmt = "Loss: \n  train: {:6.4f}\n   test: {:6.4f}"
    txtstr = txtFmt.format(hist_df.iloc[-1][y1],
                           hist_df.iloc[-1][y2]) #text to plot

    # place a text box in upper middle in axes coords
    ax.text(0.3, 0.95, txtstr, transform=ax.transAxes, fontsize=14,
            verticalalignment='top', bbox=props)

    # Mark arrow at lowest
    ax.annotate(f'Min: {best[y2].to_numpy()[0]:6.4f}', # text to print
                xy=(best[x].to_numpy(), best[y2].to_numpy()[0]), # Arrow start
                xytext=(best[x].to_numpy()+ offset, best[y2].to_numpy()[0]+offset), # location of text
                fontsize=14,va='bottom', ha='right',bbox=props, # beautification of text
                arrowprops=dict(facecolor='cyan', shrink=0.05)) # arrow

    # Draw vertical line at best value
    ax.axvline(x = best[x].to_numpy(), color = 'green', linestyle='-.', lw = 3);

    ax.set_xlabel(x.title())
    ax.set_ylabel(y1.title())
    ax.set_title('Errors')
    ax.grid()
    ax.legend(loc = 'upper left') # model legend to upper left

    # pick second axis
    ax = axes[1]

    # Plot accuracies
    hist_df.plot(x = x, y = [y3, y4], ax = ax)

    # little beautification
    txtFmt = "Accuracy: \n  train: {:6.4f}\n  test:  {:6.4f}"
    txtstr = txtFmt.format(hist_df.iloc[-1][y3],
                           hist_df.iloc[-1][y4]) #text to plot

    # calculate offset for arroe
    y_min = min(hist_df[y3].min(), hist_df[y4].min())
    y_max = max(hist_df[y3].max(), hist_df[y4].max())
    offset = (y_max-y_min)/10.0

    # place a text box in lower middle in axes coords
    ax.text(0.3, 0.2, txtstr, transform=ax.transAxes, fontsize=12,
            verticalalignment='top', bbox=props)

    # Mark arrow at lowest
    ax.annotate(f'Best: {best[y4].to_numpy()[0]:6.4f}', # text to print
                xy=(best[x].to_numpy(), best[y4].to_numpy()[0]), # Arrow start
                xytext=(best[x].to_numpy()- offset, best[y4].to_numpy()[0]-offset), # location of text
                fontsize=14,va='bottom', ha='right',bbox=props, # beautification of text
                arrowprops=dict(facecolor='cyan', shrink=0.05)) # arrow


    # Draw a vertical line at best value
    ax.axvline(x = best[x].to_numpy(),
               color = 'green',
               linestyle='-.', lw = 3)

    # Labels
    ax.set_xlabel(x.title())
    ax.set_ylabel(y3.title())
    ax.set_title('Accuracies')
    ax.grid();
    ax.legend(loc = 'lower left')

    plt.tight_layout()



###---------------------------------------------------
### Plot count of classes in training and testing sets
###---------------------------------------------------
def fn_plot_labels(tr_ds, ts_ds, class_names = None):

    '''
        Args:
            tr_ds :  Training Dataset
            ts_ds : Testing dataset
        Returns : none
    '''

    # create figure and axes
    fig, axes = plt.subplots(1,2, figsize = (15,5))

    # get names of the classes
    if not class_names:
        tr_class_names = tr_ds.class_names
        ts_class_names = tr_ds.class_names

    # pick first axis
    ax = axes[0]

    # create dict of training labels
    class_counts = {}
    for imgs, lbls in tr_ds:
        for lbl in lbls.numpy():
            class_counts[lbl] = class_counts.get(lbl, 0) +1

    # bar plot
    ax.bar(tr_class_names, [class_counts.get(i, 0) for i in range(len(tr_class_names))],
           align='center',color = 'DarkBlue', alpha = 0.7)

    # add title
    ax.set_title('Training Set')

    # grids make it look good
    ax.grid()


    #pick second image
    ax = axes[1]

    # create dict of training labels
    class_counts = {}
    for imgs, lbls in ts_ds:
        for lbl in lbls.numpy():
            class_counts[lbl] = class_counts.get(lbl, 0) +1

    # bar plot
    ax.bar(ts_class_names, [class_counts.get(i, 0) for i in range(len(ts_class_names))],
           align='center',color = 'orange', alpha = 0.7)

    # add title
    ax.set_title('Test Set')


    # grids make it look good
    ax.grid()

    # fit the subplot(s) in to the figure area
    plt.tight_layout()

    # display all open figures
    plt.show()

def fn_convert_timestamp(tstr):
    '''
        Function to conver string of form "2015-11-12 1444"
    '''

    return datetime.strptime(tstr, "%Y-%m-%d_%H%M")

###----------------------
### Plot confusion matrix
###----------------------
def fn_plot_confusion_matrix(y_true, y_pred, labels):
    '''
    Args:
        y_true: Ground Truth
        y_pred : Predictions
        labels : dictonary
                  {0: 'Goal Keeper',
                  1: 'Defender',
                  2: 'Mid-Fielder',
                  3: 'Forward'}

    '''

    cm  = confusion_matrix(y_true, y_pred)

    disp = ConfusionMatrixDisplay(confusion_matrix=cm,
                                  display_labels=labels.values())

    fig, ax = plt.subplots(figsize = (6,6))

    disp.plot(ax = ax, cmap = 'Blues', xticks_rotation = 'vertical', colorbar=False)
    # Disable the grid
    ax.grid(False)

    plt.show();

## Basic Hygiene

In [13]:
physical_devices = tf.config.list_physical_devices('GPU') 

if len(physical_devices) > 0:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)

In [14]:
print (physical_devices)

[]


In [15]:


'''

labels = []

for count, data in enumerate (train_ds):
    imgs, lbls = data
    #print(count, ":", imgs.shape, lbls.shape)
    labels.append(lbls)
    
tf.concat(labels, axis = 0).numpy().shape

unique, A, counts = tf.unique_with_counts(tf.concat(labels, axis = 0).numpy())
unique, A, counts


'''

'\n\nlabels = []\n\nfor count, data in enumerate (train_ds):\n    imgs, lbls = data\n    #print(count, ":", imgs.shape, lbls.shape)\n    labels.append(lbls)\n    \ntf.concat(labels, axis = 0).numpy().shape\n\nunique, A, counts = tf.unique_with_counts(tf.concat(labels, axis = 0).numpy())\nunique, A, counts\n\n\n'

In [16]:
def fn_plot_label(tr_ds, ts_ds):
    
    plt.figure(figsize = (15,5)) # instantiate the figure
    
    plt.subplot(1,2,1) # first out of 2

    train_labels = tf.concat([lbl for img, lbl in tr_ds], axis = 0).numpy() # get the labels

    unique, _, counts = tf.unique_with_counts(train_labels) # get counts

    plt.bar(range(len(unique)), counts, align='center', color = 'DarkBlue') # barplot the counts

    plt.xticks(range(len(unique)), class_names)

    plt.title('Training Set')

    plt.subplot(1,2,2)
    
    test_labels = tf.concat([lbl for img, lbl in ts_ds], axis = 0).numpy()

    unique, _, counts = tf.unique_with_counts(test_labels)

    plt.bar(range(len(unique)), counts, align='center', color = 'Orange')

    plt.xticks(range(len(unique)), class_names)

    plt.title('Test Set')


In [19]:
# comment part is for colab

'''import pathlib
dataset_url = "https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz"

data_dir = tf.keras.utils.get_file(origin=dataset_url,
                                   fname='flower_photos',
                                   untar=True)
data_dir = pathlib.Path(data_dir)'''


# data_dir = os.path.join(inpDir, subDir)
# data_dir

'import pathlib\ndataset_url = "https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz"\n\ndata_dir = tf.keras.utils.get_file(origin=dataset_url,\n                                   fname=\'flower_photos\',\n                                   untar=True)\ndata_dir = pathlib.Path(data_dir)'

In [21]:
#os.listdir(data_dir)

In [22]:
# create training data

train_ds =tf.keras.preprocessing.image_dataset_from_directory(
    
    data_dir, # path the the data directory
    
    validation_split=TEST_SIZE, # what ratio of validation data
    
    subset='training', # purpose
    
    seed=RANDOM_STATE, 
    
    # what size of image should we bring?
    image_size=[IMG_HEIGHT, IMG_WIDTH], ## @@@ WHAT!
    
    # 80 % of image to the training set
    batch_size=BATCH_SIZE
)
# test data

test_ds =tf.keras.preprocessing.image_dataset_from_directory(
    
    data_dir, # path the the data directory
    
    validation_split=TEST_SIZE, # what ratio of validation data
    
    subset='validation', # purpose
    
    seed=RANDOM_STATE, 
    
    image_size=[IMG_HEIGHT, IMG_WIDTH],
    
    batch_size=BATCH_SIZE
)

NotFoundError: Could not find directory C:/Users/Rohit/OneDrive/Desktop/DNN_Assign/input\flower_photos

In [23]:
# is it picking class names

class_names = train_ds.class_names
class_names

NameError: name 'train_ds' is not defined

In [None]:
class_dict = {k:v for k,v in enumerate(class_names)}
class_dict

In [None]:
num_classes = len(class_names)
num_classes

In [None]:
plt.figure(figsize=(15,8))

for images, labels in train_ds.take(1):
    for i in range (BATCH_SIZE):
        plt.subplot(int(BATCH_SIZE/8), 8, i +1)
        plt.grid(False)
        plt.imshow(images[i].numpy().astype('uint8'))
        plt.title(class_names[labels[i]])
        plt.axis('off')
    plt.tight_layout()
plt.show()

In [None]:
plt.figure(figsize=(15,8))

for images, labels in test_ds.take(1): # get me one batch
    
    for i in range (BATCH_SIZE): # loop over batch
        
        plt.subplot(int(BATCH_SIZE/8), 8, i +1) # access the axis
        
        plt.grid(False) # no to grid
        
        plt.imshow(images[i].numpy().astype('uint8')) # show image convert to numpy and int
        
        plt.title(class_names[labels[i]])
        
        plt.axis('off')
    
    plt.tight_layout()

plt.show()

In [None]:
fn_plot_label(train_ds, test_ds)

In [None]:
train_ds=train_ds.cache().prefetch(buffer_size=tf.data.AUTOTUNE)
test_ds=test_ds.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

In [None]:
input_shape = (IMG_HEIGHT, IMG_WIDTH, 3)
input_shape

densenet model

In [None]:
densenet_model=tf.keras.applications.DenseNet169(weights='imagenet',
                                                 include_top=False,
                                                input_shape=input_shape)

In [None]:
for layer in densenet_model.layers:
    layer.trainable=False

In [None]:
densenet_model.summary()

In [None]:

def build_model(input_shape, num_classes):
    
    krnl_init = tf.keras.initializers.GlorotUniform(seed = RANDOM_STATE)
    
    model = tf.keras.models.Sequential()
    
    model.add(tf.keras.layers.Rescaling (1./255.))

    # #augmentation layer here
    # model.add(tf.keras.layers.RandomFlip(mode=FLIP_MODE,         
    #                             kernel_initializer= krnl_init,
    #                             activation='relu'))       # expected output size  = 92 x 92 x 64


    model.add(densenet_model)
    model.add(tf.keras.layers.Flatten())
    
#first
    model.add(tf.keras.layers.Dense(1664,kernel_initializer= krnl_init))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.Activation(activation='relu'))
    model.add(tf.keras.layers.Dropout(0.5))
    
#second
    model.add(tf.keras.layers.Dense(46,kernel_initializer= krnl_init))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.Activation(activation='relu'))
    model.add(tf.keras.layers.Dropout(0.5))
    
#output
    model.add(tf.keras.layers.Dense(num_classes,kernel_initializer= krnl_init))
   
    return model                      

Head part

In [None]:
model = build_model(input_shape, num_classes)
model

In [None]:
# define loss:

loss_fn=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)


In [None]:
model.compile(optimizer="adam",loss=loss_fn,metrics=['accuracy'])

In [None]:
history=model.fit(train_ds,
                  batch_size=BATCH_SIZE,
                  epochs=EPOCHS,
                  validation_data=test_ds)

In [None]:
model.summary()

In [None]:
'''tf.keras.utils.plot_model(model,"model.png",
                          show_shapes=True,
                          show_dtype=True,
                          dpi=96,
                          show_layer_activations=True )'''

In [None]:
fn_plot_tf_hist(pd.DataFrame(history.history))

In [None]:
layer=tf.keras.layers.RandomRotation((-0.5,0.5),
    fill_mode="nearest",
    seed=RANDOM_STATE
)

plt.figure()
img_num=0

for images,labels in train_ds.take(1):
    out_images=layer(images)
    
    plt.subplot(1,2,1)
    plt.title("original")
    plt.imshow(images[img_num].numpy().astype("uint16"))
    plt.grid(False)

    plt.subplot(1,2,2)
    plt.title("rotated")
    plt.imshow(out_images[img_num].numpy().astype("uint16"))
    plt.grid(False)

In [None]:
test_loss,test_acc = model.evaluate(test_ds, verbose = 1)

In [None]:
yhat = model.predict(test_ds)

y_test = np.concatenate([y for X, y in test_ds], axis = 0).squeeze()

y_pred = yhat.argmax(axis = 1)

print('Accuracy score on test data: , {:.4f}'. format(accuracy_score(y_test, y_pred)))

In [None]:
# Running on validation data:

val_dir = os.path.join(inpDir, subDir)

val_ds = tf.keras.preprocessing.image_dataset_from_directory(
        val_dir,            # path the the data directory
        validation_split=None,   # what ratio of validation data
        seed=RANDOM_STATE, 
        image_size=[IMG_HEIGHT, IMG_WIDTH], 
        batch_size=BATCH_SIZE)


In [None]:
yhat = model.predict(val_ds)

y_test = np.concatenate([y for X, y in val_ds], axis = 0).squeeze()

y_pred = yhat.argmax(axis = 1)

print('Accuracy score on test data: , {:.4f}'. format(accuracy_score(y_test, y_pred)))