In [None]:
!pip install opencv-python
!pip install imageio
!pip install moviepy

In [None]:
# @title Prerequisites
import colorsys
import json
import os
import random
from typing import Tuple, List, Dict
import zipfile

import cv2
import imageio
import matplotlib.pyplot as plt
import moviepy.editor as mvp
import numpy as np
import requests
from scipy.io import wavfile

In [None]:
# @title Utility Functions
def download_and_unzip(url: str, destination: str):
  """Downloads and unzips a .zip file to a destination.

  Downloads a file from the specified URL, saves it to the destination
  directory, and then extracts its contents.

  If the file is larger than 1GB, it will be downloaded in chunks,
  and the download progress will be displayed.

  Args:
    url (str): The URL of the file to download.
    destination (str): The destination directory to save the file and
      extract its contents.
  """
  if not os.path.exists(destination):
    os.makedirs(destination)

  filename = url.split('/')[-1]
  file_path = os.path.join(destination, filename)

  if os.path.exists(file_path):
    print(f'{filename} already exists. Skipping download.')
    return

  response = requests.get(url, stream=True)
  total_size = int(response.headers.get('content-length', 0))
  gb = 1024*1024*1024

  if total_size / gb > 1:
    print(f'{filename} is larger than 1GB, downloading in chunks')
    chunk_flag = True
    chunk_size = int(total_size/100)
  else:
    chunk_flag = False
    chunk_size = total_size

  with open(file_path, 'wb') as file:
    for chunk_idx, chunk in enumerate(
        response.iter_content(chunk_size=chunk_size)):
      if chunk:
        if chunk_flag:
          print(f"""{chunk_idx}% downloading
          {round((chunk_idx*chunk_size)/gb, 1)}GB
          / {round(total_size/gb, 1)}GB""")
        file.write(chunk)
  print(f"'{filename}' downloaded successfully.")

  with zipfile.ZipFile(file_path, 'r') as zip_ref:
    zip_ref.extractall(destination)
  print(f"'{filename}' extracted successfully.")

  os.remove(file_path)


def load_db_json(db_file: str) -> Dict:
  """Loads a JSON file as a dictionary.

  Args:
    db_file (str): Path to the JSON file.

  Returns:
    Dict: Loaded JSON data as a dictionary.

  Raises:
    FileNotFoundError: If the specified file doesn't exist.
    TypeError: If the JSON file is not formatted as a dictionary.
  """
  if not os.path.isfile(db_file):
    raise FileNotFoundError(f'No such file: {db_file}')

  with open(db_file, 'r') as f:
    db_file_dict = json.load(f)
    if not isinstance(db_file_dict, dict):
      raise TypeError('JSON file is not formatted as a dictionary.')
    return db_file_dict


def load_mp4_to_frames(filename: str) -> np.array:
  """Loads an MP4 video file and returns its frames as a NumPy array.

  Args:
    filename (str): Path to the MP4 video file.

  Returns:
    np.array: Frames of the video as a NumPy array.
  """
  assert os.path.exists(filename), f'File {filename} does not exist.'
  cap = cv2.VideoCapture(filename)

  vid_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
  height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
  width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))

  vid_frames = np.empty((vid_frames, height, width, 3), dtype=np.uint8)

  idx = 0
  while True:
    ret, vid_frame = cap.read()
    if not ret:
      break

    vid_frames[idx] = vid_frame
    idx += 1

  cap.release()
  return vid_frames


def get_video_frames(data_item: Dict, vid_path: str) -> np.array:
  """Loads frames of a video specified by an item dictionary.

  Assumes format of annotations used in the Perception Test Dataset.

  Args:
  	data_item (Dict): Item from dataset containing metadata.
    vid_path (str): Path to the directory containing videos.

  Returns:
    np.array: Frames of the video as a NumPy array.
  """
  video_file_path = os.path.join(vid_path,
                                 data_item['metadata']['video_id']) + '.mp4'
  vid_frames = load_mp4_to_frames(video_file_path)
  assert data_item['metadata']['num_frames'] == vid_frames.shape[0]
  return vid_frames


In [None]:
# @title Download Dataset Sample
data_path = '/teamspace/s3_connections/perception_test/'
video_path = '/teamspace/s3_connections/perception_test/videos/'

# sample annotations and videos the visualise the annotations later
sample_annot_url = 'https://storage.googleapis.com/dm-perception-test/zip_data/sample_annotations.zip'
download_and_unzip(sample_annot_url, data_path)

sample_videos_url = 'https://storage.googleapis.com/dm-perception-test/zip_data/sample_videos.zip'
download_and_unzip(sample_videos_url, data_path)

db_json_path = os.path.join(data_path, 'sample.json')
db_dict = load_db_json(db_json_path)

In [None]:
# @title Visualisation functions
def get_colors(num_colors: int) -> Tuple[int, int, int]:
  """Generate random colormaps for visualizing different objects and points.

  Args:
    num_colors (int): The number of colors to generate.

  Returns:
    Tuple[int, int, int]: A tuple of RGB values representing the
      generated colors.
  """
  colors = []
  for j in np.arange(0., 360., 360. / num_colors):
    hue = j / 360.
    lightness = (50 + np.random.rand() * 10) / 100.
    saturation = (90 + np.random.rand() * 10) / 100.
    color = colorsys.hls_to_rgb(hue, lightness, saturation)
    color = (int(color[0] * 255), int(color[1] * 255), int(color[2] * 255))
    colors.append(color)
  random.seed(0)
  random.shuffle(colors)
  return colors


def display_video(vid_frames: np.array, fps: int = 30):
  """Create and display temporary video from numpy array frames.

  Args:
    vid_frames: (np.array): The frames of the video as a
    	numpy array. Format of frames should be:
    	(num_frames, height, width, channels)
    fps (int): Frames per second for the video playback. Default is 30.
  """
  kwargs = {'macro_block_size': None}
  imageio.mimwrite('tmp_video_display.mp4',
                   vid_frames[:, :, :, ::-1], fps=fps, **kwargs)
  display(mvp.ipython_display('tmp_video_display.mp4'))


def display_frame(tmp_frame: np.array):
  """Display a frame, converting from RGB to BGR for cv2.

  Args:
    tmp_frame (np.array): The frame to be displayed.
  """
  cv2_imshow(tmp_frame)


def paint_box(video: np.array, track: Dict,
		color: Tuple[int, int, int] = (255, 0, 0),
  	addn_label: str = '') -> np.array:
  """Paint bounding box and label on video for a given track.

  Args:
    video (np.array): The video frames as a numpy array.
    track (Dict): The track information containing bounding box
    and frame information, assumes Perception Test Dataset format.
    color (Tuple[int, int, int]): The RGB color values for the bounding box.
      Default is red (255, 0, 0).
    addn_label (str): Additional label to be added to the track label.
      Default is an empty string.

  Returns:
    np.array: The modified video frames with painted bounding box and
      label.
  """
  _, height, width, _ = video.shape
  name = str(track['id']) + ' : ' + track['label'] + addn_label
  bounding_boxes = np.array(track['bounding_boxes'])

  for box, frame_id in zip(bounding_boxes, track['frame_ids']):
    frame = np.array(video[frame_id])
    x1 = int(round(box[0] * width))
    y1 = int(round(box[1] * height))
    x2 = int(round(box[2] * width))
    y2 = int(round(box[3] * height))
    frame = cv2.rectangle(frame, (x1, y1), (x2, y2),
                          color=color, thickness=2)
    frame = cv2.putText(frame, name, (x1, y1 + 20),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.75, color, 2)
    video[frame_id] = frame

  return video


def paint_boxes(video: np.array, tracks: List[Dict]) -> np.array:
  """Paint bounding boxes and labels on a video for multiple tracks.

  Args:
    video (np.array): The video frames as a numpy array.
    tracks (List): A list of track information,
      where each track contains bounding box and frame information.

  Returns:
    np.array: The modified video frames with painted bounding boxes
      and labels.
  """
  for track_idx, track in enumerate(tracks):
    video = paint_box(video, track, COLORS[track_idx])
  return video


def paint_point(video: np.array,
  	track: dict, color: tuple[int, int, int] = (255, 0, 0)) -> np.array:
  """Paints a single tracked point on each frame of a video.

  Args:
    video (np.array): The video frames as a numpy array.
    track (dict): The track containing frame IDs and corresponding points.
    color (tuple, optional): The color of the painted point.
      Defaults to (255, 0, 0).

  Returns:
    np.array: The video frames with painted points.
  """
  _, height, width, _ = video.shape
  for idx, frame_id in enumerate(track['frame_ids']):
    vid_frame = video[frame_id]
    y = int(round(track['points'][0][idx] * height))
    x = int(round(track['points'][1][idx] * width))
    vid_frame = cv2.circle(vid_frame, (x, y),
    						radius=10, color=color, thickness=-1)
    video[frame_id] = vid_frame
  return video


def paint_points(video: np.array, tracks: List[dict]) -> np.array:
  """Paints multiple tracked points on each frame of a video.

  Args:
    video (np.array): The video frames as a numpy array.
    tracks (List[dict]): The list of tracks containing
      frame IDs and corresponding points.

  Returns:
    np.array: The video frames with painted points.
  """
  for idx, track in enumerate(tracks):
    video = paint_point(video, track, COLORS[idx])
  return video


def paint_sound(video: np.array,
    vid_sound: dict, vid_frames: np.array,
    color: tuple[int, int, int] = (0, 0, 255)) -> np.array:
  """Paints a sound label on each frame of a video.

  Args:
    video (np.array): The video frames as a numpy array.
    vid_sound (dict): The sound containing the label,
      frame IDs, and visibility.
    vid_frames (np.array): The array to keep track of
      the number of labels on each frame.
    color (tuple, optional): The color of the painted label.
      Defaults to (0, 0, 255).

  Returns:
    np.array: The video frames with painted labels.
  """
  _, height, width, _ = video.shape
  name = f"""Sound: {vid_sound["label"]}
  is_visible: {str(bool(vid_sound["is_visible"]))}"""
  [start_frame, end_frame] = vid_sound['frame_ids']
  for frame_id in range(start_frame, end_frame):
    vid_frame = np.array(video[frame_id])
    y1 = int(round(0.9 * height) - (40 * vid_frames[frame_id]))
    x1 = int(round(0.05 * width))

    vid_frame = cv2.putText(vid_frame, name, (x1, y1),
                cv2.FONT_HERSHEY_SIMPLEX, 1.5, color, 2)
    video[frame_id] = vid_frame
    vid_frames[frame_id] += 1


  return video


def paint_action(video: np.array, vid_action: dict,
		vid_frames: np.array, color: tuple[int, int, int] = (0, 255, 0),
    ) -> np.array:
  """Paints an action label on each frame of a video.

  Args:
    video (np.array): The video frames as a numpy array.
    vid_action (dict): The action containing the label and frame IDs.
    vid_frames (np.array): The array to keep track
      of the number of labels on each frame.
    color (tuple, optional): The color of the painted label.
      Defaults to (0, 255, 0).

  Returns:
    np.array: The video frames with painted labels.
  """
  _, height, width, _ = video.shape
  name = f"""Action: {vid_action["label"]}"""
  [start_frame, end_frame] = vid_action['frame_ids']
  for frame_id in range(start_frame, end_frame):
    vid_frame = np.array(video[frame_id])
    y1 = int(round(0.9 * height) - (40 * vid_frames[frame_id]))
    x1 = int(round(0.05 * width))

    vid_frame = cv2.putText(vid_frame, name, (x1, y1),
                cv2.FONT_HERSHEY_SIMPLEX, 1.5, color, 2)
    video[frame_id] = vid_frame
    vid_frames[frame_id] += 1

  return video


def paint_actions(video: np.array,
    vid_actions: List[dict], vid_frames: np.array) -> np.array:
  """Paints multiple action labels on each frame of a video.

  Args:
    video (np.array): The video frames as a numpy array.
    vid_actions (List[dict]): The list of actions containing
      the labels and frame IDs.
    vid_frames (np.array): The array to keep track
      of the number of labels on each frame.

  Returns:
    np.array: The video frames with painted labels.
  """
  for vid_action in vid_actions:
    video = paint_action(video, vid_action, vid_frames)
  return video


def paint_sounds(video: np.array,
  	vid_sounds: List[dict], vid_frames: np.array) -> np.array:
  """Paints multiple sound labels on each frame of a video.

  Args:
    video (np.array): The video frames as a numpy array.
    vid_sounds (List[dict]): The list of sounds containing the labels,
      frame IDs, and visibility.
    vid_frames (np.array): The array to keep track of the
      number of labels on each frame.

  Returns:
    np.array: The video frames with painted labels.
  """
  for sound in vid_sounds:
    video = paint_sound(video, sound, vid_frames)
  return video


def get_answer_tracks(ex_data: dict, goq_ids: List) -> List[dict]:
  """Filters and retrieves object tracks based on the given object ids.

  Args:
    ex_data (dict): The data containing object tracking information.
    goq_ids (List): The list of IDs to filter tracks.

  Returns:
    List[dict]: The filtered tracks matching the goq_ids.
  """
  goq_tracks = []
  for track in ex_data['object_tracking']:
    if track['id'] in goq_ids:
      goq_tracks.append(track)
  return goq_tracks

In [None]:
# @title Show Example Annotations
video_id = list(db_dict.keys())[6]
example_data = db_dict[video_id]

print('---------------------------------------------------------------------')
print('Tasks annotated for this video: ')
for k, v in example_data.items():
  if v:
    print(f'{k} - available: yes - annotations: {len(v)}')
  else:
    print(f'{k} - available: no')
print('---------------------------------------------------------------------')
print('Video Metadata')
print('---------------------------------------------------------------------')
for k, v in example_data['metadata'].items():
  print(f'{k} : {v}')
print('---------------------------------------------------------------------')
print('Object Tracking data')
print('---------------------------------------------------------------------')
for k, v in example_data['object_tracking'][0].items():
  print(f'{k} : {v}')
print('---------------------------------------------------------------------')
print('Multiple-Choice VQA')
print('---------------------------------------------------------------------')
for k, v in example_data['mc_question'][0].items():
  print(f'{k} : {v}')
print('---------------------------------------------------------------------')

In [None]:
# @title Visualising Object Tracks
if example_data['object_tracking']:
  frames = get_video_frames(example_data, video_path)

  COLORS = get_colors(num_colors=100)
  show_all_tracks = True  # @param {type: "boolean"}
  show_track = 2  # @param {type: "integer"}

  if show_all_tracks:
    frames = paint_boxes(frames, example_data['object_tracking'])
  else:
    frames = paint_box(frames, example_data['object_tracking'][show_track])

  annotated_frames = []
  for frame_idx in example_data['object_tracking'][0]['frame_ids']:
    annotated_frames.append(frames[frame_idx])

  annotated_frames = np.array(annotated_frames)
  display_video(annotated_frames, 1)
  del frames

In [None]:
# @title Visualising Point Tracks
if example_data['point_tracking']:
  frames = get_video_frames(example_data, video_path)
  COLORS = get_colors(num_colors=100)
  frames = paint_points(frames, example_data['point_tracking'])
  display_video(frames, example_data['metadata']['frame_rate'])
  del frames

In [None]:
# @title Visualising Action Segments
if example_data['action_localisation']:
  frames = get_video_frames(example_data, video_path)
  labelled_frames = np.zeros(frames.shape[0])
  frames = paint_actions(frames, example_data['action_localisation'],
  				 labelled_frames)
  display_video(frames, example_data['metadata']['frame_rate'])
  del frames

In [None]:
# @title Plotting Action Segments
if example_data['action_localisation']:
  frames = get_video_frames(example_data, video_path)[:,:,:,::-1]

  action_labels = []
  action_start_times = []
  action_end_times = []

  for action in example_data['action_localisation']:
    action_labels.append(action['label'])
    action_start_times.append(action['timestamps'][0]/1e6)
    action_end_times.append(action['timestamps'][1]/1e6)

  action_start_times = np.array(action_start_times)
  action_end_times = np.array(action_end_times)

  plt.figure(figsize=(20, 15))
  # Strip of frames
  plt.subplot(4, 1, 2)
  plt.title('Video Frames')
  f_size = frames[0].shape
  small = tuple(reversed((np.array(f_size[:2]) / 4).astype(int)))
  strip = None
  num_frames = example_data['metadata']['num_frames']
  for i in range(0, num_frames, int(num_frames/4)):
    frame = cv2.resize(frames[i], small)
    if strip is None:
      strip = np.array(frame)
    else:
      strip = np.concatenate([strip, frame], axis=1)
    plt.imshow(strip)

  del frames

  plt.subplot(4, 1, 3)
  plt.title('Action Events')
  plt.barh(range(len(action_start_times)),
           action_end_times-action_start_times,
           left=action_start_times)
  plt.yticks(range(len(action_start_times)), action_labels)

  plt.show()

In [None]:
# @title Visualising Multiple-Choice Video Question-Answering Annotations
if example_data['mc_question']:
  for question in example_data['mc_question']:
    print('---------------------------------')
    print('Question: ', question['question'])
    print('Options: ', question['options'])
    print('Answer ID: ', question['answer_id'])
    print('Answer: ', question['options'][question['answer_id']])
    print('Question info: ')
    print('Reasoning: ', question['reasoning'])
    print('Tag: ', question['tag'])
    print('area: ', question['area'])
    print('---------------------------------')

In [None]:
# @title Visualising Grounded Video Question-Answering

# loading an example that has grounded question annotations
video_id = list(db_dict.keys())[7]
example_data = db_dict[video_id]

if example_data['grounded_question']:
  question = example_data['grounded_question'][0]
  print('---------------------------------')
  print('Question: ', question['question'])
  print('Answer IDs: ', question['answers'])
  print('Question info: ')
  print('Reasoning: ', question['reasoning'])
  print('area: ', question['area'])
  print('---------------------------------')

  frames = get_video_frames(example_data, video_path)
  answer_tracks = get_answer_tracks(example_data, question['answers'])
  frames = paint_boxes(frames, answer_tracks)

  annotated_frames = []
  for frame_idx in answer_tracks[0]['frame_ids']:
    annotated_frames.append(frames[frame_idx])

  annotated_frames = np.array(annotated_frames)
  display_video(annotated_frames, 1)
  del frames