<a href="https://colab.research.google.com/github/vindruid/opencv-intro/blob/master/heatmap_town_centre.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Download Video

In [0]:
import requests

def download_file_from_google_drive(id, destination):
    URL = "https://docs.google.com/uc?export=download"

    session = requests.Session()

    response = session.get(URL, params = { 'id' : id }, stream = True)
    token = get_confirm_token(response)

    if token:
        params = { 'id' : id, 'confirm' : token }
        response = session.get(URL, params = params, stream = True)

    save_response_content(response, destination)    

def get_confirm_token(response):
    for key, value in response.cookies.items():
        if key.startswith('download_warning'):
            return value

    return None

def save_response_content(response, destination):
    CHUNK_SIZE = 32768

    with open(destination, "wb") as f:
        for chunk in response.iter_content(CHUNK_SIZE):
            if chunk: # filter out keep-alive new chunks
                f.write(chunk)

In [0]:
# https://drive.google.com/file/d/1sf85EueuGf5VDeHt7UAv2UHPIhEco_4f/view?usp=sharing
file_id = '1sf85EueuGf5VDeHt7UAv2UHPIhEco_4f'
path_video = 'video_town_centre.mp4'
download_file_from_google_drive(file_id, destination)

source: http://www.robots.ox.ac.uk/ActiveVision/Research/Projects/2009bbenfold_headpose/Datasets/TownCentreXVID.avi

we only use a part of whole video

In [0]:
# # wait until the video completely downloaded
# import time
# time.sleep(5)

# Show Video

In [0]:
import os
from IPython.display import HTML
from base64 import b64encode

In [0]:
path_video = os.path.join("video_town_centre.mp4")
mp4 = open(path_video,'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML("""
<video width=400 controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url)

# Inspect Video

In [0]:
import cv2

In [0]:
path_video = os.path.join("video_town_centre.mp4")
cap  = cv2.VideoCapture(path_video)

_, img = cap.read()

fps = cap.get(cv2.CAP_PROP_FPS)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
count_frame = 1
while img is not None: 
  count_frame += 1 
  _, img = cap.read()

print("TOTAL FRAME:", count_frame)
print("FPS:", fps)
print("Width:", w)
print("Height:", h)

# Heat Map Static (Write to Image)

source: https://github.com/intel-iot-devkit/python-cv-samples/tree/master/examples/motion-heatmap

In [0]:
import numpy as np
import copy

In [0]:
cap = cv2.VideoCapture(path_video)
# pip install opencv-contrib-python
fgbg = cv2.bgsegm.createBackgroundSubtractorMOG()

# number of frames is a variable for development purposes, you can change the for loop to a while(cap.isOpened()) instead to go through the whole video
num_frames = 350

first_iteration_indicator = 1
for i in range(0, num_frames):
    '''
    There are some important reasons this if statement exists:
        -in the first run there is no previous frame, so this accounts for that
        -the first frame is saved to be used for the overlay after the accumulation has occurred
        -the height and width of the video are used to create an empty image for accumulation (accum_image)
    '''
    if (first_iteration_indicator == 1):
        ret, frame = cap.read()
        first_frame = copy.deepcopy(frame)
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        height, width = gray.shape[:2]
        accum_image = np.zeros((height, width), np.uint8)
        first_iteration_indicator = 0
    else:
        ret, frame = cap.read()  # read a frame
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)  # convert to grayscale

        fgmask = fgbg.apply(gray)  # remove the background

        # for testing purposes, show the result of the background subtraction
        # cv2.imshow('diff-bkgnd-frame', fgmask)

        # apply a binary threshold only keeping pixels above thresh and setting the result to maxValue.  If you want
        # motion to be picked up more, increase the value of maxValue.  To pick up the least amount of motion over time, set maxValue = 1
        thresh = 2
        maxValue = 2
        ret, th1 = cv2.threshold(fgmask, thresh, maxValue, cv2.THRESH_BINARY)
        # for testing purposes, show the threshold image
        # cv2.imwrite('diff-th1.jpg', th1)

        # add to the accumulated image
        accum_image = cv2.add(accum_image, th1)
        # for testing purposes, show the accumulated image
        # cv2.imwrite('diff-accum.jpg', accum_image)

        # for testing purposes, control frame by frame
        # raw_input("press any key to continue")

# apply a color map
# COLORMAP_PINK also works well, COLORMAP_BONE is acceptable if the background is dark
color_image = im_color = cv2.applyColorMap(accum_image, cv2.COLORMAP_HOT)
# for testing purposes, show the colorMap image
# cv2.imwrite('diff-color.jpg', color_image)

# overlay the color mapped image to the first frame
result_overlay = cv2.addWeighted(first_frame, 0.7, color_image, 0.7, 0)

# save the final overlay image
cv2.imwrite('diff-overlay.jpg', result_overlay)

# cleanup
cap.release()
cv2.destroyAllWindows()

# Heat Map Dinamic (Write to Video)

In [0]:
%%time 
path_video = os.path.join("video_town_centre.mp4")
cap  = cv2.VideoCapture(path_video)

_, img = cap.read()


fps = cap.get(cv2.CAP_PROP_FPS)
W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

path_heatmap = os.path.join('video_heatmap.mp4') 
vid_writer = cv2.VideoWriter(path_heatmap, cv2.VideoWriter_fourcc(*'MP4V'), fps, (W, H))

fgbg = cv2.bgsegm.createBackgroundSubtractorMOG()
accum_image = np.zeros((H, W), np.uint8)
# apply a binary threshold only keeping pixels above thresh and setting the result to maxValue.  If you want
# motion to be picked up more, increase the value of maxValue.  To pick up the least amount of motion over time, set maxValue = 1
thresh = 2
maxValue = 2

count_frame = 1
while img is not None: 


    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    fgmask = fgbg.apply(gray)  # remove the background

    _, th1 = cv2.threshold(fgmask, thresh, maxValue, cv2.THRESH_BINARY)

    accum_image = cv2.add(accum_image, th1)

    color_image = cv2.applyColorMap(accum_image, cv2.COLORMAP_HOT)
    result_overlay = cv2.addWeighted(img, 0.7, color_image, 0.7, 0)

    vid_writer.write(result_overlay)

    count_frame += 1 
    _, img = cap.read()

    if count_frame % 100 == 0:
        print("FRAME PROCESSED",count_frame)

vid_writer.release()




# Compress Result Video 

In [0]:
path_compressed_heatmap = 'video_heatmap_c.mp4'
os.system(f"ffmpeg -i {path_heatmap} -vcodec libx264 {path_compressed_heatmap}")


# Show Heatmap

In [0]:
# Show video
mp4 = open(path_compressed_heatmap,'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML("""
<video width=400 controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url)

# Resize Image before calculating

In [0]:
%%time 
path_video = os.path.join("video_town_centre.mp4")
cap  = cv2.VideoCapture(path_video)

_, img = cap.read()

H,W = 300, 400
img = cv2.resize(img, (W,H), interpolation = cv2.INTER_AREA)

fgbg = cv2.bgsegm.createBackgroundSubtractorMOG()
accum_image = np.zeros((H, W), np.uint8)

fps = cap.get(cv2.CAP_PROP_FPS)

path_heatmap = os.path.join('video_heatmap_smaller.mp4') 
vid_writer = cv2.VideoWriter(path_heatmap, cv2.VideoWriter_fourcc(*'MP4V'), fps, (W, H))

# apply a binary threshold only keeping pixels above thresh and setting the result to maxValue.  If you want
# motion to be picked up more, increase the value of maxValue.  To pick up the least amount of motion over time, set maxValue = 1
thresh = 2
maxValue = 2

count_frame = 1
while img is not None: 
    img = cv2.resize(img, (W,H), interpolation = cv2.INTER_AREA)

    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    fgmask = fgbg.apply(gray)  # remove the background

    _, th1 = cv2.threshold(fgmask, thresh, maxValue, cv2.THRESH_BINARY)

    accum_image = cv2.add(accum_image, th1)

    color_image = cv2.applyColorMap(accum_image, cv2.COLORMAP_HOT)
    result_overlay = cv2.addWeighted(img, 0.7, color_image, 0.7, 0)

    vid_writer.write(result_overlay)

    count_frame += 1 
    _, img = cap.read()

    if count_frame % 100 == 0:
        print("FRAME PROCESSED",count_frame)
        break

vid_writer.release()


# Include Planar Map

In [0]:
# # https://drive.google.com/file/d/1jb1DKrr52LrCp5RNgmq0UnIlVTDoYPro/view?usp=sharing
# file_id = '1jb1DKrr52LrCp5RNgmq0UnIlVTDoYPro'
# destination = 'road_icon.jpg'
# download_file_from_google_drive(file_id, destination)

In [0]:
path_my_map = 'road_icon.jpg'
my_map = cv2.imread(path_my_map)

H,W = 300, 400
my_map = cv2.resize(my_map, (W,H), interpolation = cv2.INTER_AREA)

In [0]:
my_map.shape, my_map.max(), my_map.min(), my_map.mean()

In [0]:
plt.imshow(cv2.cvtColor(my_map, cv2.COLOR_BGR2RGB) )

In [0]:
my_img = img.copy()
H,W = 300, 400
my_img = cv2.resize(my_img, (W,H), interpolation = cv2.INTER_AREA)
plt.imshow(cv2.cvtColor(my_img, cv2.COLOR_BGR2RGB) )

## Draw Points for testing 

In [0]:
fig, (ax1, ax2) = plt.subplots(1, 2)
ax1.imshow(my_img)
ax2.imshow(my_map)

In [0]:
list_pts_img = [(0, 250), (175, 300), (230, 50), (325, 75)] # (W,H)
list_pts_map = [(25, 75), (25, 220), (375,75), (375, 220)]
list_colors = [(0,0,255), (0,255,0), (255,0,0), (255,0,255)]

In [0]:
my_img_test = my_img.copy()
my_map_test = my_map.copy()
radius = 10
thickness = -1 #full

for idx in range(len(list_pts_img)):
    pts_img = list_pts_img[idx]
    pts_map = list_pts_map[idx]
    color = list_colors[idx]

    cv2.circle(my_img_test, pts_img, radius, color, thickness)  
    cv2.circle(my_map_test, pts_map, radius, color, thickness)  


In [0]:
fig, (ax1, ax2) = plt.subplots(1, 2)
ax1.imshow(my_img_test)
ax2.imshow(my_map_test)


# Perspective

In [0]:
# provide points from image 1
pts_src = np.array(list_pts_img)
# corresponding points from image 2 (i.e. (154, 174) matches (212, 80))
pts_dst = np.array(list_pts_map)

# calculate matrix H
h, status = cv2.findHomography(pts_src, pts_dst)

# provide a point you wish to map from image 1 to image 2
a = np.array(list_pts_img, dtype='float32')
a = np.array([a])

# finally, get the mapping
pointsOut = cv2.perspectiveTransform(a, h)

In [0]:
a, a.shape

In [0]:
pointsOut, pointsOut.shape

## Test New Points Perspective

In [0]:
fig, (ax1, ax2) = plt.subplots(1, 2)
ax1.imshow(my_img_test)
ax2.imshow(my_map_test)


In [0]:
list_pts_img_new = [(150, 150), (325, 200)] # (W,H)
list_colors_new = [(0,255,255), (255,255,0)]

In [0]:
a = np.array(list_pts_img_new, dtype='float32')
a = np.array([a])
list_pts_map_new_pred = cv2.perspectiveTransform(a, h)[0]
list_pts_map_new_pred = np.array(list_pts_map_new_pred, dtype='int')

In [0]:
list_pts_map_new_pred

In [0]:
radius = 10
thickness = -1 #full

for idx in range(len(list_pts_img_new)):
    pts_img = tuple(list_pts_img_new[idx])
    pts_map = tuple(list_pts_map_new_pred[idx])
    color = list_colors_new[idx]

    cv2.circle(my_img_test, pts_img, radius, color, thickness)  
    cv2.circle(my_map_test, pts_map, radius, color, thickness)  


In [0]:
fig, (ax1, ax2) = plt.subplots(1, 2)
ax1.imshow(my_img_test)
ax2.imshow(my_map_test)


# Project Mask

In [0]:
fgmask_test = fgmask.copy()
fgmask_test = cv2.cvtColor(fgmask_test, cv2.COLOR_GRAY2BGR)  # convert to grayscale
plt.imshow(fgmask_test, cmap = 'gray');

In [0]:
mask_adjusted = cv2.warpPerspective(fgmask_test, h, (400,300))

In [0]:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15,15))
ax1.imshow(fgmask_test)
ax2.imshow(mask_adjusted)


# Coretan 

In [0]:
from PIL import Image
import matplotlib.pyplot as plt

In [0]:
fgmask.shape, fgmask.max(), fgmask.min(), fgmask.mean(), 

In [0]:
plt.imshow(fgmask, cmap = 'gray')

In [0]:
th1.shape, th1.max(), th1.min(), th1.mean(), 

In [0]:
plt.imshow(th1, cmap = 'gray')

In [0]:
accum_image.shape, accum_image.max(), accum_image.min(), accum_image.mean(), 

In [0]:
plt.imshow(accum_image, cmap = 'gray')