# Incremental learning part 2


*** In this notebook we train previously trained model, supposedly trained in `Incremental learning part 1.` notebook.***

In [None]:
import os
import cv2
import shutil
import numpy as np
import tensorflow as tf
import plotly.graph_objs as go
from tensorflow.keras.layers import Dense

from keras.models import load_model
from tensorflow.keras.layers import Input
from tensorflow.keras.applications import MobileNetV2, MobileNet, MobileNetV3Small, EfficientNetB0, EfficientNetB1, EfficientNetB3, EfficientNetB4, EfficientNetB7
from tensorflow.keras.layers import Dense, GlobalAveragePooling2D
from tensorflow.keras.models import Model
from keras.callbacks import ModelCheckpoint
from sklearn.metrics import confusion_matrix, accuracy_score, f1_score, precision_score, recall_score, roc_curve, auc

import plotly.io as pio

Setup pipeline veriables

In [None]:
SEED = 20
DRIVE_PATH = "/kaggle"
DATA_DIRECTORY = f"{DRIVE_PATH}/input/sneakers-recognition-dataset-v22/Sneakers_Recognition_DataSet"
SHOW_DATASET_STRUCTURE = False
SHOW_IMAGE_SHAPE_MEANS = False
ShOW_IMAGE_SHAPE_PLOT = False
DRAW_CLASS_DISTRIBUTION = False

# IMAGE_SIZE = (300, 300)

# IMAGE_SIZE = (244, 244)

# IMAGE_SIZE = (224, 224)


IMAGE_SIZE = (240, 240)
BATCH_SIZE = 32
EPOCHS = 15
NUMBER_LAYERS_TO_FREEZE = 0
MODEL_CLASS_CALLBACK = EfficientNetB1
SHOW_NUMBER_OF_LAYERS_IN_BASE_MODEL = True
SHOW_MODEL_SUMMARY = False

NUMBER_OF_CLASSES_OF_FIRST_MODEL = 30
NUMBER_OF_CLASSES_TO_TRAIN_MODEL_ON = 33

# LOAD_MODEL_NAME = f"IncrementalLearningPart1_{NUMBER_OF_CLASSES_OF_FIRST_MODEL}Classes_DATA_V2_{MODEL_CLASS_CALLBACK}_{NUMBER_LAYERS_TO_FREEZE}Frozen_{BATCH_SIZE}Batch_{IMAGE_SIZE[0]}x{IMAGE_SIZE[1]}Size_AugV2"
# LOAD_MODEL_DIR_PATH = f"/kaggle/input/incremental_models_efficientnetb1/tensorflow2/incremental_learning_part1_{NUMBER_OF_CLASSES_OF_FIRST_MODEL}classes/1/"
# LOAD_MODEL_PATH = f"{LOAD_MODEL_DIR_PATH}{LOAD_MODEL_NAME}.keras"

LOAD_MODEL_PATH = "/kaggle/input/efficientnetb1_30classes/tensorflow2/incremental_learning_part1/1/model.h5"

# Define the destination path where you want to copy the model
LOAD_MODEL_PATH_TO_USE = "/kaggle/working/copied_model.h5"


MODEL_SAVING_PATH = f"{DRIVE_PATH}/working/models/IncrementalLearningPart2_{NUMBER_OF_CLASSES_TO_TRAIN_MODEL_ON}Classes_DATA_V2_{MODEL_CLASS_CALLBACK}_{NUMBER_LAYERS_TO_FREEZE}Frozen_{BATCH_SIZE}Batch_{IMAGE_SIZE[0]}x{IMAGE_SIZE[1]}Size_AugV2"

In [None]:
def list_directories(root_dir):
    """
    Recursively lists all directories and subdirectories within the specified directory.

    Args:
    - root_dir (str): The directory to list.

    Returns:
    - directories (list): A list of directory paths.
    """
    directories = []
    for dirpath, dirnames, filenames in os.walk(root_dir):
        directories.append(dirpath)
        for dirname in dirnames:
            directories.append(os.path.join(dirpath, dirname))
    return directories


if SHOW_DATASET_STRUCTURE:
    directories_structure = list_directories(DATA_DIRECTORY)
    for directory in directories_structure:
        print(directory)


In [None]:
dirs_arr = list_directories(DATA_DIRECTORY)


all_classes = [path.split('/')[-1] for path in dirs_arr[1:]]
all_classes_sorted = sorted(list(set(all_classes)))
classes_for_datagen = all_classes_sorted[:NUMBER_OF_CLASSES_TO_TRAIN_MODEL_ON]
classes_for_datagen[:3], len(classes_for_datagen)

In [None]:
# Function to calculate mean dimensions of images in a directory
def calculate_mean_dimensions(directory):
    total_width = 0
    total_height = 0
    total_images = 0

    # Iterate through files in the directory
    for filename in os.listdir(directory):
        if filename.endswith(".jpg") or filename.endswith(".png"): # Add other formats if needed
            img_path = os.path.join(directory, filename)
            img = cv2.imread(img_path)
            if img is not None:
                total_width += img.shape[1]
                total_height += img.shape[0]
                total_images += 1

    # Calculate mean dimensions
    if total_images > 0:
        mean_width = total_width / total_images
        mean_height = total_height / total_images
        return mean_width, mean_height
    else:
        return 0, 0

if SHOW_IMAGE_SHAPE_MEANS:
    # Iterate through subdirectories
    for subdir in os.listdir(DATA_DIRECTORY):
        subdir_path = os.path.join(DATA_DIRECTORY, subdir)
        if os.path.isdir(subdir_path):
            mean_width, mean_height = calculate_mean_dimensions(subdir_path)
            print(f"Directory: {subdir}, Mean Width: {mean_width}, Mean Height: {mean_height}")


In [None]:
def collect_dimensions(directory):
    widths = []
    heights = []

    # Iterate through files in the directory
    for root, _, files in os.walk(directory):
        print(f"Gathering data from {root} directory")
        for filename in files:
            if filename.endswith(".jpg") or filename.endswith(".png"): # Add other formats if needed
                img_path = os.path.join(root, filename)
                img = cv2.imread(img_path)
                if img is not None:
                    widths.append(img.shape[1])
                    heights.append(img.shape[0])

    return widths, heights

def plot_dimension_distribution(widths, heights):
    # Create histograms
    width_hist = go.Histogram(x=widths, name='Width', marker_color='blue')
    height_hist = go.Histogram(x=heights, name='Height', marker_color='green')

    # Create figure
    fig = go.Figure(data=[width_hist, height_hist])

    # Update layout
    fig.update_layout(
        title='Image Dimension Distribution',
        xaxis=dict(title='Dimension'),
        yaxis=dict(title='Frequency'),
        barmode='overlay',
        bargap=0.1
    )

    # Show plot
    fig.show()

def visualize_distribution(directory):
    widths, heights = collect_dimensions(directory)
    plot_dimension_distribution(widths, heights)


if ShOW_IMAGE_SHAPE_PLOT:
  # Visualize distribution
  visualize_distribution(DATA_DIRECTORY)


In [None]:
# Create train and validation data generators
train_datagen = tf.keras.preprocessing.image.ImageDataGenerator(
    rescale=1./255,
    validation_split=0.2,
    rotation_range = 30,
    shear_range = 0.2,
    height_shift_range = 0.2,
    width_shift_range = 0.2,
    brightness_range = (0.2, 1.0),
    zoom_range = 0.2,
    horizontal_flip = True,
    fill_mode = 'nearest'
)

# Create train data generator
train_generator = train_datagen.flow_from_directory(
    DATA_DIRECTORY,
    target_size=IMAGE_SIZE,
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    subset='training',  # specify this as training data
    seed=SEED,
#     classes={v: i for i, v in enumerate(classes_for_datagen)}
#     classes={"Adidas_10050": 1}
    classes=classes_for_datagen  # specify the desired classes
)

# Create validation data generator
validation_generator = train_datagen.flow_from_directory(
    DATA_DIRECTORY,
    target_size=IMAGE_SIZE,
    batch_size=BATCH_SIZE,
    class_mode='categorical',
    subset='validation',  # specify this as validation data
    seed=SEED,
    classes=classes_for_datagen  # specify the desired classes
)

# The number of labels will be the length of the desired classes list
number_of_labels = len(classes_for_datagen)

In [None]:
# Check the number of classes in the train generator
train_class_indices = train_generator.class_indices
train_num_classes = len(train_class_indices)
print("Number of classes in train generator:", train_num_classes)

# Check the number of classes in the validation generator
validation_class_indices = validation_generator.class_indices
validation_num_classes = len(validation_class_indices)
print("Number of classes in validation generator:", validation_num_classes)


In [None]:
validation_generator.class_indices, train_generator.class_indices

In [None]:
len(validation_generator.class_indices.keys())

In [None]:
def draw_class_distribution_bar_plot(class_distribution, save_path):
    sorted_class_distribution = dict(sorted(class_distribution.items()))
    fig = go.Figure(data=[go.Bar(x=list(sorted_class_distribution.keys()), y=list(sorted_class_distribution.values()))])
    fig.update_layout(title='Class Distribution',
                      xaxis_title='Class',
                      yaxis_title='Count',
                      width=1000,
                      height=600)
    fig.write_html(save_path)  # Save the plot to the specified path as HTML
    fig.show()

# Example usage of drawing class distribution bar plot
def draw_class_distribution(generator, save_path):
    class_indices_mapping = generator.class_indices

    # Reverse the mapping to get class names from indices
    class_names = {v: k for k, v in class_indices_mapping.items()}
    
    class_distribution = {}
    for label in generator.labels:
        if label in class_distribution:
            class_distribution[label] += 1
        else:
            class_distribution[label] = 1

    draw_class_distribution_bar_plot(class_distribution, save_path)
    

if DRAW_CLASS_DISTRIBUTION:
    draw_class_distribution(train_generator, os.path.join(f"{DRIVE_PATH}/working", "class_distribution_train.html"))
    draw_class_distribution(validation_generator, os.path.join(f"{DRIVE_PATH}/working", "class_distribution_validation.html"))

In [None]:
# Copy the model to the destination
shutil.copy(LOAD_MODEL_PATH, LOAD_MODEL_PATH_TO_USE)

# Load the model from the destination path
model = tf.keras.models.load_model(LOAD_MODEL_PATH_TO_USE)

# Add a new output layer with the desired number of neurons and softmax activation
new_output_layer = Dense(NUMBER_OF_CLASSES_TO_TRAIN_MODEL_ON, activation='softmax')

# Add the new output layer to the model
model = Model(inputs=model.input, outputs=new_output_layer(model.layers[-2].output))

# Compile the model with an appropriate loss function, optimizer, and metrics
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])


In [None]:
if SHOW_MODEL_SUMMARY:
    model.summary()

In [None]:
from keras.callbacks import LearningRateScheduler

# Define the learning rate schedule function
def lr_schedule(epoch):
    initial_lr = 0.001
    if epoch is set([15, 25]):
        return initial_lr * 0.1
    return initial_lr

# Define the LearningRateScheduler callback
lr_scheduler = LearningRateScheduler(lr_schedule)

In [None]:
# Ensure the directory exists
os.makedirs(MODEL_SAVING_PATH, exist_ok=True)

# Define a callback to save the model weights after every epoch
checkpoint_filepath = MODEL_SAVING_PATH + "/model_weights_epoch_{epoch:02d}.weights.h5"
model_checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_filepath,
    save_weights_only=True,
    monitor='val_accuracy',
    mode='max',
    save_best_only=False,
    verbose=1
)

Train our model

In [None]:
# Train the model
history = model.fit(
    train_generator,
    validation_data=validation_generator,
    epochs=EPOCHS,
    callbacks=[model_checkpoint_callback, lr_scheduler]
)

In [None]:
# # Save the trained model
# model.save(f"{MODEL_SAVING_PATH}/model.h5")

In [None]:
# Get training and validation loss values
train_loss = history.history['loss']
val_loss = history.history['val_loss']

# Get training and validation accuracy values
train_acc = history.history['accuracy']
val_acc = history.history['val_accuracy']

# Create traces for loss
loss_trace = go.Scatter(x=list(range(1, len(train_loss) + 1)), y=train_loss, mode='lines', name='Training Loss')
val_loss_trace = go.Scatter(x=list(range(1, len(val_loss) + 1)), y=val_loss, mode='lines', name='Validation Loss')

# Create traces for accuracy
acc_trace = go.Scatter(x=list(range(1, len(train_acc) + 1)), y=train_acc, mode='lines', name='Training Accuracy')
val_acc_trace = go.Scatter(x=list(range(1, len(val_acc) + 1)), y=val_acc, mode='lines', name='Validation Accuracy')

# Create figure for loss
loss_fig = go.Figure(data=[loss_trace, val_loss_trace])
loss_fig.update_layout(title='Training and Validation Loss',
                       xaxis_title='Epoch',
                       yaxis_title='Loss')

# Create figure for accuracy
acc_fig = go.Figure(data=[acc_trace, val_acc_trace])
acc_fig.update_layout(title='Training and Validation Accuracy',
                      xaxis_title='Epoch',
                      yaxis_title='Accuracy')

# Save figures as HTML files
pio.write_html(loss_fig, f"{MODEL_SAVING_PATH}/loss_figure.html")
pio.write_html(acc_fig, f"{MODEL_SAVING_PATH}/accuracy_figure.html")


In [None]:
def get_true_and_predicted(generator, model):
    # Get true labels and predicted labels
    true_labels = []
    predicted_labels = []
    predicted_probabilities = []
    for i in range(len(generator)):
        batch = generator[i]
        true_labels.extend(np.argmax(batch[1], axis=1))  # Extract true labels from the batch
        predictions = model.predict(batch[0])
        predicted_labels.extend(np.argmax(predictions, axis=1))  # Extract predicted labels
        predicted_probabilities.extend(predictions)  # Extract predicted probabilities
    return true_labels, predicted_labels, predicted_probabilities

In [None]:
# Function to plot confusion matrix using Plotly
def plot_confusion_matrix(conf_matrix, class_names, title, save_path=None):
    trace = go.Heatmap(z=np.flipud(conf_matrix),  # Reverse the y-axis
                       x=class_names,
                       y=class_names[::-1],  # Reverse the y-axis labels
                       colorscale='Blues',
                       colorbar=dict(title='Count'),
                       )
    layout = go.Layout(title=title,
                       xaxis=dict(title='Predicted labels'),
                       yaxis=dict(title='True labels'),
                       width=1000,  # Specify width of the plot
                       height=1000,  # Specify height of the plot
#                        legend=dict(title='Legend'),  # Include legend
                       )
    fig = go.Figure(data=[trace], layout=layout)
    if save_path:
        fig.write_html(save_path)  # Save the plot to the specified path as HTML
    fig.show()

# Function to calculate metrics and plot ROC-AUC curve
def calculate_metrics_and_plot_roc(true_labels, predicted_probabilities, class_names, title, save_path=None):
    # Calculate accuracy
    accuracy = accuracy_score(true_labels, predicted_probabilities.argmax(axis=1))

    # Calculate top-5 accuracy
    top5_accuracy = top_k_accuracy(true_labels, predicted_probabilities, k=5)

    # Calculate F1 score
    f1 = f1_score(true_labels, predicted_probabilities.argmax(axis=1), average='macro')

    # Calculate precision
    precision = precision_score(true_labels, predicted_probabilities.argmax(axis=1), average='macro')

    # Calculate recall
    recall = recall_score(true_labels, predicted_probabilities.argmax(axis=1), average='macro')

    # Plot ROC-AUC curve
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    for i in range(len(class_names)):
        fpr[i], tpr[i], _ = roc_curve(true_labels == i, predicted_probabilities[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])

    # Plot ROC-AUC curve
    fig_roc = go.Figure()
    for i in range(len(class_names)):
        fig_roc.add_trace(go.Scatter(x=fpr[i], y=tpr[i], mode='lines', name=f'{class_names[i]} (AUC = {roc_auc[i]:0.2f})'))
    fig_roc.update_layout(title='ROC Curve',
                          xaxis_title='False Positive Rate',
                          yaxis_title='True Positive Rate',
                          width=1000,
                          height=1000)
    if save_path:
        fig_roc.write_html(save_path)  # Save the plot to the specified path as HTML
    fig_roc.show()

    print(f'Accuracy: {accuracy}')
    print(f'Top-5 Accuracy: {top5_accuracy}')
    print(f'F1 Score: {f1}')
    print(f'Precision: {precision}')
    print(f'Recall: {recall}')

# Predict labels and generate confusion matrix
def plot_confusion_matrix_and_metrics(generator, model, class_names, title, save_path=None):
    # Get true labels and predicted labels
    true_labels, predicted_labels, predicted_probabilities = get_true_and_predicted(generator, model)
    # Generate confusion matrix
    conf_matrix = confusion_matrix(true_labels, predicted_labels)
    plot_confusion_matrix(conf_matrix, class_names, title, save_path)

    # Calculate metrics and plot ROC-AUC curve
    calculate_metrics_and_plot_roc(np.array(true_labels), np.array(predicted_probabilities), class_names, title, save_path=None)

# Function to calculate top-k accuracy
def top_k_accuracy(true_labels, predicted_probabilities, k=5):
    top_k_predictions = np.argsort(predicted_probabilities, axis=1)[:, -k:]  # Get top k predictions
    matches = np.zeros_like(true_labels)
    for i in range(k):
        matches += (top_k_predictions[:, i] == true_labels)
    top_k_accuracy = np.mean(matches)
    return top_k_accuracy


class_names = [k for k, v in dict(sorted(train_generator.class_indices.items(), key=lambda item: item[1])).items()]

# Call above plot and metrics functions
plot_confusion_matrix_and_metrics(train_generator, model, class_names, 'Confusion Matrix - Training Set', save_path=f"{MODEL_SAVING_PATH}/training_set_plotly.html")
plot_confusion_matrix_and_metrics(validation_generator, model, class_names, 'Confusion Matrix - Validation Set', save_path=f"{MODEL_SAVING_PATH}/validation_set_plotly.html")

## CPU configuration

In [None]:
# import platform
# import psutil
# import os

# # Get CPU information using psutil and platform
# cpu_info = {
#     "processor_name": platform.processor(),  # Retrieve the processor name
#     "cpu_count_physical": psutil.cpu_count(logical=False),
#     "cpu_count_logical": psutil.cpu_count(logical=True),
#     "cpu_frequency_min": psutil.cpu_freq().min,
#     "cpu_frequency_max": psutil.cpu_freq().max,
#     "cpu_frequency_current": psutil.cpu_freq().current
# }

# # Get RAM information using psutil
# ram_info = {
#     "total_ram": psutil.virtual_memory().total / (1024 ** 3)  # Convert bytes to GB
# }

# # Print the information
# print("CPU Information:")
# print(f"Processor name: {cpu_info['processor_name']}")
# print(f"Physical cores: {cpu_info['cpu_count_physical']}")
# print(f"Logical cores: {cpu_info['cpu_count_logical']}")
# print(f"Minimum frequency: {cpu_info['cpu_frequency_min']} MHz")
# print(f"Maximum frequency: {cpu_info['cpu_frequency_max']} MHz")
# print(f"Current frequency: {cpu_info['cpu_frequency_current']} MHz")

# print("\nRAM Information:")
# print(f"Total RAM: {ram_info['total_ram']:.2f} GB")
