# American Sign Language - Computer Vision Project

- Dataset: https://public.roboflow.com/object-detection/american-sign-language-letters
- Example Task: https://towardsdatascience.com/sign-language-recognition-with-advanced-computer-vision-7b74f20f3442

In [None]:
# OVERRIDE_TESTING = False # Set to True to run tests even if not in testing mode

In [None]:
import numpy as np
import tensorflow as tf

# Set the seed for NumPy
np.random.seed(42)

# Set the seed for TensorFlow
tf.random.set_seed(42)

import pandas as pd
import os, glob
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import PIL

import tensorflow as tf
from tensorflow.keras.utils import load_img, img_to_array, array_to_img
from tensorflow.keras import layers, models, optimizers
from tensorflow.keras.utils import to_categorical


tf.__version__

In [None]:
# Custom functions:
%load_ext autoreload
%autoreload 2
# sys.path.append(os.path.abspath("../../"))
import custom_functions as cf
help(cf)

In [None]:
import json, os
from pprint import pprint

# Define filename for project config filepaths json file
FPATHS_FILE = "config/filepaths.json"
os.makedirs(os.path.dirname(FPATHS_FILE), exist_ok=True)

# Define Filepaths
FPATHS = dict(
    data={
        # Images Directoryies
        'data_dir': "./American Sign Language Letters.v1-v1.multiclass/",
        "train-images_dir": "./American Sign Language Letters.v1-v1.multiclass/train/",
        "test-images_dir": "./American Sign Language Letters.v1-v1.multiclass/test/",
        
        # Image classes as csv fiels
        "train-labels_csv": "./American Sign Language Letters.v1-v1.multiclass/train/_classes.csv",
        "test-labels_csv": "./American Sign Language Letters.v1-v1.multiclass/test/_classes.csv",
        
        # Processed versions of the above csv files
        "train-labels_processed_csv": "./American Sign Language Letters.v1-v1.multiclass/train/_classes_processed.csv",
        "test-labels_processed_csv": "./American Sign Language Letters.v1-v1.multiclass/test/_classes_processed.csv",

        },

    images={
        "banner": "images/American_Sign_Language_ASL.svg",
    },
    # Any images to be displayed in the app
    eda={
        "label-distrubtion-countplot_png": "images/label-distribution-countplot.png",
        "test-labels-distrubtion_png": "images/test-label-distribution-countplot.png",
        "example-images_png": "images/ed_example_letters.png",
    },
    modeling={
            "train-dataset_dir": "modeling/data/training-data-tf/",  # train_ds
            "val-dataset_dir": "modeling/data/validation-data-tf/",  # train_ds
            "test-dataset_dir": "modeling/data/testing-data-tf/",  # test_ds
            "params":"modeling/params.json",
            "best_cnn_fpaths": {'model_dir': "modeling/models/best_cnn/",
                                      "results_dir": "modeling/models/best_cnn/results/",
                                      'model_history_png': "modeling/models/best_cnn/results/model_history.png",
                                        "model_confusion_matrix": "modeling/models/best_cnn/results/model_confusion_matrix.png",
                                        "model_classification_report": "modeling/models/best_cnn/results/model_classification_report.png",
                                  },
            # 'best_model_cnn_dir': "modeling/models/best_cnn/",
            "best_transfer_fpaths": {'model_dir': "modeling/models/transfer_learning/",
                                                    "results_dir": "modeling/models/transfer_learning/results/",
                                                    "model_history_png": "modeling/models/transfer_learning/results/model_history.png",
                                                    "model_confusion_matrix": "modeling/models/transfer_learning/results/model_confusion_matrix.png",
                                                    "model_classification_report": "modeling/models/transfer_learning/results/model_classification_report.png",
            },
                                                    
            # 'transfer_learning_dir': "modeling/models/transfer_learning/",
            "label-lookup_json": "modeling/label_lookup.json",
},
    config = {'log_fpath': "logs/nn_training.log",}
    # results={"results_dir": "results/",
    #          'best_model_cnn_dir': "results/best_cnn/",
    #          "best_model_cnn_history": "results/best_cnn/history.png",
    #          "best_model_cnn_confusion_matrix": "results/best_cnn/confusion_matrix.png",
    #          "best_model_cnn_classification_report": "results/best_cnn/classification_report.png",
             
    #          "transfer_learning_dir": "results/transfer_learning/",
    #          "transfer_learning_history": "results/transfer_learning/history.png",
    #          "transfer_learning_confusion_matrix": "results/transfer_learning/confusion_matrix.png",
    #             "transfer_learning_classification_report": "results/transfer_learning/classification_report.png",
    #             },
)
FPATHS

In [None]:

# Use fn for local package, ds for pip version
cf.utils.create_directories_from_paths(FPATHS)
# ds.utils.create_directories_from_paths(FPATHS)

print('[i] FPATHS Dictionary:\n')
pprint(FPATHS.keys())#, indent=4)

## Save the filepaths
with open(FPATHS_FILE, "w") as f:
    json.dump(FPATHS, f)
    print(f"\n[i] Saved FPATHS to {FPATHS_FILE}")

### 🎛️ Project Params

In [None]:
## Set project-wide parameters
OVERWRITE_LOGS = True

# # Saving image params as vars for reuse
BATCH_SIZE = 32
IMG_HEIGHT = 128
IMG_WIDTH = 128

## Set data split proportions
TRAIN_SPLIT = 0.7  # Proportion of data for training
VAL_SPLIT = 0.15  # Proportion of data for validation (remaining will be for test)


# Save model params
PATIENCE = 10  # For early stopping
EPOCHS = 2#0  # Max number of epochs to run
print(f"EPOCHS TEMPORARILY SET TO {EPOCHS}")

import json
## Save model params from above to json
params = {"BATCH_SIZE":BATCH_SIZE,
          "IMG_HEIGHT":IMG_HEIGHT,
          "IMG_WIDTH":IMG_WIDTH,
          "TRAIN_SPLIT":TRAIN_SPLIT,
          "VAL_SPLIT":VAL_SPLIT,
          "PATIENCE":PATIENCE,
          "EPOCHS":EPOCHS}

with open(FPATHS['modeling']['params'], "w") as f:
    json.dump(params, f)
    print(f"\n[i] Saved params to {FPATHS['modeling']['params']}")

# Load Data

In [None]:
# Checking the contents of data folder
data_dir = FPATHS['data']['data_dir'] #"./American Sign Language Letters.v1-v1.multiclass/"
data_dir

In [None]:
# Getting list of img file paths (ONLY, did not make recursuve so no folders)
img_files = glob.glob(data_dir+"**/*")#, recursive=True)
len(img_files)

In [None]:
# Preview an example image (at full size)
img_loaded = load_img(img_files[0])
img_data = img_to_array(img_loaded)
print(img_data.shape)
array_to_img(img_data)

### Prepare CSV of Filenames + Labels (1 per train/test)

In [None]:

# Load the CSV file
# csv_path = os.path.join(data_dir,"train","_classes.csv")
train_csv = FPATHS['data']['train-labels_csv']
df_train = pd.read_csv(train_csv)
df_train = df_train.convert_dtypes()
df_train = df_train.set_index('filename')
df_train = df_train.astype(float)
df_train

In [None]:
# Saving list of one-hot-encoded labels
label_cols = classes = sorted(df_train.drop(columns=['filename','filepath','label'], errors='ignore').columns)
label_cols

In [None]:


# Combine label columns into single column
df_train.loc[:,'label'] = df_train[label_cols].apply(lambda x: x.idxmax(), axis=1)
display(df_train.head(2))

df_train['label'].value_counts(1).sort_index()

In [None]:
# Save prepend folder path to image filenames
train_img_dir = FPATHS['data']['train-images_dir']

# Save label lookup dictionary
label_lookup = {i:label for i,label in enumerate(classes)}
label_lookup

In [None]:
## Get the filepaths and labels
df_train = df_train.reset_index(drop=False)
df_train['filepath'] = df_train.loc[:,'filename'].astype(str).map(lambda x: os.path.join(train_img_dir, x)).values
filepaths = df_train['filepath']

labels = df_train[label_cols].astype(float).values
filepaths[0], labels[0]

In [None]:
def prepare_labels_df(csv_fpath, img_dir, return_label_lookup=False, 
                      save_processed_csv=False, processed_csv_fpath=None,
                      save_label_lookup=False, label_lookup_fpath=None):
    
    df = pd.read_csv(csv_fpath)
    
    df = df.convert_dtypes()
    
    # Save label columns
    label_cols = classes = sorted(df.drop(columns=['filename','filepath','label'], errors='ignore').columns)
    # df = df.set_index('filename')
    df[label_cols] = df[label_cols].astype(float)
    
    # Combine labels into single column for EDA
    df.loc[:,'label'] = df[label_cols].apply(lambda x: x.idxmax(), axis=1)
    
    # df = df.reset_index(drop=False)
    
    # Save prepend folder path to image filenames
    df['filepath'] = df.loc[:,'filename'].astype(str).map(lambda x: os.path.join(img_dir, x))
    
    
    if return_label_lookup | save_label_lookup:
        # Save label lookup dictionary
        label_lookup = {i:label for i,label in enumerate(classes)}
    
    if save_label_lookup:
        with open(label_lookup_fpath, "w") as f:
            json.dump(label_lookup, f)
            print(f"\n[i] Saved label lookup to {label_lookup_fpath}")

    
    if save_processed_csv:
        # Save processed csv
        df.to_csv(processed_csv_fpath, index=False)
        print(f"\n[i] Saved processed csv to {processed_csv_fpath}")
    
    if return_label_lookup:
        print("- DataFrame and label lookup dictionary returned.")
        return df, label_lookup
    
    else:
        print("- DataFrame only returned.")
        return df

In [None]:
# Load training data and labels
df_train, label_lookup = prepare_labels_df(FPATHS['data']['train-labels_csv'], 
                             FPATHS['data']['train-images_dir'], 
                             save_label_lookup=True, label_lookup_fpath=FPATHS['modeling']['label-lookup_json'],
                             save_processed_csv=True, processed_csv_fpath=FPATHS['data']['train-labels_processed_csv'],
                             return_label_lookup=True)
df_train.head(2)
label_lookup

In [None]:
# Load training data and labels
df_test = prepare_labels_df(FPATHS['data']['test-labels_csv'], 
                             FPATHS['data']['test-images_dir'], 
                             save_label_lookup=False, 
                             save_processed_csv=True, processed_csv_fpath=FPATHS['data']['test-labels_processed_csv'],
                             return_label_lookup=False)
display(df_test.head(2))

In [None]:
# Combined dataframe of filepaths and labels
# df_

In [None]:
# files_exist = np.array([os.path.exists(f) for f in filepaths])
# files_exist.all()

In [None]:
# # Load an image and its label with keras utils
# filepaths = df_train['filepath']
# print(f"Letter: {label_lookup[np.argmax(labels[0])]}")
# display(load_img(filepaths[0]))

In [None]:
# Function to load and preprocess images for dataset
def load_image(filename, label, img_height=128, img_width=128):
    img = tf.io.read_file(filename)
    # img = tf.image.decode_image(img, channels=3)
    img = tf.image.decode_image(img, channels=3, expand_animations=False)
    img.set_shape([None, None, 3])  # Explicitly set the shape
    img = tf.image.resize(img, [img_height, img_width])
    # img = img / 255.0  # Normalize the image
    return img, label

In [None]:
# Dispay an example image
i = 0
# filepaths = df_train['filepath']
# labels = df_train[label_cols].astype(float).values

def preview_image(filepaths, labels, i, label_lookup, title=""):
    ex_img, ex_label = load_image(filepaths[i], labels[i])
    
    print("\n[i] Preview Image"+ title)
    print(f"- Label (Category): {label_lookup[np.argmax(ex_label)]}")
    print(f"- Label (OHE): {ex_label}")

    display(array_to_img(ex_img))
    print(f"- Image Shape: {ex_img.shape}")
    
    
    
# Preview an example from training and testing data
preview_image(filepaths=df_train['filepath'].values, labels=df_train[label_cols].values, i=0, label_lookup=label_lookup,
              title=" (Training Data)")

preview_image(filepaths=df_test['filepath'].values, labels=df_test[label_cols].values, i=0,label_lookup=label_lookup,
              title=" (Testing Data)")

### EDA

In [None]:
# eda_df = df_train[['filepath', 'label']]
# eda_df

In [None]:
def plot_label_distribution(eda_df, x='label', title="Distribution of Labels (Training Data)", xlabel="Letter", ylabel="Count",
                            label_lookup=None, save_path=None):
    if label_lookup is None:
        classes = sorted(eda_df[x].unique())
    else:
        classes = sorted(label_lookup.values())
    
    # Plot Distrubtion of Labels in Training Data
    fig, ax = plt.subplots(figsize=(10, 6))
    ax = sns.countplot(data=eda_df, x=x,order=classes,#label_lookup.values(),
                hue=x, dodge=False,palette=sns.color_palette("icefire",n_colors=len(classes)),
                ax=ax)
    ax.set(title=title, xlabel=xlabel, ylabel=ylabel)

    ax.spines['top'].set_visible(False)
    ax.spines['right'].set_visible(False)
    
    if save_path is not None:
        fig.savefig(save_path, dpi=300, bbox_inches='tight', transparent=False)
        
    return fig, ax
        
fig, ax = plot_label_distribution(df_train, title="Distribution of Labels (Training Data)", save_path=FPATHS['eda']['label-distrubtion-countplot_png'],
                                  label_lookup=label_lookup)
fig, ax = plot_label_distribution(df_test, title="Distribution of Labels (Test Data)", save_path=FPATHS['eda']['test-labels-distrubtion_png'],
                                  
                                  label_lookup=label_lookup)
# FPATHS['eda']['label-distrubtion-countplot_png']

#### Display Example of Each

In [None]:
### Plot example of each letter
import os
# os.makedirs("images", exist_ok=True)

def plot_example_images(eda_df,label_col='label',fpath_col = "filepath", ncols = 6,figsize=(15,15),
                        save_path=None, suptitle=None, suptitle_y=1.02, suptitle_fontsize=16):
    # Save labels and determine rows
    unique_labels = sorted(eda_df[label_col].unique())
    nrows = len(unique_labels)//ncols + 1
    
    
    ## Create figure and flatten axes
    fig, axes = plt.subplots(ncols=ncols, nrows=nrows, figsize=figsize)
    axes = axes.flatten()
    
    # Plot example of each
    for i, label in enumerate(unique_labels):
        # Selet random example of label
        fpath = eda_df.loc[ eda_df[label_col]==label,fpath_col].sample(1).values[0]
        
        # Load and plot the iamge
        loaded = plt.imread(fpath)
        axes[i].imshow(loaded)
        axes[i].set_title(label)
        axes[i].axis('off')
        
    # Remove unused axes    
    axes_labels_diff =  len(axes) - len(unique_labels)
    if axes_labels_diff>0:
        for ax in axes[-axes_labels_diff:]:
            fig.delaxes(ax=ax)   
    
    # Tweak layout
    fig.tight_layout()
    if suptitle is not None:
        fig.suptitle(suptitle, y=suptitle_y, fontsize=suptitle_fontsize)
    
    if save_path is not None:
        fig.savefig(save_path, dpi=300, bbox_inches='tight', transparent=False)
        
    return fig, axes


fig, axes = plot_example_images(df_train, save_path=FPATHS['eda']['example-images_png'],
                                suptitle="Example Images of Each Letter in Training Data")

### Construct Train/Test/Val Tensorflow Datasets

In [None]:
len(df_train), len(df_test)

In [None]:
# load_image(image_paths[0], labels[0])

filepaths = df_train['filepath'].values
labels = df_train[label_cols].values
dataset = tf.data.Dataset.from_tensor_slices((filepaths, labels))

# Shuffle and batch the dataset
dataset = dataset.shuffle(buffer_size=len(dataset), reshuffle_each_iteration=False)

dataset.take(1).get_single_element()

In [None]:
tf.data.experimental.AUTOTUNE

In [None]:
## Map the load_image function to the dataseta
dataset = dataset.map(lambda x,y: load_image(x,y),
                      num_parallel_calls=tf.data.experimental.AUTOTUNE)
dataset.take(1).get_single_element()

In [None]:
# Determine split sizes
total_size = len(dataset)
train_size = int(TRAIN_SPLIT * total_size)
val_size = int(VAL_SPLIT * total_size)
test_size = total_size - train_size - val_size
print(f"{train_size=}, {test_size=}, {val_size=}")


In [None]:
# Split the dataset
train_dataset = dataset.take(train_size)
val_dataset = dataset.skip(train_size).take(val_size)
test_dataset = dataset.skip(train_size + val_size)

# Cache the datset for faster access
train_dataset = train_dataset.cache()
val_dataset = val_dataset.cache()
test_dataset = test_dataset.cache() 


In [None]:
# Batch and prefetch the datasets
train_dataset = train_dataset.batch(BATCH_SIZE).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
val_dataset = val_dataset.batch(BATCH_SIZE).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
test_dataset = test_dataset.batch(BATCH_SIZE).prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

# Shuffle the trainin data
train_dataset = train_dataset.shuffle(buffer_size=train_dataset.cardinality(), 
                                      reshuffle_each_iteration=True) # DOUBLE CHECK BATCH_SIZE * 8


In [None]:

# Use the datasets
for images, labels in train_dataset.take(1):
    print(f"Train batch - images: {images.shape}, labels: {labels.shape}")
    
for images, labels in val_dataset.take(1):
    print(f"Val batch - images: {images.shape}, labels: {labels.shape}")
    
    
for images, labels in test_dataset.take(1):
    print(f"Test batch - images: {images.shape}, labels: {labels.shape}")
    

### Baseline Model (From towardsdatascience blog)
- from https://towardsdatascience.com/sign-language-recognition-with-advanced-computer-vision-7b74f20f3442


In [None]:
# Moedl from https://towardsdatascience.com/sign-language-recognition-with-advanced-computer-vision-7b74f20f3442
# from tensorflow
def make_model(name='towards-data-science',show_summary=False, use_schedule=False):
    model = models.Sequential(name=name)
    model.add(layers.Rescaling(1./255 , input_shape = (IMG_HEIGHT,IMG_WIDTH,3)))
    
    model.add(layers.Conv2D(75 , (3,3) , strides = 1 , padding = 'same' , activation = 'relu' ))#, input_shape = (28,28,1)))
    
    model.add(layers.BatchNormalization())
    
    model.add(layers.MaxPool2D((2,2) , strides = 2 , padding = 'same'))
    model.add(layers.Conv2D(50 , (3,3) , strides = 1 , padding = 'same' , activation = 'relu'))
    model.add(layers.Dropout(0.2))
    
    model.add(layers.BatchNormalization())
    
    model.add(layers.MaxPool2D((2,2) , strides = 2 , padding = 'same'))
    model.add(layers.Conv2D(25 , (3,3) , strides = 1 , padding = 'same' , activation = 'relu'))
    
    model.add(layers.BatchNormalization())
    
    model.add(layers.MaxPool2D((2,2) , strides = 2 , padding = 'same'))
    
    # Final layers
    model.add(layers.Flatten())
    model.add(layers.Dense(units = 512 , activation = 'relu'))
    model.add(layers.Dropout(0.3))
    model.add(layers.Dense(units = len(label_lookup   ) , activation = 'softmax'))
    
    
    ## JMI:
    if use_schedule:
        lr_schedule = optimizers.schedules.ExponentialDecay(
            initial_learning_rate=0.01, decay_steps=10000, decay_rate=0.95
        )  # 0.9)
        optimizer = optimizers.legacy.Adam(learning_rate=lr_schedule)
    else:
        optimizer = optimizers.legacy.Adam()#learning_rate=0.01)
        
    model.compile(optimizer=optimizer, 
                  loss=tf.keras.losses.CategoricalCrossentropy(),
                  metrics=['accuracy'])
    # model.compile(optimizer = 'adam' , loss = 'categorical_crossentropy' , metrics = ['accuracy'])
    if show_summary:
        model.summary()
    return model


# Demonstrate model architecture
model = make_model(show_summary=True)

#### `def get_callbacks`

In [None]:


def get_callbacks(monitor='val_accuracy', patience=PATIENCE, #15,
                  start_from_epoch=3, restore_best_weights=False):
    """
    Returns a list of callbacks for training a model.

    Parameters:
    - monitor (str): The metric to monitor. Default is 'val_accuracy'.
    - patience (int): The number of epochs with no improvement after which training will be stopped. Default is 15.
    - start_from_epoch (int): The epoch from which to start counting the patience. Default is 3.
    - restore_best_weights (bool): Whether to restore the weights of the best epoch. Default is False.

    Returns:
    - callbacks (list): A list of callbacks to be used during model training.
    """
    early_stopping = tf.keras.callbacks.EarlyStopping(patience=patience,start_from_epoch=start_from_epoch,
                                                      monitor=monitor,
                                                      restore_best_weights=restore_best_weights, verbose=1)
    return [early_stopping]




### Define updated evaluation functions

> With 26 classes, it is difficult to scan the performance for each class visually. Adding code to convert results to a datafarme and use pandas styling to visualize

- added new `get_results_df` to custom_functions

###  New Custom Eval Function: 
- `custom_evaluate_classification_network` (for Notebook use only)

In [None]:
def custom_evaluate_classification_network(model, X_test, history=None, figsize=(15,15), target_names=None,
                                            #  as_frame=True, 
                                             frame_include_macro_avg=True, frame_include_support=False,
                                             display_bar=False, bar_subset_cols = ['recall','precision','f1-score'], 
                                             conf_matrix_text_kws={'fontsize': 'x-small'},
                                             return_figs= True, return_str_report=True):
    """
    Evaluate a classification model on a test dataset.

    Parameters:
    - model: The trained classification model.
    - X_test: The test dataset.
    - history: The training history of the model (optional).
    - figsize: The size of the figure for plotting the evaluation results (default: (15, 15)).
    - target_names: The names of the target classes (default: None).
    - as_frame: Whether to return the evaluation results as a pandas DataFrame (default: True).
    - frame_include_macro_avg: Whether to include macro average metrics in the DataFrame (default: False).
    - frame_include_support: Whether to include support values in the DataFrame (default: False).
    - display_bar: Whether to display the evaluation results as a styled bar chart (default: True).

    Returns:
    - results_dict: A dictionary containing the evaluation results.
    """
    if target_names is None:
        # label_lookup is in the global scope
        target_names = label_lookup.values()
        
    results_dict = cf.evaluate_classification_network(model,
                                                      X_test=X_test,history=history, figsize=figsize,
                                                # Set output to produce a dataframe (no option)
                                                  output_dict=True, as_frame=True,
                                                  target_names=target_names,
                                                  return_fig_conf_matrix=return_figs,
                                                  return_fig_history=return_figs,
                                                    frame_include_macro_avg=frame_include_macro_avg, 
                                                    frame_include_support=frame_include_support,
                                                    values_format=".2f",
                                                    conf_matrix_text_kws=conf_matrix_text_kws,
                                                    return_str_report=return_str_report)
    
    if isinstance(results_dict, tuple):
        results_dict, fig_dict = results_dict

        
        if isinstance(results_dict, dict):
            class_results = results_dict['test']['results-classes']
            overall_results = results_dict['test']['results-overall']
        else:
            class_results = results_dict['test']['results-classes']
            overall_results = None
            # print(results_dict)
    else:
        raise Exception("Results dict not a tuple")
    # elif isinstance(results_dict, dict):
    #     class_results = results_dict['test']['results-classes']
    #     overall_results = results_dict['test']['results-overall']:
        
    return_list = [results_dict]
    
    if display_bar:
        try:
            plot_data = results_dict['test']['results-classes']
            display(plot_data.style.bar(subset=bar_subset_cols, color='#5fba7d').format(formatter= lambda x: f"{x:.2f}").set_caption("Test Data"))
        except:
            display(results_dict)
        
    if return_figs:
        return results_dict, fig_dict
    return results_dict

> 📌BOOKMARK: Controlling Text Size on Confusion Matrix 

In [None]:
# # # TEST CODE (Must run modeling below first)
# results = custom_evaluate_classification_network(model,X_test=test_dataset, history=history, figsize=(15,15),
#                                                  target_names=label_lookup.values(),display_bar=True,);

## Model 1 (with New Function)

In [None]:
## Show model architecture
model = make_model(show_summary=True, use_schedule=False)

### Logging

In [None]:
# Set up logging
import logging
import time
import datetime as dt

def initialize_logs(log_file = FPATHS['config']['log_fpath'], overwrite_logs=True,
                    log_header = "start_time,name,fit_time,metrics"):
    # #add deleting log file
    if overwrite_logs==True:
        filemode = "w"
        force=True
    else:
        filemode = 'a'
        force = False
        
    logging.basicConfig(filename=log_file, level=logging.INFO, filemode=filemode,force=force)#, format='%(message)s')
    loggin.info(log_header)

initialize_logs(log_file= FPATHS['config']['log_fpath'], overwrite_logs=OVERWRITE_LOGS)
# Function to log neural network details
def log_nn_details(start_time, name, fit_time, results_overall):
    
    metrics = results_overall.loc['macro avg'].to_dict()
    # Add date recorded 
    # date_recorded = dt.datetime.now()
    info = f"{start_time.strftime('%m/%d/%Y %T')},{name},{fit_time},{metrics}"
    # for metric,value in metrics.items():
    #     # info += f",{metric}:{value} "
    #     info += f",{value:.2f}"
    info+="\n"
    logging.info(info)
    

def fit_log_model(model, train_dataset, val_dataset, test_dataset, epochs=EPOCHS, patience=PATIENCE,fit_kws= {}, callback_kws={}):
    # Save start time
    
    start_time = dt.datetime.now()
    
    callbacks = get_callbacks(patience=patience, **callback_kws)
    history = model.fit(train_dataset, validation_data=val_dataset, epochs=EPOCHS, verbose=1,
                        callbacks=callbacks, **fit_kws)
    fit_time = dt.datetime.now() - start_time

    results_frames, results_figs = custom_evaluate_classification_network(model,X_test=test_dataset, history=history, figsize=(15,15),
                                                 target_names=label_lookup.values(),display_bar=True,
                                                 frame_include_macro_avg=True,frame_include_support=False);
    print(f"{type(results_frames)=}, {type(results_figs)=}")
    
    if isinstance(results_frames, dict):
        class_results = results_frames['test']['results-classes']
        overall_results = results_frames['test']['results-overall']
    else:
        class_results = results_frames
        overall_results = None
        
    if overall_results is not None:
        print("\nOverall Results:")
        display(overall_results)
        log_nn_details(start_time, model.name, fit_time, overall_results)
    
    return dict(model=model, history=history, results_classes =class_results,
                results_overall= overall_results, result_figs=results_figs)
    

In [None]:
%%time
# Baseline model
model = make_model(show_summary=False, use_schedule=False)
model_results = fit_log_model(model, train_dataset, val_dataset, test_dataset, epochs=EPOCHS, patience=PATIENCE)

# history = model.fit(train_dataset, epochs = EPOCHS,
#                     validation_data = val_dataset, callbacks=get_callbacks())
# # results_dict = cf.evaluate_classification_network(model,X_test=test_dataset,history=history, figsize=(15,15),
# #                                                   output_dict=True, target_names=label_lookup.values(),
# #                                                   as_frame=True,
# #                                                   frame_include_macro_avg=False,frame_include_support=False)
# results, result_figs = custom_evaluate_classification_network(model,X_test=test_dataset, history=history, figsize=(15,15),
#                                                  target_names=label_lookup.values(),display_bar=True,);

## Bookmark: 06/01 - Parsing and Displaying/Saving DataFrame Logs

In [None]:
# # Step 3: Parse the log file
# def parse_log_file(log_file):
#     data = []
#     with open(log_file, 'r') as file:
#         for line in file:
#             name, fit_time, metrics = line.strip().split(',', 2)
#             metrics = eval(metrics)  # Convert string representation of dictionary to dictionary
#             data.append([
#                 name, 
#                 float(fit_time), 
#                 metrics['accuracy'], 
#                 metrics['precision'], 
#                 metrics['recall'], 
#                 metrics['f1_score']
#             ])
            

#     return data


In [None]:

# Testing log file parsing
with open(log_file, 'r') as file:
    log_lines = file.readlines()
    # file.seek(0)
    # log = file.read()


    # for line in file:
    #     print(line.strip())
# print(log)
log_lines

In [None]:
def parse_log_file(log_file):
    data = []
    with open(log_file, 'r') as file:
        log_lines= file.readlines()

    for line in log_lines:
        start_time, name, fit_time, metrics = line.strip().split(',')#, 2)
        metrics = eval(metrics)  # Convert string representation of dictionary to dictionary
        data.append([start_time, name,fit_time, metrics 
        ])
    return data
        


In [None]:
log_data =parse_log_file(log_file)
log_data

In [None]:
raise Exception("Stop here and verify")

In [None]:

# Step 4: Display the information as a table
def display_as_table(data):
    df = pd.DataFrame(data, columns=["Name", "Fit Time (s)", "Accuracy"])
    print(df)

In [None]:
log.split(',')

In [None]:
log_data = parse_log_file(FPATHS['config']['log_fpath'])
log_data

In [None]:
raise Exception("inspect logs above")

In [None]:
# results_frames, results_figs = custom_evaluate_classification_network(model,X_test=test_dataset, history=history, figsize=(15,15),
#                                                  target_names=label_lookup.values(),display_bar=True,);
# if len(results_frames) > 1:
#     class_results, overall_results = results_frames
# results_figs

### ✅ To Do:
- Define a save_results function to use the filepaths in FPATHS to save the results dict

In [None]:
print(f"{type(results_frames)=}, {len(results_frames)=}")

In [None]:
results_frames['test'].keys()

In [None]:
results_frames['test']['results-classes']

In [None]:
results_frames['test']['results-overall']

In [None]:
# # save datasets
# train_dataset.save(FPATHS['modeling']['train-dataset_dir'])
# test_dataset.save(FPATHS['modeling']['test-dataset_dir'])
# val_dataset.save(FPATHS['modeling']['val-dataset_dir'])

In [None]:
# model.save?

In [None]:
FPATHS['modeling']['best_cnn_fpaths']

In [None]:
fig

### Writing Save Function:

- Notes:
    - Need to save:
        - `figs_dict`:
            - 

In [None]:

# try:
    
#     # Save the model
#     model.save(FPATHS['modeling']['best_model_cnn_fpaths']['model_dir'], save_format='tf')

# except Exception as e:
#     display(f"[i] Error saving model: {e}")


In [None]:
FPATHS['modeling']['best_cnn_fpaths']

In [None]:
print('Results dict:')
print(results.keys())
print(results['test'].keys())


print('\nFigs Dict')
print(figs.keys())
print(figs['test'].keys())

def save_results(figs,model_key=None,FPATHS=None, results=None): 
                #  history_fpath=None, conf_matrix_fpath_train=None, 
                #  conf_matrix_fpath_test=None,
                #  classification_report_fpath=None # not used yet
                #  ):
    
    # if (model_key is not None) & (FPATHS is not None):
    model_fpaths = FPATHS['modeling'][model_key]
    history_fpath = model_fpaths.get('model_history_png',None)
    conf_matrix_fpath_train = model_fpaths.get('model_confusion_matrix',None)
    conf_matrix_fpath_test = model_fpaths.get('model_confusion_matrix',None)
    
    if 'history' in figs.keys():
        if history_fpath is None:
            history_fpath = FPATHS['results']['best_model_cnn_history']
        figs['history'].savefig(history_fpath, dpi=300, bbox_inches='tight', transparent=False)
        print(f"\n[i] Saved history plot to {history_fpath}")

    for split in ['train','test']:
        if split in figs:
            figs_split = figs[split]
            
            if 'confusion_matrix' in figs_split.keys():
                conf_matrix_fpath_train
            # if conf_matrix_fpath_train is None:
                # conf_matrix_fpath_train = FPATHS['results']['best_model_cnn_confusion_matrix']
                    
                figs['confusion_matrix'].savefig(conf_matrix_fpath_train, dpi=300, bbox_inches='tight', transparent=False)
                print(f"\n[i] Saved {split} confusion matrix plot to {conf_matrix_fpath_test}") 
                
                   

In [None]:
# save_results(figs, history_fpath=FPATHS['results']['best_model_cnn_classification_report'], conf_matrix_fpath_train=None, 
#                  conf_matrix_fpath_test=None,
#                  classification_report_fpath=None # not used yet
#                  )
    

In [None]:
# raise Exception("finish writing save_results function above.")
# if OVERRIDE_TESTING:
#     raise Exception("finish testing custom_evaluate_classification_network in eval function above.")
# # raise Exception("finish testing conf_matrix_text_kws in eval function above.")

### Model 1-LR: Adding LR Scheduling to Model 1

In [None]:
%%time
# Baseline model
model = make_model(show_summary=False, 
                   use_schedule=True # Adding learning rate scheduling
                   )
history = model.fit(train_dataset,epochs = EPOCHS ,validation_data = val_dataset, callbacks=get_callbacks())

results = custom_evaluate_classification_network(model,X_test=test_dataset,history=history, figsize=(15,15),
                                                 target_names=label_lookup.values(),display_bar=True);


## Model2 (Custom)

In [None]:

def make_model2(name='CNN1',show_summary=False,use_schedule=False):
    
    model = models.Sequential(name=name)
    # Using rescaling layer to scale pixel values
    model.add(layers.Rescaling(1./255 , input_shape = (IMG_HEIGHT,IMG_WIDTH,3)))
    
    # Convolutional layer
    model.add(
        layers.Conv2D(
            filters=16,  # How many filters you want to use
            kernel_size=3, # size of each filter
            # input_shape=input_shape,
            padding='same')) 
    # Pooling layer
    model.add(layers.MaxPooling2D(pool_size=2))  # Size of pooling


    # Convolutional layer
    model.add(
        layers.Conv2D(
            filters=32,#64,  # How many filters you want to use
            kernel_size=3,  # size of each filter
            # input_shape=input_shape,
            padding='same')) 
    # Pooling layer
    model.add(layers.MaxPooling2D(pool_size=2))  # Size of pooling
    
    # Flattening layer
    model.add(layers.Flatten())
    # Output layer
    model.add(
        layers.Dense(len(label_lookup), activation="softmax") )  
    
        
    ## JMI:
    if use_schedule:
        lr_schedule = optimizers.schedules.ExponentialDecay(
            initial_learning_rate=0.01, decay_steps=10000, decay_rate=0.95
        )  # 0.9)
        optimizer = optimizers.legacy.Adam(learning_rate=lr_schedule)
    else:
        optimizer = optimizers.legacy.Adam()#learning_rate=0.01)
    
    # ## Adding learning rate decay
    # lr_schedule = optimizers.schedules.ExponentialDecay(
    #     initial_learning_rate=0.01, decay_steps=10000, decay_rate=0.95
    # )  # 0.9)
    # optimizer = optimizers.legacy.Adam(learning_rate=lr_schedule)
    
    model.compile(optimizer=optimizer, 
                  loss=tf.keras.losses.CategoricalCrossentropy(),
                  metrics=['accuracy'])
    if show_summary:
        model.summary()
    return model

In [None]:
## Show model architecture
model2 = make_model2(name="cnn1-fixed-lr", show_summary=True, use_schedule=False)


In [None]:
%%time
model2 = make_model2(name="cnn1-fixed-lr", show_summary=False, use_schedule=False)
history2 = model2.fit(train_dataset, epochs = EPOCHS ,validation_data = val_dataset, callbacks=get_callbacks())
# results_dict = cf.evaluate_classification_network(model2,X_test=test_dataset,history=history2, figsize=(15,15), 
# output_dict=True, target_names=label_lookup.values())
# results_dict.keys()
results_dict = custom_evaluate_classification_network(model2,X_test=test_dataset,history=history2, figsize=(20,20), 
                                                      target_names=label_lookup.values(),
                                                      frame_include_macro_avg=False, frame_include_support=False,
                                                      display_bar=True)

### Adding LR Scheduling with Exponential Decay

In [None]:
%%time
model2_lr = make_model2(use_schedule=True, show_summary=False, name="cnn1-scheduled-lr")
history_lr = model2_lr.fit(train_dataset,epochs = EPOCHS ,validation_data = val_dataset, callbacks=get_callbacks())
# results_dict = cf.evaluate_classification_network(model2,X_test=test_dataset,history=history2, figsize=(15,15), output_dict=True, target_names=label_lookup.values())
# results_dict.keys()
results_dict = custom_evaluate_classification_network(model2_lr,X_test=test_dataset,history=history_lr, figsize=(20,20), 
                                                      target_names=label_lookup.values(),
                                                      frame_include_macro_avg=False, frame_include_support=False,
                                                      display_bar=True)

### Saving Best Non-Transfer Learning Model

## Transfer Learning



| Model             |   Size (MB) | Top-1 Accuracy   | Top-5 Accuracy   | Parameters   | Depth   | Time (ms) per inference step (CPU)   | Time (ms) per inference step (GPU)   |
|:------------------|------------:|:-----------------|:-----------------|:-------------|:--------|:-------------------------------------|:-------------------------------------|
| **VGG16**             |      528    | 71.3%            | 90.1%            | 138.4M       | 16      | 69.5                                 | 4.2                                  |
| **EfficientNetB0**    |       29    | 77.1%            | 93.3%            | 5.3M         | 132     | 46.0                                 | 4.9                                  |
| **InceptionV3**       |       92    | 77.9%            | 93.7%            | 23.9M        | 189     | 42.2                                 | 6.9                                  |

*Excerpt from Source: "https://keras.io/api/applications/"*

In [None]:
input_shape = (IMG_HEIGHT,IMG_WIDTH,3)
input_shape

### VGG16

In [None]:
# Downloading just the convolutional base
vgg16_base = tf.keras.applications.VGG16(
    include_top=False, weights="imagenet", input_shape=input_shape
)
# Prevent layers from base_model from changing 
vgg16_base.trainable = False

# Create the preprocessing lamdba layer
# Create a lambda layer for the preprocess input function for the model
lambda_layer_vgg16 = tf.keras.layers.Lambda(
    tf.keras.applications.vgg16.preprocess_input, name="preprocess_input"
)



def make_vgg16_model(show_summary=False):
    model = models.Sequential(name="VGG16")
    # Use input layer (lambda layer will handle rescaling).
    model.add(tf.keras.layers.Input(shape=input_shape))

    ## Adding preprocessing lamabda layer
    model.add(lambda_layer_vgg16)

    # Add pretrained base
    model.add(vgg16_base)

    # Flattening layer
    model.add(layers.Flatten())

    ## Adding a Hidden Dense Layer
    model.add(layers.Dense(256, activation="relu"))
    model.add(layers.Dropout(0.5))

    # Output layer
    model.add(layers.Dense(len(label_lookup.values()), activation="softmax"))

    model.compile(
        optimizer=tf.keras.optimizers.legacy.Adam(),
        # loss=tf.keras.losses.BinaryCrossentropy(),
        loss = tf.keras.losses.CategoricalCrossentropy(),
        metrics=["accuracy"],
    )
    
    if show_summary:
        model.summary()
        
    return model


In [None]:
%%time
# Baseline model
model_vgg = make_vgg16_model(show_summary=False, 
                
                   )
history = model_vgg.fit(train_dataset,epochs = EPOCHS ,validation_data = val_dataset, callbacks=get_callbacks())

results = custom_evaluate_classification_network(model_vgg,X_test=test_dataset,history=history, figsize=(15,15),
                                                 target_names=label_lookup.values(),display_bar=True);


In [None]:
# raise Exception('not ready for below')

### EfficientNet

In [None]:
# Download EfficientNet base
efficientnet_base =tf.keras.applications.EfficientNetB0(include_top=False, 
                                                       input_shape=input_shape)
efficientnet_base.summary()

In [None]:

# Make it not-trainable
efficientnet_base.trainable=False

# add preprocessing lambda layer
lambda_layer_efficient = tf.keras.layers.Lambda(tf.keras.applications.efficientnet.preprocess_input, 
                                      name='preprocess_input_enet')

def make_efficientnet_model(show_summary=True):
    model = models.Sequential(name="EfficientNetB0")
    # Use input layer (lambda layer will handle rescaling).
    model.add(tf.keras.layers.Input(shape=input_shape))

    ## Adding preprocessing lamabda layer
    model.add(lambda_layer_efficient)

    # Add pretrained base
    model.add(efficientnet_base)

    # Flattening layer
    model.add(layers.Flatten())

    ## Adding a Hidden Dense Layer
    model.add(layers.Dense(256, activation="relu"))
    model.add(layers.Dropout(0.5))

    # Output layer
    model.add(layers.Dense(len(label_lookup.values()), activation="softmax"))

    model.compile(
        optimizer=tf.keras.optimizers.legacy.Adam(),
        loss=tf.keras.losses.CategoricalCrossentropy(),
        metrics=["accuracy"],
    )
    
    if show_summary:
        model.summary()
    return model

# vk.layered_view(efficientnet_base, legend=True)

In [None]:
# model_eff = make_efficientnet_model(show_summary=True)

In [None]:
%%time
# Baseline model
model_eff = make_efficientnet_model(show_summary=False)
history = model_eff.fit(train_dataset,epochs = EPOCHS ,validation_data = val_dataset, callbacks=get_callbacks())

results = custom_evaluate_classification_network(model_eff,X_test=test_dataset,history=history, figsize=(15,15),
                                                 target_names=label_lookup.values(),display_bar=True);


In [None]:
# model_eff.save(filepath=FPATHS['modeling']['best_transfer_fpaths']['model_dir'], save_format='tf')

### Full EfficientNet

> 🚨 Need to change IMG_HEIGHT, IMG_WIDTH to 224 to use top of efficientnet

In [None]:
# # Download EfficientNet base
# efficientnet_full =tf.keras.applications.EfficientNetB0(include_top=True, 
#                                                        input_shape=input_shape)
# efficientnet_full.summary()

## To Do: Keras Tuner

In [None]:
# ## Fit and evaluate model with custom function
# model2 = make_model2()
# history2 = model2.fit(train_dataset,epochs = 100 ,validation_data = val_dataset, callbacks=get_callbacks())
# results_dict = custom_evaluate_classification_network(model2,X_test=test_dataset,history=history2, figsize=(15,15), 
#                                                       target_names=label_lookup.values(),
#                                                       as_frame=True, frame_include_macro_avg=False, frame_include_support=False,
#                                                       display_bar=True)

## To Do: Add LimeExplanations

In [None]:
BEST_MODEL = model_eff   #None

#### Convert test data to numpy arrays

In [None]:
%%time
# timing WITH converting classes
y_test, y_hat_test, X_test = cf.get_true_pred_labels_images(BEST_MODEL,test_dataset,
                                                         convert_y_for_sklearn=True)
y_test[0], y_hat_test[0]

In [None]:
label_lookup

In [None]:
i = 10
y_test[i]

In [None]:
# select an image index to use/view
i = 10

# Show actual-sized image with keras
display(array_to_img(X_test[i]))
print(f"True Label: {label_lookup[y_test[i]]}")
print(f"Predicted: {label_lookup[y_hat_test[i]]}")

## LimeExplainer

### To Do:
- Fix the comparison images below 

In [None]:
raise Exception("Do not run below yet.")

In [None]:
from skimage.segmentation import mark_boundaries
from lime import lime_image

In [None]:
explainer = lime_image.LimeImageExplainer(verbose=False)#,random_state=321)
explainer

In [None]:
label= label_lookup[y_test[i]]
label

In [None]:
# Get the explanation object for the chosen
explanation = explainer.explain_instance(X_test[i], # Convert image values to ints    
                                         model.predict, # Prediction method/function
                                         top_labels=1, # How many of the labels to explain [?]
                                         hide_color=0, #
                                         num_samples=1000,
                                        )

In [None]:
# Stored original image
plt.imshow(explanation.image)#.astype(int));
plt.axis('off');

In [None]:
# Explanation split image into "segments"
plt.imshow(explanation.segments); 

In [None]:
# Unique Segments
np.unique(explanation.segments)

In [None]:
#pros and cons
temp, mask = explanation.get_image_and_mask(explanation.top_labels[0], 
                                            positive_only=True, 
                                            num_features=5, 
                                            hide_rest=True)
plt.imshow(mark_boundaries(temp, mask))
plt.axis('off')
plt.title('Segments that Positively Pushed Prediction');

In [None]:
#pros and cons
temp, mask = explanation.get_image_and_mask(explanation.top_labels[0], 
                                            negative_only=True, 
                                            positive_only=False,
                                            num_features=5, 
                                            hide_rest=True)
plt.imshow(mark_boundaries(temp, mask))
plt.axis('off')

In [None]:
#pros and cons
temp, mask = explanation.get_image_and_mask(explanation.top_labels[0], 
                                            negative_only=False, 
                                            positive_only=False,
                                            num_features=5, 
                                            hide_rest=False)
plt.imshow(mark_boundaries(temp, mask))
plt.axis('off')
plt.title(f'Segments that Pushed Prediction Towards (Green) or Away (Red) from {label}');

In [None]:
import matplotlib.pyplot as plt
from skimage.segmentation import mark_boundaries

from IPython.display import clear_output

def plot_comparison(main_image, img, mask):
    """Adapted from Source:
    https://coderzcolumn.com/tutorials/artificial-intelligence/lime-explain-keras-image-classification-network-predictions"""
    fig,axes = plt.subplots(ncols=4,figsize=(15,5))

    # show original image
    ax = axes[0]
    ax.imshow(main_image)#.astype(int))#, cmap="gray");
    ax.set_title("Original Image")
    ax.axis('off')

    ax =axes[1]
    ax.imshow(img)#.astype(int));
    ax.set_title("Image")
    ax.axis('off')
    
    ax = axes[2]
    ax.imshow(mask);
    ax.set_title("Mask")
    ax.axis('off')
    
    ax = axes[3]
    ax.imshow(mark_boundaries(img,
                              mask, color=(0,1,0)));
    ax.set_title("Image+Mask Combined");
    ax.axis('off')
    fig.tight_layout()
    

In [None]:
plot_comparison(X_test[i], temp, mask)

### Explaining an Incorrect Prediction