In [1]:
from google.colab import drive
drive.mount('/drive')

Mounted at /drive


In [3]:
import glob
import json
import os
import os.path as ops
import shutil
import tqdm

import cv2
import numpy as np

In [None]:
src_dir = './Auto-Piolot/tusimple_lanemark_detection_Challenge '
tar_dir = './Auto-Piolot/lanenet-lane-detection/data'

In [None]:
def process_json_file(json_file_path, src_dir, ori_dst_dir, binary_dst_dir, instance_dst_dir):
    """
    :param json_file_path:
    :param src_dir: origin clip file path
    :param ori_dst_dir:
    :param binary_dst_dir:
    :param instance_dst_dir:
    :return:
    """
    assert ops.exists(json_file_path), '{:s} not exist'.format(json_file_path)

    image_nums = len(os.listdir(ori_dst_dir))

    with open(json_file_path, 'r') as file:
        for line_index, line in enumerate(file):
            info_dict = json.loads(line)

            image_dir = ops.split(info_dict['raw_file'])[0]
            image_dir_split = image_dir.split('/')[1:]
            image_dir_split.append(ops.split(info_dict['raw_file'])[1])
            image_name = '_'.join(image_dir_split)
            image_path = ops.join(src_dir+'/train_set/', info_dict['raw_file'])
            assert ops.exists(image_path), '{:s} not exist'.format(image_path)

            h_samples = info_dict['h_samples']
            lanes = info_dict['lanes']

            image_name_new = '{:s}.png'.format('{:d}'.format(line_index + image_nums).zfill(4))

            src_image = cv2.imread(image_path, cv2.IMREAD_COLOR)
            dst_binary_image = np.zeros([src_image.shape[0], src_image.shape[1]], np.uint8)
            dst_instance_image = np.zeros([src_image.shape[0], src_image.shape[1]], np.uint8)

            for lane_index, lane in enumerate(lanes):
                assert len(h_samples) == len(lane)
                lane_x = []
                lane_y = []
                for index in range(len(lane)):
                    if lane[index] == -2:
                        continue
                    else:
                        ptx = lane[index]
                        pty = h_samples[index]
                        lane_x.append(ptx)
                        lane_y.append(pty)
                if not lane_x:
                    continue
                lane_pts = np.vstack((lane_x, lane_y)).transpose()
                lane_pts = np.array([lane_pts], np.int64)

                cv2.polylines(dst_binary_image, lane_pts, isClosed=False,
                              color=255, thickness=5)
                cv2.polylines(dst_instance_image, lane_pts, isClosed=False,
                              color=lane_index * 50 + 20, thickness=5)

            dst_binary_image_path = ops.join(binary_dst_dir, image_name_new)
            dst_instance_image_path = ops.join(instance_dst_dir, image_name_new)
            dst_rgb_image_path = ops.join(ori_dst_dir, image_name_new)

            cv2.imwrite(dst_binary_image_path, dst_binary_image)
            cv2.imwrite(dst_instance_image_path, dst_instance_image)
            cv2.imwrite(dst_rgb_image_path, src_image)

            print('Process {:s} success'.format(image_name))


def gen_train_sample(src_dir, b_gt_image_dir, i_gt_image_dir, image_dir, val_size):
    """
    generate sample index file
    :param src_dir:
    :param b_gt_image_dir:
    :param i_gt_image_dir:
    :param image_dir:
    :return:
    """
    with open('{:s}/training/train.txt'.format(src_dir), 'w') as file:

        for image_name in tqdm.tqdm(sorted(os.listdir(b_gt_image_dir))[:-val_size]):
            if not image_name.endswith('.png'):
                continue

            binary_gt_image_path = ops.join(b_gt_image_dir, image_name)
            instance_gt_image_path = ops.join(i_gt_image_dir, image_name)
            image_path = ops.join(image_dir, image_name)

            assert ops.exists(image_path), '{:s} not exist'.format(image_path)
            assert ops.exists(instance_gt_image_path), '{:s} not exist'.format(instance_gt_image_path)

            b_gt_image = cv2.imread(binary_gt_image_path, cv2.IMREAD_COLOR)
            i_gt_image = cv2.imread(instance_gt_image_path, cv2.IMREAD_COLOR)
            image = cv2.imread(image_path, cv2.IMREAD_COLOR)

            if b_gt_image is None or image is None or i_gt_image is None:
                continue
            else:
                info = '{:s} {:s} {:s}'.format(image_path, binary_gt_image_path, instance_gt_image_path)
                file.write(info + '\n')

    with open('{:s}/training/val.txt'.format(src_dir), 'w') as file:

        for image_name in tqdm.tqdm(sorted(os.listdir(b_gt_image_dir))[-val_size:]):
            if not image_name.endswith('.png'):
                continue

            binary_gt_image_path = ops.join(b_gt_image_dir, image_name)
            instance_gt_image_path = ops.join(i_gt_image_dir, image_name)
            image_path = ops.join(image_dir, image_name)

            assert ops.exists(image_path), '{:s} not exist'.format(image_path)
            assert ops.exists(instance_gt_image_path), '{:s} not exist'.format(instance_gt_image_path)

            b_gt_image = cv2.imread(binary_gt_image_path, cv2.IMREAD_COLOR)
            i_gt_image = cv2.imread(instance_gt_image_path, cv2.IMREAD_COLOR)
            image = cv2.imread(image_path, cv2.IMREAD_COLOR)

            if b_gt_image is None or image is None or i_gt_image is None:
                continue
            else:
                info = '{:s} {:s} {:s}'.format(image_path, binary_gt_image_path, instance_gt_image_path)
                file.write(info + '\n')
    return


def process_tusimple_dataset(src_dir, tar_dir):
    """

    :param src_dir:
    :return:
    """
    traing_folder_path = ops.join(tar_dir, 'training')
    testing_folder_path = ops.join(tar_dir, 'testing')

    os.makedirs(traing_folder_path, exist_ok=True)
    os.makedirs(testing_folder_path, exist_ok=True)

    for json_label_path in glob.glob('{:s}/train_set/label*.json'.format(src_dir)):
        json_label_name = ops.split(json_label_path)[1]

        shutil.copyfile(json_label_path, ops.join(traing_folder_path, json_label_name))

    for json_label_path in glob.glob('{:s}/test_set/test*.json'.format(src_dir)):
        json_label_name = ops.split(json_label_path)[1]

        shutil.copyfile(json_label_path, ops.join(testing_folder_path, json_label_name))

    gt_image_dir = ops.join(traing_folder_path, 'gt_image')
    gt_binary_dir = ops.join(traing_folder_path, 'gt_binary_image')
    gt_instance_dir = ops.join(traing_folder_path, 'gt_instance_image')

    os.makedirs(gt_image_dir, exist_ok=True)
    os.makedirs(gt_binary_dir, exist_ok=True)
    os.makedirs(gt_instance_dir, exist_ok=True)

    for json_label_path in glob.glob('{:s}/*.json'.format(traing_folder_path)):
        process_json_file(json_label_path, src_dir, gt_image_dir, gt_binary_dir, gt_instance_dir)
    val_size = 326
    gen_train_sample(tar_dir, gt_binary_dir, gt_instance_dir, gt_image_dir, val_size)

    return

In [None]:
process_tusimple_dataset(src_dir, tar_dir)

100%|██████████| 3300/3300 [30:22<00:00,  1.81it/s]
100%|██████████| 326/326 [02:59<00:00,  1.81it/s]


In [None]:
%cd /drive/MyDrive/new_projects/lane_detection/lanenet-lane-detection

/drive/MyDrive/new_projects/lane_detection/lanenet-lane-detection


In [None]:
!pip install loguru

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting loguru
  Downloading loguru-0.6.0-py3-none-any.whl (58 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/58.3 KB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m58.3/58.3 KB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: loguru
Successfully installed loguru-0.6.0


In [None]:
from data_provider import lanenet_data_feed_pipline
from local_utils.log_util import init_logger

In [None]:
LOG = init_logger.get_logger(log_file_name_prefix='generate_tusimple_tfrecords')

In [None]:
producer = lanenet_data_feed_pipline.LaneNetDataProducer()

B: /drive/MyDrive/new_projects/lane_detection/lanenet-lane-detection/data/training/gt_binary_image
I: /drive/MyDrive/new_projects/lane_detection/lanenet-lane-detection/data/training/gt_instance_image
M: /drive/MyDrive/new_projects/lane_detection/lanenet-lane-detection/data/training/gt_image
B: True
I: True
M: True


In [None]:
producer.generate_tfrecords()

2023-01-29 17:01:32.783 | INFO     | data_provider.lanenet_data_feed_pipline:generate_tfrecords:91 - Start generating training example tfrecords
2023-01-29 17:01:33.236 | INFO     | data_provider.tf_io_pipline_tools:write_example_tfrecords:59 - Writing /drive/MyDrive/new_projects/lane_detection/lanenet-lane-detection/data/training/tfrecords/tusimple_train.tfrecords....
2023-01-29 17:33:36.087 | INFO     | data_provider.tf_io_pipline_tools:write_example_tfrecords:104 - Writing /drive/MyDrive/new_projects/lane_detection/lanenet-lane-detection/data/training/tfrecords/tusimple_train.tfrecords complete
2023-01-29 17:33:36.105 | INFO     | data_provider.lanenet_data_feed_pipline:generate_tfrecords:105 - Generating training example tfrecords complete
2023-01-29 17:33:36.112 | INFO     | data_provider.lanenet_data_feed_pipline:generate_tfrecords:108 - Start generating validation example tfrecords
2023-01-29 17:33:36.268 | INFO     | data_provider.tf_io_pipline_tools:write_example_tfrecords:59 

IndexError: ignored