<a href="https://colab.research.google.com/github/rifathaqueamit/poc_development/blob/develop/poc_v_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<h1>Demonstration of the POC for searching content inside video</h1>

Prerequisites


1.  Upload the required model with name "model.tflite"
2.  Upload the input video inside folder "input_videos"

Update : V5 (15th Aug, 2023)




In [1]:
# @title 1. Install dependencies
!pip install -U -q "tf-models-official"
!command -v ffmpeg >/dev/null || (apt update && apt install -y ffmpeg)
!pip install -q mediapy

In [2]:
# @title 2. Import dependencies
import os
import matplotlib as mpl
import matplotlib.pyplot as plt
import mediapy as media
import numpy as np
import PIL
import pandas as pd
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_hub as hub
import tqdm
import absl.logging
import random
import re
import tempfile
import ssl
import cv2
import imageio
from IPython import display
from urllib import request

tf.get_logger().setLevel('ERROR')
absl.logging.set_verbosity(absl.logging.ERROR)
mpl.rcParams.update({
    'font.size': 10,
})

In [11]:
# @title 3. Define constants (if required)
#Constants
input_videos_path = "input_videos"
output_videos_path = "output_videos"
model_frame_width = 224
model_frame_height = 224
video_fps = 5
seconds_per_segments = 5
threshold = 50

In [4]:
# @title 4. Helper functions

# Download Kinetics 600 label map
!wget https://raw.githubusercontent.com/tensorflow/models/f8af2291cced43fc9f1d9b41ddbf772ae7b0d7d2/official/projects/movinet/files/kinetics_600_labels.txt -O labels.txt -q

with tf.io.gfile.GFile('labels.txt') as f:
  lines = f.readlines()
  KINETICS_600_LABELS_LIST = [line.strip() for line in lines]
  KINETICS_600_LABELS = tf.constant(KINETICS_600_LABELS_LIST)

def get_top_k(probs, k=5, label_map=KINETICS_600_LABELS):
  """Outputs the top k model labels and probabilities on the given video."""
  top_predictions = tf.argsort(probs, axis=-1, direction='DESCENDING')[:k]
  top_labels = tf.gather(label_map, top_predictions, axis=-1)
  top_labels = [label.decode('utf8') for label in top_labels.numpy()]
  top_probs = tf.gather(probs, top_predictions, axis=-1).numpy()
  return tuple(zip(top_labels, top_probs))

def predict_top_k(model, video, k=5, label_map=KINETICS_600_LABELS):
  """Outputs the top k model labels and probabilities on the given video."""
  outputs = model.predict(video[tf.newaxis])[0]
  probs = tf.nn.softmax(outputs)
  return get_top_k(probs, k=k, label_map=label_map)

def get_top_k_streaming_labels(probs, k=5, label_map=KINETICS_600_LABELS_LIST):
  """Returns the top-k labels over an entire video sequence.

  Args:
    probs: probability tensor of shape (num_frames, num_classes) that represents
      the probability of each class on each frame.
    k: the number of top predictions to select.
    label_map: a list of labels to map logit indices to label strings.

  Returns:
    a tuple of the top-k probabilities, labels, and logit indices
  """
  top_categories_last = tf.argsort(probs, -1, 'DESCENDING')[-1, :1]
  categories = tf.argsort(probs, -1, 'DESCENDING')[:, :k]
  categories = tf.reshape(categories, [-1])

  counts = sorted([
      (i.numpy(), tf.reduce_sum(tf.cast(categories == i, tf.int32)).numpy())
      for i in tf.unique(categories)[0]
  ], key=lambda x: x[1], reverse=True)

  top_probs_idx = tf.constant([i for i, _ in counts[:k]])
  top_probs_idx = tf.concat([top_categories_last, top_probs_idx], 0)
  top_probs_idx = tf.unique(top_probs_idx)[0][:k+1]

  top_probs = tf.gather(probs, top_probs_idx, axis=-1)
  top_probs = tf.transpose(top_probs, perm=(1, 0))
  top_labels = tf.gather(label_map, top_probs_idx, axis=0)
  top_labels = [label.decode('utf8') for label in top_labels.numpy()]

  return top_probs, top_labels, top_probs_idx

# Utilities to open video files using CV2
def crop_center_square(frame):
  y, x = frame.shape[0:2]
  min_dim = min(y, x)
  start_x = (x // 2) - (min_dim // 2)
  start_y = (y // 2) - (min_dim // 2)
  return frame[start_y:start_y+min_dim,start_x:start_x+min_dim]

def load_video(path, max_frames=0, resize=(model_frame_width, model_frame_height)):
  cap = cv2.VideoCapture(path)
  frames = []
  try:
    while True:
      ret, frame = cap.read()
      if not ret:
        break
      frame = crop_center_square(frame)
      frame = cv2.resize(frame, resize)
      frame = frame[:, :, [2, 1, 0]]
      frames.append(frame)

      if len(frames) == max_frames:
        break
  finally:
    cap.release()
  return np.array(frames) / 255.0

# detect and print 5 probabilities
def detect(name, video):
  print("...")
  print("Detecting " + name + "...")
  init_states = {
    name: tf.zeros(x['shape'], dtype=x['dtype'])
    for name, x in runner.get_input_details().items()
  }
  del init_states['image']

  clips = video
  states = init_states
  for clip in clips:
    outputs = runner(**states, image=clip)
    logits = outputs.pop('logits')[0]
    states = outputs

  probs = tf.nn.softmax(logits)
  top_k = get_top_k(probs)
  print()
  for label, prob in top_k:
    print(label, prob)

In [5]:
# @title 5. Load pre-trained model
# Create the interpreter and signature runner
interpreter = tf.lite.Interpreter(model_path="model.tflite")
runner = interpreter.get_signature_runner()

In [12]:
# @title 6. Make necessary folders
if os.path.exists(input_videos_path) == False:
  os.mkdir(input_videos_path)

if os.path.exists(output_videos_path) == False:
  os.mkdir(output_videos_path)

In [20]:
# @title 7. Collect input videos (Mp4 only)
input_videos = []

for file in os.listdir("input_videos"):
    if file.endswith(".mp4"):
        input_videos.append(file)

if len(input_videos) == 0:
  print("There is no input videos")
else:
  print(input_videos)

['received_256168027307762.mp4', 'received_824546655998330.mp4']


In [None]:
# @title 8. Convert the input mp4 video into AVI format
output_videos = []
for input_video in input_videos:
  input_path = input_videos_path + "/" + input_video
  path, filename_ext = os.path.split(input_path)
  filename = os.path.splitext(filename_ext)
  output_path = output_videos_path + "/" + filename[0] + ".avi"
  if os.path.exists(output_path):
    os.remove(output_path)
  print("Input : ", input_path)
  print("Output : ", output_path)
  !ffmpeg -i $input_path -filter:v fps=$video_fps $output_path
  output_videos.append(output_path)

In [27]:
# @title 9. Load the videos and show
loaded_videos = []
for output_video in output_videos:
  loaded_video = load_video(output_video)
  media.show_video(loaded_video, fps=video_fps)
  print("video frames count, width, height, color : ", loaded_video.shape)
  loaded_videos.append((output_video, loaded_video))

0
This browser does not support the video tag.


video frames count, width, height, color :  (77, 224, 224, 3)


0
This browser does not support the video tag.


video frames count, width, height, color :  (251, 224, 224, 3)


In [74]:
# @title 10. Separate the videos in small segments
def separateVideoInSegments(name, video):
  print("separateVideoInSegments() : ", name)
  video_segments = []
  video_segments_playable = []

  frames_per_video = seconds_per_segments * video_fps

  for idx, frame in enumerate(loaded_video):
    video_id = int(idx / frames_per_video)
    try:
      casted_frame = tf.cast(frame, tf.float32)
      video_segments[video_id].append(casted_frame)
      video_segments_playable[video_id].append(frame)
    except IndexError:
      video_segments.append([casted_frame])
      video_segments_playable.append([frame])

  for idx, segment in enumerate(video_segments):
    print(">> Segment ", idx+1, ", total frames : ", len(segment))

  return (name, video_segments, video_segments_playable)

videos_information = []
for output_video, loaded_video in loaded_videos:
  videos_information.append(separateVideoInSegments(output_video, loaded_video))

separateVideoInSegments() :  output_videos/received_256168027307762.avi
>> Segment  1 , total frames :  25
>> Segment  2 , total frames :  25
>> Segment  3 , total frames :  25
>> Segment  4 , total frames :  2
separateVideoInSegments() :  output_videos/received_824546655998330.avi
>> Segment  1 , total frames :  25
>> Segment  2 , total frames :  25
>> Segment  3 , total frames :  25
>> Segment  4 , total frames :  25
>> Segment  5 , total frames :  25
>> Segment  6 , total frames :  25
>> Segment  7 , total frames :  25
>> Segment  8 , total frames :  25
>> Segment  9 , total frames :  25
>> Segment  10 , total frames :  25
>> Segment  11 , total frames :  1


In [75]:
# @title 11. Define the detect function
def detectFirstPrint(video):
  init_states = {
    name: tf.zeros(x['shape'], dtype=x['dtype'])
    for name, x in runner.get_input_details().items()
  }
  del init_states['image']

  clips = video
  states = init_states
  for clip in clips:
    # Input shape: [1, 1, 224, 224, 3]
    outputs = runner(**states, image=clip)
    logits = outputs.pop('logits')[0]
    states = outputs

  probs = tf.nn.softmax(logits)
  top_k = get_top_k(probs)

  for label, prob in top_k:
    if prob * 100 >= threshold:
      print("Action : " + label + ", with probability : " + str(prob * 100) + "%")
      return

  print("Action : nothing detected")

In [None]:
# @title 12. Run inference

for video_item in videos_information:
  video_name = video_item[0]
  video_segments = video_item[1]
  video_segments_playable = video_item[2]

  print()
  print("Video : ", video_name)
  print(".............................")

  segment_id = 0
  for segment in video_segments:
    print("Detecting ", segment_id + 1, " ...")
    detectFirstPrint(video_segments[segment_id])
    media.show_video(video_segments_playable[segment_id], fps=video_fps)
    segment_id = segment_id + 1