In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.initializers import GlorotNormal

In [None]:
# Step 1: Load the pre-trained ResNet50 model
base_model = tf.keras.models.load_model('/content/drive/MyDrive/Tesis_maestria_DS/Tesis/Proceso/2-Entrenamiento/1-Backbone/ResNet50/resnet50_adamax_v1_kaggle.h5')

#Weight initalization = Adam optimization -> acelera convergencia, distribuyendo los pesos inciales uniformemente en un rango especifico
initializer = tf.keras.initializers.GlorotNormal()

# Step 2: Remove the last few layers
# We remove 2 layers because one is the Dense classification layer, and the other one is the Global Pooling Avg. It is recommended to eliminate this one also
# because it averages along the 3D coming from the convolutional layer, so some spatial information is missing.
base_model = models.Model(inputs=base_model.input, outputs=base_model.layers[-4].output)

# Step 3: Add ConvLSTM layers for temporal feature extraction
# Assuming the input shape for each frame is (224, 224, 3)
# Adjust the input shape accordingly based on the actual input shape for your video frames
input_shape = (None, 224, 224, 3)  # (time_steps, height, width, channels)
video_input = layers.Input(shape=input_shape)

x = layers.TimeDistributed(base_model)(video_input)
x = layers.Reshape((-1, 1, 1, 2048))(x)  # Adding spatial dimensions

#The ConvLSTM layers used consist of 64 neurons each, a kernel size of 3 × 3, a dropout of 0.2 and a recurrent dropout of 0.1.
#x = layers.ConvLSTM2D(filters=64, kernel_size=(3, 3), padding='same', return_sequences=True)(x)
x = layers.ConvLSTM2D(filters=64, kernel_size=(3, 3), padding='same', dropout=0.2, recurrent_dropout=0.1, kernel_initializer=initializer, return_sequences=True)(x)
x = layers.BatchNormalization()(x)
#x = layers.ConvLSTM2D(filters=64, kernel_size=(3, 3), padding='same')(x)
x = layers.ConvLSTM2D(filters=64, kernel_size=(3, 3), dropout=0.2, recurrent_dropout=0.1, kernel_initializer=initializer, padding='same')(x)
x = layers.BatchNormalization()(x)

# Step 4: Add a dense layer for binary classification
x = layers.Flatten()(x)
x = layers.Dense(400, activation='tanh', kernel_initializer=initializer)(x)
x = layers.Dropout(0.3)(x)
x = layers.Dense(100, activation='tanh', kernel_initializer=initializer)(x)
x = layers.Dropout(0.3)(x)
output = layers.Dense(1, activation='sigmoid', kernel_initializer=initializer)(x)

# Step 5: Compile and build the model
model = models.Model(inputs=video_input, outputs=output)
model.compile(optimizer=Adam(learning_rate=1e-4), loss='binary_crossentropy', metrics=['accuracy'])

# Print the model summary to verify the architecture
# model.summary()



In [None]:
# Checking how many videos do we have in Dataset

import os
from tabulate import tabulate

def count_folders(dataset_path):
    # Define the set names and class names
    set_names = ["train", "val", "test"]
    class_names = ["Siniestro", "No Siniestro"]

    # Create a dictionary to hold the counts
    counts = {set_name: {class_name: 0 for class_name in class_names} for set_name in set_names}

    # Iterate through the dataset, counting the folders
    for set_name in set_names:
        for class_name in class_names:
            class_set_path = os.path.join(dataset_path, class_name, set_name)
            folder_count = sum(1 for entry in os.scandir(class_set_path) if entry.is_dir())
            counts[set_name][class_name] = folder_count

    return counts

def print_table(counts):
    # Prepare the table data
    headers = ["Set Name"] + list(counts["train"].keys())
    table_data = [[set_name] + list(counts[set_name].values()) for set_name in counts.keys()]

    # Print the table
    print(tabulate(table_data, headers, tablefmt="grid"))

# Usage:
dataset_path = "/content/drive/MyDrive/Tesis_maestria_DS/Tesis/Proceso/2-Entrenamiento/2-Modelo completo/Dataset"
counts = count_folders(dataset_path)
print_table(counts)


+------------+-------------+----------------+
| Set Name   |   Siniestro |   No Siniestro |
| train      |         338 |           1351 |
+------------+-------------+----------------+
| val        |          17 |             67 |
+------------+-------------+----------------+
| test       |          68 |            272 |
+------------+-------------+----------------+


In [None]:
# Size of the Dataset

import os

def get_size(path):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(path):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            total_size += os.path.getsize(fp)
    return total_size / (1024 ** 3)  # Convert bytes to gigabytes

def main():
    dataset_path = "/content/drive/MyDrive/Tesis_maestria_DS/Tesis/Proceso/2-Entrenamiento/2-Modelo completo/Dataset"
    classes = ["Siniestro", "No Siniestro"]

    for class_name in classes:
        class_path = os.path.join(dataset_path, class_name)
        size_gb = get_size(class_path)
        print(f"Size of {class_name} folder: {size_gb:.2f} GB")

if __name__ == "__main__":
    main()


Size of Siniestro folder: 4.62 GB
Size of No Siniestro folder: 0.00 GB


## Load preprocessed file

In [None]:
import numpy as np
import sys

In [None]:
data = np.load("/content/drive/MyDrive/Tesis_maestria_DS/Tesis/Proceso/2-Entrenamiento/2-Modelo completo/Preprocess ResNet/data/preprocessed_data_Resnet_FINAL.npz", allow_pickle=True)
labels = np.load("/content/drive/MyDrive/Tesis_maestria_DS/Tesis/Proceso/2-Entrenamiento/2-Modelo completo/Preprocess ResNet/labels/preprocessed_labels_Resnet_FINAL.npz", allow_pickle=True)

In [None]:
print(data.files), data['train'].shape, data['test'].shape, data['val'].shape

['train', 'test', 'val']


(None, (480, 45, 224, 224, 3), (96, 45, 224, 224, 3), (24, 45, 224, 224, 3))

In [None]:
labels['train'].shape, labels['test'].shape, labels['val'].shape

((480,), (96,), (24,))

## Final Model

In [None]:
# Convert the list of numpy arrays to a single numpy array for training
X_train = np.array(data["train"])
y_train = np.array(labels["train"])  # You would need to convert labels to a numerical format

X_val = np.array(data["val"])
y_val = np.array(labels["val"])  # You would need to convert labels to a numerical format

# Train your model
#model.fit(X_train, y_train, epochs=30, batch_size=5, validation_data=(X_val, y_val))
history = model.fit(X_train, y_train, epochs=10, batch_size=4, validation_data=(X_val, y_val))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10

In [None]:
# Save model weights
model.save('/content/drive/MyDrive/Tesis_maestria_DS/Tesis/Proceso/2-Entrenamiento/2-Modelo completo/modelo_final_resnet50.h5')

In [None]:
# Back up save (in case model doesn't save due to storage in drive not enough)
model.save('/content/modelo_final_resnet50.h5')
from google.colab import files
files.download('/content/modelo_final_resnet50.h5')

In [None]:
import matplotlib.pyplot as plt

# Plot training & validation accuracy values
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

# Plot training & validation loss values
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')

plt.tight_layout()
plt.show()

In [None]:
X_test = np.array(data["test"])
y_test = np.array(labels["test"])

In [None]:
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")

Test Loss: 0.1477
Test Accuracy: 0.9583


In [None]:
import csv

def save_history_to_csv(history, filename):
    # Ensure the history object is not None
    if history is None:
        print("No history data to save.")
        return

    # Open the file in write mode
    with open(filename, 'w', newline='') as file:
        writer = csv.writer(file)

        # Write the header
        headers = ['epoch'] + list(history.history.keys())
        writer.writerow(headers)

        # Write the data
        for epoch in range(len(history.history['loss'])):
            row = [epoch + 1]  # epochs are zero-indexed in history
            for metric in history.history:
                row.append(history.history[metric][epoch])
            writer.writerow(row)

    print(f"History saved to {filename}")

save_history_to_csv(history, '/content/drive/MyDrive/Tesis_maestria_DS/Tesis/Proceso/2-Entrenamiento/2-Modelo completo/Metrics/training_history_resnet50.csv')

## Reload final model for metrics

In [None]:
from tensorflow.keras.models import load_model

# Load the trained model
model_path = '/content/drive/MyDrive/Tesis_maestria_DS/Tesis/Proceso/2-Entrenamiento/2-Modelo completo/modelo_final_resnet50.h5'
model = load_model(model_path)

In [None]:
X_test = np.array(data["test"])
y_test = np.array(labels["test"])

In [None]:
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.4f}")

Test Loss: 0.1477
Test Accuracy: 0.9583


### Other metrics

In [None]:
from sklearn.metrics import classification_report

In [None]:
# Make predictions
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1) # esta linea esta mal

y_true = y_test



In [None]:
print("y_pred: ", y_pred)

In [None]:
y_pred_round = np.round(y_pred , decimals=0)
y_pred_round

In [None]:
print("y_true: ", y_true)

y_true:  [1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]


In [None]:
# Calculate Precision, Recall and F1 score
report = classification_report(y_true, y_pred_round , output_dict=True)
print(report)

{'0': {'precision': 0.9545454545454546, 'recall': 0.984375, 'f1-score': 0.9692307692307692, 'support': 64}, '1': {'precision': 0.9666666666666667, 'recall': 0.90625, 'f1-score': 0.9354838709677419, 'support': 32}, 'accuracy': 0.9583333333333334, 'macro avg': {'precision': 0.9606060606060607, 'recall': 0.9453125, 'f1-score': 0.9523573200992556, 'support': 96}, 'weighted avg': {'precision': 0.9585858585858587, 'recall': 0.9583333333333334, 'f1-score': 0.9579818031430934, 'support': 96}}


In [None]:
import pandas as pd

report_df = pd.DataFrame(report).transpose()

# Save the DataFrame to a CSV file
csv_file_path = '/content/drive/MyDrive/Tesis_maestria_DS/Tesis/Proceso/2-Entrenamiento/2-Modelo completo/Metrics/other_metrics_resnet50.csv'
report_df.to_csv(csv_file_path, index=True)

print(f"Classification report saved to {csv_file_path}")

Classification report saved to /content/drive/MyDrive/Tesis_maestria_DS/Tesis/Proceso/2-Entrenamiento/2-Modelo completo/Metrics/other_metrics_resnet50.csv


ROC curve

In [None]:
from tensorflow.keras.models import load_model

# Load the trained model
model_path = '/content/drive/MyDrive/Tesis_maestria_DS/Tesis/Proceso/2-Entrenamiento/2-Modelo completo/modelo_final_resnet50.h5'
model = load_model(model_path)

In [None]:
import numpy as np
import sys

In [None]:
data = np.load("/content/drive/MyDrive/Tesis_maestria_DS/Tesis/Proceso/2-Entrenamiento/2-Modelo completo/Preprocess ResNet/data/preprocessed_data_Resnet_FINAL.npz", allow_pickle=True)
labels = np.load("/content/drive/MyDrive/Tesis_maestria_DS/Tesis/Proceso/2-Entrenamiento/2-Modelo completo/Preprocess ResNet/labels/preprocessed_labels_Resnet_FINAL.npz", allow_pickle=True)

In [None]:
X_test = np.array(data["test"])
y_test = np.array(labels["test"])

In [None]:
# Make predictions
y_pred = model.predict(X_test)

y_true = y_test

In [None]:
y_pred_round = np.round(y_pred , decimals=0)

In [None]:
from sklearn.metrics import accuracy_score, classification_report, roc_auc_score, confusion_matrix, roc_curve, auc

In [None]:
fpr, tpr, _ = roc_curve(y_true, y_pred_round)

roc_auc = auc(fpr, tpr)

# Plot the ROC curve
plt.figure(figsize=(7, 4))
plt.plot(fpr, tpr, color='darkorange', lw=2, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
# plt.title('Receiver Operating Characteristic Curve')
plt.legend(loc="lower right")
plt.show()