# Self-Driving Car
This Jupiter notebook details my approach to developing a program for a self-driving car. Using dashcam footage the program is able to determine how fast the car is moving in m/s and identify street signs on the sides of the road.

In [None]:
# Import libraries
import cv2
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt

In [None]:
# Mounts my google drive for access to important files for training
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


# Speed Determination
To begin I will create a model that takes two images and determines the velocity of the car using the change between them. This implementation utilizes cv2 and optical flow to create an input image that shows the color-coded apparent movement of objects between the two frames. These images are then paired with a label that is the average velocity of the two frames used for the optical flow. The paired images and labels are then used to train a convolutional neural network. This trained network is able to take in optical flows created from two frames and estimate the velocity of the car.

Data From: https://github.com/commaai/speedchallenge

In [None]:
# Assign files to variable names
speed_training_data = '/content/train.mp4'
speed_labels = '/content/train.txt'
speed_testing_data = '/content/test.mp4'
final_vid = '/content/final_input.mp4'

In [None]:
# Hyperparameters
epochs = 5
batch_size = 10
WIDTH  = 128
HEIGHT = 96

In [None]:
# Take the difference between two frames and assign the average velocity between these two
# frames as the label velocity for the given difference. Takes in the video file and velocity
# labels for each frame as parameters
def labeled_frame(speed_training_data, speed_labels):
  v_tot = 0
  cap = cv2.VideoCapture(speed_training_data)
  f = open(speed_labels, "r")
  frame_change = []
  label = []
  while True:
    ret, frame1 = cap.read() # Get the first frame using cv2
    ret, frame2 = cap.read() # Get the second frame
    if frame1 is None: # Exit loop once out of frames
      break

    # Retrieve the velocity labels for each frame and average them
    for i in range(2):
      v_tot += float(f.readline())
    ave_v = v_tot / 2 # Average the two velocity labels for the frames
    label.append(ave_v)
    v_tot = 0

    color_coded = frames_to_optical_flow(frame1, frame2)

    frame_change.append(color_coded)

  # Close the cv2 capture and file
  f.close()
  cap.release()
  cv2.destroyAllWindows()

  return (frame_change, label)


In [None]:
def frames_to_optical_flow(frame1, frame2):
  # Utilize cv2's optical flow, which will the change between frames and give a vector
  # value, which then can be converted to colors using HSV
  gray1 = cv2.cvtColor(frame1,cv2.COLOR_BGR2GRAY) # Grayscale the frames for optical flow
  gray2 = cv2.cvtColor(frame2,cv2.COLOR_BGR2GRAY)
  optical_flow = cv2.calcOpticalFlowFarneback(gray1,gray2, None, 0.5, 1, 10, 3, 5, 1.1, 0) # optical flow parametes set according to cv2 documentation

  # Utilizing HSV we can color code the direction and magnitude of the change. H as the angle will determine the color, S will always be maxed
  # since we want to see the colors, and V will represent the magnitude, brighter color correlates to a greater change
  hsv = np.zeros_like(frame1) # Create empty array with the same chape as the frame
  hsv[...,1] = 255 # Set the s column to 100%
  magnitude, angle = cv2.cartToPolar(optical_flow[...,0], optical_flow[...,1], angleInDegrees=True) # cartToPolar takes x and y coordinate and
                                                                                                        # returns them as polar coordinates
  hsv[...,0] = angle / 2 # Angle correlates to which color. Divide by 2 so value does not exceed 255
  hsv[...,2] = cv2.normalize(magnitude,None, 0, 255, cv2.NORM_MINMAX) # Normalize magnitude to values between 0 and 255 for proper mapping to RGB

  # Need to convert the output to RGB for color coding
  color_coded = cv2.cvtColor(hsv,cv2.COLOR_HSV2BGR) # Color code

  # Shrink output image for NN according to documentation
  color_coded = cv2.resize(color_coded, (WIDTH, HEIGHT), interpolation = cv2.INTER_AREA)

  return color_coded

In [None]:
def create_speed_model():
  input_shape = (HEIGHT, WIDTH, 3)

  # Network structure motivated by NVIDIA model designed for self-driving cars, adapted for smaller scale
  model = tf.keras.models.Sequential()
  model.add(tf.keras.layers.BatchNormalization(input_shape=input_shape)) # normalize inputs

  # Small Conv2D CNN layer to pick up on basic features
  model.add(tf.keras.layers.Conv2D(24, (5, 5), activation='elu', strides=(2, 2)))
  model.add(tf.keras.layers.Conv2D(36, (5, 5), activation='elu', strides=(2, 2)))
  model.add(tf.keras.layers.Conv2D(48, (5, 5), activation='elu', strides=(2, 2)))

  # Dropout layer to avoid local minima
  model.add(tf.keras.layers.Dropout(0.5))

  # Larger Conv2D CNN layer with smaller stride to pick up on more advanced features
  model.add(tf.keras.layers.Conv2D(64, (3, 3), activation='elu', strides=(1, 1)))
  model.add(tf.keras.layers.Conv2D(64, (3, 3), activation='elu', strides=(1, 1)))

  # Flatten for fully connected NN
  model.add(tf.keras.layers.Flatten())

  # Fully connected NN, to last layer with one output
  model.add(tf.keras.layers.Dense(100, activation='elu'))
  model.add(tf.keras.layers.Dense(50, activation='elu'))
  model.add(tf.keras.layers.Dense(10, activation='elu'))
  model.add(tf.keras.layers.Dense(1, name='output'))

  # Compile the model with ADAM
  model.compile(optimizer='adam', loss="mse")

  return model

In [None]:
# Retrieve a list of optical flow frames and their associated speed labels
images, speed_label = labeled_frame(speed_training_data, speed_labels)

In [None]:
# Split the frames into train and test sets, with 80% of the data for training, then create the model
x_train, x_test, y_train, y_test = train_test_split(np.array(images), np.array(speed_label), test_size=0.2, shuffle=True)
speed_model = create_speed_model()

In [None]:
# Train the model on the training data, save metrics to a varible called history
history = speed_model.fit(x = x_train, y = y_train, batch_size=batch_size, epochs=epochs)

In [None]:
# Save the model to the SpeedModel folder of my google drive for future access
speed_model.save('/content/gdrive/MyDrive/SpeedModel/speed_model.keras')

In [None]:
# Evaluate and graph the MSE of the speed training model
speed_model.evaluate(x = x_test, y = y_test, batch_size=batch_size)
plt.plot(history.history['loss'])
plt.title('model loss')
plt.ylabel('mse loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

# Identifying Street Signs

Next, I will be training the YOLOv3 object detection model on a custom dataset of German street signs. The model will take in any frame of the video and identify where in the image a street sign is and identify what kind of sign it is. This section only needs to be run for training the YOLO model, can be skipped if you are using an existing .weights file.

Data From: https://sid.erda.dk/public/archives/ff17dc924eba88d5d01a807357d6614c/published-archive.html

In [None]:
# Converts the formatting of gt.txt to the proper format for training  the YOLOv3 model,
# returns a list with [x_cen, y_cen, width, height] which are normalized to the dimensions of the input frame
def yolo_coordinates(split_label):
  width = int(split_label[3]) - int(split_label[1])
  height = int(split_label[4]) - int(split_label[2])
  x_cen = int(split_label[1]) + (width / 2)
  y_cen = int(split_label[2]) + (height / 2)

  # Normalize the pixel values
  width = width / 1360
  height = height / 800
  x_cen = x_cen / 1360
  y_cen = y_cen / 800

  # Overwrite the values with new normalized ones in the YOLO order
  split_label[1] = str(x_cen)
  split_label[2] = str(y_cen)
  split_label[3] = str(width)
  split_label[4] = str(height)
  return(split_label)

In [None]:
# Creates a dictionary where the key is the image name, and the values is a list
# of the labels in a YOLO readable format
sign_labels = "gt.txt"
image_to_labels = {}

f = open(sign_labels, "r")
labels = f.readlines()
for label in labels:
  split_label = label.rstrip().split(";")
  split_label = yolo_coordinates(split_label)
  if split_label[0] in image_to_labels.keys():
    image_to_labels[split_label[0]].append(split_label[5] + " " + split_label[1] + " " + split_label[2] + " " + split_label[3] + " " + split_label[4] + '\n')
  else:
    image_to_labels[split_label[0]] = [(split_label[5] + " " + split_label[1] + " " + split_label[2] + " " + split_label[3] + " " + split_label[4] + '\n')]
f.close()

In [None]:
# Reads through each image and creates a txt file of the same name where each
# line is a YOLO readable label of an object in the image. Saves these txt
# files to my google drive in the same directory as the images
#
# Only run once to put the txt files into google drive
for image in image_to_labels.keys():
  image_num = image[:-4]
  path = '/content/gdrive/MyDrive/MLBoot/SignFinding/' + image_num + '.txt'
  text = ''
  for label in image_to_labels[image]:
    text += label
  with open(path, 'w') as f:
    f.write(text)

In [None]:
# Creates a list of all images with label txt files and splits them into a train
# and test set, with 80% of the images going to training
data_sign = []
for image_keys in image_to_labels.keys():
  data_sign.append(image_keys)
x_train, x_test = train_test_split(np.array(data_sign), test_size=0.2, shuffle=True)

In [None]:
# Takes each value and re-writes it as the absolute path to the file
def split_to_txt(x):
  text = ''
  for val in x:
    text += '/content/gdrive/MyDrive/MLBoot/SignFinding/' + (val + '\n')
  return text

In [None]:
# Takes the train and test sets and writes them to txt files where each line is the path to a file
train_text = split_to_txt(x_train)
test_text = split_to_txt(x_test)
with open('/content/gdrive/MyDrive/MLBoot/train.txt', 'w') as f:
  f.write(train_text)
with open('/content/gdrive/MyDrive/MLBoot/test.txt', 'w') as f:
  f.write(test_text)

In [None]:
# Check to ensure the T4 GPU is connected, will only run if connected to collabs T4 GPU
!nvidia-smi

Thu Feb 15 02:17:25 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   38C    P8               9W /  70W |      0MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

In [None]:
# Clone the darknet repository which will be used to train and assess the YOLO model
!git clone https://github.com/AlexeyAB/darknet

In [None]:
# Change the current working directory to the darknet folder
%cd darknet

Before constructing the darknet, edit the makefile to enable GPU, CUDNN, CUDNN_HALF, and OPENCV by changing parameters to 1 to improve performance.

In [None]:
# Construct the darknet for training the model
!make

Download and edit the yolo3 cfg file as specified in the ReadMe file in the darknet github repository:



*   Comment out testing parameters and uncomment training ones
*   Set max batches to (# of classes) * 2000, I have 43 classes
*   Change steps to 80% and 90% of max batches
*   Change the classes parameter to the number of classes at lines 610, 698, and 783
*   Change filters to (# of classes + 5) * 3 at lines 603, 689, and 776


Next you will need to construct a custon.names file and a detector.data file. Make sure that the formatting matches the ones I created. It is important to list the classes in the same order they are categorized within your file.



In [None]:
# After downloading and editing the config file, retrieve initial pretrained weights for a YOLOv3 model
!wget https://pjreddie.com/media/files/darknet53.conv.74

In [None]:
# Train the YOLO model starting with the pretrained weights
!./darknet detector train /content/gdrive/MyDrive/MLBoot/detector.data /content/gdrive/MyDrive/MLBoot/cfg/yolov3-custom.cfg darknet53.conv.74 -dont_show

In [None]:
# Resume earlier training by using weights saved from initial training by changing the initial
# weights from the darknet53.conv.74 to the saved .weights file from your initial training
!./darknet detector train /content/gdrive/MyDrive/MLBoot/detector.data /content/gdrive/MyDrive/MLBoot/cfg/yolov3-custom.cfg /content/gdrive/MyDrive/MLBoot/Backup/yolov3-custom_last.weights -dont_show

In [None]:
# View average precision of the model on all of the classes
!./darknet detector map /content/gdrive/MyDrive/MLBoot/detector.data /content/gdrive/MyDrive/MLBoot/cfg/yolov3-custom.cfg /content/gdrive/MyDrive/MLBoot/Backup/yolov3-custom_last.weights -dont_show

 CUDA-version: 12020 (12020), cuDNN: 8.9.6, CUDNN_HALF=1, GPU count: 1  
 CUDNN_HALF=1 
 OpenCV version: 4.5.4
 0 : compute_capability = 750, cudnn_half = 1, GPU: Tesla T4 
net.optimized_memory = 0 
mini_batch = 1, batch = 16, time_steps = 1, train = 0 
   layer   filters  size/strd(dil)      input                output
   0 Create CUDA-stream - 0 
 Create cudnn-handle 0 
conv     32       3 x 3/ 1    416 x 416 x   3 ->  416 x 416 x  32 0.299 BF
   1 conv     64       3 x 3/ 2    416 x 416 x  32 ->  208 x 208 x  64 1.595 BF
   2 conv     32       1 x 1/ 1    208 x 208 x  64 ->  208 x 208 x  32 0.177 BF
   3 conv     64       3 x 3/ 1    208 x 208 x  32 ->  208 x 208 x  64 1.595 BF
   4 Shortcut Layer: 1,  wt = 0, wn = 0, outputs: 208 x 208 x  64 0.003 BF
   5 conv    128       3 x 3/ 2    208 x 208 x  64 ->  104 x 104 x 128 1.595 BF
   6 conv     64       1 x 1/ 1    104 x 104 x 128 ->  104 x 104 x  64 0.177 BF
   7 conv    128       3 x 3/ 1    104 x 104 x  64 ->  104 x 104 x 128 1.59

My model was trained on 3000 iterations over four hours to achieve an average loss of 0.266

# Creating the final video
To create the final video, I need to extract the frames of the test video and transform them into forms that can be interpreted by my two models. First all frames are compiled into optical flows and sent to the speed model for a batch prediction. This output is then converted to a list, where the index of the list is correlated to the frame. Then, frames are individually fed into the YOLO model, where boxes are drawn around recognized signs and labeled. The YOLO model is only trained to recognize German street signs, so the footage takes place on German roads.


Final Video is a Clip From: https://www.youtube.com/watch?v=sWiq7fPnRbE

In [None]:
# Retrieve trained speed model weights from earlier training and create
# the speed model
speed_model = tf.keras.models.load_model('/content/gdrive/MyDrive/SpeedModel/speed_model.keras')

In [None]:
# The test video has overlay text at the bottom that is constantly updating,
# this may interfere with the optical flow so grey boxes are placed over the
# text. This method only applies to the specific video used for the final output.
def cover_overlay(frame):
  cv2.rectangle(frame, (25, 705), (315, 690), (100, 100, 100), cv2.FILLED)
  cv2.rectangle(frame, (400, 705), (780, 690), (100, 100, 100), cv2.FILLED)

In [None]:
# Takes input file and converts its frames to optical flow. Optical flows are then
# sent to the speed_model to get velocity predictions which are then exported.
def get_final_video_labels(testing_data):
  cap = cv2.VideoCapture(testing_data)
  speed_model_input = []
  while True:
    ret, frame1 = cap.read() # Get the first frame using cv2
    ret, frame2 = cap.read() # Get the second frame
    if frame1 is None or frame2 is None: # Exit loop once out of frames
      break
    cover_overlay(frame1)
    cover_overlay(frame2)
    speed_model_input.append(frames_to_optical_flow(frame1, frame2))
  speed_predictions = speed_model.predict(np.array(speed_model_input))
  cap.release()
  cv2.destroyAllWindows()

  return speed_predictions

In [None]:
# Converts the speed predictions to a list of predicted speeds for each frame
speed_p = get_final_video_labels(final_vid)
frame_speeds = [elem[0] for elem in speed_p for i in range(2)]

In [None]:
# Rounds the output speed predictions to two decimal places and saves them to a new list
speed_f = []
for i in frame_speeds:
 speed_f.append(int(i * 100) / 100)

In [None]:
# Hyperparameters and values used for writing text onto the image frames
vid_di = 416
confThre = 0.5
nmsThre = 0.3

# List of all class names, index of list correlates to class ID
classNames = ['speed limit 20', 'speed limit 30', 'speed limit 50',
'speed limit 60', 'speed limit 70', 'speed limit 80',
'restriction ends 80', 'speed limit 100', 'speed limit 120',
'no overtaking', 'no overtaking (trucks)', 'priority at next intersection',
'priority road', 'give way', 'stop', 'no traffic both ways',
'no trucks', 'no entry', 'danger', 'bend left',
'bend right', 'bend', 'uneven road', 'slippery road',
'road narrows', 'construction', 'traffic signal', 'pedestrian crossing',
'school crossing', 'cycles crossing', 'snow', 'animals',
'restriction ends', 'go right', 'go left', 'go straight',
'go right or straight', 'go left or straight', 'keep right',
'keep left', 'roundabout', 'restriction ends (overtaking)',
'restriction ends (overtaking trucks)']

# Config and weights file from the YOLO training phase
modelConfig = '/content/gdrive/MyDrive/MLBoot/cfg/yolov3-custom.cfg'
modelWeights = '/content/gdrive/MyDrive/MLBoot/Backup/yolov3-custom_last.weights'

In [None]:
# Initialize the model using cv2 and the config and weight created earlier
net = cv2.dnn.readNetFromDarknet(modelConfig, modelWeights)
net.setPreferableBackend(cv2.dnn.DNN_BACKEND_OPENCV)
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CPU)

In [None]:
# Takes the output layer of the model and checks for highest class score
# and if the confidence in that class exceeds the threshold converts the
# dimensions of the box to cv2 usable format and puts the box and the label
# on the frame
def find_objects(outputs, frame):
  height, width, channels = frame.shape
  boxes = []
  classes = []
  confidence = []

  # Iterate through output layers prediction scores for each class
  for output in outputs:
    for objs in outputs:
      for obj in objs:
        scores = obj[5:]
        classIndex = np.argmax(scores)
        class_conf = scores[classIndex]
        if class_conf >= confThre:
          box_w = int(obj[2] * width)
          box_h = int(obj[3] * height)
          box_x = int((obj[0] * width) - box_w/2)
          box_y = int((obj[1] * height) - box_h/2)
          boxes.append([box_x, box_y, box_w, box_h])
          classes.append(classIndex)
          confidence.append(float(class_conf))

  # Index boxes and reduce to one box per sign recognition
  indicies = cv2.dnn.NMSBoxes(boxes, confidence, confThre, nmsThre)

  # Iterate through and draw boxes with labels and confidence levels
  for i in indicies:
    box = boxes[i]
    x, y, w, h = box[0], box[1], box[2], box[3]
    cv2.rectangle(frame, (x, y), (x + w, y + h), (36, 255, 12), 2)
    cv2.putText(frame, f'{classNames[classes[i]].upper()} {int(confidence[i] * 100)}%', (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (36, 255, 12), 2)

In [None]:
# Initializes video file for output
result = cv2.VideoWriter('/content/MLBootfinalvid.avi',
                         cv2.VideoWriter_fourcc(*'MJPG'),
                         30, (1280, 720))

In [None]:
# Reads the video frame by frame and runs object detection on each one
# and adds boxes for detected objects. Prints the associated estimated velocity
# on the frame as well and then writes it to the output file
def object_detection(video_data, speed_labels):
  frame_count = 0
  cap = cv2.VideoCapture(video_data)
  while True:
    ret, frame = cap.read()
    if frame is None or frame_count == len(speed_labels):
      break
    blob = cv2.dnn.blobFromImage(frame, 1/255, (vid_di, vid_di), [0, 0, 0], swapRB=True, crop=False)
    net.setInput(blob)
    outputLayers = net.getUnconnectedOutLayersNames()
    outputs = net.forward(outputLayers)
    find_objects(outputs, frame)
    cv2.rectangle(frame, (0, 0), (225, 80), (200, 200, 200), cv2.FILLED)
    cv2.putText(frame, (str(speed_labels[frame_count]) + ' m/s'), (28, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
    result.write(frame)
    frame_count += 1
    if frame_count % 100 == 0:
      print(frame_count)
  cap.release()
  result.release()
  cv2.destroyAllWindows()

In [None]:
# This kernel runs the object detection model and draws the model output on
# each frame, then the frame is written to a video file
object_detection(final_vid, speed_f)

# Frame Testing
This section is used for testing how the models interact with input frames by outputting predictions and printing them onto a single frame.

In [None]:
# final_input has 8140 frames
test_frame = 8140
testcap = cv2.VideoCapture('/content/final_input.mp4')
total_frames = testcap.get(cv2.CAP_PROP_FRAME_COUNT)
testcap.set(cv2.CAP_PROP_POS_FRAMES, test_frame)
ret, testimg = testcap.read()
blob = cv2.dnn.blobFromImage(testimg, 1/255, (vid_di, vid_di), [0, 0, 0], swapRB=True, crop=False)
net.setInput(blob)
outputLayers = net.getUnconnectedOutLayersNames()
outputs = net.forward(outputLayers)
find_objects(outputs, testimg)
cv2.rectangle(testimg, (0, 0), (225, 80), (200, 200, 200), cv2.FILLED)
cv2.putText(testimg, (str(test_frame) + ' m/s'), (28, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
cv2.rectangle(testimg, (25, 705), (315, 690), (100, 100, 100), cv2.FILLED)
cv2.rectangle(testimg, (400, 705), (780, 690), (100, 100, 100), cv2.FILLED)
testimg = cv2.cvtColor(testimg,cv2.COLOR_BGR2RGB)
testcap.release()
plt.imshow(testimg)

# Final Reflection
The object detection model became surprisingly effective after giving it ample time to train. While it still missed some signs that were driven by, I believe that this recognition issue would be solved if the model was trained for more iterations. The speed detection model seemed to be a bit more sensitive to the quality of video used. Covering up the overlay text seemed to help but it still suffers from occasional spikes in inaccuracy. This is most likely due to the constant passing traffic in the video, making it difficult for the model to tell if the car is moving or if its surroundings are. It seems to be much more accurate during long drives down roads without incoming traffic. Overall, I am very happy with how effective these two models are.

Video Output: https://youtu.be/fLxkel230tQ