In [None]:
import cv2
import pandas as pd
from typing import List
import matplotlib.pyplot as plt
import numpy as np
import imageio

import robosuite as suite

from task_decomposition.paths import DATA_VIDEOS_PATH
from task_decomposition.data.gpt_outputs import GPT_OUTPUTS

FONT = cv2.FONT_HERSHEY_SIMPLEX


In [None]:

def get_subtask_description(idx: int, filename: str):
    """
    Get the subtask for any given index
    """
    subtask_decomposition = GPT_OUTPUTS[filename]
    for start, end, description, _ in subtask_decomposition:
        if start <= idx <= end:
            return description
    return "Index not within any subtask range."

def annotate_video_fn(frames: List, filename: str):
    """
    Save a video of the frames
    """
    subtask_decomposition = GPT_OUTPUTS[filename]
    full_filename = DATA_VIDEOS_PATH + "/" + filename + "_gpt_annotated.mp4"
    video_writer = imageio.get_writer(full_filename, fps=20)
    for idx, frame in enumerate(frames):
        # annotate videos with step number
        frame = frame.astype(np.uint8)
        subtask = get_subtask_description(idx, filename)
        cv2.putText(
            frame, subtask
        )

        video_writer.append_data(frame)

    video_writer.close()
    return

In [None]:
video_name = "stack.mp4"
video = cv2.VideoCapture(DATA_VIDEOS_PATH+"/"+video_name)

base64Frames = []
while video.isOpened():
    success, frame = video.read()
    if not success:
        break
    _, buffer = cv2.imencode(".jpg", frame)
    base64Frames.append(base64.b64encode(buffer).decode("utf-8"))

video.release()
print(len(base64Frames), "frames read.")