<a href="https://colab.research.google.com/github/tylaar1/PICAR-autopilot/blob/main/GridsearchHyperperameter_regression_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# SWITCH TO **`T4 GPU`** OR THE **`HPC`**

# Imports

In [None]:
import os
import pandas as pd
import numpy as np
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from keras.preprocessing.image import load_img, img_to_array
from sklearn.model_selection import train_test_split
from sklearn.metrics import balanced_accuracy_score
import matplotlib.pyplot as plt

In [None]:
# makes it so pd dfs aren't truncated

pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
labels_file_path = '/content/drive/MyDrive/machine-learning-in-science-ii-2025/training_norm.csv' # tylers file path
#labels_file_path = '/home/apyba3/KAGGLEDATAmachine-learning-in-science-ii-2025/training_norm.csv' # ben hpc file path (mlis2 cluster)
labels_df = pd.read_csv(labels_file_path, index_col='image_id')

In [None]:
#image_folder_path = '/home/apyba3/KAGGLEDATAmachine-learning-in-science-ii-2025/training_data/training_data' # bens hpc file path
image_folder_path = '/content/drive/MyDrive/machine-learning-in-science-ii-2025/training_data/training_data' # tylers file path
image_file_paths = [
    os.path.join(image_folder_path, f)
    for f in os.listdir(image_folder_path)
    if f.lower().endswith(('.png', '.jpg', '.jpeg'))
]

image_file_paths.sort(key=lambda x: int(os.path.splitext(os.path.basename(x))[0])) # sorts the files in the right order (1.png, 2.png, 3.png, ...)

imagefilepaths_df = pd.DataFrame(
    image_file_paths,
    columns=['image_file_paths'],
    index=[int(os.path.splitext(os.path.basename(path))[0]) for path in image_file_paths]
)

imagefilepaths_df.index.name = 'image_id'

In [None]:
merged_df = pd.merge(labels_df, imagefilepaths_df, on='image_id', how='inner')
merged_df['speed'] = merged_df['speed'].round(6) # to get rid of floating point errors

In [None]:
merged_df[merged_df['speed'] == 1.428571]

Unnamed: 0_level_0,angle,speed,image_file_paths
image_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1


In [None]:
cleaned_df = merged_df[merged_df['speed'] != 1.428571]
cleaned_df.loc[3882:3886]

Unnamed: 0_level_0,angle,speed,image_file_paths
image_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1


In [None]:
def process_image(image_path, label, resized_shape=(224, 224)):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, resized_shape)
    image = image / 255.0  # Normalize pixel values to [0,1]
    return image, label

dataset = tf.data.Dataset.from_tensor_slices((cleaned_df["image_file_paths"], cleaned_df["angle"])) # Convert pd df into a tf ds

dataset = dataset.map(process_image, num_parallel_calls=tf.data.AUTOTUNE)

dataset = dataset.cache()
dataset = dataset.shuffle(len(cleaned_df))
dataset = dataset.batch(32)
dataset = dataset.prefetch(tf.data.AUTOTUNE)

In [None]:
for images, labels in dataset.take(1):
    print(images.shape, labels.shape)
    print(labels)

In [None]:
# 80-20 split

dataset_size = tf.data.experimental.cardinality(dataset).numpy()
train_size = int(0.8 * dataset_size)

train_dataset = dataset.take(train_size)
validation_dataset = dataset.skip(train_size)

In [None]:
print(f"Train size: {train_size}, validation size: {dataset_size - train_size}")

In [None]:
validation_dataset

In [None]:
# #only performing augmentation on training data as want validation/test data to reflect kaggle test set
# seed = (6,9)
# train_dataset = train_dataset.map(
#       lambda image, label: (tf.image.stateless_random_brightness(image, 0.2,seed),label)
# ).map(
#       lambda image, label: (tf.image.stateless_random_contrast(image,0.8,1.2,seed),label)
# ).map(
#       lambda image, label: (tf.image.stateless_random_hue(image,0.2,seed),label)
# ).map(
#       lambda image, label: (tf.image.stateless_random_saturation(image,0.8,1.2,seed),label)
# ).take(5
# )

In [None]:
total_images = 0
for image_batch, _ in train_dataset:
    total_images += image_batch.shape[0]  # Add the batch size

print(f"Total number of images in train_dataset: {total_images}")

In [None]:
f, axarr = plt.subplots(1,5)

i = 0
for image_batch, label_batch in dataset.take(1):  # Take one batch
    for image in image_batch:  # Iterate through images in the batch
        if i < 5:  # Only display the first 5 images
            print('image shape: ', np.shape(image))
            tf.print('label:', label_batch[i])  # Print label for the corresponding image
            axarr[i].imshow(image)
            axarr[i].axis('off')
            i += 1
        else:
            break  # Stop after displaying 5 images
plt.show()

In [None]:
dropoutrate = 0.2
num_classes = 1 # we're only predicting the prob of the positive class with a sigmoid
input_shape = (224,224,3)

mbnet = tf.keras.applications.MobileNetV3Large(
    input_shape=input_shape,
    include_top=False,
    weights='imagenet',
    minimalistic=False
)

model = tf.keras.Sequential([
  mbnet,
  tf.keras.layers.GlobalAveragePooling2D(),
  tf.keras.layers.Dropout(dropoutrate),
  tf.keras.layers.Dense(256, activation='relu'),
  tf.keras.layers.Dropout(dropoutrate),
  tf.keras.layers.Dense(128, activation='relu'),
  tf.keras.layers.Dropout(dropoutrate),
  tf.keras.layers.Dense(64, activation='relu'),
  tf.keras.layers.Dropout(dropoutrate),
  tf.keras.layers.Dense(32, activation='relu'),
  tf.keras.layers.Dense(1, activation='linear')
])

model.build()

mbnet.trainable = False # freeze the first layers to the imagenet weights

model.summary() # print the model

In [None]:
LR = 0.001 #learning rate
optimizer = tf.optimizers.Adam(LR) #adam optimiser

@tf.function
def train_step( model, X , Y):
    with tf.GradientTape() as tape:
        pred = model( X )
        Y = tf.cast(Y, tf.float32)
        current_loss = tf.reduce_mean(tf.losses.MeanSquaredError()( Y,  pred))

    grads = tape.gradient(current_loss, model.trainable_variables)
    optimizer.apply_gradients( zip( grads , model.trainable_variables) )
    current_MSE = tf.reduce_mean(tf.square(Y-pred))
    return(current_loss, current_MSE)

In [None]:
niter = 200

tloss = []
tMSE = []
vloss = []
vMSE = []

for it in range(niter):
    # Training
    batch_losses = []
    batch_MSEs = []
    for image_batch, label_batch in train_dataset:
        loss, MSE = train_step(model, image_batch, label_batch)
        batch_losses.append(loss)
        batch_MSEs.append(MSE)

    # Calculate average metrics for this epoch
    avg_loss = tf.reduce_mean(batch_losses)
    avg_MSE = tf.reduce_mean(batch_MSEs)
    tloss.append(avg_loss)
    tMSE.append(avg_MSE)

    # Validation
    val_batch_losses = []
    val_batch_MSEs = []
    for image_batch, label_batch in validation_dataset:
        val_loss, val_MSE = train_step(model, image_batch, label_batch)
        val_batch_losses.append(val_loss)
        val_batch_MSEs.append(val_MSE)

    # Calculate average validation metrics
    avg_val_loss = tf.reduce_mean(val_batch_losses)
    avg_val_MSE = tf.reduce_mean(val_batch_MSEs)
    vloss.append(avg_val_loss)
    vMSE.append(avg_val_MSE)

    # Print metrics every 10 iterations
    if it % 10 == 0:  # Check if (it + 1) is divisible by 10
        tf.print('iter: {}, train_loss: {:.3f}, train_MSE: {:.3f}, val_loss: {:.3f}, val_MSE: {:.3f}'.format(
            it, avg_loss, avg_MSE, avg_val_loss, avg_val_MSE))

In [None]:
model.save_weights('/home/apyba3/car_frozen_regression.weights.h5')

In [None]:
tf.keras.backend.clear_session() #Clear keras session

### 2d) fine-tuning

In [None]:
model.load_weights('/home/apyba3/car_frozen_regression.weights.h5')

Set up fine-tuning training

In [None]:
LR = 0.001
optimizer = tf.optimizers.Adam(LR) #adam optimiser

@tf.function
def train_step( model, X , Y):
    with tf.GradientTape() as tape:
        pred = model( X )
        Y = tf.cast(Y, tf.float32)
        current_loss = tf.reduce_mean(tf.losses.MeanSquaredError()( Y,  pred))

    grads = tape.gradient(current_loss, model.trainable_variables)
    optimizer.apply_gradients( zip( grads , model.trainable_variables) )
    current_MSE = tf.reduce_mean(tf.square(Y-pred))
    return(current_loss, current_MSE)

In [None]:
niter = 50

tloss = []
tMSE = []
vloss = []
vMSE = []

for it in range(niter):
    # Training
    batch_losses = []
    batch_MSEs = []
    for image_batch, label_batch in train_dataset:
        loss, MSE = train_step(model, image_batch, label_batch)
        batch_losses.append(loss)
        batch_MSEs.append(MSE)

    # Calculate average metrics for this epoch
    avg_loss = tf.reduce_mean(batch_losses)
    avg_MSE = tf.reduce_mean(batch_MSEs)
    tloss.append(avg_loss)
    tMSE.append(avg_MSE)

    # Validation
    val_batch_losses = []
    val_batch_MSEs = []
    for image_batch, label_batch in validation_dataset:
        val_loss, val_MSE = train_step(model, image_batch, label_batch)
        val_batch_losses.append(val_loss)
        val_batch_MSEs.append(val_MSE)

    # Calculate average validation metrics
    avg_val_loss = tf.reduce_mean(val_batch_losses)
    avg_val_MSE = tf.reduce_mean(val_batch_MSEs)
    vloss.append(avg_val_loss)
    vMSE.append(avg_val_MSE)

    # Print metrics every 10 iterations
    if it % 10 == 0:  # Check if (it + 1) is divisible by 10
        tf.print('iter: {}, train_loss: {:.3f}, train_MSE: {:.3f}, val_loss: {:.3f}, val_MSE: {:.3f}'.format(
            it, avg_loss, avg_MSE, avg_val_loss, avg_val_MSE))

In [None]:
model.save_weights('/home/apyba3/car_unfrozen_regression.weights.h5')

In [None]:
image_folder_path = '/home/apyba3/KAGGLEDATAmachine-learning-in-science-ii-2025/test_data/test_data'
image_file_paths = [
    os.path.join(image_folder_path, f)
    for f in os.listdir(image_folder_path)
    if f.lower().endswith(('.png', '.jpg', '.jpeg'))
]

image_file_paths.sort(key=lambda x: int(os.path.splitext(os.path.basename(x))[0])) # sorts the files in the right order (1.png, 2.png, 3.png, ...)

imagefilepaths_df = pd.DataFrame(
    image_file_paths,
    columns=['image_file_paths'],
    index=[int(os.path.splitext(os.path.basename(path))[0]) for path in image_file_paths]
)

imagefilepaths_df.index.name = 'image_id'
imagefilepaths_df.head()

In [None]:
def process_image_no_label(image_path, resized_shape=(224, 224)):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)  # Use decode_png for PNG images
    image = tf.image.resize(image, resized_shape)  # Resize to uniform shape
    image = image / 255.0  # Normalize pixel values to [0,1]
    return image

test_dataset = tf.data.Dataset.from_tensor_slices((imagefilepaths_df["image_file_paths"]))

test_dataset = test_dataset.map(process_image_no_label, num_parallel_calls=tf.data.AUTOTUNE)
test_dataset = test_dataset.batch(32)
test_dataset = test_dataset.prefetch(tf.data.AUTOTUNE)

In [None]:
predictions = model.predict(test_dataset)

In [None]:
predictions_df.to_csv('/home/apyba3/mbnetv3_angleregression_predictions.csv')

# Hyperperameter Tuning for Regression

In [None]:
!pip install keras-tuner


In [None]:
import keras_tuner as kt


In [None]:
def build_model(hp):
    dropoutrate = hp.Float('dropout_rate', min_value=0.1, max_value=0.5, step=0.1)
    num_units_1 = hp.Int('dense_1_units', min_value=128, max_value=512, step=64)
    num_units_2 = hp.Int('dense_2_units', min_value=64, max_value=256, step=32)
    num_units_3 = hp.Int('dense_3_units', min_value=32, max_value=128, step=16)
    learning_rate = hp.Choice('learning_rate', values=[1e-4, 5e-4, 1e-3, 5e-3])

    mbnet = tf.keras.applications.MobileNetV3Large(
        input_shape=(224,224,3),
        include_top=False,
        weights='imagenet',
        minimalistic=False
    )
    mbnet.trainable = False  # freeeze MobileNet layers

    model = tf.keras.Sequential([
        mbnet,
        tf.keras.layers.GlobalAveragePooling2D(),
        tf.keras.layers.Dropout(dropoutrate),
        tf.keras.layers.Dense(num_units_1, activation='relu'),
        tf.keras.layers.Dropout(dropoutrate),
        tf.keras.layers.Dense(num_units_2, activation='relu'),
        tf.keras.layers.Dropout(dropoutrate),
        tf.keras.layers.Dense(num_units_3, activation='relu'),
        tf.keras.layers.Dense(1, activation='linear')  # regression
    ])

    model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate),
                  loss='mse',  # mean Squared Error for regression
                  metrics=['mae'])  # mean Absolute Error

    return model


In [None]:
tuner = kt.RandomSearch(
    build_model,
    objective='val_mae',  # miinimize Mean Absolute Error
    max_trials=10,  # number of models to try
    executions_per_trial=1,
    directory='tuning_results',
    project_name='speed_regression_tuning'
)


In [None]:
tuner.search(train_dataset, validation_data=validation_dataset, epochs=10)


In [None]:
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print(f"Best Dropout Rate: {best_hps.get('dropout_rate')}")
print(f"Best Dense Layer 1 Units: {best_hps.get('dense_1_units')}")
print(f"Best Dense Layer 2 Units: {best_hps.get('dense_2_units')}")
print(f"Best Dense Layer 3 Units: {best_hps.get('dense_3_units')}")
print(f"Best Learning Rate: {best_hps.get('learning_rate')}")
