DATA SET COLLECTOR

In [18]:
import gym
import numpy as np
import gym_donkeycar
import cv2
import os
import matplotlib.pyplot as plt
from functions import *
import warnings
import csv
import time
import datetime
warnings.filterwarnings("ignore")

In [35]:
def is_port_in_use(port):
    import socket
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        return s.connect_ex(('localhost', port)) == 0

exe_path = "C:/Users/Acer/Desktop/Research/DonkeySimWin1/donkey_sim.exe"
port = 9091 

cam = (512, 512, 3) 
conf = {"exe_path": exe_path, "port": port, "cam_resolution" :cam,'cam_config':{'img_w': cam[0], 'img_h': cam[1], 'img_d': cam[2]} }
 
if not is_port_in_use(port):
    env = gym.make("donkey-generated-roads-v0", conf=conf)
    obs = env.reset() 
else:
    print(f"Simulator is already running on port {port}.")
    env = gym.make("donkey-generated-roads-v0", conf=conf)
    obs = env.reset() 

starting DonkeyGym env
Setting default: start_delay 5.0
Setting default: max_cte 8.0
Setting default: frame_skip 1
Setting default: log_level 20
Setting default: host localhost
Setting default: steer_limit 1.0
Setting default: throttle_min 0.0
Setting default: throttle_max 1.0
donkey subprocess started


INFO:gym_donkeycar.core.client:connecting to localhost:9091 


loading scene generated_road


INFO:gym_donkeycar.envs.donkey_sim:on need car config
INFO:gym_donkeycar.envs.donkey_sim:sending car config.
INFO:gym_donkeycar.envs.donkey_sim:done sending cam config. {'img_w': 512, 'img_h': 512, 'img_d': 3}
INFO:gym_donkeycar.envs.donkey_sim:sim started!


CALIBRATE HSV VALUES

The code below returns the values for HSV masking and points for warping

In [5]:
action = np.array([0, 0]) 
obs, reward, done, info = env.step(action)
img = cv2.cvtColor(obs,cv2.COLOR_RGB2BGR) 
parameters = calibrate(img)
print("HSV Min Range: {}".format(parameters[0]))
print("HSV Max Range: {}".format(parameters[1]))
print("Warp Points: {}".format(parameters[2]))
obs = env.reset()

HSV Min Range: [0, 0, 0]
HSV Max Range: [255, 255, 255]
Warp Points: [[0, 256], [512, 256], [512, 0], [0, 0]]


In [12]:
# Now once the calibrated values are obtained , the values are kept in below function
def process(image):
    hsv_min_value = [5, 0, 166]
    hsv_max_value = [121, 180, 255]
    warp_points =  [[26, 327], [486, 327], [339, 238], [173, 238]]
    
    height, width = image.shape[:2]
    destination_points = [[0, height], [width, height], [width, 0], [0, 0]]
    target_points = np.float32(warp_points)
    destination = np.float32(destination_points)
    matrix= cv2.getPerspectiveTransform(target_points, destination)
    inv_matrix = cv2.getPerspectiveTransform(destination,target_points)
    warped_result = cv2.warpPerspective(image, matrix, (width, height))  
    hsv = cv2.cvtColor(warped_result, cv2.COLOR_BGR2HSV)
    # Create a mask based on the HSV thresholds

    hsv_min = np.array(hsv_min_value)
    hsv_max = np.array(hsv_max_value)
    mask = cv2.inRange(hsv, hsv_min, hsv_max)
    kernel = np.ones((5, 5), np.uint8)
    erosion = cv2.erode(mask, kernel, iterations=1)
    dilation_mask = cv2.dilate(erosion, kernel, iterations=1)
            # Display the images
    return warped_result,dilation_mask

In [9]:
# TESTING LANE AND CURVATURE

In [30]:
# Create Dataset Folder 
data_folder_path = 'Data'
img_folder_path = os.path.join(data_folder_path, 'IMG')

# Check if 'Data' folder already exists, if not, create it
if not os.path.exists(data_folder_path):
    os.makedirs(data_folder_path)
    print(f'Folder "{data_folder_path}" created.')

# Check if 'IMG' folder inside 'Data' already exists, if not, create it
if not os.path.exists(img_folder_path):
    os.makedirs(img_folder_path)
    print(f'Folder "{img_folder_path}" created.')
else:
    print(f'Folder "{img_folder_path}" already exists.')


Folder "Data\IMG" already exists.


In [36]:
steering_angle = 0
throttle_speed = 0.1
now = datetime.datetime.now()
file_path = now.strftime("%Y_%m_%d_") + now.strftime("%H_%M_%S")
file_counter = 0

with open(data_folder_path + "/driving_log.csv", "a", newline="") as csvfile:
    log_writer = csv.writer(csvfile)
    while True:
        action = np.array([steering_angle, throttle_speed]) 
        obs, reward, done, info = env.step(action)
        img = cv2.cvtColor(obs,cv2.COLOR_RGB2BGR)
        data_img = cv2.resize(img, (480, 320))
        warped_result,mask = process(img)
        img = draw_steering_indicator(img, steering_angle, radius = 100)
        left_points,right_points,msk = sliding_window(mask)
        steering_angle = find_steering_angle(left_points,right_points, msk.shape[1]) * 0.5
        combined = create_combined_frame([img,warped_result,cv2.merge([mask, mask, mask]),msk])
        cv2.imshow("Combined Image",combined)
        file_name = str(file_path)+ "_" + str(file_counter) + ".jpg"
        img_path = img_folder_path + "/" + str(file_name)
        log_writer.writerow([file_name, steering_angle])
        file_counter = file_counter + 1
        cv2.imwrite(img_path, data_img)
        k = cv2.waitKey(10)
        if k == ord('e') :
            action = np.array([0, 0])
            cv2.destroyAllWindows()
            break
 
obs = env.reset()    
cv2.destroyAllWindows()

In [20]:
# Run this code if code hangs up
obs = env.reset()
cv2.destroyAllWindows()

In [37]:
env.close()

closing donkey sim subprocess
