In [121]:
import pandas as pd
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from keras import layers, Model

from sklearn.utils.class_weight import compute_class_weight

from matplotlib import pyplot as plt

from datetime import datetime

import pickle

from PIL import Image

print(f"pd.__version__: {pd.__version__}")
print(f"tf.__version__: {tf.__version__}")

pd.__version__: 2.2.3
tf.__version__: 2.15.1


In [122]:
speed_heads = []
arrow_heads = []
arrow_turn_heads = []
angle_heads = []

In [123]:
weight_path = "/mnt/c/Users/psamt/OneDrive - The University of Nottingham/_Spring/PHYS4036_Machine Learning in Science Part II/Project/dummy_models/150x200.weights.h5"

size_int_list = np.array(weight_path.split('/')[-1].split('.')[0].split('x')).astype(int)
image_shape = (size_int_list[0], size_int_list[1], 3)

image_shape

(150, 200, 3)

In [124]:
num_of_models = 8

In [125]:
mobilenetv3 = keras.applications.MobileNetV3Small(
    include_top=False,
    weights="imagenet",
    input_shape=image_shape,
    pooling=None,
    include_preprocessing=False, 
)

backbone = Model(inputs=mobilenetv3.input, outputs=mobilenetv3.layers[-6].output, name="MobileNetV3Small_backbone")
neck = Model(inputs=mobilenetv3.layers[-6].output, outputs=mobilenetv3.output, name="MobileNetV3Small_neck")
neck_out = neck(backbone.output)





### Model

#### Arrow model 0

In [126]:
base_model = keras.applications.MobileNetV3Small(
    include_top=False,
    weights="imagenet",
    input_shape=image_shape,
    pooling=None,
    include_preprocessing=False, 
)
base_model.trainable = False

feature_extractor = base_model.output

print(feature_extractor.shape)

a_model = []

output_activation_1 = 'sigmoid'


# model 0
i = 0

a = feature_extractor
a = layers.Conv2D(384, (3, 3), padding='same', 
                  kernel_initializer='he_uniform', name=f'a{i}_conv_2')(feature_extractor)
a = layers.BatchNormalization(name=f'a{i}_bn_2')(a)
a = layers.Activation('relu', name=f'a{i}_relu_2')(a)
a = layers.Conv2D(128, (3, 3), padding='same', 
                  kernel_initializer='he_uniform', name=f'a{i}_conv_1')(feature_extractor)
a = layers.BatchNormalization(name=f'a{i}_bn_1')(a)
a = layers.Activation('relu', name=f'a{i}_relu_1')(a)
a = layers.GlobalAveragePooling2D(name=f'a{i}_GAP')(a)
a = layers.Dense(96, activation='relu', kernel_initializer='he_uniform', name=f'a{i}_dense_2')(a)
a = layers.Dropout(0.5, name=f'a{i}_dropout_2')(a)
a = layers.Dense(64, activation='relu', kernel_initializer='he_uniform', name=f'a{i}_dense_1')(a)
a = layers.Dropout(0.5, name=f'a{i}_dropout')(a)

# Branch 1: No arrow (regression, 0 to 1)
arrow_output = layers.Dense(3, activation="softmax", name='no_arrow_output')(a)


arrow_model = Model(inputs=base_model.input, outputs={
  "arrow": arrow_output,
}, name="arrow_model")


arrow_model.load_weights(weight_path)

# model.summary()





(None, 5, 7, 576)


In [127]:
for i in range(num_of_models):
    arrow_heads.append(Model(inputs=base_model.layers[-6].output, outputs=arrow_model.output, name=f"arrow_head_{len(arrow_heads)}"))

#### Merge heads

In [128]:
arrow_heads_out = [arrow_head(backbone.output) for arrow_head in arrow_heads]
arrow_heads_out

[{'arrow': <KerasTensor: shape=(None, 3) dtype=float32 (created by layer 'arrow_head_0')>},
 {'arrow': <KerasTensor: shape=(None, 3) dtype=float32 (created by layer 'arrow_head_1')>},
 {'arrow': <KerasTensor: shape=(None, 3) dtype=float32 (created by layer 'arrow_head_2')>},
 {'arrow': <KerasTensor: shape=(None, 3) dtype=float32 (created by layer 'arrow_head_3')>},
 {'arrow': <KerasTensor: shape=(None, 3) dtype=float32 (created by layer 'arrow_head_4')>},
 {'arrow': <KerasTensor: shape=(None, 3) dtype=float32 (created by layer 'arrow_head_5')>},
 {'arrow': <KerasTensor: shape=(None, 3) dtype=float32 (created by layer 'arrow_head_6')>},
 {'arrow': <KerasTensor: shape=(None, 3) dtype=float32 (created by layer 'arrow_head_7')>}]

### Merge model

In [129]:
merged_model = Model(inputs=backbone.input, outputs= arrow_heads_out, name="merged_model")
# merged_model = Model(inputs=backbone.input, outputs=[angle_head(backbone.output)] + speed_heads_out, name="merged_model")

In [130]:
# tf.keras.utils.plot_model(
#     merged_model,
#     show_shapes=True,  # Show input/output shapes
#     show_layer_names=True,  # Show layer names (e.g., "angle_hidden_2")
#     expand_nested=False,  # Keep it flat (no base_model nesting)
#     show_layer_activations=True,
#     dpi=96,  # Image resolution
# )

### Convert model into TFLite

In [131]:
converter = tf.lite.TFLiteConverter.from_keras_model(merged_model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]  # Enable optimization
tflite_model = converter.convert()

tf_lite_file_name = f"merged_model_{image_shape[0]}x{image_shape[1]}.tflite"

with open(tf_lite_file_name, "wb") as f:
    f.write(tflite_model)

print(f"TFLite model: merged_model_{image_shape[0]}x{image_shape[1]}.tflite saved successfully!")

INFO:tensorflow:Assets written to: /tmp/tmpjeqf0ulc/assets


INFO:tensorflow:Assets written to: /tmp/tmpjeqf0ulc/assets


TFLite model: merged_model_150x200.tflite saved successfully!


2025-03-21 16:57:51.990400: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:378] Ignored output_format.
2025-03-21 16:57:51.990458: W tensorflow/compiler/mlir/lite/python/tf_tfl_flatbuffer_helpers.cc:381] Ignored drop_control_dependency.
2025-03-21 16:57:51.990661: I tensorflow/cc/saved_model/reader.cc:83] Reading SavedModel from: /tmp/tmpjeqf0ulc
2025-03-21 16:57:52.024716: I tensorflow/cc/saved_model/reader.cc:51] Reading meta graph with tags { serve }
2025-03-21 16:57:52.024767: I tensorflow/cc/saved_model/reader.cc:146] Reading SavedModel debug info (if present) from: /tmp/tmpjeqf0ulc
2025-03-21 16:57:52.114455: I tensorflow/cc/saved_model/loader.cc:233] Restoring SavedModel bundle.
2025-03-21 16:57:52.482061: I tensorflow/cc/saved_model/loader.cc:217] Running initialization op on SavedModel bundle at path: /tmp/tmpjeqf0ulc
2025-03-21 16:57:52.700923: I tensorflow/cc/saved_model/loader.cc:316] SavedModel load for tags { serve }; Status: success: OK. Took 710262 

In [132]:
import numpy as np
import tensorflow.lite as tflite  # Use this if full TensorFlow is installed
# If using tflite-runtime, import this instead:
# import tflite_runtime.interpreter as tflite

# Load TFLite model
interpreter = tflite.Interpreter(model_path=tf_lite_file_name)
interpreter.allocate_tensors()

# Get input and output details
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

# Create a dummy input (modify based on model input shape)
input_data = "/mnt/c/Users/psamt/OneDrive - The University of Nottingham/_Spring/PHYS4036_Machine Learning in Science Part II/Project/Dataset/machine-learning-in-science-ii-2025/test_data/test_data/1.png"
input_data = np.array(Image.open(input_data).convert('RGB').resize((image_shape[1], image_shape[0])))

# convert to range(-1, 1)
input_data = input_data / 255 * 2 - 1

input_data = input_data.astype(np.float32)

input_data = np.expand_dims(input_data, axis=0)

# Set input tensor
interpreter.set_tensor(input_details[0]['index'], input_data)

# Run inference
interpreter.invoke()

In [133]:
# output_details = interpreter.get_output_details()
# output_dict = {o['name']: o['index'] for o in output_details}

# angle_outputs = [interpreter.get_tensor(output_dict[f'StatefulPartitionedCall:{i}']) for i in range(10)]
# speed_outputs = [interpreter.get_tensor(output_dict[f'StatefulPartitionedCall:{i}']) for i in range(10, 20)]

In [134]:
output_details_sorted = sorted(output_details, key=lambda x: int(x['name'].split(':')[-1]))

all_outputs = [interpreter.get_tensor(out['index']) for out in output_details_sorted]

all_outputs

[array([[5.285398e-04, 5.722773e-01, 4.271941e-01]], dtype=float32),
 array([[5.285398e-04, 5.722773e-01, 4.271941e-01]], dtype=float32),
 array([[5.285398e-04, 5.722773e-01, 4.271941e-01]], dtype=float32),
 array([[5.285398e-04, 5.722773e-01, 4.271941e-01]], dtype=float32),
 array([[5.285398e-04, 5.722773e-01, 4.271941e-01]], dtype=float32),
 array([[5.285398e-04, 5.722773e-01, 4.271941e-01]], dtype=float32),
 array([[5.285398e-04, 5.722773e-01, 4.271941e-01]], dtype=float32),
 array([[5.285398e-04, 5.722773e-01, 4.271941e-01]], dtype=float32)]

In [135]:
arrow_left = int(np.mean(np.squeeze(all_outputs[len(angle_heads):len(angle_heads)+len(arrow_heads)])[:, 1]) > 0.5)
arrow_right = int(np.mean(np.squeeze(all_outputs[len(angle_heads):len(angle_heads)+len(arrow_heads)])[:, 2]) > 0.5)

In [136]:
arrow_turn_left = int(np.mean(np.squeeze(all_outputs[len(angle_heads)+len(arrow_heads) : len(angle_heads)+len(arrow_heads)+len(arrow_turn_heads)])[:, 1]) > 0.5)
arrow_turn_right = int(np.mean(np.squeeze(all_outputs[len(angle_heads)+len(arrow_heads) : len(angle_heads)+len(arrow_heads)+len(arrow_turn_heads)])[:, 2]) > 0.5)

IndexError: too many indices for array: array is 1-dimensional, but 2 were indexed

In [None]:
angle_output = float(np.mean(np.squeeze(all_outputs[0:len(angle_heads)])))

# arrow_left = float(np.squeeze(all_outputs[1]))
# arrow_right = float(np.squeeze(all_outputs[2]))

# arrow_left_turn = float(np.squeeze(all_outputs[3]))
# arrow_right_turn = float(np.squeeze(all_outputs[4]))

# speed_output = int(np.squeeze(all_outputs[-1])[0] > 0.5)

speed_output = int(np.mean(np.squeeze(all_outputs[-5:])[:,0]) > 0.5)

# # Split into angle_outputs (0-9) and speed_outputs (10-19)
# angle_outputs = np.array(all_outputs[:10])
# speed_outputs = np.array(all_outputs[10:])

# angle_outputs.mean(), speed_outputs.mean()
angle_output

0.7329090237617493

In [None]:
print(angle_output, arrow_left, arrow_right, arrow_left_turn, arrow_right_turn, speed_output)

NameError: name 'arrow_left_turn' is not defined