In [None]:
%matplotlib inline
import os
import cv2
from matplotlib import pyplot as plt


def load_images(dir):
    rv = []
    for dirpath, dirnames, filenames in os.walk(dir):
        for fn in sorted(filenames):
            if fn.endswith('.png'):
                imdata = cv2.imread(os.path.join(dirpath, fn))
                imdata = cv2.cvtColor(imdata, cv2.COLOR_BGR2RGB)
                
                # hack: flip GTI_Left images, since the car is driving in the left-hand-side lane
                # and will only see cars to its right
                if dirpath.endswith('GTI_Left'):
                    imdata = np.flip(imdata, axis=1)

                rv.append(imdata)
    
    return rv


vehicles = load_images('vehicles')
non_vehicles = load_images('non-vehicles')

print(len(vehicles))
print(len(non_vehicles))
_, (ax1, ax2) = plt.subplots(2)
ax1.imshow(random.choice(vehicles))
ax2.imshow(random.choice(non_vehicles))

In [None]:
import sklearn.model_selection

vehicles_train, vehicles_test = sklearn.model_selection.train_test_split(vehicles)
non_vehicles_train, non_vehicles_test = sklearn.model_selection.train_test_split(non_vehicles)

In [None]:
def write_img(path, img):
    if len(img.shape) == 2:
        img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
    else:
        img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    cv2.imwrite(path, img)

In [None]:
import skimage.feature
from cached_property import cached_property
import numpy as np
from sklearn.preprocessing import StandardScaler


def rgb2(color_space):
    return getattr(cv2, f'COLOR_RGB2{color_space}')


class HogSettings:
    PIXELS_PER_CELL = 8
    CELLS_PER_BLOCK = 2
    PIXELS_PER_BLOCK = PIXELS_PER_CELL * CELLS_PER_BLOCK


class FeatureExtractorUnnormalized(HogSettings):

    def __init__(self, img, use_ycrcb=True):
        self.img = img
        self.use_ycrcb = use_ycrcb
    
    @cached_property
    def gray(self):
        return cv2.cvtColor(self.img, cv2.COLOR_RGB2GRAY)
    
    @cached_property
    def ycrcb(self):
        return cv2.cvtColor(self.img, cv2.COLOR_RGB2YCrCb)
    
    @cached_property
    def ycrcb_by_chan(self):
        # separate into channels
        # TODO there must be a better way to do this
        return np.array([self.ycrcb[:,:,0], self.ycrcb[:,:,1], self.ycrcb[:,:,2]])

    def do_hog(self, gray_img, visualise=False, feature_vector=True):
        return skimage.feature.hog(gray_img, block_norm='L2-Hys', visualise=visualise,
                                   feature_vector=feature_vector,
                                   cells_per_block=(self.CELLS_PER_BLOCK, self.CELLS_PER_BLOCK))

    @cached_property
    def hog(self):
        return self.hog_array.ravel()
    
    @cached_property
    def color_hist(self):
        COLOR_SPACE = 'HSV'
        converted = cv2.cvtColor(self.img, rgb2(COLOR_SPACE))

        chan_hists = [np.histogram(converted[:, :, i], bins=32)[0] for i in range(3)]
    
        return np.concatenate(chan_hists)
    
    @cached_property
    def color_spatial_bin(self):
        COLOR_SPACE = 'HSV'
        scaled = cv2.resize(self.img, (16, 16))
        converted = cv2.cvtColor(scaled, rgb2(COLOR_SPACE))
        
        return converted.ravel()
    
    @cached_property
    def feats(self):
        return np.concatenate([self.hog, self.color_spatial_bin])
    
    @cached_property
    def hog_viz(self):
        chans_viz = [self.do_hog(c, visualise=True)[1] for c in self.ycrcb_by_chan]
        chans_viz = [255 * v / np.max(v) for v in chans_viz]
        chans_viz = [v.astype(np.uint8) for v in chans_viz]
        print([v.shape for v in chans_viz])
        print([v.dtype for v in chans_viz])
        print([np.min(v) for v in chans_viz])
        print([np.max(v) for v in chans_viz])
        chans_viz = [cv2.cvtColor(v, cv2.COLOR_GRAY2RGB) for v in chans_viz]
        height, width = self.img.shape[:2]
        rv = np.zeros((height * 2, width * 2, 3), dtype=np.uint8)
        rv[:height, :width, :] = self.img
        rv[:height, width:, :] = chans_viz[0]
        rv[height:, :width, :] = chans_viz[1]
        rv[height:, width:, :] = chans_viz[2]
        return rv
    
    @cached_property
    def hog_array(self):
        if self.use_ycrcb:
            return np.array([self.do_hog(c, feature_vector=False) for c in self.ycrcb_by_chan])
        else:
            return self.do_hog(self.gray, feature_vector=False)
    
    def hog_window(self, min_block_x, min_block_y, max_block_x, max_block_y):
        # min and max are *inclusive*
        if self.use_ycrcb:
            return self.hog_array[:, min_block_y:max_block_y + 1, min_block_x:max_block_x + 1].ravel()
        else:
            return self.hog_array[min_block_y:max_block_y + 1, min_block_x:max_block_x + 1].ravel()
    
    def hog_window_viz(self, min_block_x, min_block_y, max_block_x, max_block_y):
        img = self.gray[min_block_y * self.PIXELS_PER_CELL
                        :(max_block_y + self.CELLS_PER_BLOCK) * self.PIXELS_PER_CELL,
                        min_block_x * self.PIXELS_PER_CELL
                        :(max_block_x + self.CELLS_PER_BLOCK) * self.PIXELS_PER_CELL]
        
        print(min_block_y * self.PIXELS_PER_CELL)
        print((max_block_y + self.CELLS_PER_BLOCK) * self.PIXELS_PER_CELL)
        print(min_block_x * self.PIXELS_PER_CELL)
        
        return self.do_hog(img, visualise=True)[1]


_, axes = plt.subplots(2, figsize=(10, 10))
feu_vt = FeatureExtractorUnnormalized(vehicles_train[0])
feu_nvt = FeatureExtractorUnnormalized(non_vehicles_train[0])

axes[0].imshow(feu_vt.hog_viz)
axes[1].imshow(feu_nvt.hog_viz)

write_img('output_imgs/hog_demo_vehicle.png', feu_vt.hog_viz)
write_img('output_imgs/hog_demo_non_vehicle.png', feu_nvt.hog_viz)

In [None]:
%%time
x_unn = [FeatureExtractorUnnormalized(im, use_ycrcb=True).feats for im in vehicles_train + non_vehicles_train]

In [None]:
%%time
from sklearn.preprocessing import StandardScaler

normalizer = StandardScaler()
normalizer.fit(x_unn)

In [None]:
class FeatureExtractor(FeatureExtractorUnnormalized):
    @cached_property
    def feats(self):
        unnormed = super().feats
        normed = normalizer.transform([unnormed])[0]
        return normed
    
    def hog_window(self, min_block_x, min_block_y, max_block_x, max_block_y):
        unnormed = super().hog_window(
            min_block_x, min_block_y, max_block_x, max_block_y)
        normed = normalizer.transform([unnormed])[0] 
        return normed

In [None]:
x = [FeatureExtractor(im, use_ycrcb=True).feats for im in vehicles_train + non_vehicles_train]

In [None]:
import pickle

def try_load_obj(name, globals_):
    if os.path.exists(f'pickled/{name}'):
        with open(f'pickled/{name}', 'rb') as f:
            globals_()[name] = pickle.load(f)
            return True
    return False

def save_obj(name, obj):
    with open(f'pickled/{name}', 'wb') as f:
        pickle.dump(obj, f)

In [None]:
%%time
import sklearn.svm

if not try_load_obj('clf', globals):
    clf = sklearn.svm.SVC(kernel='linear')

    y = [1] * len(vehicles_train) + [0] * len(non_vehicles_train)

    clf.fit(x, y)
    
    save_obj('clf', clf)

In [None]:
%%time
import numpy as np

x_test = [FeatureExtractor(im, use_ycrcb=True).feats for im in vehicles_test + non_vehicles_test]
y_test = np.array([1] * len(vehicles_test) + [0] * len(non_vehicles_test))

In [None]:
%%time
yhat_test = clf.predict(x_test)

In [None]:
acc = np.average(yhat_test == y_test)
acc_pct = acc * 100
print(f'Test accuracy: {acc_pct:.2f}%')

In [None]:
import math
from typing import NamedTuple

def calc_scales(min, max, num, exp=False):
    # if exp is True, does exponential scales, otherwise linear
    if exp:
        min, max = np.log([min, max])
    x = list(range(num))
    xp = [0, num - 1]
    yp = [min, max]
    rv = np.interp(x, xp, yp)
    if exp:
        rv = np.exp(rv)
    return list(map(float, rv))


class Window(NamedTuple):
    min_x: int
    min_y: int
    size: int
    
    @property
    def max_x(self):
        # not inclusive, same as ranges
        return self.min_x + self.size

    @property
    def max_y(self):
        # not inclusive, same as ranges
        return self.min_y + self.size
    
    @property
    def cent_x(self):
        return (self.min_x + self.max_x) // 2
    
    @property
    def cent_y(self):
        return (self.min_y + self.max_y) // 2


class BBox(NamedTuple):
    min_x: int
    min_y: int
    max_x: int
    max_y: int
    
    def contains_window(self, w):
        return (
            self.min_x <= w.min_x
            and self.min_y <= w.min_y
            and self.max_x >= w.max_x
            and self.max_y >= w.max_y)


class FrameProcessor(HogSettings):
    MIN_Y = 380
    MAX_Y = 670
    MAX_X = 1280
    
    MIN_SIZE = 64
    MAX_SIZE = MAX_Y - MIN_Y
    MIN_SCALE = 1
    NUM_SCALES = 5
    EXP = True
    
    WINDOW_SIZES = [64, 96, 128, 192]
    WINDOW_OVERLAP = 0.25
    
    BBOX_BY_SIZE = {
        64: BBox(600, MIN_Y, 1000, 500),
        96: BBox(530, MIN_Y, 1080, 530),
        128: BBox(0, MIN_Y, MAX_X, MAX_Y),
        192: BBox(0, MIN_Y, MAX_X, MAX_Y),
    }

    # hack: this needs to be a property now that TweakableFrameProcessor is a thing
    @cached_property
    def SCALES(self):
        return calc_scales(self.MIN_SCALE, self.MAX_SCALE, self.NUM_SCALES, exp=self.EXP)

    # hack: this needs to be a property now that TweakableFrameProcessor is a thing
    @cached_property
    def MAX_SCALE(self):
        return self.MAX_SIZE / self.MIN_SIZE

    PIXELS_PER_WINDOW = 64
    WINDOW_OVERLAP_CELLS = 2
    CELLS_PER_WINDOW = PIXELS_PER_WINDOW // HogSettings.PIXELS_PER_CELL

    def __init__(self, img_or_path):
        if isinstance(img_or_path, str):
            self.img = cv2.cvtColor(cv2.imread(img_or_path), cv2.COLOR_BGR2RGB)
        else:
            self.img = img_or_path
    
    @cached_property
    def windows_old(self):
        rv = []
        width = self.img.shape[1]
        for size in self.WINDOW_SIZES:
            for x in range(0, width - size + 1, int(round(size * self.WINDOW_OVERLAP))):
                for y in range(self.MIN_Y, self.MAX_Y - size + 1, int(round(size * self.WINDOW_OVERLAP))):
                    rv.append(Window(x, y, size))

        return rv
    
    @cached_property
    def windows(self):
        # some really hacky optimizations
        rv = []
        for w in self.windows_old:
            # remove windows in the upper left of the image
            if w.cent_y < 681 + (476 - 681) / (582 - 284) * (w.cent_x - 284):
                continue

            if not self.BBOX_BY_SIZE[w.size].contains_window(w):
                continue
    
            rv.append(w)

        return rv

    @cached_property
    def windows_viz(self):
        r = random.Random(44)
        rv = np.copy(self.img)
        for w in self.windows:
            if r.random() < 0.9:
                continue

            cv2.rectangle(
                rv,
                (w.min_x, w.min_y),
                (w.max_x, w.max_y),
                (0, 0, 255),
                thickness=5,
            )

        return rv


fp = FrameProcessor('test_images/test1.jpg')

plt.figure(figsize=(20,10))
plt.imshow(fp.windows_viz)

print(f'there are {len(fp.windows)} windows')

write_img('output_imgs/windows_viz.jpg', fp.windows_viz)

In [None]:
class FrameProcessor2(FrameProcessor):  # bad name, 🤷 ¯\_(ツ)_/¯ 🤷
    def window_has_car(self, w):
        unscaled = self.img[w.min_y:w.max_y, w.min_x:w.max_x]
        scaled = cv2.resize(unscaled, (64, 64))
        feats = FeatureExtractor(scaled).feats
        return bool(clf.predict([feats])[0])
    
    @cached_property
    def windows_with_cars(self):
        rv = []
        for w in self.windows:
            if self.window_has_car(w):
                rv.append(w)

        return rv
    
    @cached_property
    def windows_with_cars_viz(self):
        rv = np.copy(self.img)
        for w in self.windows_with_cars:
            cv2.rectangle(
                rv,
                (w.min_x, w.min_y),
                (w.max_x, w.max_y),
                (0, 0, 255),
                thickness=5,
            )

        return rv

fp = FrameProcessor2('test_images/test1.jpg')
plt.figure(figsize=(10, 20))
plt.imshow(fp.windows_with_cars_viz)

In [None]:
class FrameProcessor3(FrameProcessor2):
    THRESHOLD = 5

    @cached_property
    def heatmap(self):
        rv = np.zeros(self.img.shape[:-1], dtype=np.uint32)
        for w in self.windows_with_cars:
            rv[w.min_y:w.max_y, w.min_x:w.max_x] += 1
        
        return rv
    
    @cached_property
    def heatmap_threshold(self):
        return (self.heatmap >= self.THRESHOLD).astype(np.uint8)

    @cached_property
    def heatmap_viz(self):
        return (self.heatmap * 255 // np.max(self.heatmap)).astype(np.uint8)
    
    @cached_property
    def heatmap_threshold_viz(self):
        return self.heatmap_threshold * 255
    


fp = FrameProcessor3('test_images/test1.jpg')
print(np.max(fp.heatmap))
plt.imshow(fp.heatmap_threshold)

write_img('output_images/heatmap_viz.jpg', fp.heatmap_viz)
write_img('output_images/heatmap_threshold_viz.jpg', fp.heatmap_threshold_viz)

In [None]:
import functools


class TweakableFrameProcessor(FrameProcessor3):
    def __init__(self, *args, params=None, **kwargs):
        params = params or {}
        for k in params:
            setattr(self, k, params[k])
        super().__init__(*args, **kwargs)


@functools.lru_cache(maxsize=None)
def get_heatmap_threshold_for_params(path, **kwargs):
    fp = TweakableFrameProcessor(path, params=kwargs)
    return fp.heatmap_threshold_viz

In [None]:
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
# see https://github.com/jupyter-widgets/ipywidgets/issues/1181
from ipykernel.pylab.backend_inline import flush_figures
import time


test_imgs = [
    'project_video_frames/frame_010.jpg',
    'project_video_frames/frame_028.jpg',
]
def show_processed_frame(**kwargs):
    tstart = time.time()
    _, axes = plt.subplots(len(test_imgs), 2, figsize=(20, 6 * len(test_imgs)))
    for i, path in enumerate(test_imgs):
        heatmap = get_heatmap_threshold_for_params(path, **kwargs)
        axes[i][0].imshow(FrameProcessor3(path).img)
        axes[i][1].imshow(heatmap)
    flush_figures()
    print(f'took {time.time() - tstart:.2f} seconds')


interact(show_processed_frame, THRESHOLD=(1, 7))

#, SCALE_SLOP=(0.5, 1.5, 0.25),
#         NUM_SCALES=(3, 7), MIN_SIZE=(56, 72, 8), MAX_SIZE=128, EXP=False)

In [None]:
%%time
import glob
from moviepy.editor import VideoFileClip
from scipy.ndimage.measurements import label


class MultiFrameHeatmapProcessor:
    def __init__(self, frame_heatmaps):
        self.frame_heatmaps = frame_heatmaps
    
    @cached_property
    def sum_heatmaps(self):
        return np.sum(self.frame_heatmaps, axis=0)
    
    @cached_property
    def sum_heatmaps_viz(self):
        if np.max(self.sum_heatmaps) == 0:
            rv = self.sum_heatmaps.astype(np.uint8)
        else:
            rv = (self.sum_heatmaps * 255 // np.max(self.sum_heatmaps)).astype(np.uint8)
        return cv2.cvtColor(rv, cv2.COLOR_GRAY2RGB)
    
    @cached_property
    def thresh(self):
        return (self.sum_heatmaps >= FrameProcessor3.THRESHOLD * len(self.frame_heatmaps)).astype(np.uint8)
    
    @cached_property
    def thresh_viz(self):
        return cv2.cvtColor(self.thresh.astype(np.uint8) * 255, cv2.COLOR_GRAY2RGB)
    
    @cached_property
    def closed(self):
        # this helps eliminate small islands near main bounding boxes that would be identified as distinct cars
        return cv2.morphologyEx(self.thresh, cv2.MORPH_CLOSE, np.ones((9, 9), dtype=np.uint8))
    
    @cached_property
    def closed_viz(self):
        return cv2.cvtColor(self.closed.astype(np.uint8) * 255, cv2.COLOR_GRAY2RGB)
    
    @cached_property
    def car_bboxes(self):
        labeled, n_cars = label(self.closed)
        
        rv = []
        for n in range(1, n_cars + 1):
            nonzero = (labeled == n).nonzero()
            bbox = (
                (
                    np.min(nonzero[1]),
                    np.min(nonzero[0]),
                ),
                (
                    np.max(nonzero[1]),
                    np.max(nonzero[0]),
                ),
            )
            rv.append(bbox)
        
        return rv
    
    @cached_property
    def car_bboxes_viz(self):
        rv = np.zeros(self.sum_heatmaps.shape + (3,), dtype=np.uint8)
        
        for bbox in self.car_bboxes:
            cv2.rectangle(
                rv,
                *bbox,
                (0, 0, 255),
                thickness=5,
            )
        
        return rv
    
    def overlay_car_bboxes_viz_on(self, img):
        rv = np.copy(img)
        viz_mask = np.any(self.car_bboxes_viz, axis=-1)
        rv[viz_mask] = 0
        rv += self.car_bboxes_viz
        return rv


def overlay_viz(viz, img):
    rv = np.copy(img)
    viz_mask = np.any(viz, axis=-1)
    rv[viz_mask] = 0
    rv += viz
    return rv

    
class CumulativeFrameProcessor:
    FRAME_WINDOW = 10

    def __init__(self):
        self.heatmaps_buffer = []
        self.frame_num = -1

    def process_frame(self, frame, viz='car_bboxes_viz'):
        self.frame_num += 1
        
        hm = FrameProcessor3(frame).heatmap
        self.heatmaps_buffer.append(hm)
        
        if len(self.heatmaps_buffer) < self.FRAME_WINDOW:
            viz = np.zeros_like(frame)
        else:
            viz = MultiFrameHeatmapProcessor(self.heatmaps_buffer).car_bboxes_viz
            del self.heatmaps_buffer[0]
    
        rv = overlay_viz(viz, frame)

        cv2.imwrite(f'frames/{self.frame_num:05}.jpg', cv2.cvtColor(rv, cv2.COLOR_RGB2BGR))    
        
        return rv
    
    def process_frame_heatmap_viz(self, frame):
        rv = self.process_frame(frame)
        if np.max(rv) == 0:
            rv = rv.astype(np.uint8)
        else:
            rv = (rv * 255 // np.max(rv)).astype(np.uint8)
        rv = cv2.cvtColor(rv, cv2.COLOR_GRAY2RGB)
        return rv
    
    def process_frame_threshold_viz(self, frame):
        rv = self.process_frame(frame)
        rv = ((rv >= self.THRESHOLD) * 255).astype(np.uint8)
        rv = cv2.cvtColor(rv, cv2.COLOR_GRAY2RGB)
        cv2.imwrite(f'frames/{self.frame_num:05}.jpg', rv)
        return rv
    

def process_video(inpath, outpath):
    for path in glob.glob('frames/*'):
        os.unlink(path)
    
    inclip = VideoFileClip(inpath)
    proc = CumulativeFrameProcessor()
    
    outclip = inclip.fl_image(proc.process_frame)
    
    outclip.write_videofile(outpath, audio=False)


process_video('test_video.mp4', 'outvid.mp4')