In [3]:
import tensorflow as tf
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Dropout
from tensorflow.keras.models import Model
from tensorflow.keras import regularizers
import pandas as pd
import os

In [5]:
from struct import unpack
from tqdm import tqdm
import os

img_dir = '/content/Omdena-Quantum-Self-Driving/Images/driving_dataset1/'
root_img = '/content/Omdena-Quantum-Self-Driving/Images/driving_dataset1/'
marker_mapping = {
    0xffd8: "Start of Image",
    0xffe0: "Application Default Header",
    0xffdb: "Quantization Table",
    0xffc0: "Start of Frame",
    0xffc4: "Define Huffman Table",
    0xffda: "Start of Scan",
    0xffd9: "End of Image"
}


class JPEG:
    def __init__(self, image_file):
        with open(image_file, 'rb') as f:
            self.img_data = f.read()

    def decode(self):
        data = self.img_data
        while(True):
            marker, = unpack(">H", data[0:2])
            # print(marker_mapping.get(marker))
            if marker == 0xffd8:
                data = data[2:]
            elif marker == 0xffd9:
                return
            elif marker == 0xffda:
                data = data[-2:]
            else:
                lenchunk, = unpack(">H", data[2:4])
                data = data[2+lenchunk:]
            if len(data)==0:
               raise TypeError("issue reading jpeg file")


bads = []

for dirName, subdirList, fileList in os.walk(img_dir):
    imagesList = fileList
    for img in tqdm(imagesList):
      image = os.path.join(root_img,img)
      image = JPEG(image)
      try:
        image.decode()
      except:
        bads.append(img)


for name in bads:
  os.remove(os.path.join(root_img,name))

In [6]:
# Load steering angles from text file
df = pd.read_csv('../Images/driving_dataset1/data.txt', names=['filename', 'steering_angle'], delimiter=' ')
image_dir = '../Images/driving_dataset1/'
data_file = "../Images/driving_dataset1/data.txt"
df['filename'] = df['filename'].apply(lambda x: os.path.join(image_dir, x))
df.head()

Unnamed: 0,filename,steering_angle
0,../Images/driving_dataset1/0.jpg,0.0
1,../Images/driving_dataset1/1.jpg,0.0
2,../Images/driving_dataset1/2.jpg,0.0
3,../Images/driving_dataset1/3.jpg,0.0
4,../Images/driving_dataset1/4.jpg,0.0


In [7]:
def load_image(image_path):
    try:
        image = tf.io.read_file(image_path)
        image = tf.image.decode_jpeg(image, channels=3)
        image = tf.image.resize(image, (224, 224))
        image = tf.keras.applications.resnet.preprocess_input(image)
    except:
        print(f"Invalid image format, skipping: {image_path}")
        return None
    return image

def create_dataset(df):
    image_dataset = tf.data.Dataset.from_tensor_slices(df['filename'])
    angle_dataset = tf.data.Dataset.from_tensor_slices(df['steering_angle'])
    image_dataset = image_dataset.map(load_image, num_parallel_calls=tf.data.AUTOTUNE)
    image_dataset = image_dataset.apply(tf.data.experimental.ignore_errors())
    dataset = tf.data.Dataset.zip((image_dataset, angle_dataset))

    return dataset

In [10]:
""" 
def resnet_model(input_shape):
    base_model = tf.keras.applications.ResNet50(include_top=False, weights='imagenet', input_shape=input_shape)
    base_model.trainable = False
    x = base_model.output
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Dense(1024, activation='relu')(x)
    x = tf.keras.layers.Dropout(0.5)(x)
    x = tf.keras.layers.Dense(1024, activation='relu')(x)
    output = tf.keras.layers.Dense(1)(x)
    model = tf.keras.Model(inputs=base_model.input, outputs=output)
    return model
"""



def alexnet_model(input_shape):
    model = tf.keras.Sequential()

    # Layer 1
    model.add(tf.keras.layers.Conv2D(96, kernel_size=(11, 11), strides=(4, 4), activation='relu', input_shape=input_shape))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(3, 3), strides=(2, 2)))

    # Layer 2
    model.add(tf.keras.layers.Conv2D(256, kernel_size=(5, 5), padding='same', activation='relu'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(3, 3), strides=(2, 2)))

    # Layer 3
    model.add(tf.keras.layers.Conv2D(384, kernel_size=(3, 3), padding='same', activation='relu'))

    # Layer 4
    model.add(tf.keras.layers.Conv2D(384, kernel_size=(3, 3), padding='same', activation='relu'))

    # Layer 5
    model.add(tf.keras.layers.Conv2D(256, kernel_size=(3, 3), padding='same', activation='relu'))
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(3, 3), strides=(2, 2)))

    # Flatten the output from previous layers
    model.add(tf.keras.layers.Flatten())

    # Layer 6
    model.add(tf.keras.layers.Dense(4096, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.5))

    # Layer 7
    model.add(tf.keras.layers.Dense(4096, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.5))

    # Output layer
    model.add(tf.keras.layers.Dense(1))

    return model


In [11]:
validation_split = 0.1
df = df.sample(frac=1).reset_index(drop=True)
val_df = df[:int(validation_split*len(df))]
train_df = df[int(validation_split*len(df)):]

train_dataset = create_dataset(train_df)
val_dataset = create_dataset(val_df)

batch_size = 32
train_dataset = train_dataset.shuffle(buffer_size=1000).batch(batch_size)
val_dataset = val_dataset.batch(batch_size)

model = alexnet_model((224, 224, 3))

# Compile the model
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5), loss='mean_absolute_error')

Instructions for updating:
Use `tf.data.Dataset.ignore_errors` instead.




In [12]:

# Define early stopping
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3)

In [13]:
# Fit the model to the data
model.fit(train_dataset, validation_data=val_dataset, epochs=25, callbacks=[early_stopping])

Epoch 1/25
Epoch 2/25
Epoch 3/25
Epoch 4/25
Epoch 5/25
Epoch 6/25
Epoch 7/25
Epoch 8/25
Epoch 9/25
Epoch 10/25
Epoch 11/25
Epoch 12/25
Epoch 13/25
Epoch 14/25
Epoch 15/25
Epoch 16/25


<keras.src.callbacks.History at 0x13b8ac130>

In [14]:
#Testing

In [15]:
import random
from tensorflow.keras.preprocessing import image as keras_image
import numpy as np

def load_image(image_path, target_size=(224, 224)):
    img = keras_image.load_img(image_path, target_size=target_size)
    img_tensor = keras_image.img_to_array(img)
    img_tensor = np.expand_dims(img_tensor, axis=0)
    img_tensor = tf.keras.applications.resnet.preprocess_input(img_tensor)
    return img_tensor

# Randomly select an image from validation set
random_image_path = random.choice(val_df['filename'].tolist())

# Load the image
test_image = load_image(random_image_path)

# Use the model to predict the steering angle for the test image
predicted_angle = model.predict(test_image)

# Print out the predicted steering angle
print("Predicted steering angle: ", predicted_angle[0][0])

# If you want to compare this prediction to the actual angle, you could find that as follows:
actual_angle = val_df[val_df['filename'] == random_image_path]['steering_angle'].values[0]
print("Actual steering angle: ", actual_angle)


ImportError: Could not import PIL.Image. The use of `load_img` requires PIL.