### Image Segmentation on a video using Matterport's MaskRCNN implementation

### Install Matterport MaskRCNN

In [0]:
import os
from os.path import exists, join, basename

project_name = "Mask_RCNN"
if not exists(project_name):
  # clone and install
  !git clone -q https://github.com/matterport/Mask_RCNN.git
  !cd $project_name && pip install -q -r requirements.txt
  
import sys
sys.path.append(project_name)

### Download a pre-trained model

In [0]:
%tensorflow_version 1.x

import tensorflow as tf
print(tf.__version__)

In [0]:
import random
import math
import time
import numpy as np
import skimage.io
import matplotlib
import matplotlib.pyplot as plt
plt.rcParams["axes.grid"] = False

# Root directory of the project
ROOT_DIR = os.path.abspath(project_name)

# Import Mask RCNN
sys.path.append(ROOT_DIR)  # To find local version of the library
from mrcnn import utils
import mrcnn.model as modellib
from mrcnn import visualize
# Import COCO config
sys.path.append(os.path.join(ROOT_DIR, "samples/coco/"))  # To find local version
import coco

%matplotlib inline 

# Directory to save logs and trained model
MODEL_DIR = os.path.join(ROOT_DIR, "logs")

# Local path to trained weights file
COCO_MODEL_PATH = os.path.join(ROOT_DIR, "mask_rcnn_coco.h5")
# Download COCO trained weights from Releases if needed
if not os.path.exists(COCO_MODEL_PATH):
    utils.download_trained_weights(COCO_MODEL_PATH)
    
    
class InferenceConfig(coco.CocoConfig):
    # Set batch size to 1 since we'll be running inference on
    # one image at a time. Batch size = GPU_COUNT * IMAGES_PER_GPU
    GPU_COUNT = 1
    IMAGES_PER_GPU = 1

config = InferenceConfig()

# Create model object in inference mode.
model = modellib.MaskRCNN(mode="inference", model_dir=MODEL_DIR, config=config)

# Load weights trained on MS-COCO
model.load_weights(COCO_MODEL_PATH, by_name=True)

# COCO Class names
# Index of the class in the list is its ID. For example, to get ID of
# the teddy bear class, use: class_names.index('teddy bear')
class_names = ['BG', 'person', 'bicycle', 'car', 'motorcycle', 'airplane',
               'bus', 'train', 'truck', 'boat', 'traffic light',
               'fire hydrant', 'stop sign', 'parking meter', 'bench', 'bird',
               'cat', 'dog', 'horse', 'sheep', 'cow', 'elephant', 'bear',
               'zebra', 'giraffe', 'backpack', 'umbrella', 'handbag', 'tie',
               'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
               'kite', 'baseball bat', 'baseball glove', 'skateboard',
               'surfboard', 'tennis racket', 'bottle', 'wine glass', 'cup',
               'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple',
               'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza',
               'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed',
               'dining table', 'toilet', 'tv', 'laptop', 'mouse', 'remote',
               'keyboard', 'cell phone', 'microwave', 'oven', 'toaster',
               'sink', 'refrigerator', 'book', 'clock', 'vase', 'scissors',
               'teddy bear', 'hair drier', 'toothbrush']

### Run with a test image

In [0]:
IMAGE_URL = 'https://raw.githubusercontent.com/matterport/Mask_RCNN/master/images/12283150_12d37e6389_z.jpg'

image_file = basename(IMAGE_URL)
!wget -q -O $image_file $IMAGE_URL
image = skimage.io.imread(image_file)
plt.figure(figsize=(15, 10))
plt.imshow(image)

In [0]:
t = time.time()
# Run detection
results = model.detect([image], verbose=1)
print("executed in %.3fs" % (time.time() - t))

# Visualize results
r = results[0]
visualize.display_instances(image, r['rois'], r['masks'], r['class_ids'], 
                            class_names, r['scores'])

### Video processing utilities

In [0]:
import cv2

#----------------------------------------------------
# Video Download and Display utility
#----------------------------------------------------
class ShowVid():
  def __init__(self, video_file=None):
    self.youtube_id = None
    self.video_file = video_file

  #----------------------------------------------------
  # Get stats on a Youtube video
  #----------------------------------------------------
  def _youtube_info(self, youtube):
    # get video information
    print (f'ID: {youtube.video_id}, Title: {youtube.title}, Length: {youtube.length}\n')

    # Get video stream formats
    stream = youtube.streams.all()
    print ('Streams:\n')
    for i in stream:
      print(i)

    # Thumbnail image URL
    return youtube.thumbnail_url

  #----------------------------------------------------
  # Download a video from Youtube to a local file using Pytube library
  #----------------------------------------------------
  def youtube_download(self, video_id, download_dir, video_stem):
    # Install pytube
    !pip install pytube3
    import pytube

    # Get Pytube's youtube object for the video
    video_url = f'https://www.youtube.com/watch?v={video_id}'
    youtube = pytube.YouTube(video_url)
    
    # Show some stats about the video
    self._youtube_info(youtube)

    # Choose the first stream format (format contains mime_type, resolution, fps, vcodec, acodec)
    video = youtube.streams.first()

    # Download the video
    self.video_file = video.download(download_dir, video_stem)
    self.youtube_id = video_id

    return self.video_file

  #----------------------------------------------------
  # Extract just the first few 'duration' seconds of a video
  #----------------------------------------------------
  def extract_short(self, short_file, duration):
    assert (self.video_file is not None)
    !ffmpeg -y -loglevel info -i {self.video_file} -t {duration} {short_file}
    self.video_file = short_file

  #----------------------------------------------------
  # Display a locally downloaded video file
  #----------------------------------------------------
  def show_mp4(self, width=640, height=480):
    import io
    import base64
    from IPython.display import HTML

    assert (self.video_file is not None)
    video_file = self.video_file

    video_encoded = base64.b64encode(io.open(video_file, 'rb').read())
    return HTML(data='''<video width="{0}" height="{1}" alt="test" controls>
                          <source src="data:video/mp4;base64,{2}" type="video/mp4" />
                        </video>'''.format(width, height, video_encoded.decode('ascii')))
    
  #----------------------------------------------------
  # Display a video on Youtube
  #----------------------------------------------------
  def show_youtube_video(self, youtube_id):
    from IPython.display import YouTubeVideo
    YouTubeVideo(youtube_id)

  #----------------------------------------------------
  # Read a video frame-by-frame and call a callback function to process
  # each frame. Stop when we have completed the given number of frames.
  # Write an output video file if required.
  #----------------------------------------------------
  def process_video(self, process_cb, num_frames, vid_out_name=None):
    assert (self.video_file is not None)
    video_file = self.video_file

    i_frames = 0
    vid = cv2.VideoCapture(str(video_file))

    # Get video width, height and frames per second
    width, height = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH)), int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = vid.get(cv2.CAP_PROP_FPS)
    print (f'Video size {width}x{height} at {fps} fps')

    if (vid_out_name is not None):
      # Write an AVI file and convert to MP4 later
      vid_out_avi = f'{vid_out_name}.avi'
      vid_out = cv2.VideoWriter(vid_out_avi, cv2.VideoWriter_fourcc(*'MJPG'), fps, (width, height))

    while(vid.isOpened()):
      ret, frame = vid.read()

      # Bail out when the video file ends
      if not ret:
        break

      frame_out = process_cb(frame)

      if (vid_out_name is not None):
        vid_out.write(frame_out)

      i_frames += 1
      if (i_frames > num_frames):
        vid.release()
        if (vid_out_name is not None):
          vid_out.release()
          # convert AVI to MP4
          vid_out_mp4 = f'{vid_out_name}.mp4'
          !ffmpeg -y -loglevel info -i {vid_out_avi} {vid_out_mp4}
        break

### Download Video

In [0]:
from pathlib import Path
root_path = Path.cwd()
video_dir = root_path/'videos'
video_dir.mkdir(exist_ok=True)

orig_file = video_dir/'trailer1.mp4'
short_file = video_dir/'short.mp4'

In [0]:
!mkdir videos
!wget https://github.com/Tony607/blog_statics/releases/download/v1.0/trailer1.mp4 -P ./videos

# Show a local file (not downloaded from youtube)
sv_tr = ShowVid(orig_file)
sv_tr.extract_short(short_file, duration=5)

In [0]:
sv_tr.show_mp4(width=480, height=360)

### Image Segmentation on Video

In [0]:
import numpy as np


def random_colors(N):
    np.random.seed(1)
    colors = [tuple(255 * np.random.rand(3)) for _ in range(N)]
    return colors


def apply_mask(image, mask, color, alpha=0.5):
    """apply mask to image"""
    for n, c in enumerate(color):
        image[:, :, n] = np.where(
            mask == 1,
            image[:, :, n] * (1 - alpha) + alpha * c,
            image[:, :, n]
        )
    return image


def display_instances(image, boxes, masks, ids, names, scores):
    """
        take the image and results and apply the mask, box, and Label
    """
    n_instances = boxes.shape[0]
    colors = random_colors(n_instances)

    if not n_instances:
        print('NO INSTANCES TO DISPLAY')
    else:
        assert boxes.shape[0] == masks.shape[-1] == ids.shape[0]

    for i, color in enumerate(colors):
        if not np.any(boxes[i]):
            continue

        y1, x1, y2, x2 = boxes[i]
        label = names[ids[i]]
        score = scores[i] if scores is not None else None
        caption = '{} {:.2f}'.format(label, score) if score else label
        mask = masks[:, :, i]

        image = apply_mask(image, mask, color)
        image = cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
        image = cv2.putText(
            image, caption, (x1, y1), cv2.FONT_HERSHEY_COMPLEX, 0.7, color, 2
        )

    return image

In [0]:
import IPython.core.debugger as db

def myfunc(frame):

  results = model.detect([frame], verbose=0)

  r = results[0]
  res = display_instances(frame, r['rois'], r['masks'], r['class_ids'], 
                            class_names, r['scores'])

  return res

svf = ShowVid(short_file)
svf.process_video(myfunc, 140, 'outvid')

In [0]:
svo = ShowVid('outvid.mp4')
svo.show_mp4(width=720, height=720)