Script for collecting dataset. Records images from the screen to the "image" folder and data from the steering wheel or joystick in the csv. For matching data from the joystick and images, the name of the images is the time they are received. The script works only on Linux.

In [None]:
from Xlib import display, X
from PIL import Image
import cv2
import numpy as np
import pandas as pd
from multiprocessing import Queue, Process
import time

from js_linux import Get_joy # Function for data collecting from joystick

dsp = display.Display()
root = dsp.screen().root

queue_road = Queue()
queue_joy = Queue()

roi = [2220,325,400,300]

In [None]:
class Get_Screen(Process):
    
    def get_screen(self, roi):
        '''Get screen with Xlib from roi (startX, startY, width, heigh)'''
        raw = root.get_image(roi[0], roi[1], roi[2],roi[3], X.ZPixmap, 0xffffffff)
        image = Image.frombytes("RGB", (roi[2], roi[3]), raw.data, "raw", "RGBX")
        image = np.array(image.getdata(),dtype='uint8')\
        .reshape((image.size[1],image.size[0],3))
        return image
    
    
    def __init__(self, queue_road, roi):
        Process.__init__(self)
        self.queue_road = queue_road
        self.roi = roi
        
        
    def run(self):
        
        while 1:
            self.screen = self.get_screen(self.roi)
            self.queue_road.put(self.screen)

In [None]:
import pickle

with open('km.pickle', 'rb') as f:
    """Loading pretrained k-means"""
    km = pickle.load(f)

In [None]:
class Main(Process):

    def MeanShift(self, img):
        res = cv2.pyrMeanShiftFiltering(img, 1, 50)
        return res

    def use_km(self, img, orig_img):
            #get indeces of non-ziro color
            indices = np.argwhere(img>1)
            """
            Cluster prediction
            Clusters 2 and 3 are marking lines, others are noise
            """
            try:
                """
                Gets the pixels belonging to the line clusters
                """
                lines = km.predict(indices)
                left_indx = np.argwhere(lines==1)
                right_indx = np.argwhere((lines==0) | (lines==2))
                """
                Get pixel coordinates of lines
                """
                left = np.take(indices, left_indx, axis=0).reshape(left_indx.shape[0], 2)
                right = np.take(indices, right_indx, axis=0).reshape(right_indx.shape[0], 2)
                """
                and fill them with color
                other pixels are set to zero
                """
                orig_img[:, :] = 0
                orig_img[right[:,0], right[:,1]] = 255
                orig_img[left[:,0], left[:,1]] = 255

            except:
                orig_img[:, :] =0

            return orig_img

    def img_proceed(self):
        '''
        processing image: 
        resize->mean shift->gray->blur->trashhold->dilate->k-means
        The image is taken from Get_screen process
        '''
        orig_img = self.queue.get()

        orig_img = cv2.resize(orig_img, (256,256))

        orig_img = self.MeanShift(orig_img)

        gray = cv2.cvtColor(orig_img, cv2.COLOR_RGB2GRAY)

        processed_img = cv2.GaussianBlur(gray, (5, 5), 0)
        processed_img = cv2.adaptiveThreshold(processed_img, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, 25, -22)
        processed_img = cv2.dilate(processed_img, None, iterations = 1)

        #removing noise by using k-means
        processed_img = self.use_km(processed_img, processed_img)

        return processed_img

    
    def __init__(self, queue):
        
        Process.__init__(self)
        self.queue = queue

        
    def run(self):
        
        while 1:
            result_img = self.img_proceed()

            cv2.imwrite("./images/" + str(time.time()) + ".jpg", result_img)
            cv2.imshow('frame', result_img)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
                
        cv2.destroyAllWindows()

In [None]:
# process for collecting data from the joystick
joy = Get_joy(queue_joy)
Get_joy.start(joy)

#process for grabbing the screen
get_screen = Get_Screen(queue_road, roi)
Get_Screen.start(get_screen)

#main process processing the image
main = Main(queue_road)
Main.start(main)