In [57]:
import numpy as np
import pandas as pd
import cv2
import matplotlib.pyplot as plt
from pathlib import Path
import os
from tqdm import tqdm
import pickle
from copy import deepcopy
import uuid
from urllib.request import urlretrieve
from zipfile import ZipFile

In [92]:
ZIP_TRAIN_PATH = "../raw_data/train.pickle.zip"
ZIP_VALID_PATH = "../raw_data/valid.pickle.zip"
ZIP_TEST_PATH = "../raw_data/test.pickle.zip"

In [93]:
def unzip_file(src, dest):
    with ZipFile(src, "r") as f:
        f.extractall(dest)

In [95]:
unzip_file(ZIP_TRAIN_PATH, "../raw_data/")
unzip_file(ZIP_VALID_PATH, "../raw_data/")
unzip_file(ZIP_TEST_PATH, "../raw_data/")

In [58]:
TRAIN_PATH = "../raw_data/train.pickle"
VALID_PATH = "../raw_data/valid.pickle"
TEST_PATH = "../raw_data/test.pickle"

In [59]:
def load_file(path):
    with open(path, 'rb') as f:
        d = pickle.load(f, encoding='latin1')  
    x = d['features'].astype(np.uint8)   # 4D numpy.ndarray type, for train = (34799, 32, 32, 3)
    y = d['labels']                        # 1D numpy.ndarray type, for train = (34799,)
    s = d['sizes']                         # 2D numpy.ndarray type, for train = (34799, 2)
    c = d['coords']  
    return x, y, s, c


In [60]:
train_data, valid_data, test_data = {}, {}, {}
train_data["x"], train_data["y"], _, train_data["c"] = load_file(TRAIN_PATH)
valid_data["x"], valid_data["y"], _, valid_data["c"] = load_file(VALID_PATH)
test_data["x"], test_data["y"], _, test_data["c"] = load_file(TEST_PATH)

In [61]:
def display(img, coordinates):
    new_img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    cv2.rectangle(new_img, 
                  (coordinates[0], coordinates[1]),
                  (coordinates[0] + coordinates[2], coordinates[1] + coordinates[3]),
                  (255,0,0),
                  1
                 )
    while True:
        cv2.imshow("img", new_img)
        if cv2.waitKey(0) & 0xFF == 27:
            cv2.destroyAllWindows()
            break

In [62]:
# display(train_data["x"][4000], train_data["c"][4000])

In [63]:
labels = pd.read_csv("../raw_data/label_names.csv")

In [64]:
labels

Unnamed: 0,ClassId,SignName
0,0,Speed limit (20km/h)
1,1,Speed limit (30km/h)
2,2,Speed limit (50km/h)
3,3,Speed limit (60km/h)
4,4,Speed limit (70km/h)
5,5,Speed limit (80km/h)
6,6,End of speed limit (80km/h)
7,7,Speed limit (100km/h)
8,8,Speed limit (120km/h)
9,9,No passing


# Augmentation 1 - Local Histogram Equalization

In [65]:
def local_histogram_equalization(img):
    new_img = np.zeros(img.shape, dtype=np.uint8)
    new_img[:,:,0] = cv2.equalizeHist(img[:,:,0])
    new_img[:,:,1] = cv2.equalizeHist(img[:,:,1])
    new_img[:,:,2] = cv2.equalizeHist(img[:,:,2])
    return new_img

In [66]:
new_img = local_histogram_equalization(train_data['x'][4000])

In [67]:
# display(new_img, train_data['c'][4000])

# Augmentation 2 - Changing Brightness

In [68]:
def change_brightness(img):
    img_hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
    img_hsv[:,:,2] = img_hsv[:,:,2] * (0.5 + np.random.uniform(size=(img_hsv.shape[:-1])))
    img_rgb = cv2.cvtColor(img_hsv, cv2.COLOR_HSV2RGB)
    return img_rgb

In [69]:
new_img = change_brightness(train_data['x'][4000])

In [70]:
# display(new_img, train_data['c'][4000])

# Augmentation 3 - Rotating Image

In [71]:
def rotation_changing(image):
    # Defining angle range
    angle_range = 30
    # Defining angle rotation
    angle_rotation = np.random.uniform(angle_range) - angle_range / 2
    # Getting shape of image
    rows, columns, channels = image.shape
    # Implementing rotation
    # Calculating Affine Matrix
    affine_matrix = cv2.getRotationMatrix2D((columns / 2, rows / 2), angle_rotation, 1)
    # Warping original image with Affine Matrix
    rotated_image = cv2.warpAffine(image, affine_matrix, (columns, rows))
    # Returning rotated image
    return rotated_image

In [72]:
new_img = rotation_changing(train_data['x'][4000])

In [73]:
# display(new_img, train_data['c'][4000])

# Preprocessing Data

In [74]:
def shuffle(data, seed=0):
    new_data = deepcopy(data)
    np.random.seed(seed)
    np.random.shuffle(new_data["x"])
    np.random.seed(seed)
    np.random.shuffle(new_data["y"])
    np.random.seed(seed)
    np.random.shuffle(new_data["c"])
    return new_data

In [75]:
def preprocess(data, shuffle=False, lhe=False, rotate=False, brightness=False):
    if shuffle:
        data = shuffle(data)
    if lhe:
        data["x"] = list(map(local_histogram_equalization, tqdm(data["x"])))
    if rotate:
        data["x"] = list(map(rotation_changing, tqdm(data["x"])))
    if brightness:
        data["x"] = list(map(rotation_changing, tqdm(data["x"])))
    return data

In [76]:
def join_data(data, augmented_data):
    data["x"] = np.r_[data["x"], augmented_data["x"]]
    data["y"] = np.r_[data["y"], augmented_data["y"]]
    data["c"] = np.r_[data["c"], augmented_data["c"]]
    return data

In [77]:
def generate_augmented_data(data):
    augmented_data = None
    list_kwargs = [
        {"shuffle": False, "lhe": True, "rotate": False, "brightness": False},
        {"shuffle": False, "lhe": False, "rotate": True, "brightness": False},
        {"shuffle": False, "lhe": False, "rotate": False, "brightness": True},
        {"shuffle": False, "lhe": True, "rotate": True, "brightness": False},
        {"shuffle": False, "lhe": True, "rotate": False, "brightness": True},
        {"shuffle": False, "lhe": False, "rotate": True, "brightness": True},
        {"shuffle": False, "lhe": True, "rotate": True, "brightness": True}
    ]
    for kwargs in list_kwargs:
        data = shuffle(data, np.random.randint(0,100))
        data_top = {}
        data_top["x"] = data["x"][:int(0.2*len(data["x"]))]
        data_top["y"] = data["y"][:int(0.2*len(data["x"]))]
        data_top["c"] = data["c"][:int(0.2*len(data["x"]))]
        new_data = preprocess(data_top, **kwargs)
        if augmented_data is None:
            augmented_data = new_data
        else:
            augmented_data = join_data(new_data, augmented_data)
    return join_data(new_data, augmented_data)

In [78]:
augmented_train_data = generate_augmented_data(train_data)
augmented_valid_data = generate_augmented_data(valid_data)

100%|███████████████████████████████████████████████████████████████████████████| 6959/6959 [00:00<00:00, 12541.80it/s]
100%|████████████████████████████████████████████████████████████████████████████| 6959/6959 [00:00<00:00, 9129.31it/s]
100%|████████████████████████████████████████████████████████████████████████████| 6959/6959 [00:00<00:00, 7024.75it/s]
100%|███████████████████████████████████████████████████████████████████████████| 6959/6959 [00:00<00:00, 11315.46it/s]
100%|████████████████████████████████████████████████████████████████████████████| 6959/6959 [00:00<00:00, 8155.66it/s]
100%|███████████████████████████████████████████████████████████████████████████| 6959/6959 [00:00<00:00, 13346.77it/s]
100%|████████████████████████████████████████████████████████████████████████████| 6959/6959 [00:00<00:00, 9701.07it/s]
100%|████████████████████████████████████████████████████████████████████████████| 6959/6959 [00:00<00:00, 9652.43it/s]
100%|███████████████████████████████████

# Labelling Of Data

In [79]:
def convert_coordinates_to_yolo_annotation(coordinates, label, shape):
    dh, dw, _ = shape
    x,y,w,h = coordinates
    x_final = x + w if x + w < dw else dw
    y_final = y + h if y + h < dh else dh
    x_mean = (x + x_final) / (dw * 2)
    y_mean = (y + y_final) / (dh * 2)
    w_norm = (x_final - x) / dw
    h_norm = (y_final - y) / dh
    return [" ".join([str(label), str(x_mean), str(y_mean), str(w_norm), str(h_norm)])]

def convert_yolo_annotations_to_coordinates(annotation, shape):
    dh, dw, _ = shape
    _, x, y, w, h = [float(i) for i in annotation[0].strip().split()]
    x_start = (x - w / 2) * dw
    y_start = (y - h / 2) * dh
    return np.array([x_start, y_start, w * dw, h * dh], dtype=np.uint8)

In [80]:
annotation = convert_coordinates_to_yolo_annotation(train_data['c'][4000],
                                       train_data['y'][4000],
                                       train_data['x'][4000].shape
                                      )

In [81]:
new_coords = convert_yolo_annotations_to_coordinates(annotation, train_data['x'][4000].shape)

In [82]:
# display(train_data['x'][4000] ,new_coords)

# Saving new data to file 

In [83]:
def create_directory_structure(root):
    try:
        os.makedirs(root, exist_ok=True)
        folders = ["train", "valid", "test"]
        for folder in folders:
            path_to_folder = root / folder
            os.makedirs(path_to_folder, exist_ok=True)
            os.makedirs(path_to_folder / "images", exist_ok=True)
            os.makedirs(path_to_folder / "labels", exist_ok=True)
        return True
    except:
        return False

In [84]:
ROOT_DIR = Path("../data/")
create_directory_structure(ROOT_DIR)

True

In [85]:
def save_images_and_labels(path, img, label, coordinates):
    try:
        name = str(uuid.uuid1())
        img_name = name + ".jpg"
        label_name = name + ".txt"
        img_path = path / "images" / img_name
        label_path = path / "labels" / label_name
        cv2.imwrite(str(img_path), img)
        with open(label_path, "w") as f:
            f.writelines(convert_coordinates_to_yolo_annotation(coordinates, labels, img.shape))
        return True
    except Exception as e:
        return str(e)

In [86]:
TRAIN_FOLDER = ROOT_DIR / "train"
VALID_FOLDER = ROOT_DIR / "valid"
TEST_FOLDER = ROOT_DIR / "test"
train_result = list(map(lambda img, label, coordinates:save_images_and_labels(TRAIN_FOLDER, img, label, coordinates), tqdm(augmented_train_data["x"]), augmented_train_data["y"], augmented_train_data["c"]))   
valid_result = list(map(lambda img, label, coordinates:save_images_and_labels(VALID_FOLDER, img, label, coordinates), tqdm(augmented_valid_data["x"]), augmented_valid_data["y"], augmented_valid_data["c"]))    
test_result = list(map(lambda img, label, coordinates:save_images_and_labels(TEST_FOLDER, img, label, coordinates), tqdm(test_data["x"]), test_data["y"], test_data["c"]))                           

100%|███████████████████████████████████████████████████████████████████████████| 97426/97426 [12:49<00:00, 126.61it/s]
100%|███████████████████████████████████████████████████████████████████████████| 12348/12348 [01:31<00:00, 134.58it/s]
  x_final = x + w if x + w < dw else dw
  w_norm = (x_final - x) / dw
  x_final = x + w if x + w < dw else dw
  w_norm = (x_final - x) / dw
  x_final = x + w if x + w < dw else dw
  w_norm = (x_final - x) / dw
100%|███████████████████████████████████████████████████████████████████████████| 12630/12630 [01:23<00:00, 152.16it/s]


In [87]:
count = 0
for i in test_result:
    if i == False:
        count += 1
print(count)

0


In [88]:
nc = len(labels)

In [89]:
nc

43

In [90]:
classes = str(list(labels["SignName"]))

In [91]:
desc = f"""train: ../train/images
val: ../valid/images
test: ../test/images

nc: {nc}
names: {classes}
"""

with open("../data/data.yaml", "w") as f:
    f.write(desc)