In [None]:
import tensorflow as tf
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tqdm

In [None]:

# Read the CSV file
csv_file_path = 'cleaned_impact.csv'
data = pd.read_csv(csv_file_path)

In [None]:
def load_frame(video_path, frame_index, image_size=(224, 224)):
    cap = cv2.VideoCapture(video_path)
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_index)
    ret, frame = cap.read()
    cap.release()
    if not ret:
        raise ValueError(f"Frame {frame_index} not found in video {video_path}")
    frame = cv2.resize(frame, image_size)
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    return frame

def load_data(data, image_size=(224, 224)):
    frames_n_list = []
    frames_m_list = []
    perceived_changes = []
    for index, row in data.iterrows():
        video_path = row['Video']
        frame_n = int(row['Frame n'])
        frame_m = int(row['Frame m'])
        perceived_change = row['Perceived Change']

        frame_n = load_frame(video_path, frame_n, image_size)
        frame_m = load_frame(video_path, frame_m, image_size)

        frames_n_list.append(frame_n)
        frames_m_list.append(frame_m)
        perceived_changes.append(perceived_change)

    return frames_n_list, frames_m_list, perceived_changes

In [None]:
frames_n, frames_m, perceived_changes = load_data(data)



In [None]:
# normalize the images
frames_n = np.array(frames_n) / 255.0
frames_m = np.array(frames_m) / 255.0
# Calculate the minimum and maximum values
min_value = np.min(perceived_changes)
max_value = np.max(perceived_changes)
# Apply Min-Max normalization
perceived_changes = (perceived_changes - min_value) / (max_value - min_value)

In [None]:
# Shuffle the data
indices = np.arange(len(perceived_changes))
np.random.shuffle(indices)

frames_n = tf.gather(frames_n, indices)
frames_m = tf.gather(frames_m, indices)
perceived_changes = tf.gather(perceived_changes, indices)

In [None]:
# Split the data into training, validation, and test sets
train_ratio = 0.6
val_ratio = 0.2
test_ratio = 0.2

# Calculate the split indices
num_samples = len(perceived_changes)
train_end = int(train_ratio * num_samples)
val_end = train_end + int(val_ratio * num_samples)

# Split the data
frames_n_train = frames_n[:train_end]
frames_m_train = frames_m[:train_end]
perceived_changes_train = perceived_changes[:train_end]

frames_n_val = frames_n[train_end:val_end]
frames_m_val = frames_m[train_end:val_end]
perceived_changes_val = perceived_changes[train_end:val_end]

frames_n_test = frames_n[val_end:]
frames_m_test = frames_m[val_end:]
perceived_changes_test = perceived_changes[val_end:]




In [None]:
frames_m_test.shape

In [None]:
# Create a TensorFlow dataset
train_dataset = tf.data.Dataset.from_tensor_slices(((frames_n_train, frames_m_train), perceived_changes_train))
val_dataset = tf.data.Dataset.from_tensor_slices(((frames_n_val, frames_m_val), perceived_changes_val))
test_dataset = tf.data.Dataset.from_tensor_slices(((frames_n_test, frames_m_test), perceived_changes_test))

# Shuffle and batch the dataset
batch_size = 32
train_dataset = train_dataset.shuffle(buffer_size=len(perceived_changes)).batch(batch_size).prefetch(tf.data.AUTOTUNE)
val_dataset = val_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)
test_dataset = test_dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE)

In [None]:
# Define the neural network model
input_n = tf.keras.layers.Input(shape=(224, 224, 3))
input_m = tf.keras.layers.Input(shape=(224, 224, 3))

# Shared convolutional base
base_model = tf.keras.applications.MobileNetV3Small(weights='imagenet', include_top=False, input_shape=(224, 224, 3))
base_model.trainable = False
x_n = base_model(input_n)
x_m = base_model(input_m)

# Global average pooling
x_n = tf.keras.layers.GlobalAveragePooling2D()(x_n)
x_m = tf.keras.layers.GlobalAveragePooling2D()(x_m)

# Concatenate the outputs
x = tf.keras.layers.Concatenate()([x_n, x_m])

# Fully connected layer
x = tf.keras.layers.Dense(128, activation='relu', kernel_regularizer=tf.keras.regularizers.l2(0.01))(x)
x = tf.keras.layers.BatchNormalization()(x)
output = tf.keras.layers.Dense(1, activation='sigmoid')(x)

model = tf.keras.models.Model(inputs=[input_n, input_m], outputs=output)



In [None]:
model.trainable_variables


In [None]:
model.summary()

In [None]:
# Train the new layers for a few epochs
# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Train the model
epochs = 10
model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=epochs
)

# Save the trained model
model.save(f'{drivePath}/trained_model.h5')

In [None]:
# Unfreeze the last 50% of the layers of the base model
total_layers = len(base_model.layers)
for layer in base_model.layers[int(total_layers * 0.5):]:
    layer.trainable = True

In [None]:
def lr_schedule(epoch, lr):
    decay_rate = 0.1
    decay_step = 5
    if epoch % decay_step == 0 and epoch:
        return lr * decay_rate
    return lr

early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True
)

# Define the model checkpoint callback
checkpoint = tf.keras.callbacks.ModelCheckpoint(
    f'{drivePath}/model_checkpoint.keras',  # File path to save the model
    monitor='val_loss',     # Monitor the validation loss
    save_best_only=True,    # Save only the best model
    save_weights_only=False, # Save the entire model (architecture + weights)
    mode='min',             # Save the model with the minimum validation loss
    verbose=1               # Print a message when saving the model
)

# Compile the model
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# Train the model
epochs = 100
model.fit(
    train_dataset,
    validation_data=val_dataset,
    epochs=epochs,
    callbacks=[tf.keras.callbacks.LearningRateScheduler(lr_schedule), early_stopping, checkpoint]
)

# Save the trained model again after unfreezing and further training
model.save(f'{drivePath}/fine_tuned_model.h5')


In [None]:
test_loss, test_accuracy = model.evaluate(test_dataset)
print(f'Test loss: {test_loss}')
print(f'Test Accuracy: {test_accuracy}')