### Welcome!
To ensure outputs are not truncated, press [Shift-O].

### First-Time Setup

In [None]:
# Clone FYP inference base model

%cd ..\FollowYourPose
!mkdir checkpoints

%cd checkpoints
!git lfs install
!git clone https://huggingface.co/YueMafighting/FollowYourPose_v1 .
%cd ..\..\demos

### Subsequent Setup

In [None]:
# return to root folder
%cd ..

In [None]:
#@title Folder Scaffolding & Path Loading
from pathlib import Path
import os

'''
how 2 Path
Try to surround file paths with Path() to reduce confusion
Because Path(str / str) does not work, try to use fstrings as inputs to Path(), instead of Path(var / var)
This is to reduce the impact of inevitable missed Path()s
If need to write filepath as a string, use .as_posix()
'''

# Root Directory
ROOT_DIR_PATH = os.getcwd()

# Video
VIDEO_DIR_PATH = Path(f"{ROOT_DIR_PATH}/video")
VIDEO_SKELETON_DIR_PATH = Path(f"{ROOT_DIR_PATH}/video/Skeleton")
!mkdir {VIDEO_SKELETON_DIR_PATH}

# I/O Files
TRAINING_CONTENT_DIR_PATH = Path(f"{ROOT_DIR_PATH}/training_content")
!mkdir {TRAINING_CONTENT_DIR_PATH}
CUSTOM_MODEL_DIR_PATH = Path(f"{ROOT_DIR_PATH}/custom_model")
!mkdir {CUSTOM_MODEL_DIR_PATH}
INFERENCE_OUTPUT_DIR_PATH = Path(f"{ROOT_DIR_PATH}/inference_output")
!mkdir {INFERENCE_OUTPUT_DIR_PATH}
TEST_OUTPUT_DIR_PATH = Path(f"{ROOT_DIR_PATH}/test_output")
!mkdir {TEST_OUTPUT_DIR_PATH}

MMPOSE_DIR_PATH = Path(f"{ROOT_DIR_PATH}/MMPose")
FYP_DIR_PATH = Path(f"{ROOT_DIR_PATH}/FollowYourPose")

# FYP Config
CONFIG_DIR_PATH = Path(f"{FYP_DIR_PATH}/configs")

# Dataset Paths
CHARADES_LOOKUP_PATH = Path(f"{ROOT_DIR_PATH}/other_files/charades_lookup")

import sys

import csv
import datetime
import time
import subprocess

from omegaconf import OmegaConf
import yaml

from ipywidgets import Dropdown, Output, Layout, widgets, Button, VBox, HBox
from IPython.display import display, Markdown, HTML, Video, Image, clear_output

from moviepy.editor import VideoFileClip, clips_array, TextClip, CompositeVideoClip, ColorClip
from moviepy.config import change_settings

import numpy as np
import cv2
import imageio
import fnmatch

In [None]:
#@title Configuration Presets

# Expected Configurations of a Inference yaml
expected_inference_config = {
  "pretrained_model_path": str,
  "output_dir": str,
  "validation_data": {
      "prompts": list,
      "video_length": int,
      "width": int,
      "height": int,
      "num_inference_steps": int,
      "guidance_scale": float,
      "use_inv_latent": bool,
      "num_inv_steps": int,
      "dataset_set": str
  },
  "train_batch_size": int,
  "validation_steps": int,
  "resume_from_checkpoint": str,
  "seed": int,
  "mixed_precision": str,
  "gradient_checkpointing": bool,
  "enable_xformers_memory_efficient_attention": bool
}

# Expected Configurations of a Training yaml
expected_training_config = {
  "pretrained_model_path": str,
  "output_dir": str,
  "train_data": {
    "video_path": str,
    "n_sample_frames": int,
    "width": int,
    "sample_frame_rate": int
  },
  "learning_rate": float,
  "train_batch_size": int,
  "max_train_steps": int,
  "trainable_modules": list,
  "seed": int,
  "mixed_precision": str,
  "use_8bit_adam": bool,
  "gradient_checkpointing": bool,
  "enable_xformers_memory_efficient_attention": bool
}

# Define the options for boolean dropdown
boolean_dropdown = [True, False]

# Create layout
configs_config_style = {'description_width': '150px'}
configs_config_layout = widgets.Layout(width="500px")
configs_config_button_layout = widgets.Layout(margin='0px 0px 20px 354px')

In [None]:
#@title Configuration Functions

# Find all models and creates a list of the models' path in the directory
def create_model_list():
  model_options = [("Default", Path(f'{FYP_DIR_PATH}/checkpoints/stable-diffusion-v1-4'))]
  for model_name in os.listdir(CUSTOM_MODEL_DIR_PATH):
    model_path = os.path.join(CUSTOM_MODEL_DIR_PATH, model_name)
    if os.path.isdir(model_path):
      model_options.append((model_name, Path(f'{CUSTOM_MODEL_DIR_PATH}/{model_name}')))
  return model_options

def create_video_path_list():
  video_path_options = [("None", TRAINING_CONTENT_DIR_PATH)]  # path_changed from "/content/training_content"
  for video_name in os.listdir(TRAINING_CONTENT_DIR_PATH):
    video_path = os.path.join(TRAINING_CONTENT_DIR_PATH, video_name)
    if os.path.isdir(video_path):
      video_path_options.append((video_name, Path(f'{TRAINING_CONTENT_DIR_PATH}/{video_name}')))
  return video_path_options

# Check if the yaml is in the correct structure for inference
def compare_dict_structure(expected, yaml_config):
  for key, value in expected.items():
    if not isinstance(value, dict):
      if key not in yaml_config or not isinstance(yaml_config[key], value): # Check if key exists and if data type matches
        if key in yaml_config and isinstance(list(yaml_config[key]), value): # Special check for list because omegaconf sees it as a ListConfig
          return True
        return False
    elif key in yaml_config: # If value is a dict recurse to its first non dict value
      compare_dict_structure(value, yaml_config[key])
  return True

In [None]:
#@title Post-Process Inference Functions

# Returns the -bg pose video
def usePoseWithBG(skeleton_path):
  video_name = skeleton_path.name
  
  new_path = Path(f'{skeleton_path.parents[0]}/.{video_name[:-4]}-bg.mp4')
  
  return new_path

# Find the first occurence of a file name in video directory and its subdirs
def findHumanFilePath(file_to_find, folder_path):
  found_file = None

  for item in os.listdir(folder_path):
    if item == "Skeleton":
      continue
    if os.path.isfile(os.path.join(folder_path, item)):
      if fnmatch.fnmatch(item, file_to_find):
        found_file = os.path.join(folder_path, item)
        return found_file
    else:
      found_file = findHumanFilePath(file_to_find, (folder_path / item))

    if found_file:
      break

  return found_file

# Combines all gifs with parameters to show superimposition and captions
def postprocess_gif(inf_path, prompts, size, pose_type, show_caption, is_gif_superimposed):
  # Create lists that will hold the path of input and videos to combine
  gif_video_paths = []
  clips = []

  # Path containing the pose and the path of the output video
  # Run the FFmpeg command to adjust the video's duration
  pose_path = Path(f'{inf_path}/pose.gif')
  if pose_type == "human":
    pose_path = Path(f'{inf_path}/pose-human.gif')
  elif pose_type == "combination":
    pose_path = Path(f'{inf_path}/pose-superimposed.gif')

  gif_output_path = Path(f'{inf_path}/processed/all_combined.gif')

  # Check if only a single prompt was given
  if isinstance(prompts, str):
    prompts = [prompts]
    gif_output_path = Path(f'{inf_path}/processed/{prompts[0]}.gif')

  # Check if superimposed is true
  if is_gif_superimposed:
    gif_video_paths = [Path(f'{inf_path}/superimposed/{prompt}.gif') for prompt in prompts]
  else:
    gif_video_paths = [Path(f'{inf_path}/raw/{prompt}.gif') for prompt in prompts]

  # Load video
  skeleton_video = VideoFileClip(pose_path.as_posix())
  for file_path in gif_video_paths:
    gif_video = VideoFileClip(file_path.as_posix())
    clips.append(gif_video)

  # Video editting
  skeleton_video = skeleton_video.resize((size, size))
  clips = [video.resize((size, size)) for video in clips]
  clips.insert(0, skeleton_video)

  if not show_caption:
    # Combine videos side by side and write to output path
    result = clips_array([clips])
    result.write_gif(gif_output_path, fps=10)

  else:
    # Create a black bar for captions
    black_clip = ColorClip(size=(clips[0].w, clips[0].h + 40), color=(0, 0, 0), duration=clips[0].duration)
    clips[0] = CompositeVideoClip([black_clip, clips[0]])

    # Add captions
    for index, prompt in enumerate(prompts):
      txt_clip = TextClip(prompt, font="Amiri-bold", fontsize=30, color='white')
      txt_clip = txt_clip.set_duration(clips[index + 1].duration)
      txt_clip = txt_clip.set_position(("center", "bottom"))

      # Combine clips with captions
      clips[index + 1] = CompositeVideoClip([black_clip, txt_clip, clips[index + 1]])

    # Write the final GIF
    gif_with_text = clips_array([clips])
    gif_with_text.write_gif(gif_output_path, fps=10)

# View the duration, number of frames and fps of a given gif
def gif_stats(gif_path):
  gif = imageio.get_reader(gif_path)

  # Get the duration in seconds
  duration = 0.0
  for frame in gif:
      duration += frame.meta['duration'] / 1000.0

  fps = len(gif) / duration
  print(f"Duration: {duration}s, Frames: {len(gif)}, FPS: {fps}")

# Matches the skeleton video duration and fps to be the same as gif
def postprocess_mmpose(skeleton_path, video_length, output_dir, pose_type):
  
  # Get the video duration using ffprobe
  duration = float(subprocess.check_output(['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', skeleton_path]))

  # Set your desired duration
  desired_duration = video_length * 130 / 1000

  # Calculate the speedup factor
  speedup_factor = desired_duration / duration

  fps = video_length / desired_duration

  # Run the FFmpeg command to adjust the video's duration
  output_path = Path(f'{output_dir}/pose.gif')
  if pose_type == "human":
    output_path = Path(f'{output_dir}/pose-human.gif')
  elif pose_type == "combination":
    output_path = Path(f'{output_dir}/pose-superimposed.gif')

  !ffmpeg -i $skeleton_path -vf "setpts=$speedup_factor*PTS,fps=$fps" -y $output_path

## Video Browsing üëÄ

1. Play the Video Selection cell.
2. Pick a folder.
3. Pick a video.
4. Click **Display** to view the video.
5. The other buttons, **MMPose**, **FYP** and **Refresh** will be covered in the other sections.

In [None]:
# @title Video Selection { display-mode: "form" }

# Helper Functions
def getFolderContent(folder_name):
  subfolders = []
  for content in os.listdir(VIDEO_DIR_PATH):
        content_path = os.path.join(VIDEO_DIR_PATH, content)

        # Check if it's a directory and not hidden
        if os.path.isdir(content_path) and not content.startswith("."):
            subfolders.append(content)

  return subfolders

# Init
video_dir_folders = os.listdir(VIDEO_DIR_PATH)
video_dir_folder_content = getFolderContent(video_dir_folders)

# Create layout
layout_single_button = widgets.Layout(width='212px',margin='0px 0px 20px 90px')
layout_double_button = widgets.Layout(width='104px')
layout_hbox = widgets.Layout(margin='0px 0px 0px 88px')
layout_output = widgets.Layout(margin='0px 0px 20px 0px', display='flex', align_items='flex-start')

# Create widgets
video_output_placeholder = widgets.Output(layout=layout_output)
video_subdir_dropdown = widgets.Dropdown(options=video_dir_folder_content, description='Folder:', value=None, disabled=False)
video_dropdown = widgets.Dropdown(options=[], description='Video:', disabled=True)
video_display_button = widgets.Button(description="Display", disabled=True, layout=layout_double_button)
video_refresh_button = widgets.Button(description="Refresh", disabled=False, layout=layout_double_button)
video_mmpose_button = widgets.Button(description="MMPose", disabled=True, layout=layout_double_button)
video_fyp_button = widgets.Button(description="FYP", disabled=True, layout=layout_double_button)

video_hbox_1 = widgets.HBox([video_display_button, video_refresh_button], layout=layout_hbox)
video_hbox_2 = widgets.HBox([video_mmpose_button, video_fyp_button], layout=layout_hbox)

video_output_placeholder_content = HTML("""
  <div style="width: 512px; height: 512px; border-radius: 5%; background-color: black; margin: 0 auto; display: flex; justify-content: center; align-items: center;">
      <div style="width: 500px; height: 500px; border-radius: 5%; border: 2px solid white;" />
  </div>
""")

# Create listeners
## Update video dropdown options based on the selected folder
def video_subdir_select(change):
    selected_video_folder = video_subdir_dropdown.value

    if selected_video_folder != None:
      selected_VIDEO_DIR_PATH = Path(f'{VIDEO_DIR_PATH}/{selected_video_folder}')
      selected_video_dir_content = [file for file in os.listdir(selected_VIDEO_DIR_PATH) if file.endswith('.mp4')]
    else:
      selected_video_dir_content = []

    video_dropdown.options = selected_video_dir_content
    if not selected_video_dir_content:
        video_dropdown.disabled = False
        video_dropdown.value = None
    else:
        video_dropdown.disabled = False

## Display the selected video
def display_selected_video(change):
    selected_video = video_dropdown.value
    selected_video_folder = video_subdir_dropdown.value

    if selected_video:
        video_path = Path(f'{VIDEO_DIR_PATH}/{selected_video_folder}/{selected_video}')
        video_display = Video(video_path, width=512, height=512, embed=True)

        # Clear the output placeholder and display the video
        with video_output_placeholder:
            clear_output()
            display(video_display)

## Refresh folder and directory
def refresh_folder_and_directory(change):
    video_dir_folders = os.listdir(VIDEO_DIR_PATH)
    video_dir_folder_content = getFolderContent(video_dir_folders)

    video_dropdown.options = []
    video_dropdown.value = None

    video_subdir_dropdown.options = video_dir_folder_content
    video_subdir_dropdown.value = None



## Enable button when a valid video is picked
def enable_button(change):
    if video_dropdown.value:
        video_display_button.disabled = False
        video_mmpose_button.disabled = False
        video_fyp_button.disabled = False
    else:
        video_display_button.disabled = True
        video_mmpose_button.disabled = True
        video_fyp_button.disabled = True

# Attach Listeners
video_subdir_dropdown.observe(video_subdir_select, 'value')
video_display_button.on_click(display_selected_video)
video_refresh_button.on_click(refresh_folder_and_directory)
video_dropdown.observe(enable_button, 'value')


# Display fields
with video_output_placeholder:
  display(video_output_placeholder_content)
display(video_output_placeholder)
display(video_subdir_dropdown)
display(video_dropdown)

display(video_hbox_1)
display(video_hbox_2)


## Inference: MMPOSE ÏõÉ

1. Play the Run MMPose Inference cell.
2. Go back to the Video Selection cell and select a folder and a corresponding video and click **MMPose** . (Ensure the video is of a human doing any action)
3. Wait for the **Video** box to be populated with the selected video.
4. Click **Start Inference**.
5. Done.

In [None]:
# @title Run MMPose Inference { display-mode: "form" }

# Helper Functions
## Re-encode video due to H.264 video encoding error
def reencode_video(input_file):
  temp_output_file = Path(f'{VIDEO_SKELETON_DIR_PATH}/output.mp4')
  !ffmpeg -i {input_file} -c:v libx264 -crf 23 -c:a aac -strict experimental {temp_output_file}

  os.remove(input_file)
  os.rename(temp_output_file, input_file)

# Create layout
layout_single_long_button = widgets.Layout(margin='0px 0px 20px 154px')

# Create widgets
selected_mmpose_video_input = widgets.Text(placeholder='Select a video above', description="Video:", disabled=True)
inf_mmpose_button = widgets.Button(description="Start Inference", disabled=True, layout=layout_single_long_button)

# Create listeners
## Retrieve video input
def update_mmpose_input_video(change):
    selected_video = video_dropdown.value
    selected_mmpose_video_input.value = selected_video
    if selected_video:
        inf_mmpose_button.disabled = False
    else:
        inf_mmpose_button.disabled = True

## Run mmpose
def run_mmpose_inference(button):
    selected_video = video_dropdown.value
    video_path = Path(f'{VIDEO_DIR_PATH}/{video_subdir_dropdown.value}/{selected_video}')

    %cd -q {MMPOSE_DIR_PATH}

    if not os.path.exists(VIDEO_SKELETON_DIR_PATH):
        os.mkdir(VIDEO_SKELETON_DIR_PATH)

    # Start inference with human background
    !python demo/inferencer_demo.py \
        {video_path}  \
        --pose2d human \
        --vis-out-dir {VIDEO_SKELETON_DIR_PATH} \
        --thickness 4 \
        --radius 0

    # Change name for human background
    input_file = Path(f'{VIDEO_SKELETON_DIR_PATH}/{selected_video}')
    bg_file = Path(f'{VIDEO_SKELETON_DIR_PATH}/.{selected_video[:-4]}-bg.mp4')

    os.rename(input_file, bg_file)

    # Start inference with black background
    !python demo/inferencer_demo.py \
        {video_path}  \
        --pose2d human \
        --vis-out-dir {VIDEO_SKELETON_DIR_PATH} \
        --black-background \
        --thickness 4 \
        --radius 0


    reencode_video(bg_file)
    reencode_video(input_file)

    clear_output()
    mmpose_inf_display()
    print("Done, Outputs:")
    print(f"With Background: {bg_file}")
    print(f"Black Background: {input_file}")

# Attach Listeners
video_mmpose_button.on_click(update_mmpose_input_video)
inf_mmpose_button.on_click(run_mmpose_inference)

# Display fields
def mmpose_inf_display():
  display(selected_mmpose_video_input)
  display(inf_mmpose_button)
mmpose_inf_display()

## Inference: FYP üíÉ

1. Play the Inference Configuration cell.
2. Update the configurations and **Save**. (Details for each field will be stated above the cell).
3. Play the Run FYP Inference cell.
4. Go back to the Video Selection cell click **Refresh**.
5. Select the **Skeleton** folder and a video in that folder then click **FYP**.
6. Once the Video box has been loaded, click **Start Inference** to begin the inference.
7. Once inference is completed, play the Post-Process Inference MMPose cell.
8. Play the Superimpose cell.
9. Play the Combine Gif cell.
10. Input all the fields and click **Create**. (Details for each field will be stated above the cell).
11. Click the Gif Display cell.
12. Choose a folder and a gif and click **Display**.
13. Done.

**Inference Configuration** ‚öôÔ∏è

**pretrained_model_path**: The path that contains the model to be used for inference.

**output_dir**: The path where the inferred gifs are saved to. The box is for users to write the name of the inference folder.

**Validation Data**:
> **prompts**: A list of texts that the gifs will be generated based on.
>
> **video_length**: Number of frames referenced from the pose video.
>
> **width** and **height**: Resolution of the video.
>
> **num_inference_steps**: Higher the value the more relistic a video would be in exchange for higher memory usage, computational resouces and time spent to infer.
>
> **guidance_scale**: A scale used to control and predict noise.
>
> **use_inv_latent**: Whether to reverse engineer the process to determine the latent variables used to make up the real image. Unused in our current state.
>
> **num_inv_steps**: Adjust to optimize the inverse latent process.
>
> **dataset_set**: No need to be changed by the user.

**train_batch_size** How much training can be done together at once. (Larger batch means faster training at the cost of higher memory usage)

**resume_from_checkpoint**: The path that contains the checkpoint used for the model.

**seed**: A set inference seed to limit and control randomness and ensure reproducibility in case of error and/or for debugging.

**mixed_precision**: This is to set the type of precision for text encoding and VAE autoencoding weights. By default, this is set to single precision which is fp32. (High precision in exchange for more memory usage and computational resources used)

**gradient_checkpointing**: Reduces memory usage by doing some checkpoints for gradients, which increases the computational load. Decreases memory usage for increased time taken for inference to complete.

**enable_xformers_memory_efficient_attention**: Reduce memory usage in exchange for slight dip in inference performance.



In [None]:
# @title Inference Configuration { display-mode: "form" }

# Init
%cd -q {FYP_DIR_PATH}

# Initialize load_config with a default value
load_config = None

# Define a container for displayed widgets
displayed_widgets = []

# Get a list of all files in the directory
config_files = [f for f in os.listdir(CONFIG_DIR_PATH) if os.path.isfile(os.path.join(CONFIG_DIR_PATH, f))]

# Create a dropdown widget with the list of config files
config_files_dropdown = Dropdown(
  options=["- Select an Item -"] + config_files,
  description='Select a Config File:',
  layout=Layout(width="500px"),
  style={'description_width': '150px'}
)

# Function to clear displayed widgets (excluding the dropdown)
def clear_displayed_widgets():
    for widget in displayed_widgets:
        widget.close()
    displayed_widgets.clear()
    display(config_files_dropdown)  # Display the dropdown again

# Function to update the load_config variable based on the selected filename
def update_load_config(change):
    global load_config
    selected_filename = change.new
    if selected_filename and selected_filename != "- Select an Item -":
        clear_output(wait=True)  # Clear the output area
        clear_displayed_widgets()  # Clear previously displayed widgets

        # Load yaml file
        sample_yaml_path = Path(f'{CONFIG_DIR_PATH}/{selected_filename}')
        with open(sample_yaml_path, 'r') as yaml_file:
          load_config = yaml.load(yaml_file, Loader=yaml.FullLoader)
        print(f"Editing config from: {sample_yaml_path}")

        # Check if the yaml configuration matches expected training config
        if compare_dict_structure(expected_inference_config, load_config):

          # Create a list of model names and paths in the directory
          model_options = create_model_list()

          # =====================================
          ## Basic Data
          config_subheader1 = widgets.HTML(value="<h3>Basic Data</h3>")
          config_pretrained_model_path = Dropdown(options=model_options, description="pretrained_model_path:", value=Path(load_config['pretrained_model_path']), style=configs_config_style, layout=configs_config_layout)
          config_output_dir_name = widgets.Text(description="output_dir_name:", value="", style=configs_config_style, layout=configs_config_layout)
          config_train_batch_size = widgets.IntText(description="train_batch_size:", value=load_config['train_batch_size'], style=configs_config_style, layout=configs_config_layout)
          config_validation_steps = widgets.IntText(description="validation_steps:", value=load_config['validation_steps'], style=configs_config_style, layout=configs_config_layout)
          config_seed = widgets.IntText(description="seed:", value=load_config['seed'], style=configs_config_style, layout=configs_config_layout)
          config_mixed_precision = widgets.Text(description="mixed_precision:", value=load_config['mixed_precision'], style=configs_config_style, layout=configs_config_layout)
          config_gradient_checkpointing = Dropdown(options=boolean_dropdown, value=load_config['gradient_checkpointing'], description="gradient_checkpointing:", style=configs_config_style, layout=configs_config_layout)
          config_enable_xformers_memory_efficient_attention = Dropdown(options=boolean_dropdown, value=load_config['enable_xformers_memory_efficient_attention'], description="enable_xformers_memory_efficient_attention:", style=configs_config_style, layout=configs_config_layout)
          # =====================================
          ## Validation_data
          config_subheader2 = widgets.HTML(value="<h3>Validation Data</h3>")
          config_prompts = widgets.Textarea(description="prompts:", value="\n".join(load_config['validation_data']['prompts']), style=configs_config_style, layout=widgets.Layout(width="500px", height="100px"))
          config_video_length = widgets.IntText(description="video_length:", value=load_config['validation_data']['video_length'], style=configs_config_style, layout=configs_config_layout)
          config_width = widgets.IntText(description="width:", value=load_config['validation_data']['width'], style=configs_config_style, layout=configs_config_layout)
          config_height = widgets.IntText(description="height:", value=load_config['validation_data']['height'], style=configs_config_style, layout=configs_config_layout)
          config_num_inference_steps = widgets.IntText(description="num_inference_steps:", value=load_config['validation_data']['num_inference_steps'], style=configs_config_style, layout=configs_config_layout)
          config_guidance_scale = widgets.FloatText(description="guidance_scale:", value=load_config['validation_data']['guidance_scale'], style=configs_config_style, layout=configs_config_layout)
          config_use_inv_latent = Dropdown(options=boolean_dropdown, value=load_config['validation_data']['use_inv_latent'], description="use_inv_latent:", style=configs_config_style, layout=configs_config_layout)
          config_num_inv_steps = widgets.IntText(description="num_inv_steps:", value=load_config['validation_data']['num_inv_steps'], style=configs_config_style, layout=configs_config_layout)
          # =====================================
          ## Group widgets
          config_vbox = widgets.VBox([
              config_pretrained_model_path,
              config_output_dir_name,
              config_train_batch_size,
              config_validation_steps,
              config_seed,
              config_mixed_precision,
              config_gradient_checkpointing,
              config_enable_xformers_memory_efficient_attention,
          ])
          config_vbox_validation_data = widgets.VBox([
              config_prompts,
              config_video_length,
              config_width,
              config_height,
              config_num_inference_steps,
              config_guidance_scale,
              config_use_inv_latent,
              config_num_inv_steps
          ])

          # Create listeners
          def save_config(change):
            config = {
                "pretrained_model_path": config_pretrained_model_path.value.as_posix(),
                "output_dir": Path(f'{INFERENCE_OUTPUT_DIR_PATH}/{config_output_dir_name.value}').as_posix(),
                "validation_data": {
                    "prompts": [prompt.strip() for prompt in config_prompts.value.splitlines() if prompt.strip()],
                    "video_length": config_video_length.value,
                    "width": config_width.value,
                    "height": config_height.value,
                    "num_inference_steps": config_num_inference_steps.value,
                    "guidance_scale": config_guidance_scale.value,
                    "use_inv_latent": config_use_inv_latent.value,
                    "num_inv_steps": config_num_inv_steps.value,
                    "dataset_set": load_config['validation_data']['dataset_set']
                },
                "train_batch_size": config_train_batch_size.value,
                "validation_steps": config_validation_steps.value,
                "resume_from_checkpoint": load_config['resume_from_checkpoint'],
                "seed": config_seed.value,
                "mixed_precision": config_mixed_precision.value,
                "gradient_checkpointing": config_gradient_checkpointing.value,
                "enable_xformers_memory_efficient_attention": config_enable_xformers_memory_efficient_attention.value
            }

            # Specify the folder path you want to check
            folder_path = Path(f'{INFERENCE_OUTPUT_DIR_PATH}/{config_output_dir_name.value}')

            # Check if the folder exists
            if os.path.exists(folder_path) and os.path.isdir(folder_path):
              print("\r", f'There is already a folder with the name {config_output_dir_name.value}! Please rename your folder!', end="")

            else:
              # Save updated config back into the yaml file
              with open(sample_yaml_path, "w") as file:
                yaml.dump(config, file, default_style='"', default_flow_style=False, sort_keys=False)

              print("\r", "Saving...", end="")
              time.sleep(2)
              print("\r", "Successfully saved!", end="")

          ## Button Widget and Attach Listener
          config_save_btn = widgets.Button(description="Save", layout=configs_config_button_layout)
          config_save_btn.on_click(save_config)

          # Display fields (same as before)
          display(
            config_subheader1,
            config_vbox,
            config_subheader2,
            config_vbox_validation_data,
            config_save_btn
          )

          # Update the displayed_widgets list
          displayed_widgets.extend([
              config_subheader1,
              config_vbox,
              config_subheader2,
              config_vbox_validation_data,
              config_save_btn
          ])

        else:
          print("The configuration for the yaml is not structured correctly for inference")

# Attach the event handler to the dropdown's 'value' trait
config_files_dropdown.observe(update_load_config, names='value')

# Display the dropdown widget and the output widget
display(config_files_dropdown)

In [None]:
# @title Run FYP Inference { display-mode: "form" }

# Create layout
layout_single_long_button = widgets.Layout(margin='0px 0px 20px 154px')

# Create widgets
selected_fyp_video_input = widgets.Text(placeholder='Select a video above', description="Video:", disabled=True)
config_file_path = widgets.Text(value=Path(f'{CONFIG_DIR_PATH}/pose_sample_windows.yaml').as_posix(),description="Config File:")
inf_fyp_button = widgets.Button(description="Start Inference", disabled=True, layout=layout_single_long_button)

# Create listeners
## Retrieve video input
def update_fyp_input_video(change):
    selected_video = video_dropdown.value
    selected_fyp_video_input.value = Path(f'{VIDEO_DIR_PATH}/{video_subdir_dropdown.value}/{selected_video}').as_posix()
    if selected_video:
        inf_fyp_button.disabled = False
    else:
        inf_fyp_button.disabled = True

## Run FYP
def run_fyp_inference(button):
    %cd -q {FYP_DIR_PATH}

    config_file_path_text = config_file_path.value
    video_file_path_text = selected_fyp_video_input.value

    # Specify the folder path you want to check
    load_config = OmegaConf.load(config_file_path_text)
    folder_path = load_config.output_dir

    # Check if the folder exists
    if os.path.exists(folder_path) and os.path.isdir(folder_path):
      print("\r", f'The folder path {load_config.output_dir}! Please rename your folder!', end="")

    else:
      print("\r", "", end="")
      # Start inference
      !accelerate launch txt2video.py \
          --config={config_file_path_text}  \
          --skeleton_path={video_file_path_text}

## Pass on skeleton path
def get_skeleton_path():
    skeleton_path = Path(selected_fyp_video_input.value)
    return skeleton_path

## Pass on config path
def get_config_path():
    config_path = config_file_path.value
    return config_path

# Attach Listeners
inf_fyp_button.on_click(run_fyp_inference)
video_fyp_button.on_click(update_fyp_input_video)

# Display fields
display(selected_fyp_video_input)
display(config_file_path)
display(inf_fyp_button)

In [None]:
#@title Post-Inference MMPose

# Load the inferred config file
with open(f"{CONFIG_DIR_PATH}/pose_sample_windows.yaml", 'r') as yaml_file: # Use this if you copied the inference folder from gdrive and bypassed running inference
# with open(get_config_path(), 'r') as yaml_file:
  load_config = yaml.load(yaml_file, Loader=yaml.FullLoader)

# Retrieve info needed from the config file
video_length = load_config['validation_data']['video_length']
output_dir = Path(load_config['output_dir'])

# Proper getting skeleton
# skeleton_path = get_skeleton_path()
skeleton_path = Path('D:/code/repos/ict3104_team_05/video/Skeleton/0TM53.mp4') # Use this if you copied the inference folder from gdrive and bypassed running inference
human_path = findHumanFilePath(skeleton_path.name, VIDEO_DIR_PATH)
superimposed_path = usePoseWithBG(skeleton_path)

print(skeleton_path)

# Create a pose gif of all 3 types of pose (Human, Skeleton, Superimposed)
postprocess_mmpose(skeleton_path, video_length, output_dir, "pose")
postprocess_mmpose(human_path, video_length, output_dir, "human")
postprocess_mmpose(superimposed_path, video_length, output_dir, "combination")

In [None]:
#@title Superimpose

def superimposeSkeleton(inf_folder_path):
  raw_path = Path(f'{inf_folder_path}/raw')
  skeleton = Path(f'{inf_folder_path}/pose.gif')

  output_folder_path = Path(f'{inf_folder_path}/superimposed')

  # Check if the superimposed folder exists
  if not os.path.exists(output_folder_path):
    # If it doesn't exist, create the superimposed folder
    os.makedirs(output_folder_path)

  # Get a list of all files in the directory
  humans = [f for f in os.listdir(raw_path) if os.path.isfile(os.path.join(raw_path, f))]

  for human_gif in humans:
    if not human_gif.endswith(".gif"):
      continue

    human_path = Path(f'{raw_path}/{human_gif}')
    output_file_path = Path(f'{output_folder_path}/{human_gif}')

    # Load your two GIFs
    fg = imageio.get_reader(skeleton)
    bg = imageio.get_reader(human_path)

    # Create a writer to save the result as a GIF
    output_gif = imageio.get_writer(output_file_path, fps=7.692, loop=0)  # Adjust the desired frame rate

    for i in range(min(len(fg), len(bg))):  # Process frames until one of the GIFs ends
        foreground = fg.get_data(i)
        background = bg.get_data(i)

        foreground = cv2.resize(foreground, (512, 512))  # Resize to (480, 480)
        background = cv2.resize(background, (512, 512))  # Resize to (480, 480)

        # Creating the alpha mask from the foreground image (e.g., removing the black background)
        gray = cv2.cvtColor(foreground, cv2.COLOR_BGR2GRAY)
        foreground = foreground.astype(float)
        background = background.astype(float)

        # Dark pixels filter (0 to 255)
        black_mask = (gray <= 50)

        # Combine the images based on the mask
        outImage = np.where(black_mask[:, :, np.newaxis], background, foreground)

        # Convert the frame to uint8
        ims = outImage.astype(np.uint8)

        # Add the frame to the output GIF
        output_gif.append_data(ims)

    output_gif.close()
    print('Done')


# with open(f"{CONFIG_DIR_PATH}/pose_sample.yaml", 'r') as yaml_file: # Use this if you copied the inference folder from gdrive and bypassed running inference
with open(get_config_path(), 'r') as yaml_file:
  load_config = yaml.load(yaml_file, Loader=yaml.FullLoader)

inf_folder_path = Path(load_config['output_dir'])
superimposeSkeleton(inf_folder_path)

#### **Combine Gifs**

**Inference Directory Name**: Shows a dropdown of folders in the inference output folder that the user can select.

**Pose Type**: Shows 3 options as dropdown for user to select
> **pose**: Skeleton pose video generated by MMPose with black background <br>
> **human**: Original video <br>
> **combination**: Skeleton pose superimposed onto the original video

**Superimpose on Gif**: Boolean dropdown for user to indicate if they would like to see only the inferred gifs generated or the inferred gifs with the skeleton pose superimposed onto them.

**Show Captions**: Boolean dropdown for user to indicate if they would like the prompt to be indicated for the inferred gif generated.

**Combine all**: Boolean dropdown for user to indicate if they would like all the inferred gifs generated + the pose gif to be merged into 1 gif for display or separate it into individual inferred gif + skeleton pose gif.

In [None]:
#@title Combine Gifs



def getInferenceRunFolderContent(folder_name):
  runfolders = []
  for content in os.listdir(INFERENCE_OUTPUT_DIR_PATH):
        content_path = os.path.join(INFERENCE_OUTPUT_DIR_PATH, content)

        # Check if it's a directory and not hidden
        if os.path.isdir(content_path) and not content.startswith("."):
            runfolders.append(content)

  return runfolders

inference_run_folders = os.listdir(INFERENCE_OUTPUT_DIR_PATH)
inference_run_folder_content = getInferenceRunFolderContent(inference_run_folders)

# Configure layouts
combine_gif_button_layout = widgets.Layout(margin='0px 0px 20px 210px', width="143px")
combine_gif_style = {'description_width': '200px'}
combine_gif_layout = widgets.Layout(width="350px")

# Create widgets
gif_inference_folder_name = Dropdown(options=inference_run_folder_content, description="Inference Directory Name:", style=combine_gif_style, layout=combine_gif_layout)
gif_pose_type = widgets.Dropdown(options=["pose", "human", "combination"], description='Pose Type:', value=None, style=combine_gif_style, layout=combine_gif_layout)
gif_inferred_type = widgets.Dropdown(options=boolean_dropdown, description='Superimpose on Gif:', value=None, style=combine_gif_style, layout=combine_gif_layout)
gif_show_captions = widgets.Dropdown(options=boolean_dropdown, description='Show Captions:', value=None, style=combine_gif_style, layout=combine_gif_layout)
gif_combine_all = widgets.Dropdown(options=boolean_dropdown, description='Combine all:', value=None, style=combine_gif_style, layout=combine_gif_layout)
gif_create_button = widgets.Button(description="Create", disabled=True, layout=combine_gif_button_layout)

skeleton_path = get_skeleton_path()
# skeleton_path = "/content/video/Skeleton/0Tm53.mp4" # Use this if you copied the inference folder from gdrive and bypassed running inference

def clearTextOutput():
  clear_output()

  display(gif_inference_folder_name)
  display(gif_pose_type)
  display(gif_inferred_type)
  display(gif_show_captions)
  display(gif_combine_all)
  display(gif_create_button)

# Create listeners
## Enable button when all dropdowns are populated
def enable_button(change):
    if gif_pose_type.value and gif_inferred_type.value is not None and gif_show_captions.value is not None and gif_combine_all.value is not None:
      gif_create_button.disabled = False
    else:
      gif_create_button.disabled = True

## Display the selected gif
def combine_gifs(change):
  gif_inf_folder = Path(f'{INFERENCE_OUTPUT_DIR_PATH}/{gif_inference_folder_name.value}')
  pose_type = gif_pose_type.value
  is_superimposed = gif_inferred_type.value
  show_captions = gif_show_captions.value
  combine_all = gif_combine_all.value

  # Check if the folder exists
  if os.path.exists(gif_inf_folder) and os.path.isdir(gif_inf_folder):
    clearTextOutput()
    # with open(f"{CONFIG_DIR_PATH}/pose_sample.yaml", 'r') as yaml_file: # Use this if you copied the inference folder from gdrive and bypassed running inference
    with open(get_config_path(), 'r') as yaml_file:
      load_config = yaml.load(yaml_file, Loader=yaml.FullLoader)

    # Retrieve info needed from the config file
    prompts = load_config['validation_data']['prompts']
    size = load_config['validation_data']['width']

    if combine_all:
      postprocess_gif(gif_inf_folder, prompts, size, pose_type, show_captions, is_superimposed)
    else:
      [postprocess_gif(gif_inf_folder, prompt, size, pose_type, show_captions, is_superimposed) for prompt in prompts]
    print("Done")
  else:
    print("\r", 'No such folder exists!', end="")

# Attach Listeners
gif_pose_type.observe(enable_button, names='value')
gif_inferred_type.observe(enable_button, names='value')
gif_show_captions.observe(enable_button, names='value')
gif_combine_all.observe(enable_button, names='value')
gif_create_button.on_click(combine_gifs)

# Display fields
display(gif_inference_folder_name)
display(gif_pose_type)
display(gif_inferred_type)
display(gif_show_captions)
display(gif_combine_all)
display(gif_create_button)

In [None]:
# @title Gif Display
def getInferenceRunFolderContent(folder_name):
  runfolders = []
  for content in os.listdir(INFERENCE_OUTPUT_DIR_PATH):
        content_path = os.path.join(INFERENCE_OUTPUT_DIR_PATH, content)

        # Check if it's a directory and not hidden
        if os.path.isdir(content_path) and not content.startswith("."):
            runfolders.append(content)

  return runfolders

# Init
inference_run_folders = os.listdir(INFERENCE_OUTPUT_DIR_PATH)
inference_run_folder_content = getInferenceRunFolderContent(inference_run_folders)

# Create widgets
gif_output_placeholder = widgets.Output(layout=layout_output)
gif_subdir_dropdown = widgets.Dropdown(options=inference_run_folder_content, description='Folder:', value=None, disabled=False)
gif_dropdown = widgets.Dropdown(options=[], description='Gif:', disabled=True)
gif_display_button = widgets.Button(description="Display", disabled=True, layout=layout_double_button)
gif_refresh_button = widgets.Button(description="Refresh", disabled=False, layout=layout_double_button)

gif_hbox_1 = widgets.HBox([gif_display_button, gif_refresh_button], layout=layout_hbox)

# Create listeners
## Update gif dropdown options based on the selected folder
def gif_subdir_select(change):
    selected_gif_folder = gif_subdir_dropdown.value

    if selected_gif_folder != None:
      selected_GIF_DIR_PATH = Path(f'{INFERENCE_OUTPUT_DIR_PATH}/{selected_gif_folder}/processed')
      selected_gif_dir_content = [file for file in os.listdir(selected_GIF_DIR_PATH) if file.endswith('.gif')]
    else:
      selected_gif_dir_content = []

    gif_dropdown.options = selected_gif_dir_content
    if not selected_gif_dir_content:
        gif_dropdown.disabled = False
        gif_dropdown.value = None
    else:
        gif_dropdown.disabled = False

## Display the selected gif
def display_selected_gif(change):
    selected_gif = gif_dropdown.value
    selected_gif_folder = gif_subdir_dropdown.value

    if selected_gif:
        gif_path = Path(f'{INFERENCE_OUTPUT_DIR_PATH}/{selected_gif_folder}/processed/{selected_gif}')
        gif_display = Image(filename=gif_path, embed=True)

        # Clear the output placeholder and display the gif
        with gif_output_placeholder:
            clear_output()
            display(gif_display)

## Refresh folder and directory
def refresh_folder_and_directory(change):
    inference_run_folders = os.listdir(INFERENCE_OUTPUT_DIR_PATH)
    inference_run_folder_content = getInferenceRunFolderContent(inference_run_folders)

    gif_dropdown.options = []
    gif_dropdown.value = None

    gif_subdir_dropdown.options = inference_run_folder_content
    gif_subdir_dropdown.value = None


## Enable button when a valid gif is picked
def enable_button(change):
    if gif_dropdown.value:
        gif_display_button.disabled = False
    else:
        gif_display_button.disabled = True

# Attach Listeners
gif_subdir_dropdown.observe(gif_subdir_select, 'value')
gif_display_button.on_click(display_selected_gif)
gif_refresh_button.on_click(refresh_folder_and_directory)
gif_dropdown.observe(enable_button, 'value')


# Display fields
display(gif_output_placeholder)
display(gif_subdir_dropdown)
display(gif_dropdown)

display(gif_hbox_1)

## Training üèã

1. Play the Init cell.
2. Play the Dataset Preload (Video) cell.
3. Select a dataset folder and click **Start Cutting**.
4. Play Dataset Preload (Metadata) cell.
5. Play the Training Configuration cell.
6. Update the configurations and click **Save**. (Details for each field will be shown above the cell).
7. Play Run Training cell.
8. Done.

In [None]:
# @title Init { display-mode: "form" }

# Charades Data Class from csv
class CharadesData:
  def __init__(self, row):
    id, subject, scene, quality, relevance, verified, script, objects, descriptions, actions, length = row
    self.id = id
    self.subject = subject
    self.scene = scene
    self.quality = quality
    self.relevance = relevance
    self.verified = verified
    self.script = script
    self.objects = objects.split(";")
    self.descriptions = descriptions
    self.length = length
    self.actions = {}

    # Convert actions in proper data structure ("class_id time_start time_end" -> class_id: [time_start, time_end])
    if len(actions) != 0:
      action_substrings = actions.split(';')
      for substring in action_substrings:
        parts = substring.split()
        key = parts[0]
        values = [self.convert_to_ms(parts[1]), self.convert_to_ms(parts[2])]
        self.actions[key] = values

  # For printing
  def __str__(self):
        return f"ID: {self.id}, Subject: {self.subject}, Scene: {self.scene}, Quality: {self.quality}, Relevance: {self.relevance}, Verified: {self.verified}, Script: {self.script}, Objects: {self.objects}, Descriptions: {self.descriptions}, Actions: {self.actions}, Length: {self.length}"

  # Helper function to convert time into ms
  def convert_to_ms(self, seconds):
    ss,ms = seconds.split('.')
    total_ms = 1000*int(ss) + int(ms)
    return total_ms

  # Caption getter with template
  def getCaption(self, index):
    return f"In a {self.scene} setting, within the context of '{self.script}', the action '{action_descriptions[list(self.actions.keys())[index]]}' is taking place."


action_descriptions = {}
charades_all = []

# Load classes lookup table
with open(Path(f'{CHARADES_LOOKUP_PATH}/Charades_v1_classes.txt'), 'r') as file:
    for line in file:
        code, description = line.strip().split(' ', 1)
        action_descriptions[code] = description

# Load charades data A
with open(Path(f'{CHARADES_LOOKUP_PATH}/Charades_v1_train.csv'), mode='r') as file:
    csv_reader = csv.reader(file)
    next(csv_reader, None)

    for row in csv_reader:
        charadeData = CharadesData(row)
        charades_all.append(charadeData)

# Load charades data B
with open(Path(f'{CHARADES_LOOKUP_PATH}/Charades_v1_test.csv'), mode='r') as file:
    csv_reader = csv.reader(file)
    next(csv_reader, None)

    for row in csv_reader:
        charadeData = CharadesData(row)
        charades_all.append(charadeData)

clear_output()
print("Data load successful!")

In [None]:
# @title Dataset Preload (Video) { display-mode: "form" }

# Init
dataset_dir_folders = os.listdir(VIDEO_DIR_PATH)

## Training env
training_dataset = None
training_idx = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
training_branch = Path(f'{TRAINING_CONTENT_DIR_PATH}/{training_idx}')

# Create layout
config_style = {'description_width': '100px'}
config_layout = widgets.Layout(width="300px")
config_button_layout = widgets.Layout(margin='0px 0px 20px 154px')

# Create widgets
dataset_folder_dropdown = widgets.Dropdown(options=[dir for dir in dataset_dir_folders if not dir.startswith(".")], description='Dataset Folder:', value=None, layout=config_layout, style=config_style)
dataset_cutting_button = widgets.Button(description="Start Cutting", disabled=True, layout=config_button_layout)

# Create listeners
def dataset_dir_select(change):
  if dataset_folder_dropdown:
    dataset_folder_content = os.listdir(Path(f'{VIDEO_DIR_PATH}/{dataset_folder_dropdown.value}'))
    total_dataset_videos = len([file for file in dataset_folder_content if file.endswith('.mp4')])

    dataset_cutting_button.disabled = False
    dataset_cutting_button.description = f"Start Cutting ({total_dataset_videos})"

def video_cutting_select(change):
  os.makedirs(training_branch, exist_ok=True)
  training_dataset = Path(f'{VIDEO_DIR_PATH}/{dataset_folder_dropdown.value}')
  
  # Loop video files from selected dataset folder
  for video_file in os.listdir(training_dataset):
    
    video, ext = os.path.splitext(video_file)

    # Ignore non video files (Eg: .ipynb_checkpoint and csv)
    if ext != ".mp4":
      continue

    video_folder = Path(f'{training_branch}/{video}')
    if not os.path.exists(video_folder):
      os.mkdir(video_folder)

      # Retrieve charade object by ID
      charade_data = None
      
      for charade in charades_all:
        if charade.id == video:
            charade_data = charade
            break
        
      # If no clipping required, keep whole video
      if not charade_data.actions:
        print(f"No clipping needed")
      else:
        print(f"Clipping {video}")
        charade_actions = charade_data.actions.items()
        total_charade_actions = len(charade_actions)
        for i, (class_id, timings) in enumerate(charade_actions):

          input_video = Path(f'{training_dataset}/{video_file}')
          output_video = Path(f'{video_folder}/{video}{i+1:02}{ext}')

          print(f"#{i+1}/{total_charade_actions}: {timings[0]}ms to {timings[0]+timings[1]}ms [I:{input_video}] [O:{output_video}]")
          !ffmpeg -i {input_video} -ss {timings[0]}ms -t {timings[1]}ms -c:v libx264 -c:a aac {output_video} -loglevel quiet
    else:
      print(f"Folder already exist for video_id: {video}. Skipping ...")
  print("Finished Clipping")

# Attach Listeners
dataset_folder_dropdown.observe(dataset_dir_select, 'value')
dataset_cutting_button.on_click(video_cutting_select)

# Display fields
display(dataset_folder_dropdown)
display(dataset_cutting_button)

In [None]:
# @title Dataset Preload (Metadata) { display-mode: "form" }

training_metadata_file = Path(f'{training_branch}/metadata.tsv')
with open(training_metadata_file, 'w', newline='', encoding='utf-8') as tsvfile:
  fieldnames = ['part_id', 'clip_id', 'caption']
  writer = csv.DictWriter(tsvfile, fieldnames=fieldnames, delimiter='\t')
  writer.writeheader()

  for part_id in os.listdir(training_branch):
    folder_path = os.path.join(training_branch, part_id)

    # Ignore non video files (Eg: .ipynb_checkpoint and csv)
    if not os.path.isdir(folder_path) or part_id.startswith("."):
      continue

    charade_data = None
    for charade in charades_all:
      if charade.id == part_id:
          charade_data = charade
          break

    if not charade_data:
      print("Missing charades data, skipping ...")
      continue

    # Sort by video sub-id to maintain order
    training_video_files = sorted(os.listdir(folder_path), key=lambda x: int(os.path.splitext(x)[0][-2:]))
    for i, clip in enumerate(training_video_files):
      caption = charade_data.getCaption(i)
      writer.writerow({
          'part_id': part_id,
          'clip_id': clip,
          'caption': caption
      })
print(f"TSV created: {training_metadata_file}")

#### **Training Configuration** ‚öôÔ∏è

**pretrained_model_path**: The path that contains the model to fine-tune. This will be a dropdown for the user to select from.

**output_dir**: The path where the newly fine tuned model is pushed to. The folder name itself is written by the user, whereas the path to the folder is currently fixed.

**Train Data**:
> **video_path**: The path that contains the training dataset.
>
> **n_sample_frames**: Determines how many frames are referenced for training.
>
> **width**: Resolution of the video.
>
> **sample_frame_rate**: The rate at which the frames are sampled from. If the sample frames are set to 10 and the frame rate is set to 2, then every second 2 frames are referenced.

**learning_rate** The rate at which each step of the training is conducted.

**train_batch_size**: How much training can be done together at once. (Larger batch means faster training at the cost of higher memory usage)

**max_train_steps**: The number of iterations the dataset is ran to optimize training.

**trainable_modules**: The modules that are being trained (No change to be made be a user as the training state would be the same unless requirements change)

**seed**: A set training seed to limit and control randomness and ensure reproducibility in case of error and/or for debugging.

**mixed_precision**: This is to set the type of precision for text encoding and VAE autoencoding weights. By default, this is set to single precision which is fp32. (High precision in exchange for more memory usage and computational resources used)

**use_8bit_adam**: Can be toggled true to reduce memory usage and computational resources used by using 8 bit precision for some part of ADAM optimization computations.

**gradient_checkpointing**: Reduces memory usage by doing some checkpoints for gradients, which increases the computational load. Decreases memory usage for increased time taken for training completion.

**enable_xformers_memory_efficient_attention**: Reduce memory usage in exchange for slight dip in training performance.



In [None]:
# @title Training Configuration { display-mode: "form" }

# Init
%cd -q {FYP_DIR_PATH}

# Initialize load_config with a default value
load_config = None

# Get a list of all files in the directory
config_files = [f for f in os.listdir(CONFIG_DIR_PATH) if os.path.isfile(os.path.join(CONFIG_DIR_PATH, f))]

# Define a container for displayed widgets
displayed_widgets = []

# Create a dropdown widget with the list of config files
config_files_dropdown = Dropdown(
  options=["- Select an Item -"] + config_files,
  description='Select a Config File:',
  layout=Layout(width="500px"),
  style={'description_width': '150px'}
)

# Function to clear displayed widgets (excluding the dropdown)
def clear_displayed_widgets():
    for widget in displayed_widgets:
        widget.close()
    displayed_widgets.clear()
    display(config_files_dropdown)  # Display the dropdown again

# Function to update the load_config variable based on the selected filename
def update_load_config(change):
    global load_config
    selected_filename = change.new
    if selected_filename and selected_filename != "- Select an Item -":
        clear_output(wait=True)  # Clear the output area
        clear_displayed_widgets()  # Clear previously displayed widgets

        # Load yaml file
        sample_yaml_path = Path(f'{CONFIG_DIR_PATH}/{selected_filename}')
        # load_config = OmegaConf.load(sample_yaml_path)
        with open(sample_yaml_path, 'r') as yaml_file:
          load_config = yaml.load(yaml_file, Loader=yaml.FullLoader)
        print(f"Editing config from: {sample_yaml_path}")

        # Check if the yaml configuration matches expected training config
        if compare_dict_structure(expected_training_config, load_config):

          # Create a list of model names and paths in the directory
          model_options = create_model_list()
          video_options = create_video_path_list()
          video_path_default = load_config["train_data"]["video_path"]

          if not (os.path.exists(video_path_default) and os.path.isdir(video_path_default)):
            video_path_default = Path(TRAINING_CONTENT_DIR_PATH)
          # =====================================
          ## Basic Data
          config_subheader1 = widgets.HTML(value="<h3>Basic Data</h3>")
          config_pretrained_model_path = Dropdown(options=model_options, description="pretrained_model_path:", value=Path(load_config["pretrained_model_path"]), style=configs_config_style, layout=configs_config_layout)
          config_output_dir_name = widgets.Text(description="output_dir_name:", value="", style=configs_config_style, layout=configs_config_layout)
          config_learning_rate = widgets.FloatText(description="learning_rate:", value=load_config["learning_rate"], style=configs_config_style, layout=configs_config_layout)
          # =====================================
          # train_data
          config_subheader2 = widgets.HTML(value="<h3>Train Data</h3>")
          config_video_path = Dropdown(options=video_options, description="video_path:", value=video_path_default, style=configs_config_style, layout=configs_config_layout)
          config_n_sample_frames = widgets.IntText(description="n_sample_frames:", value=load_config["train_data"]["n_sample_frames"], style=configs_config_style, layout=configs_config_layout)
          config_train_data_width = widgets.IntText(description="width:", value=load_config["train_data"]["width"], style=configs_config_style, layout=configs_config_layout)
          config_sample_frame_rate = widgets.IntText(description="sample_frame_rate:", value=load_config["train_data"]["sample_frame_rate"], style=configs_config_style, layout=configs_config_layout)
          # =====================================
          config_train_batch_size = widgets.IntText(description="train_batch_size:", value=load_config["train_batch_size"], style=configs_config_style, layout=configs_config_layout)
          config_max_train_steps = widgets.IntText(description="max_train_steps:", value=load_config["max_train_steps"], style=configs_config_style, layout=configs_config_layout)
          config_seed = widgets.IntText(description="seed:", value=load_config["seed"], style=configs_config_style, layout=configs_config_layout)
          config_mixed_precision = widgets.Text(description="mixed_precision:", value=load_config["mixed_precision"], style=configs_config_style, layout=configs_config_layout)
          config_use_8bit_adam = Dropdown(options=boolean_dropdown, value=load_config["use_8bit_adam"], description="config_use_8bit_adam:", style=configs_config_style, layout=configs_config_layout)
          config_gradient_checkpointing = Dropdown(options=boolean_dropdown, value=load_config["gradient_checkpointing"], description="gradient_checkpointing:", style=configs_config_style, layout=configs_config_layout)
          config_enable_xformers_memory_efficient_attention = Dropdown(options=boolean_dropdown, value=load_config["enable_xformers_memory_efficient_attention"], description="enable_xformers_memory_efficient_attention:", style=configs_config_style, layout=configs_config_layout)
          # =====================================
          ## Button Widget
          config_save_btn = widgets.Button(description="Save", layout=configs_config_button_layout)
          ## Group widgets
          config_vbox = widgets.VBox([
              config_pretrained_model_path,
              config_output_dir_name,
              config_learning_rate,
              config_train_batch_size,
              config_max_train_steps,
              config_seed,
              config_mixed_precision,
              config_use_8bit_adam,
              config_gradient_checkpointing,
              config_enable_xformers_memory_efficient_attention
          ])
          config_vbox_train_data = widgets.VBox([
              config_video_path,
              config_n_sample_frames,
              config_train_data_width,
              config_sample_frame_rate,
          ])

          # Display fields (same as before)
          display(
            config_subheader1,
            config_vbox,
            config_subheader2,
            config_vbox_train_data,
            config_save_btn
          )

          # Create listeners
          def save_config(change):
            config = {
                "pretrained_model_path": Path(config_pretrained_model_path.value).as_posix(),
                "output_dir": Path(f"{CUSTOM_MODEL_DIR_PATH}/{config_output_dir_name.value}").as_posix(),
                "train_data": {
                    "video_path": Path(config_video_path.value).as_posix(),
                    "n_sample_frames": config_n_sample_frames.value,
                    "width": config_train_data_width.value,
                    "sample_frame_rate": config_sample_frame_rate.value
                },
                "learning_rate": config_learning_rate.value,
                "train_batch_size": config_train_batch_size.value,
                "max_train_steps": config_max_train_steps.value,
                "trainable_modules": load_config["trainable_modules"],
                "seed": config_seed.value,
                "mixed_precision": config_mixed_precision.value,
                "use_8bit_adam": config_use_8bit_adam.value,
                "gradient_checkpointing": config_gradient_checkpointing.value,
                "enable_xformers_memory_efficient_attention": config_enable_xformers_memory_efficient_attention.value
            }

            if Path(config_video_path.value) == Path(f'{TRAINING_CONTENT_DIR_PATH}/'):
              print("\r", "Please choose a training folder!", end="")

            else:
              #Save updated config back into yaml file
              with open(sample_yaml_path, "w") as file:
                yaml.dump(config, file, default_style='"', default_flow_style=False, sort_keys=False)

              print("\r", "Saving...", end="")
              time.sleep(2)
              print("\r", "Successfully saved!", end="")

          # Attach Listeners
          config_save_btn.on_click(save_config)

          # Update the displayed_widgets list
          displayed_widgets.extend([
              config_subheader1,
              config_vbox,
              config_subheader2,
              config_vbox_train_data,
              config_save_btn,
          ])

        else:
          print("The configuration for the yaml is not structured correctly for training")

# Attach the event handler to the dropdown's 'value' trait
config_files_dropdown.observe(update_load_config, names='value')

# Display the dropdown widgets
display(config_files_dropdown)


#### **Perform Training** ‚ñ∂Ô∏è

1. Running the code would immediately start the training process.

2. Once the training process starts, there will be a 2 minute buffer to load the necessary data for training.

3. After the buffer, a progress bar would show displaying the progress of the training together with the percentage of completion.

4. Finally when the training is completed, the newly generated model will be saved in the output directory path set by the user in the Training Configuration section.

In [None]:
# @title Run Training { display-mode: "form" }

%cd -q {FYP_DIR_PATH}

output_label = widgets.Label(value="Output will appear here:")
display(output_label)

def run_command_and_display_output(command):
    process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, shell=True)
    for line in process.stdout:
        output_label.value = line.strip()  # Update the label with the live output
    process.wait()

run_command_and_display_output('accelerate launch train_followyourpose.py --config="configs/pose_train_windows.yaml"')

