<a href="https://colab.research.google.com/github/udupa-varun/pyimagesearch_uni/blob/main/augmented_reality/101/video_augmented_reality.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!wget https://pyimagesearch-code-downloads.s3-us-west-2.amazonaws.com/video-augmented-reality/video-augmented-reality.zip
!unzip -qq video-augmented-reality.zip
%cd video-augmented-reality
%ls -hal

--2022-06-03 12:44:02--  https://pyimagesearch-code-downloads.s3-us-west-2.amazonaws.com/video-augmented-reality/video-augmented-reality.zip
Resolving pyimagesearch-code-downloads.s3-us-west-2.amazonaws.com (pyimagesearch-code-downloads.s3-us-west-2.amazonaws.com)... 52.218.128.113
Connecting to pyimagesearch-code-downloads.s3-us-west-2.amazonaws.com (pyimagesearch-code-downloads.s3-us-west-2.amazonaws.com)|52.218.128.113|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 9716273 (9.3M) [application/zip]
Saving to: ‘video-augmented-reality.zip’


2022-06-03 12:44:04 (11.2 MB/s) - ‘video-augmented-reality.zip’ saved [9716273/9716273]

/content/video-augmented-reality/video-augmented-reality
total 1.2M
drwxr-xr-x 4 root root 4.0K Dec  9  2020 [0m[01;34m.[0m/
drwxr-xr-x 5 root root 4.0K Jun  3 12:44 [01;34m..[0m/
-rw-r--r-- 1 root root 1.2M Dec  7  2020 markers.pdf
-rw-r--r-- 1 root root 2.9K Dec  9  2020 opencv_ar_video.py
drwxr-xr-x 2 root root 4.0K Dec  9  2

In [3]:
# import packages
import argparse
import time
from collections import deque

import cv2
import imutils
import matplotlib.pyplot as plt
import numpy as np

In [4]:
# init cached reference points
CACHED_REF_PTS = None

In [10]:
def find_and_warp(frame, source, corner_ids, aruco_dict, aruco_params, use_cache=False):
    # grab reference to cached reference points
    global CACHED_REF_PTS

    # grab width and height of frame and source image
    (img_h, img_w) = frame.shape[:2]
    (src_h, src_w) = source.shape[:2]

    # detect ArUCo markers in input frame
    (corners, ids, rejected) = cv2.aruco.detectMarkers(
        frame, 
        aruco_dict, 
        parameters=aruco_params
        )
    
    # if we didn't find 4 markers, init empty
    # otherwise flatten the ID list
    ids = np.array([]) if len(corners) != 4 else ids.flatten()

    # init list of reference points
    ref_pts = []

    # loop over IDs (tl, tr, br, bl)
    for corner_id in corner_ids:
        # grab index of corner with current ID
        j = np.squeeze(np.where(ids == corner_id))

        # if empty, we couldn't find the marker with this ID
        if j.size == 0:
            continue
        
        # otherwise, append the (x,y) coord to list of ref points
        corner = np.squeeze(corners[j])
        ref_pts.append(corner)

    # check to see if we failed to find the 4 markers
    if len(ref_pts) != 4:
        # fall back on cached if allowed
        if use_cache and CACHED_REF_PTS is not None:
            ref_pts = CACHED_REF_PTS
        # otherwise return early
        # cannot use cache/cache is empty
        else:
            return None

    # if allowed to use cache, update cache with current set
    if use_cache:
        CACHED_REF_PTS = ref_pts

    # unpack reference points
    (ref_tl, ref_tr, ref_br, ref_bl) = ref_pts
    # define destination transform matrix
    dst_mat = [ref_tl[0], ref_tr[1], ref_br[2], ref_bl[3]]
    dst_mat = np.array(dst_mat)

    # define transform matrix for source image
    src_mat = np.array([[0, 0], [src_w, 0], [src_w, src_h], [0, src_h]])

    # compute homography matrix
    (H, _) = cv2.findHomography(src_mat, dst_mat)
    # warp source image to destination
    warped = cv2.warpPerspective(source, H, (img_w, img_h))

    # construct a mask for source image
    mask = np.zeros((img_h, img_w), dtype="uint8")
    cv2.fillConvexPoly(mask, dst_mat.astype("int32"), (255, 255, 255), cv2.LINE_AA)

    # black border for source image via dilation
    rect = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
    mask = cv2.dilate(mask, rect, iterations=2)
 
    # create a 3-channel version of the mask by stacking depth-wise
    # to allow is to copy the warped source image onto input
    mask_scaled = mask.copy() / 255.0
    mask_scaled = np.dstack([mask_scaled] * 3)

    # copy warped source image onto input image
    # multiply warped source and mask
    warped_multiplied = cv2.multiply(warped.astype("float"), mask_scaled)
    # multiply original input image with mask, giving more weight to areas without mask
    image_multiplied = cv2.multiply(frame.astype("float"), 1.0 - mask_scaled)
    # add results together
    output = cv2.add(warped_multiplied, image_multiplied)
    output = output.astype("uint8")

    return output


In [6]:
# first, let's download a sample video we will use to detect ArUco markers
!wget https://colab-notebook-videos.s3-us-west-2.amazonaws.com/aruco_sample.mp4

--2022-06-03 13:04:13--  https://colab-notebook-videos.s3-us-west-2.amazonaws.com/aruco_sample.mp4
Resolving colab-notebook-videos.s3-us-west-2.amazonaws.com (colab-notebook-videos.s3-us-west-2.amazonaws.com)... 52.92.210.98
Connecting to colab-notebook-videos.s3-us-west-2.amazonaws.com (colab-notebook-videos.s3-us-west-2.amazonaws.com)|52.92.210.98|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 10835110 (10M) [video/mp4]
Saving to: ‘aruco_sample.mp4’


2022-06-03 13:04:15 (12.4 MB/s) - ‘aruco_sample.mp4’ saved [10835110/10835110]



In [14]:
# construct the argument parser and parse the arguments
#ap = argparse.ArgumentParser()
#ap.add_argument("-i", "--input", type=str, required=True,
#	help="path to input video file for augmented reality")
#ap.add_argument("-c", "--cache", type=int, default=-1,
#	help="whether or not to use reference points cache")
#args = vars(ap.parse_args())

# since we are using Jupyter Notebooks we can replace our argument
# parsing code with *hard coded* arguments and values
args = {
    "input": "videos/jp_trailer_short.mp4",
    "cache": 1,
    "video": "aruco_sample.mp4",
    "output": "output.avi"
}

In [15]:
# load ArUCo dict and grab parameters
print("[INFO] detecting markers...")
aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_ARUCO_ORIGINAL)
aruco_params = cv2.aruco.DetectorParameters_create()

# init video file stream
print("INFO] accessing video stream...")
vf = cv2.VideoCapture(args["input"])

# init a queue to maintain the next frame from the video stream
Q = deque(maxlen=128)

# we need to have a frame in our queue to start AR pipeline,
# so read the next frame from source and add it to queue
(grabbed, source) = vf.read()
Q.appendleft(source)

# init video stream and init pointer to output video file
print("[INFO] starting video stream...")
vs = cv2.VideoCapture(args["video"])
writer = None


[INFO] detecting markers...
INFO] accessing video stream...
[INFO] starting video stream...


In [16]:
# loop over frames from video stream
while len(Q) > 0:
    # grab frame from video stream
    frame = vs.read()[1]

    if frame is None:
        break

    # resize frame
    frame = imutils.resize(frame, width=600)

    # attempt to find the ArUCo markers in frame
    # if they are found, take current source image and warp onto input frame
    warped = find_and_warp(
        frame,
        source,
        corner_ids=(923, 1001, 241, 1007),
        aruco_dict=aruco_dict,
        aruco_params=aruco_params,
        use_cache=args["cache"] > 0)

    # if warped image is not None,
    # we have found 4 markers and warp was successful
    if warped is not None:
        # set frame to output AR frame
        frame = warped
        # grab next video file frame from queue
        source = Q.popleft()

    # for speed, we can use a queue to keep next video frame ready
    # queue should always be full (or nearly full)
    if len(Q) != Q.maxlen:
        # read next frame from video file stream
        (grabbed, next_frame) = vf.read()

        if grabbed:
            Q.append(next_frame)
    
    # if video writer is None AND we are supposed to write output video to disk,
    # init writer
    if writer is None and args["output"] is not None:
        fourcc = cv2.VideoWriter_fourcc(*"MJPG")
        writer = cv2.VideoWriter(args["output"], fourcc, 20, (frame.shape[1], frame.shape[0]), True)
    
    # if writer is not None, write frame to disk
    if writer is not None:
        writer.write(frame)

# cleanup
vs.release()
vf.release()

# check if writer pointer needs to be released
if writer is not None:
    writer.release()



In [17]:
!ffmpeg -i "output.avi" output.mp4

ffmpeg version 3.4.8-0ubuntu0.2 Copyright (c) 2000-2020 the FFmpeg developers
  built with gcc 7 (Ubuntu 7.5.0-3ubuntu1~18.04)
  configuration: --prefix=/usr --extra-version=0ubuntu0.2 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --enable-gpl --disable-stripping --enable-avresample --enable-avisynth --enable-gnutls --enable-ladspa --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librubberband --enable-librsvg --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libssh --enable-libtheora --enable-libtwolame --enable-libvorbis --enable-libvpx --enable-libwavpack --enable-libwebp --enable-libx265 --enable-libxml2 --enable-libxvid --enable-lib

In [18]:
#@title Display video inline
from IPython.display import HTML
from base64 import b64encode

mp4 = open("output.mp4", "rb").read()
dataURL = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML("""
<video width=700 controls>
      <source src="%s" type="video/mp4">
</video>
""" % dataURL)