In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import datetime, os
from pathlib import Path
import cv2
from typing import List
import pickle
import math

# Shared Functions

In [None]:
# Capture frames from the video for training and testing purpose
# From this tutorial:
# https://www.tutorialexample.com/python-capture-images-from-video-by-frames-using-opencv-a-complete-guide/

VIDEO_PATH = Path('video/original.mp4')
FRAME_FOLDER_PATH = Path('data/video_frames')

def capture_frames(
    num_captures: int = 10,
    frame_frequency: int = 100,
    video_path=VIDEO_PATH,
    frame_folder_path=FRAME_FOLDER_PATH,
):
    """Capture frames from the given video at a given frame frequency.

    Captured frames will be stored in the folder specified by `frame_folder_path`. Each image
    is labeled as {index}.png

    :param frame_frequency: The frequency at which frames are captured as image.
    :param video_path: A Path object to the video.
    :param frame_folder_path: A Path object to the folder where the captured frames will be stored.
    """
    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        exit(0)
    total_frame = 0
    idx = 0
    while idx < num_captures:
        ret, frame = cap.read()
        if ret is False:
            break
        total_frame += 1
        if total_frame % frame_frequency == 0:
            image_path = frame_folder_path.joinpath(f'{idx:02}.png')
            cv2.imwrite(str(image_path), frame)
            print(image_path)
            idx += 1
    cap.release()


def show_images(images, number_of_images, cmap=None, fig_size_base=(20, 10), file_name=''):
    """Show images in a column.

    :param images: A numpy array of images. Each image must be converted to an array already.
    :param number_of_images: number of images to show. All images will be shown in a column.
    :param cmap: Color map. If the image is grey scale, pass 'grey', otherwise leave as None.
    :param fig_size_base: The base scale of figure size. Each figure is considered to be 20
        in width and 10 in height.
    """
    if number_of_images > 1:
        fig, axes = plt.subplots(
            number_of_images,
            1,
            figsize=(fig_size_base[0], number_of_images * fig_size_base[1]),
        )
        for image, ax in zip(images, axes.flatten()):
            # Since the channels in cv2 are in the order of BGR, in order to fit
            # the more conventional RGB scheme for matplotlib, we convert the color
            # before plotting. See this SO answer
            # https://stackoverflow.com/a/39316695/9723036
            ax.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB), cmap=cmap)
            ax.set_axis_off()
    else:
        fig, ax = plt.subplots(1, 1, figsize=(30, 30))
        ax.imshow(cv2.cvtColor(images, cv2.COLOR_BGR2RGB), cmap=cmap)
        ax.set_axis_off()
    plt.tight_layout()
    if file_name:
        plt.savefig(file_name)
    else:
        plt.show()


def select_color_then_gray_scale(image, color_bgr: List, plusminus: int = 40): 
    """Select a specific color parts in the image.

    The image is first masked for the target color, and then returned as
    the gray scaled version.

    :param image: The image object obtained from cv2.imread().
    :param color_bgr: A sample BGR value, as a list, for the color to be
        selected.
    :param plusminus: A numeric value to add to or subtract from the color_bgr
        to create a range when selecting color from the image. This value is
        hardcoded in this function, but there are non-hard-coded method, as
        explained here: https://www.learnopencv.com/color-spaces-in-opencv-cpp-python/
    :return: A new image with the given color selected and then gray-scaled.
    """
    color = np.uint8(color_bgr)
    mask = cv2.inRange(image, color - plusminus, color + plusminus)
    return cv2.cvtColor(
        cv2.bitwise_and(image, image, mask=mask),
        cv2.COLOR_BGR2GRAY,
    )


def detect_edges(image, h_low: int, h_high: int):
    """Detect edges in a gray scaled image.

    :param image: The image to have its edges detected.
    :param h_low: The hysteresis lower threshold.
    :param h_high: The hysteresis higher threshold.
    :return: A new image with its edge detected.
    """
    return cv2.Canny(image, h_low, h_high)


def blackout(image, vertices):
    """Black out the part of the image enclosed by a polygon specified by vertices.

    This is used to remove unnecessary cluter on the image and focus on the area
    of interest. The vertices need to be manually created.

    :param image: The image to perform the blackout.
    """
    mask = np.zeros_like(image)
    if len(mask.shape) == 2:
        cv2.fillPoly(mask, vertices, 255)
    else:
        cv2.fillPoly(mask, vertices, (255,) * mask.shape[2])
    return cv2.bitwise_and(image, mask)


def focus(image, percent_vertices):
    """Focus on the polygon defined by the percent_vertices.
    
    Everything else will be blacked out.

    :param image: The image to be focused.
    :param percent_vertices: The coordinates of the polygon vertices in terms of
        the percentage of the rows and columns. e.g. if a percent_vertices[0] =
        [0.1, 0.8], that means the vertex is at 10% of the total row and 80% of
        the total column, i.e. on the upper-right corner.
    :return: The focused image after black out.
    """
    r, c = image.shape[:2]
    # Manuall create the vertices from percent_vertices
    vertices = np.array(
        [[[int(c * cp), int(r * rp)] for rp, cp in percent_vertices]],
        dtype=np.int32,
    )
    return blackout(image, vertices)


def line_detection(
    edge_image,
    original_image,
    vote_threshold,
    min_line_length,
    max_line_gap,
):
    """Detect vertical lines that are the parking demarcations.

    From the canny edge output, we run cv2.HoughLinesP to probabilistically
    detect lines. We use vote_threshold, min_line_length, and max_line_gap
    to control how strict the criteria are for line detection. Tuning these
    variables is a manual labor. Each time a new set of lines are detected,
    we add them to the original image for inspection. We also put in the
    original image, right next to the vertical lines, the x and y coordinates.
    We create two images, one with the x coordinate and the other the two
    y coordinates. The purpose is to manually pickout the key markers of
    each parking row such that we can compute the total parking lot map
    from these key markers (e.g. edge of a parking row, height and width
    of a parking spot, etc.)

    Note that we have also a requirement that the line must be vertical.
    This is achieved by the conditional check -1 <= x1 - x2 <= 1

    :param edge_image: The image after the process of Canny edge detector.
    :param original_image: The original image where the lines will be drawn.
    :param vote_threshold: Min number of points on a line for the line to be
        considered a line by the HoughLinesP.
    :param min_line_length: Lines shorter than this value are discarded.
    :param max_line_gap: Lines with a width larger than this value are discarded.
    :return: Two copied original images with lines drawn on it. The first one labeled
        by x coordinate and the second one y coordinate.
    """
    lines = cv2.HoughLinesP(
        edge_image,
        1,  # rho accuracy, unit pixel
        np.pi / 180,  # theta accuracy, unit degree
        vote_threshold,
        minLineLength=min_line_length,
        maxLineGap=max_line_gap,
    )
    lines = lines.reshape(-1, 4).tolist()
    vert_lines = [tup for tup in lines if abs(tup[0] - tup[2]) <= 1]
    copy_images = [np.copy(original_image), np.copy(original_image)]
    for x1, y1, x2, y2 in vert_lines:  # x coord
        cv2.line(copy_images[0], (x1, y1), (x2, y2), [0, 0, 255], 2)
        cv2.putText(
            copy_images[0],
            str((x1 + x2) // 2),  # text
            (x1, min(y1, y2)),  # lower left coordinates
            cv2.FONT_HERSHEY_SIMPLEX,  # font
            0.3,  # font scale
            (0, 255, 255),  # color
        )
    for x1, y1, x2, y2 in vert_lines:  # y coord
        cv2.line(copy_images[1], (x1, y1), (x2, y2), [0, 0, 255], 2)
        cv2.putText(  # line upper bound
            copy_images[1],
            str(min(y1, y2)),  # text
            (x1, min(y1, y2)),  # lower left coordinates
            cv2.FONT_HERSHEY_SIMPLEX,  # font
            0.3,  # font scale
            (0, 255, 0),  # color
        )
        cv2.putText(  # line lower bound
            copy_images[1],
            str(max(y1, y2)),  # text
            (x1, max(y1, y2)),  # lower left coordinates
            cv2.FONT_HERSHEY_SIMPLEX,  # font
            0.3,  # font scale
            (255, 0, 0),  # color
        )
    return np.array(copy_images)


def draw_bounding_box(original_images, spot_dict_pickle_name: str):
    """Draw bounding boxes on the image to identiy how good the spot detection is.

    :param original_images: The original unadultered images.
    :param spot_dict_pickle_name: Name of the pickle file containing the coordinates
        of all identified spots.
    :return: Images with the bounding boxes drawn.
    """
    copy_images = [np.copy(img) for img in original_images]
    with open(spot_dict_pickle_name, 'rb') as f_obj:
        spot_dict = pickle.load(f_obj)
    for img in copy_images:
        for x1, y1, x2, y2 in spot_dict.keys():
            cv2.rectangle(img, (x1, y1), (x2, y2), (0, 0, 255), 2)
    return np.array(copy_images)


def save_spot_images(original_images_ids, spot_dict_pickle_name: str, save_path: str = 'temp') -> None:
    """Save the spot image from each spot coordinate specified in spot_dict.

    This is how we create all the training samples.

    :param original_images_ids: The original unadultered images along with its ids.
    :param spot_dict_pickle_name: Name of the pickle file containing the coordinates
        of all identified spots.
    """
    with open(spot_dict_pickle_name, 'rb') as f_obj:
        spot_dict = pickle.load(f_obj)
    for img, img_id in original_images_ids:
        for spot_id, (x1, y1, x2, y2) in enumerate(spot_dict.keys()):
            spot_img = img[y1:y2, x1:x2]
            file_name = f'f{img_id:02}_s{spot_id:03}.png'
            cv2.imwrite(f'{save_path}/{file_name}', spot_img)
            print(f'{file_name} saved!')

# Capture Frames

In [None]:
capture_frames()

# Obtain All Images

In [None]:
all_image_paths = sorted(FRAME_FOLDER_PATH.glob('*.png'))
# Use cv2 to read images. All images shall be read via cv2.
all_images = np.array([
    [cv2.imread(str(p)), i] for i, p in enumerate(all_image_paths)
])

# Show Sample Images

In [None]:
IMAGE_INDICES = np.array([3, 8])
images = all_images[IMAGE_INDICES, 0]
show_images(images, images.shape[0], file_name='images/original_images.png')
print(f'Images shown are indices: {all_images[IMAGE_INDICES, 1]}')

# Select White Color

In [None]:
white_bgr = [200, 200, 200]
whites = np.array([select_color_then_gray_scale(img, white_bgr) for img in images])
show_images(whites, whites.shape[0], file_name='images/white_selected.png')

# Edge Detection

In [None]:
white_edges = np.array([detect_edges(img, 60, 150) for img in whites])
show_images(white_edges, white_edges.shape[0], file_name='images/edge_detection.png')

# Focus on Area of Interest

In [None]:
percent_vertices = np.array([  # manually acquired
    [0.01, 0.20],
    [0.01, 0.77],
    [0.94, 0.77],
    [0.94, 0.20],
])

focused_edges = np.array([focus(img, percent_vertices) for img in white_edges])
show_images(focused_edges, focused_edges.shape[0], file_name='images/area_of_interest.png')

# Line Detection

In [None]:
# Use only the second image for line detection
lines_images = line_detection(focused_edges[1], images[1], 50, 10, 5)
show_images(lines_images, lines_images.shape[0], file_name='images/line_detection.png')

# Manual Identification of All Potential Spots

In [None]:
# The row_xs and row_ys are acquired by inspecting the line_detection.png
row_ys = np.array([  # the upper and lower y coordinates of each parking lot row
    [35, 111],
    [150, 237],
    [287, 365],
    [415, 492],
    [544, 621],
    [673, 751],
    [803, 883],
    [935, 1010],
])

row_xs = np.array([  # left and right x coordinates of each parking row
    [456, 1447],
    [540, 1465],
    [540, 1465],
    [445, 1465],
    [540, 1465],
    [540, 1465],
    [438, 1465],
    [433, 1465],
])

correct_xs = 20
correct_ys = 10

row_ys[:, 0] -= correct_ys
row_xs[:, 0] -= correct_xs
row_xs[:, 1] += correct_xs

spot_width = 20  # Width of a spot in terms of pixel

# Output a dict with coordinates of each box
spot_dict = {}
for (xl, xr), (yt, yb) in zip(row_xs, row_ys):
    y_ave = (yt + yb) // 2
    for xv in np.arange(xl, xr, spot_width):
        spot_dict[(xv, yt, xv + spot_width, y_ave)] = -1
        spot_dict[(xv, y_ave, xv + spot_width, yb)] = -1

with open('spot_dict.pickle', 'wb') as f_obj:
    pickle.dump(spot_dict, f_obj)


# Display Bounding Boxes for Each Spot

It is apparent that the bounding boxes are not perfectly aligned with the parking spot. This is due to the footage not being completely stable (shot via a drone). Any coordinate system developed on one frame will not be 100% match for another frame.

In [None]:
bounding_box_images = draw_bounding_box(all_images[[0, 5], 0], 'spot_dict.pickle')
show_images(bounding_box_images, bounding_box_images.shape[0], file_name='images/bounding_box.png')

# Save The Spot Image for CNN

In [None]:
# We use the very first frame to generate training data.
save_spot_images(all_images[[0, ],:], 'spot_dict.pickle')

# Load The Trained CNN Model

The pre-trained model is ResNet50

In [None]:
# Default values
MODEL = keras.models.load_model('spot_parked_detection_model')
INPUT_SHAPE = (72, 72, 3)
PRE_MODEL = keras.applications.ResNet50(include_top=False, input_shape=INPUT_SHAPE)
PRE_PROCESS = keras.applications.resnet.preprocess_input
OUTPUT_FRAMES_FOLDER = Path('data/output_frames')
OUTPUT_VIDEO = Path('video/predicted.avi')

# Functions Used to Make Prediction And Draw Empty Spots

In [None]:
def predict_on_image(original_image, spot_dict, input_shape):
    """Predict the labels of each spot on the given image.

    The function first grabs all the spot images. Then run the prediction, which
    involves four
     steps:
    1. Use the pre-trained model's pre-process function to process the input.
    2. Use the pre-trained model to extract features (i.e. use the predict() method)
    3. Use the trained model to predict the probabilities of each label.
    4. Choose the index of the largest probability as the predicted label.

    :param original_image: The unadulterated image. Prediction will be made
         on this image.
    :param spot_dict: A dictionary containing the coordinates of all the spot
        bounding boxes in its keys.
    :param input_shape: Shape of the input image. Must be of shape
        (height, width, channel).
    :return: The predicted label of all the spots on the image.
    """
    all_spots = np.array(
        [cv2.resize(original_image[y1:y2, x1:x2], input_shape[:-1]) for x1, y1, x2, y2 in spot_dict.keys()],
    )
    all_pred = np.argmax(
        MODEL.predict(PRE_MODEL.predict(PRE_PROCESS(all_spots))),
        axis=1,
    )
    return all_pred


def draw_prediction(original_image, spot_dict, all_pred, alpha):
    """Draw the predicted empty parking spot rectangles on unadulterated image.

    Empty spots are labeled as green rectangle overlays. Obstacles that cannot
    be parked upon, such as trees, landscapes, and cart return stations are
    labeled as blue rectangle overlays.

    Also included in the drawing is a message of the current number of empty spaces.
    Its accuracy depends on the model.

    :param original_image: The unadulterated image to have its emtpy parking spott
        highlighted.
    :param spot_dict: A dictionary containing the coordinates of all the spot
        bounding boxes in its keys.
    :para all_pred: Return value from predict_on_image(). Must be of shape (n, )
        where n is the number of total spots.
    :param alpha: The level of transparency in the overlay. The higher the alpha,
        the more opaque.
    :return: The newly drawn images.
    """
    new_image = np.copy(original_image)
    overlay = np.copy(original_image)
    # get the counts of all classes
    counts = dict(zip(*np.unique(all_pred, return_counts=True)))
    for pred, (x1, y1, x2, y2) in zip(all_pred, spot_dict.keys()):
        if pred == 0:  # green for empty space
            cv2.rectangle(overlay, (x1, y1), (x2, y2), (0, 255, 0), -1)
        elif pred == 1:  # blue for obstacle space
            cv2.rectangle(overlay, (x1, y1), (x2, y2), (255, 0, 0), -1)
    cv2.addWeighted(overlay, alpha, new_image, 1 - alpha, 0, new_image)
    cv2.putText(
        new_image,
        f'Available: {counts[0]} spots',
        (30, 95),
        cv2.FONT_HERSHEY_SIMPLEX,
        0.7,
        (0, 0, 0),
        2,
    )
    return new_image


def predict_and_mark_empty_spot(
    original_images,
    spot_dict_pickle_name,
    input_shape=INPUT_SHAPE,
    alpha=0.3,
):
    """Combination of predict_on_image() and darw_prediction().

    A simple wrapper of the other two main functions mentioned earlier.

    :param original_images: A np array of unadulterated original images.
    :param spot_dict_pickle_name: The name of pickle file corresponding
         to the `spot_dict`.
    :param input_shape: Shape of the input image. Must be of shape
        (height, width, channel). Default to the value INPUT_SHAPE
    :param alpha: Transparency level on an image. The lower the alpha, the
        more transparent. Allowed range is 0 to 1.
    :return: A numpy object with each element being all the predicted labels.
    """
    with open('spot_dict.pickle', 'rb') as f_obj:
        spot_dict = pickle.load(f_obj)
    all_preds = np.array([
        predict_on_image(img, spot_dict, input_shape=input_shape) for img in original_images
    ])
    return np.array([
        draw_prediction(img, spot_dict, pred, alpha=alpha) for img, pred in zip(original_images, all_preds)
    ])


def get_predicted_video_frames(
    pred_frequency: int,
    input_video_path: Path,
    output_frames_folder: Path,
    spot_dict_pickle_name: str,
    max_frame_count: int = 1000,
):
    """Get raw frames from the original video, draw a new frame after making empty spot prediction.

    After the new frame is drawn, it is saved as a PNG file.

    :param pred_frequency: The frequency of making predictions and drawing a new frame. For
        instance, if `pred_frequency` is 5, that means we make preidiction and draw a new frame
        every 5 frames.
    :param input_video_path: A Path object containing the path to the original video.
    :param output_frames_folder: A Path obejct containing the path to the destination of the
        new frame images.
    :param spot_dict_pickle_name: Name of the pickle file for `spot_dict`.
    :param max_fram_count: Maximum number of frames to capture from the original video. Default
        to 1000. For a simple demo purpose, there is no need to grab all the frames in the original
        video.
    """
    cap = cv2.VideoCapture(str(input_video_path))
    ret, frame_count, predicted_count = True, 0, 0

    while ret and predicted_count < max_frame_count:
        ret, frame = cap.read()
        frame_count += 1
        if frame_count % pred_frequency == 0:
            new_images = predict_and_mark_empty_spot(np.array([frame]), spot_dict_pickle_name)
            file_name = f'{output_frames_folder}/{frame_count:04}.png'
            cv2.imwrite(file_name, new_images[0])
            predicted_count += 1
            print(file_name)
    cap.release()


def create_predicted_video(
    output_frames_folder: Path,
    output_video_path: Path,
    video_length: int,
    fps: int,
):
    """Create a video by combining the newly drawn frames together.

    :param output_frames_folder: A Path obejct containing the path to the destination of the
        new frame images.
    :param output_video_path: A Path object containing the path to the destination of the
        new video.
    :param video_length: The duration of the video, in seconds.
    :param fps: Frame per second. Video appears more speeded up if a higher FPS is given.
    """
    # must sort the paths because .glob does not guarantee order
    all_frame_paths = sorted(OUTPUT_FRAMES_FOLDER.glob('*.png'))
    all_frames = [cv2.imread(str(frame_path)) for frame_path in all_frame_paths[:video_length * fps]]
    h, w, channels = all_frames[0].shape
    size = (w, h)
    out = cv2.VideoWriter(
        str(output_video_path),
        cv2.VideoWriter_fourcc(*'DIVX'),
        fps,
        size,
    )
    for i, frame in enumerate(all_frames[:]):
        out.write(frame)
        print(f'Frame {i:04} written.')
    out.release()

# Predict on Static Frame

In [None]:
predicted_images = predict_and_mark_empty_spot(all_images[[0, 5], 0], 'spot_dict.pickle')
show_images(predicted_images, predicted_images.shape[0], file_name='images/empty_spot_predicted.png')

# Predict on Video (Not Live)

The prediction is too slow for a live view. Thus, we have to save the prediction into a new video

In [None]:
# Get all the predicted frames
get_predicted_video_frames(2, VIDEO_PATH, OUTPUT_FRAMES_FOLDER, 'spot_dict.pickle', max_frame_count=500)
print('Frame prediction DONE!')

In [None]:
# Stich the predicted frames into a new video
create_predicted_video(OUTPUT_FRAMES_FOLDER, OUTPUT_VIDEO, video_length=30, fps=12)
print('Video CREATED!')