In [298]:
import tensorflow as tf
import numpy as np
from tensorflow import keras
from keras import layers
import os
import pandas as pd
import cv2 
from tensorflow.keras.optimizers.legacy import Adam


In [325]:
dataset_folder = "/Users/karmasmac/Downloads/SteeringModelKarma/Dataset"
final_images_folder_path = os.path.join(dataset_folder, 'final_images')
final_data_csv_path = os.path.join(dataset_folder, 'final_data.csv')

final_data = pd.read_csv(final_data_csv_path)

In [326]:
final_data

Unnamed: 0,timestamp,angle,throttle,brake
0,147942442726,-0.178024,0.232136,0.147372
1,147942436580,-0.144862,0.249958,0.149142
2,147942441606,-0.097738,0.149950,0.161227
3,147942437990,-0.118682,0.233478,0.148547
4,147942440456,-0.101229,0.150072,0.148211
...,...,...,...,...
1977,147942438365,-0.017453,0.240925,0.148272
1978,147942431129,0.013963,0.275486,0.147829
1979,147942441696,-0.095993,0.150072,0.148226
1980,147942438445,-0.008727,0.168750,0.148714


In [327]:
final_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1982 entries, 0 to 1981
Data columns (total 4 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   timestamp  1982 non-null   int64  
 1   angle      1982 non-null   float64
 2   throttle   1982 non-null   float64
 3   brake      1982 non-null   float64
dtypes: float64(3), int64(1)
memory usage: 62.1 KB


In [328]:
final_data[final_data['timestamp'] == 147942422692]

Unnamed: 0,timestamp,angle,throttle,brake
792,147942422692,0.109956,0.319326,0.148241
1715,147942422692,0.109956,0.319326,0.148241


In [329]:
class Data():
    def __init__(self) -> None:
        self.path = final_images_folder_path
        self.data = []
        self.data_train = []
        self.data_val = []
        self.data_test = []
    def make_training_data(self):
        for filename in os.listdir(self.path):
            if filename.endswith(".jpg") or filename.endswith(".png"):
                image = cv2.imread(os.path.join(self.path,filename))
                image = cv2.resize(image, (160, 120))
                image = image / 255.0  # Normalize image data
                image_name_as_ts = os.path.splitext(filename)[0]

                # print(f"img is {image}")

                data_row = final_data[final_data['timestamp'] == int(image_name_as_ts)]
                if not data_row.empty:
                    angle = data_row['angle'].iloc[0]
                    throttle = data_row['throttle'].iloc[0]
                    brake = data_row['brake'].iloc[0]
                    self.data.append((image, angle, throttle, brake))
        
    def training_data(self):
        self.data_train = self.data[:int((len(self.data)*3)/4)]
        return self.data_train
    
         # return last 1/4 of total data
    def validation_data(self):
        self.data_val = self.data[int((len(self.data)*3)/4)+1:-1]
        return self.data_val
    
    def testing_data(self):
        self.data_test = self.data[int(len(self.data)-1):]
        return self.data_test

In [330]:
data = Data()
data.make_training_data()

data_train = data.training_data()
print(type(data_train[0][0]))

<class 'numpy.ndarray'>


In [331]:
inputs = keras.Input(name='input_shape', shape=(120,160,3))

# convolutional feature maps
x = layers.Conv2D(filters=24, kernel_size=(5,5), strides=(2,2), activation='relu')(inputs)
x = layers.Conv2D(filters=36, kernel_size=(5,5), strides=(2,2), activation='relu')(x)
x = layers.Conv2D(filters=48, kernel_size=(5,5), strides=(2,2), activation='relu')(x)
x = layers.Conv2D(filters=64, kernel_size=(3,3), strides=(1,1), activation='relu')(x)
x = layers.Conv2D(filters=64, kernel_size=(3,3), strides=(1,1), activation='relu')(x)

# flatten layer
x = layers.Flatten()(x)

# fully connected layers with dropouts for overfit protection
x = layers.Dense(units=1152, activation='relu')(x)
x = layers.Dropout(rate=0.1)(x)
x = layers.Dense(units=100, activation='relu')(x)
x = layers.Dropout(rate=0.1)(x)
x = layers.Dense(units=50, activation='relu')(x)
x = layers.Dropout(rate=0.1)(x)
x = layers.Dense(units=10, activation='relu')(x)
x = layers.Dropout(rate=0.1)(x)

In [332]:
x

<KerasTensor: shape=(None, 10) dtype=float32 (created by layer 'dropout_75')>

In [333]:
# derive steering angle value from single output layer by point multiplication
steering_angle = layers.Dense(units=1, activation='linear')(x)
steering_angle = layers.Lambda(lambda X: tf.multiply(tf.atan(X), 2), name='steering_angle')(steering_angle)

# derive throttle pressure value from single output layer by point multiplication
throttle_press = layers.Dense(units=1, activation='linear')(x)
throttle_press = layers.Lambda(lambda X: tf.multiply(tf.atan(X), 2), name='throttle_press')(throttle_press)

# derive brake pressure value from single output by point multiplication
brake_pressure = layers.Dense(units=1, activation='linear')(x)
brake_pressure = layers.Lambda(lambda X: tf.multiply(tf.atan(X), 2), name='brake_pressure')(brake_pressure)


In [334]:

# build and compile model
model = keras.Model(inputs = [inputs], outputs = [steering_angle, throttle_press, brake_pressure])
model.compile(
    optimizer = Adam(),
    loss = {'steering_angle': 'mse', 'throttle_press': 'mse', 'brake_pressure': 'mse'}
)
model.summary()

Model: "model_22"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_shape (InputLayer)    [(None, 120, 160, 3)]        0         []                            
                                                                                                  
 conv2d_90 (Conv2D)          (None, 58, 78, 24)           1824      ['input_shape[0][0]']         
                                                                                                  
 conv2d_91 (Conv2D)          (None, 27, 37, 36)           21636     ['conv2d_90[0][0]']           
                                                                                                  
 conv2d_92 (Conv2D)          (None, 12, 17, 48)           43248     ['conv2d_91[0][0]']           
                                                                                           

In [335]:
data = Data()
data.make_training_data()
data_train = data.training_data()
data_val = data.validation_data()
data_test = data.testing_data()
print(np.array([data[0] for data in data_train]).shape, np.array([(data[1], data[2], data[3]) for data in data_train]).shape)
print(np.array([data[0] for data in data_val]).shape, np.array([(data[1], data[2], data[3]) for data in data_val]).shape)
print(np.array([data[0] for data in data_test]).shape, np.array([(data[1], data[2], data[3]) for data in data_test]).shape)

(890, 120, 160, 3) (890, 3)
(295, 120, 160, 3) (295, 3)
(1, 120, 160, 3) (1, 3)


In [336]:
data = Data()
def train(model, name, data, epochs: 30, steps: 10, steps_val: 10, batch_size: 64):
    # x_train & y_train are np.array() objects with data extracted directly from the PilotData object instances
    # fit data to model for training
    data.make_training_data()
    data_train = data.training_data()
    data_test = data.validation_data()
    print(np.array([data[0] for data in data_train]).shape)
    model.fit(np.array([data[0] for data in data_train]), np.array([(data[1], data[2], data[3]) for data in data_train]), batch_size=batch_size, epochs=epochs, steps_per_epoch=steps, validation_split=0.2, validation_steps=steps_val)
    # test the model by fitting the test data
    stats = model.evaluate(np.array([data[0] for data in data_test]), np.array([(data[1], data[2], data[3]) for data in data_test]), verbose=0)
    # print the stats
    print(f'Model accuracy: {stats}')
    # input('\nPress [ENTER] to continue...')
    # save the trained model
    # model.save(f"models/{name}.h5")
    model.save(f'{name}.h5')

In [337]:
train(model, 'jaishreeram', data, 10, 5, 10, 16)

(890, 120, 160, 3)
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Model accuracy: [0.05963308736681938, 0.01943758875131607, 0.020252743735909462, 0.019942747429013252]


  saving_api.save_model(


In [342]:
def predict(data, given_model):
    if given_model != 'default':
        try:
            # Load the model with safe_mode=False and use the legacy optimizer
            model = keras.models.load_model(given_model, safe_mode=False)
        except Exception as e:
            print("An unexpected error occurred:", e)
            print("Please check if the model file path is correct and the model file is not corrupted.")
            return None
    else:
        model = []
    # Predict using the model
    if model:
        predictions = model.predict(data)
        return predictions
    else:
        print("No model loaded.")
        return None

test = data.testing_data()

# image = cv2.imread("/Users/karmasmac/Downloads/SteeringModelKarma/Dataset/right/1479424435766211782.png")
# image = cv2.resize(image, (160, 120))
# print(test)

# Extract images from the test data
images = [sample[0] for sample in test]
print(images)
model_pred = '/Users/karmasmac/Downloads/SteeringModelKarma/src/jaishreeram.h5'
# Call the predict function with the extracted images
predictions = predict(images, model_pred)
if predictions is not None:
    print("Predictions:", predictions)

[array([[[0.19215686, 0.15294118, 0.22352941],
        [0.17647059, 0.1372549 , 0.20392157],
        [0.20392157, 0.15686275, 0.22745098],
        ...,
        [0.99215686, 0.8745098 , 0.83921569],
        [1.        , 1.        , 0.97254902],
        [1.        , 0.96078431, 0.98039216]],

       [[0.2       , 0.19607843, 0.17254902],
        [0.20784314, 0.19607843, 0.17254902],
        [0.20784314, 0.19215686, 0.17254902],
        ...,
        [0.98823529, 0.7254902 , 0.69411765],
        [0.92941176, 0.74117647, 0.71372549],
        [0.95686275, 0.71764706, 0.72941176]],

       [[0.21960784, 0.20784314, 0.20784314],
        [0.22352941, 0.20784314, 0.20784314],
        [0.14509804, 0.12156863, 0.1254902 ],
        ...,
        [0.92156863, 0.69803922, 0.66666667],
        [0.90588235, 0.69019608, 0.6627451 ],
        [0.94509804, 0.70588235, 0.69411765]],

       ...,

       [[0.37254902, 0.33333333, 0.33333333],
        [0.37254902, 0.33333333, 0.33333333],
        [0.36862745, 

ValueError: in user code:

    File "/Users/karmasmac/miniconda3/envs/uav-karma/lib/python3.8/site-packages/keras/src/engine/training.py", line 2341, in predict_function  *
        return step_function(self, iterator)
    File "/Users/karmasmac/miniconda3/envs/uav-karma/lib/python3.8/site-packages/keras/src/engine/training.py", line 2327, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "/Users/karmasmac/miniconda3/envs/uav-karma/lib/python3.8/site-packages/keras/src/engine/training.py", line 2315, in run_step  **
        outputs = model.predict_step(data)
    File "/Users/karmasmac/miniconda3/envs/uav-karma/lib/python3.8/site-packages/keras/src/engine/training.py", line 2283, in predict_step
        return self(x, training=False)
    File "/Users/karmasmac/miniconda3/envs/uav-karma/lib/python3.8/site-packages/keras/src/utils/traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "/Users/karmasmac/miniconda3/envs/uav-karma/lib/python3.8/site-packages/keras/src/engine/input_spec.py", line 298, in assert_input_compatibility
        raise ValueError(

    ValueError: Input 0 of layer "model_22" is incompatible with the layer: expected shape=(None, 120, 160, 3), found shape=(None, 160, 3)


In [347]:
import cv2
import numpy as np
from keras.models import load_model

def preprocess_image(image_path):
    # Read the image
    image = cv2.imread(image_path)
    # Resize the image to match the expected input shape of the model
    image = cv2.resize(image, (160, 120))
    # Normalize the pixel values to be in the range [0, 1]
    image = image.astype(np.float32) / 255.0
    # Expand dimensions to create a batch of one image
    image = np.expand_dims(image, axis=0)
    # print(image)
    return image

def predict(image_paths, model_path):
    try:
        # Load the model
        model = load_model(model_path)
    except Exception as e:
        print("An unexpected error occurred:", e)
        print("Please check if the model file path is correct and the model file is not corrupted.")
        return None

    predictions = []
    for image_path in image_paths:
        # Preprocess the image
        preprocessed_image = preprocess_image(image_path)
        # Predict using the model
        prediction = model.predict(preprocessed_image)
        predictions.append(prediction)
    
    return predictions

# Example usage
test_image_paths = ["/Users/karmasmac/Downloads/SteeringModelKarma/Dataset/final_images/147942421588.png"]
model_path = '/Users/karmasmac/Downloads/SteeringModelKarma/src/jaishreeram.h5'
predictions = predict(test_image_paths, model_path)
if predictions is not None:
    print("Predictions:", predictions)






Predictions: [[array([[0.12741645]], dtype=float32), array([[0.07883751]], dtype=float32), array([[0.10564547]], dtype=float32)]]


In [345]:
final_data[final_data['timestamp']==147942421588]

Unnamed: 0,timestamp,angle,throttle,brake
269,147942421588,0.001745,0.395651,0.148135
