Copyright 2025 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


# FeedFlix


## Environment Setup




In [None]:
# @title Install dependencies
!pip install --upgrade --quiet google-genai
!pip install --quiet mediapy
!pip install --quiet moviepy==2.1.2
!pip install --quiet 'git+https://github.com/google-marketing-solutions/gen-v.git@main#egg=gtech-gen-v&subdirectory=backend'


In [None]:
# @title Imports
import base64
import concurrent.futures
import contextlib
import google.auth
import ipywidgets as widgets
import mediapy as media
import moviepy as mp
import os
import pydantic
import requests
import sys
import time

from datetime import datetime, date
from IPython.display import clear_output, Markdown
from gen_v import models
from gen_v import storage as gcs
from gen_v import utils
from gen_v import video
from google import genai
from google.cloud import storage
from google.genai.types import (
    GenerateContentConfig,
    MediaResolution,
    Part
)
from PIL import Image as PIL_Image
from PIL import ImageDraw, ImageFont

In [None]:
# @title Authenticate User

if "google.colab" in sys.modules:
    from google.colab import auth

    auth.authenticate_user()


## Parameters Setup

In [None]:
# @title GCP Parameters

GCP_PROJECT_ID = 'your-project-id'  # @param {type: "string", placeholder: "[your-project-id]", isTemplate: true}
GEMINI_PROJECT_ID = GCP_PROJECT_ID
IMAGEN_PROJECT_ID = GCP_PROJECT_ID
VEO_PROJECT_ID = GCP_PROJECT_ID

GCP_BUCKET_NAME = 'your-bucket-name' # @param {type: "string", placeholder: "bucket-name-without-path"}
INPUT_IMAGE_BUCKET_NAME = GCP_BUCKET_NAME
FOLDER_NAME = 'your-folder-name' #@param {type: "string"}
INPUT_IMAGE_BUCKET_PATH = f'{FOLDER_NAME}/input-images/'
INPUT_VIDEOS_BUCKET_PATH = f'{FOLDER_NAME}/input-videos/'
INPUT_AUDIO_BUCKET_PATH = f'{FOLDER_NAME}/audio/'

OUTPUT_IMAGES_BUCKET_NAME = GCP_BUCKET_NAME
OUTPUT_IMAGES_BUCKET_PATH = f'{FOLDER_NAME}/output-images/'
OUTPUT_VIDEOS_BUCKET_NAME = GCP_BUCKET_NAME
OUTPUT_VIDEOS_BUCKET_PATH = f'{FOLDER_NAME}/output-videos/'
OUTPUT_VIDEOS_URI = f"gs://{OUTPUT_VIDEOS_BUCKET_NAME}/{OUTPUT_VIDEOS_BUCKET_PATH}"


TMP_STRING = '/content'

# Set current date variables, used in GCS URI paths
CURRENT_YEAR, CURRENT_WEEK, _ = date.today().isocalendar()
WEEK_AND_YEAR = f"week{CURRENT_WEEK}-{CURRENT_YEAR}"

IMAGE_OVERLAYS_PATH = f"{OUTPUT_VIDEOS_BUCKET_NAME}/{OUTPUT_VIDEOS_BUCKET_PATH}image_overlays/{WEEK_AND_YEAR}"
FINAL_OVERLAYS_PATH = f"{OUTPUT_VIDEOS_BUCKET_NAME}/{OUTPUT_VIDEOS_BUCKET_PATH}final_overlays/{WEEK_AND_YEAR}"
IMAGES_URI = f"{INPUT_IMAGE_BUCKET_NAME}/{INPUT_IMAGE_BUCKET_PATH}{WEEK_AND_YEAR}"
OUTPUT_URI_PATH = f"{OUTPUT_IMAGES_BUCKET_NAME}/{OUTPUT_IMAGES_BUCKET_PATH}{WEEK_AND_YEAR}"

VEO_OVERLAYS_FOLDER = f'{OUTPUT_VIDEOS_BUCKET_NAME}/{OUTPUT_VIDEOS_BUCKET_PATH}final_overlays/'
VEO_CLIPS_URI = f"{VEO_OVERLAYS_FOLDER}{WEEK_AND_YEAR}/"

INTROS_OUTROS_URI = f'{OUTPUT_VIDEOS_BUCKET_NAME}/{INPUT_VIDEOS_BUCKET_PATH}'
AUDIO_URI = f'{OUTPUT_VIDEOS_BUCKET_NAME}/{INPUT_AUDIO_BUCKET_PATH}'

STITCHING_OUTPUT_URI = f'{OUTPUT_VIDEOS_URI}concatenated/{WEEK_AND_YEAR}/'

In [None]:
#@title GenAI models parameters
if not VEO_PROJECT_ID or VEO_PROJECT_ID == "[your-project-id]":
    VEO_PROJECT_ID = str(os.environ.get("GOOGLE_CLOUD_PROJECT"))

if not VEO_PROJECT_ID or VEO_PROJECT_ID == "[your-project-id]":
    VEO_PROJECT_ID = str(os.environ.get("GOOGLE_CLOUD_PROJECT"))

LOCATION = os.environ.get("GOOGLE_CLOUD_REGION", "us-central1")

imagen_client = genai.Client(vertexai=True, project=IMAGEN_PROJECT_ID, location=LOCATION)
veo_client = genai.Client(vertexai=True, project=VEO_PROJECT_ID, location=LOCATION) # Not used since using REST

# Set VEO Model versions
video_model = "veo-2.0-generate-001"
gemini_model = "gemini-2.0-flash"

# Setup REST API video model and endpoints
video_model_uri = f"https://us-central1-aiplatform.googleapis.com/v1beta1/projects/{VEO_PROJECT_ID}/locations/us-central1/publishers/google/models/{video_model}"
prediction_endpoint = f"{video_model_uri}:predictLongRunning"
fetch_endpoint = f"{video_model_uri}:fetchPredictOperation"

In [None]:
# @title Image Editing Parameters

RESIZED_IMAGE_WIDTH = 1280 # @param
RESIZED_IMAGE_HEIGHT = 720 # @param
# @markdown ----

# @markdown Enable this to show images during processing
COLOR_BACKGROUND_REPLACEMENT = True # @param {type: 'boolean'}

BACKGROUND_RED = 255 # @param
BACKGROUND_GREEN = 224 # @param
BACKGROUND_BLUE = 77 # @param
BACKGROUND_TRANSPARENCY = 255

ORIGINAL_BACKGROUND_COLOR = models.RGBColor.from_tuple((255,255,255))
BACKGROUND_COLOR = models.RGBColor.from_tuple((
    BACKGROUND_RED,
    BACKGROUND_GREEN,
    BACKGROUND_BLUE
))


# Use this to convert HEX color to RGB
# hex_color = "#fcdc4c"
# rgb_color = utils.hex_to_rgb(hex_color)
# print(rgb_color)  # Output: (R, G, B)

# @markdown ----

# @markdown Enable this to show images during processing
ENABLE_SHOW_IMAGES_INLINE = False # @param {type: "boolean"}
SHOW_IMAGE_HEIGHT = 500 # @param [250,500]

In [None]:
# @title Video Generation Parameters

# @markdown Set VEO parameters
DURATION = 5  # @param {type:"slider", min:5, max:8, step:1}
SAMPLE_COUNT = 2  # @param {type:"slider", min:1, max:4, step:1}
NEGATIVE_PROMPT = "copyrighted content"  # @param {type: 'string'}
PROMPT_ENHANCE = True  # @param {type: 'boolean'}
PERSON_GENERATION = "allow_adult"  #@param ["allow_adult", "dont_allow"]

# @markdown ----

# @markdown Decide which prompt to use
PROMPT_TYPE = "CUSTOM" # @param ["CUSTOM", "GEMINI"]
# @markdown Your Veo prompt aka CUSTOM
CUSTOM_VIDEO_PROMPT = "Animate this image in a way that is most appropriate for the content in the image" # @param ["Animate this image in a way that is most appropriate for the content in the image"] {"allow-input":true}
# @markdown Let Gemini generate a video prompt aka GEMINI
GENERATE_VIDEO_PROMPT = "Analyse the image and write a prompt for a generative video AI to animate the video in the most appropriate way for the content to be displayed in an online ad.  Consider the function of the main object in the image when deciding how to animate it.  If there is a background, focus on animating the primary object only. Output the prompt only. Don't show any of the anlaysis or headings in your response, only provide the prompt you created." # @param ["Analyse the image and write a prompt for a generative video AI to animate the video in the most appropriate way for the content to be displayed in an online ad.  Consider the function of the main object in the image when deciding how to animate it.  If there is a background, focus on animating the primary object only. Output the prompt only. Don't show any of the anlaysis or headings in your response, only provide the prompt you created."] {"allow-input":true}

# @markdown ----

VIDEO_ORIENTATION = "LANDSCAPE" # @param ["LANDSCAPE", "PORTRAIT"]



In [None]:
# @title Overlay Parameters

LOGO_FILE_NAME = "logo.png" #@param {type: 'string'}
LOGO_URI = f'gs://{GCP_BUCKET_NAME}/{FOLDER_NAME}/logos/{LOGO_FILE_NAME}'
STICKER_FILE_NAME = "sticker.png" #@param {type: 'string'}
STICKER_URI = f'gs://{GCP_BUCKET_NAME}/{FOLDER_NAME}/input-overlays/{STICKER_FILE_NAME}'
FONT_FILE_NAME = "font.ttf" #@param {type: 'string'}
FONT_URI = f'gs://{GCP_BUCKET_NAME}/{FOLDER_NAME}/fonts/{FONT_FILE_NAME}'

OVERLAY_WIDTH = RESIZED_IMAGE_WIDTH
OVERLAY_HEIGTH = RESIZED_IMAGE_HEIGHT

LOGO_POSITION = (50, 520) #@param
LOGO_DESIRED_HEIGHT = 150 #@param Set as 0 to avoid auto-scaling
LOGO_START = 0 #@param
LOGO_DURATION = 5 #@param


STICKER_POSITION = (50, 50) #@param
STICKER_START = 0 #@param
STICKER_DURATION = 5 #@param
STICKER_DESIRED_HEIGHT = 100 #@param

TEXT_FONT_SIZE = 30 #@param
TEXT_START = 0 #@param
TEXT_DURATION = 5 #@param
TEXT_COLOR = 'blue'   #@param
TEXT_POSITION = (850, 50)  #@param

GCS_IMAGES_TEST: list[models.ImageInput] = [
    models.ImageInput(
        path=LOGO_URI,
        start=LOGO_START,
        position=LOGO_POSITION,
        duration=LOGO_DURATION,
        height=LOGO_DESIRED_HEIGHT
      ),
    models.ImageInput(
        path=STICKER_URI,
        start=STICKER_START,
        position=STICKER_POSITION,
        duration=STICKER_DURATION,
        height=STICKER_DESIRED_HEIGHT
      )
    ]

TEXT_TEST = models.TextInput(
    text="text_to_display",
    font=FONT_URI,
    font_size=TEXT_FONT_SIZE,
    start_time=TEXT_START,
    duration=TEXT_DURATION,
    color=TEXT_COLOR,
    position=TEXT_POSITION
    )


In [None]:
# @title Stitching Parameters
OUTPUT_WITHOUT_AUDIO = 'video_transition_demo.mp4'
OUTPUT_WITH_AUDIO = 'video_transition_demo_audio.mp4'

VERBOSE = False # @param {type:"boolean"}
TRANSITION = "SWIPE" # @param ["CROSS_FADE", "FADE_IN", "SWIPE", "SLIDE_IN"]
TRANSITON_DURATION = 0.5 # @param {type:"number"}
TRANSITION_SIDE = "left" # @param ["left", "right", "top", "bottom"]

TRANSITION_TEST = models.VideoTransition(
    name=TRANSITION,
    padding=TRANSITON_DURATION,
    side=TRANSITION_SIDE
)
OUTPUT_LENGHT = 23 # @param {type:"slider", min:10, max:30, step:1}
TRIM_FROM = "end" # @param ["start", "end"]
TRIM_ENABLED = True # @param {type:"boolean"}

UPSCALE_FACTOR = 2 #@param

## FeedFlix Functions

In [None]:
# @title Video Generation Functions

def fetch_operation(lro_name: str) -> str | None:
    """Fetches the status of a long-running operation.

    Args:
        lro_name: The name of the long-running operation.

    Returns:
        The response from the API containing the operation status.

    """
    request = {"operationName": lro_name}
    # The generation usually takes 2 minutes. Loop 30 times, around 5 minutes.
    max_attempts = 30  # Maximum number of attempts
    attempt_interval = 10  # Seconds between attempts

    for i in range(max_attempts):
      try:
        resp = video.send_request_to_google_api(fetch_endpoint, request)
        if "done" in resp and resp["done"]:
            return resp
      except Exception as e:
            print(f"Error raised while fetching operation: {e}")

      time.sleep(attempt_interval)

def image_to_video(
    prompt: str,
    image_uri: str,
    gcs_uri: str,
    duration: int,
    sample_count: int,
    aspect_ratio: str,
    negative_prompt: str = "",
    prompt_enhance: bool = True,
    person_generation: str = "allow_adult",
) -> str | None:
    """Generates a video from an image using the Video Generation API.

    Args:
        prompt: The text prompt for video generation.
        image_uri: The GCS URI of the input image.
        gcs_uri: The GCS URI where the generated video will be stored.
        duration: The desired duration of the video in seconds.
        sample_count: The number of video samples to generate.
        aspect_ratio: The aspect ratio of the video (e.g., "16:9").
        negative_prompt: Text describing what shouldn't be included in the video.
        prompt_enhance: Whether to enhance the prompt (default: True).
        person_generation: Settings for person generation in the video.

    Returns:
        A dictionary containing the response from the Video Generation API,
        including the operation details and generated video information.

    """
    request_model = models.VeoApiRequest(
        prompt=prompt,
        image_uri=image_uri,
        gcs_uri=gcs_uri,
        duration=duration,
        sample_count=sample_count,
        aspect_ratio=aspect_ratio,
        negative_prompt=negative_prompt,
        prompt_enhance=prompt_enhance,
        person_generation=person_generation
    )
    req = request_model.to_api_payload()
    try:
      resp = video.send_request_to_google_api(prediction_endpoint, req)
      # print(f"Video generation request submitted: {resp}")
      return fetch_operation(resp["name"])

    except Exception as e:
        print(f"Error sending image_to_video request: {e}")

def generate_videos_and_download(
    prompt: str,
    input_image_uri: str,  # GCS URI of the input image
    input_image_path: str,  # Local path of the input image
    aspect_ratio: str,
    output_file_prefix: str,
    product: dict[str, any]
) -> list[dict[str, any]]:
    """Generates videos, downloads them, and returns their information.

    Args:
        prompt: The text prompt for video generation.
        input_image_uri: The GCS URI of the input image.
        input_image_path: The local path of the input image.
        aspect_ratio: The aspect ratio of the video (e.g., "16:9").
        output_file_prefix: The prefix for output video file names.
        product: A dictionary containing product information.

    Returns:
        A list of dictionaries, containing information about a generated video
    """
    try:
      output_videos = image_to_video(prompt,
                                    input_image_uri,
                                    f"{OUTPUT_VIDEOS_URI}veo/{WEEK_AND_YEAR}",
                                    DURATION,
                                    SAMPLE_COUNT,
                                    aspect_ratio,
                                    NEGATIVE_PROMPT,
                                    PROMPT_ENHANCE,
                                    PERSON_GENERATION)
      file_name = gcs.get_file_name_from_gcs_url(input_image_uri)
      output_video_files = [] # Array to hold files generated
      print(
          f'Generated videos {len(output_videos["response"]["videos"])}'
          f' for {file_name}'
      )

      for video in output_videos["response"]["videos"]:
          veo_name = gcs.get_file_name_from_gcs_url(video["gcsUri"])
          output_video_local_path = (
              f"{file_name}-{output_file_prefix}-{veo_name}"
          )

          gcs.download_file_locally(video["gcsUri"], output_video_local_path)

          output_video_files.append({
              "gcs_uri": video["gcsUri"],
              "local_file": output_video_local_path,
              "local_file_name": output_video_local_path.split("/")[-1],
              "product_title": product['title'],
              "promo_text": ""
          })

      return output_video_files

    except Exception as e:
      print(f"Error in generate_videos_and_download: {e}")

def generate_videos_concurrently(selected_products: list[dict]) -> list[dict]:
    """Generates videos concurrently for multiple products and returns info.

    Args:
        selected_products: A list of dictionaries, with products

    Returns:
        A list of dictionaries of generated videos.
    """

    output_video_files = [] # Array to hold the output video files

    def generate_video(product: dict) -> list[dict]:
        """Generates a video for a given product and returns video information.

        Args:
            product: A dictionary containing product information

        Returns:
            A list of dictionaries with generated videos info:
        """
        try:
          recolored_image_uri = product['recolored_image_uri']
          recolored_image_local_path = gcs.download_file_locally(
              recolored_image_uri
          )
          prompt = ""

          # Determine the prompt based on PROMPT_TYPE
          if PROMPT_TYPE == "CUSTOM":
              prompt = CUSTOM_VIDEO_PROMPT
          elif PROMPT_TYPE == "GEMINI":
            gemini_prompt_request = models.GeminiPromptRequest(
                prompt_text=GENERATE_VIDEO_PROMPT,
                image_file_path=recolored_image_local_path,
                model_name=gemini_model
            )
            prompt = video.get_gemini_generated_video_prompt(
                gemini_prompt_request,
                project_id=GEMINI_PROJECT_ID,
                location=LOCATION
            )
          else:
              # Default to custom prompt if invalid PROMPT_TYPE
              prompt = CUSTOM_VIDEO_PROMPT
              print(
                  f"Invalid PROMPT_TYPE: {PROMPT_TYPE}. "
                  f"Using CUSTOM_VIDEO_PROMPT."
              )

          if VIDEO_ORIENTATION == "LANDSCAPE":
              aspect_ratio = "16:9"
              output_file_prefix = "video-landscape"
          elif VIDEO_ORIENTATION == "PORTRAIT":
              aspect_ratio = "9:16"
              output_file_prefix = "video-portrait"
          else:
              aspect_ratio = "16:9"  # Default
              output_file_prefix = "video-landscape"
              print(
                  f"Invalid VIDEO_ORIENTATION: {VIDEO_ORIENTATION}. "
                  f"Using LANDSCAPE."
              )

          # Generate landscape videos using selected prompt
          output_video_files.extend(
              generate_videos_and_download(
                prompt,
                recolored_image_uri,
                recolored_image_local_path,
                aspect_ratio,
                output_file_prefix,
                product))
        except Exception as e:
          print(f'Error generating video for product {e}')


    with concurrent.futures.ThreadPoolExecutor() as executor:
        executor.map(generate_video, selected_products)

    return output_video_files

In [None]:
# @title Get User Video Selection Functions
def get_user_choice_with_videos(videos: list[dict]) -> list[dict]:
    """
    Allows the user to choose multiple options from a list with video thumbnails.

    Args:
        videos: A list of dictionaries, where each dictionary represents a video
                and contains keys "local_file" for the local video path,
                "product_title" for the video title, and "promo_text"
                for any promotional text.

    Returns:
        A list of dictionaries, each containing gcs_url, and title
        of the selected videos.
    """
    selected_videos = []
    # Create checkboxes and video widgets
    checkboxes = [widgets.Checkbox(
        value=False,
        description=f"{video['product_title']}") for video in videos]

    # Create video elements using local file paths
    video_elements = []
    for video in videos:
        local_video_path = video['local_file']

        # Encode the video file as base64
        with open(local_video_path, "rb") as f:
            video_data = f.read()
        encoded_video = base64.b64encode(video_data).decode()

        # Create video element using base64 encoded data
        video_element = f"""
        <video width="320" height="240" controls>
            <source src="data:video/mp4;base64,{encoded_video}" type="video/mp4">
            Your browser does not support the video tag.
        </video>
        """
        video_elements.append(widgets.HTML(value=video_element))

    # Create a text field for promo_title
    promo_title_inputs = [widgets.Text(
        value="",
        placeholder="Enter promo text:",
        description=f"{video['promo_text']}") for video in videos]

    # Create a container to arrange checkboxes and videos
    items = []
    for checkbox, video_widget, promot_title_input in zip(
        checkboxes, video_elements, promo_title_inputs
    ):
      items.extend([
          video_widget,
          checkbox,
          widgets.HTML(value="<br>"),
          promot_title_input
      ])
    container = widgets.VBox(items)

    display(container)

    def on_button_clicked(button):
        """This function processes the selected videos and promo texts, and
        provides feedback to the user.
        """
        try:
          global selected_videos
          clear_output(wait=True)  # Clear previous output
          for i, checkbox in enumerate(checkboxes):
            if checkbox.value:
                video = videos[i]
                selected_videos.append(video)
                video['promo_text'] = promo_title_inputs[i].value
          print(selected_videos)
          return selected_videos
        except Exception as e:
          print(f'Error on_button_click {e} in get_user_choice_with_videos()')

    submit_button = widgets.Button(description="Submit")
    submit_button.on_click(on_button_clicked)
    display(submit_button)



In [None]:
#@title Overlay Functions

def add_text_clips_to_video(
    input_video: models.VideoInput,
    text_inputs: list[models.TextInput],
    output_video_path: models.VideoInput
) -> None:
  """Adds text to a video clip.
  Args:
    video_path: Path to the video file.
    text_inputs: List of TextInput objects, each representing the text to add.
    output_path: Path to save the output video.
  """
  local_video_path = gcs.download_file_locally(input_video.path)
  input_file_name = gcs.get_file_name_from_gcs_url(input_video.path)
  local_output_video_path = f"{TMP_STRING}/output_{input_file_name}"

  for text in text_inputs:
    local_font_path = gcs.download_file_locally(text.font)
    text.font = local_font_path

  with video.load_text_clips(local_video_path, text_inputs) as clips:
    with mp.CompositeVideoClip(clips) as final_clip:
      final_clip.write_videofile(local_output_video_path, codec='libx264')
      output_uri = output_video_path.path
      gcs.upload_file_to_gcs(local_output_video_path, output_uri)
      final_clip.close()


def process_videos_with_overlays_and_text(
    videos: list[dict],
    images: list[models.ImageInput],
    overlay_text: models.TextInput,
    overlays_uri: str,
    final_uri: str
) -> None:
    """Processes videos by adding image and text overlays and uploading to gcs.

    Args:
        videos: A list of video dictionaries with GCS URI and local file path.
        images: A list of `ImageInput` image overlays to be added to the videos.
        overlay_text: A `TextInput` object, defining the text overlay.
        overlays_uri: The GCS URI where intermediate overlays will be stored.
        final_uri: The GCS URI where final videos with overlays will be stored.

    Returns:
        None.
    """
    print(f'process_videos_with_overlays_and_text...')

    def process_video(video: dict):
      """Processes a single video by adding overlays and text.

      Args:
        video: A dictionary representing a video
      """
      print(f'process_video: {video}')
      local_video_file_path = gcs.download_file_locally(
          video['gcs_uri'], video['local_file_name'])
      local_video_file = models.VideoInput(path = local_video_file_path)

      gcs_file_name = video['local_file_name']
      gcs_image_overlay_video_path = f"{overlays_uri}/{gcs_file_name}"
      image_overlay_video = models.VideoInput(
          path = f'gs://{gcs_image_overlay_video_path}')

      video.overlay_image_on_video(local_video_file, images, image_overlay_video)

      promo_text = models.TextInput(
          text=video['promo_text'],
          font=overlay_text.font,
          font_size=overlay_text.font_size,
          start_time=overlay_text.start_time,
          duration=overlay_text.duration,
          color=overlay_text.color,
          position=overlay_text.position
      )

      # Define the GCS path for the final video with text overlay.
      final_video_gcs_path = f"{final_uri}/{gcs_file_name}"
      final_video = models.VideoInput(path = final_video_gcs_path)

      add_text_clips_to_video(
        image_overlay_video,
        [promo_text],
        final_video
      )

    with concurrent.futures.ThreadPoolExecutor() as executor:
      executor.map(process_video, videos)

In [None]:
### (Optional) Visual Overlay Layout
def draw_elements_on_image(
    image_path: str,
    logo_path: str,
    sticker_path: str,
    font_path: str
) -> PIL_Image:
    """Draws elements on an image based on the given parameters.

    Args:
        image_path: The path to the base image file.
        logo_path: The path to the logo image file.
        sticker_path: The path to the sticker image file.
        font_path: The path to the font file.

    Returns:
        A PIL Image object with the elements drawn on it.
    """
    try:
      img = PIL_Image.new('RGB', (OVERLAY_WIDTH, OVERLAY_HEIGTH), color='yellow')
      draw = ImageDraw.Draw(img)

      draw_logo(draw, logo_path)
      draw_sticker(draw, sticker_path)
      draw_text(draw, font_path)

      img.save(image_path)
    except (FileNotFoundError, PIL_Image.UnidentifiedImageError, OSError) as e:
      print(f"Error drawing elements on image: {e}")

def draw_logo(draw: ImageDraw, logo_path:str) -> None:
  """Draws a logo on the image.

    Args:
        draw: The ImageDraw object to draw on.
        logo_path: The path to the logo image file.
  """
  logo = PIL_Image.open(logo_path)
  logo_width, logo_height = logo.size
  if LOGO_DESIRED_HEIGHT > 0:
    logo_width = int(logo_width * LOGO_DESIRED_HEIGHT / logo_height)
    logo_height = LOGO_DESIRED_HEIGHT
    logo = logo.resize((logo_width, logo_height))
  draw.rectangle(
    [LOGO_POSITION, (LOGO_POSITION[0] + logo_width, LOGO_POSITION[1] + logo_height)],
    outline="red"
  )

def draw_sticker(draw: ImageDraw, sticker_path: str) -> None:
  """Draws a sticker on the image.

  Args:
      draw: The ImageDraw object to draw on.
      sticker_path: The path to the sticker image file.
  """
  sticker = PIL_Image.open(sticker_path)
  sticker_width, sticker_height = sticker.size
  if STICKER_DESIRED_HEIGHT > 0:
    sticker_width = int(sticker_width * STICKER_DESIRED_HEIGHT / sticker_height)
    sticker_height = STICKER_DESIRED_HEIGHT
    sticker = sticker.resize((sticker_width, sticker_height))
  draw.rectangle([
      STICKER_POSITION,
     (STICKER_POSITION[0] + sticker_width, STICKER_POSITION[1] + sticker_height)],
    outline="green"
  )

def draw_text(draw: ImageDraw, font_path: str) -> None:
  """Draws text on the image.

    Args:
        draw: The ImageDraw object to draw on.
        font_path: The path to the font file.
  """
  font = ImageFont.truetype(font_path, TEXT_FONT_SIZE)
  draw.text(TEXT_POSITION, "Sample Text", font=font, fill=TEXT_COLOR)


def overlay_elements_on_image_and_display(image_path: str) -> None:
    """Overlays elements (logo, sticker, text) on an sample image and displays
    the image to help visualize the overlays layout.

    Args:
        image_path: The path to the image file on which to overlay the elements.

    """
    # Download element files from GCS if needed
    logo_path = gcs.download_file_locally(LOGO_URI)
    sticker_path = gcs.download_file_locally(STICKER_URI)
    font_path = gcs.download_file_locally(FONT_URI)

    # Draw elements on the image
    draw_elements_on_image(image_path, logo_path, sticker_path, font_path)

    # Display the modified image
    video.display_image(image_path, height=SHOW_IMAGE_HEIGHT)

overlay_elements_on_image_and_display("output_image.png")


In [None]:
#@title Video Stitching Functions
def select_videos_for_concatenation(
    local_video_paths: list[str],
    transition: models.VideoTransition,
    output_length: int,
    trim_location: str,
    audio_inputs: list[models.AudioInput],
    gcs_uri: str,
    output_without_audio: str,
    output_with_audio: str
  ) -> list:
  """Selects videos for concatenation and returns them with an audio clip.

  Args:
    local_video_paths: A list of local paths to the video files.
    transition: A VideoTransition object with the transition type and duration.
    output_length: The desired length of the output video in seconds.
    trim_location: Where to trim the videos ("start" or "end").
    audio_inputs: A list of AudioInput objects for audio overlay.
    gcs_uri: The GCS URI for uploading the final video.

  Returns:
    A list of selected videos
    """
  try:
    checkboxes = [
        widgets.Checkbox(value=False, description=path)
        for path in local_video_paths
    ]
    container = widgets.VBox(checkboxes)
    display(container)

    submit_button = widgets.Button(description="Submit")
    display(submit_button)

    selected_videos = []

    def on_button_clicked(button):
      nonlocal selected_videos
      clear_output(wait=True)
      for path, checkbox in zip(local_video_paths, checkboxes):
        if checkbox.value:
          selected_videos.append(path)
      final_video = concatenate_video_clips(
          selected_videos,
          transition,
          output_length,
          trim_location
      )

      gcs_uri_video = f'{gcs_uri}{output_without_audio}'
      gcs_uri_audio = f'{gcs_uri}{output_with_audio}'

      add_audio_clips_to_video(final_video, audio_inputs, gcs_uri_audio)
      gcs.upload_file_to_gcs(final_video, gcs_uri_video)

    submit_button.on_click(on_button_clicked)

    return selected_videos
  except Exception as e:
    print(f'Error select_videos_for_concatenation {e}')



def concatenate_video_clips(
    videos: list[str],
    transition: models.VideoTransition,
    output_length: int,
    trim_location: str
) -> str:
    """Concatenates video clips with transitions and optional trimming.

    Args:
        videos: A list of paths to the video clips to concatenate.
        transition: A VideoTransition object with transition type and duration.
        output_length: The desired length of the output video in seconds.
        trim_location: Where to trim the videos ("start" or "end").

    Returns:
        The path to the concatenated video file, or None if an error occurred.
    """

    try:
        print(f'''Start concatenation of video files {videos}
                 with transition {transition}...''')
        if not videos:
          raise ValueError('No video inputs provided.')

        target_path = f'{TMP_STRING}/concat_target.mp4'
        video_clips = []
        target_resolution = set_target_resolution(videos[0])
        for video in videos:
          video_clips.append(
              mp.VideoFileClip(
                  video, target_resolution=target_resolution
              )
            )

        video_clips = trim_clips(
            video_clips,
            transition.padding,
            output_length,
            trim_location
        )
        composed_clip = None
        match transition.name:
          case 'CROSS_FADE':
            composed_clip = cross_fade(
                video_clips,
                transition.padding,
                target_path
            )
          case 'FADE_IN':
            composed_clip = fade_in(
                video_clips,
                transition.padding,
                target_path
            )
          case 'SWIPE':
            composed_clip = swipe(
                video_clips,
                transition.padding,
                transition.side,
                target_path
            )
          case 'SLIDE_IN':
            composed_clip = slide_in(
                video_clips,
                transition.padding,
                transition.side,
                target_path
            )
          case _:
            raise ValueError(f'Transition {transition.name} not supported.')

        composed_clip.write_videofile(target_path, codec='libx264', logger=None)
        composed_clip.close()

    finally:
        # Clean up temporary files
        try:
          for video in videos:
            print(f'Removing temporary file {video}...')
            os.remove(video)
        except FileNotFoundError:
          pass

    print(f'Concatenation finished, video available: {target_path}')
    return target_path

def trim_clips(
    video_clips: list[mp.VideoFileClip],
    padding: float,
    output_length: int,
    trim_location: str
) -> list[mp.VideoFileClip]:
  """Trims video clips to fit the desired output length.

  Args:
    video_clips: A list of moviepy VideoFileClip video clips.
    padding: The duration of the padding (for transitions) in seconds.
    output_length: The desired length of the output video in seconds.
    trim_location: Where to trim the videos ("start" or "end").

  Returns:
    A list of trimmed moviepy VideoFileClip objects.

  Raises:
    ValueError: If the trim_location is invalid or if output_length is <= 0.
  """
  total_length = 0
  for video in video_clips:
    total_length += video.duration
  total_length -= padding * (len(video_clips) - 1)

  total_trim_length = total_length - output_length
  print(f'Total length: {total_length} Output length: {output_length}')
  if total_trim_length < 0 or not TRIM_ENABLED:
    print('Trimming not required.')
    return video_clips


  number_of_intros_outros_clips = 2
  number_of_veo_clips = len(video_clips) - number_of_intros_outros_clips
  trim_length = total_trim_length / number_of_veo_clips
  for index, video in enumerate(video_clips[1:-1]):
    if trim_location == 'start':
      start_time = -(video.duration - trim_length)
      end_time = None
    elif trim_location == 'end':
      start_time = 0
      end_time = video.duration - trim_length
    else:
      raise ValueError(f'Trim location {trim_location} not supported.')

    trimmed_clip = video.subclipped(start_time, end_time)
    video_clips[index + 1] = trimmed_clip

  total_length = 0
  for video in video_clips:
    total_length += video.duration
  total_length += padding * (len(video_clips) - 1)

  print(f'Total length: {total_length} Output length: {output_length}')
  return video_clips

def calculate_audio_duration(
    i: int, audio_inputs: list[models.AudioInput], video_duration: float
) -> float:
  """Calculates the duration for each audio clip.
  Args:
    i: Index of the audio clip.
    audio_inputs: List of AudioInput objects.
    video_duration: Duration of the video in seconds.
  Returns:
    Duration of the audio clip in seconds.
  """
  if not audio_inputs[i].duration:
    next_start_time = (
        audio_inputs[i + 1].start_time
        if i < len(audio_inputs) - 1
        else video_duration
    )
    duration = next_start_time - audio_inputs[i].start_time
  else:
    duration = audio_inputs[i].duration
  return duration

def load_audio_clips(
    audio_inputs: list[models.AudioInput], video_duration: float
) -> list[mp.AudioFileClip]:
  """Loads audio clips from a list of paths.
  Args:
    audio_inputs: List of paths to the audio files.
    video_duration: Duration of the video in seconds.
  Returns:
    List of AudioFileClip objects.
  """
  audio_clips = []
  for i, audio_input in enumerate(audio_inputs):
    check_file_exists(audio_input.path)
    # If no duration calculate it based on the next start or video duration
    duration = calculate_audio_duration(i, audio_inputs, video_duration)
    audio_clip = (
        mp.AudioFileClip(audio_input.path)
        .with_start(audio_input.start_time)
        .with_duration(duration)
    )
    audio_clips.append(audio_clip)
  return audio_clips

def add_audio_clips_to_video(
    video_path: str,
    audio_inputs: list[models.AudioInput],
    gcs_uri: str
) -> None:
  """Adds one or more audio clips to a video clip.
  If several videos are provided without start times and duration, they will be
  played in successive order, and the duration will be equal for each of the
  audio clips.
  Args:
    video_path: Path to the video file.
    audio_inputs: List of AudioInput objects, each representing an audio clip to
      add.
    gcs_uri: GCS URI of the video transition.
  Raises:
    ValueError: If the number of audio start times or durations does not match
    the number of audio clips.
  """
  check_file_exists(video_path)
  target_path = '/tmp/concat_audio_target.mp4'
  if not audio_inputs:
    raise ValueError('No audio inputs provided.')
  video = mp.VideoFileClip(video_path)
  audio_clips = load_audio_clips(audio_inputs, video.duration)
  final_audio = mp.CompositeAudioClip(audio_clips)
  final_clip = video.with_audio(final_audio).with_duration(video.duration)
  final_clip.write_videofile(
      target_path,
      codec='libx264',
      audio_codec='aac',
      logger=None
  )

  gcs.upload_file_to_gcs(target_path, gcs_uri)
  video.close()
  for audio in audio_clips:
    audio.close()

  final_clip.close()

def set_target_resolution(video: str) -> tuple:
  """Sets the target resolution for a video based on its orientation.

  Args:
    video: The path to the video file.

  Returns:
    A tuple containing the target width and height (in pixels).

  """
  print('Retrieve the dimensions...')
  clip = mp.VideoFileClip(video)
  clip_dimension = clip.size
  if clip_dimension[0] < clip_dimension[1]:
    if RESIZED_IMAGE_WIDTH > RESIZED_IMAGE_HEIGHT:
      dimension = (RESIZED_IMAGE_HEIGHT, RESIZED_IMAGE_WIDTH)
    else:
      dimension = (RESIZED_IMAGE_WIDTH, RESIZED_IMAGE_HEIGHT)
  elif clip_dimension[0] > clip_dimension[1]:
    if RESIZED_IMAGE_WIDTH > RESIZED_IMAGE_HEIGHT:
      dimension = (RESIZED_IMAGE_WIDTH, RESIZED_IMAGE_HEIGHT)
    else:
      dimension = (RESIZED_IMAGE_HEIGHT, RESIZED_IMAGE_WIDTH)
  else:
    minimum_size = min(RESIZED_IMAGE_HEIGHT, RESIZED_IMAGE_WIDTH)
    dimension = (minimum_size, minimum_size)
  # print(f'Output dimensions: {dimension}')
  return dimension

def cross_fade(
    video_clips: list[mp.VideoFileClip],
    padding: float,
    target_path: str,
    side: str
  ):
  """Applies a cross-fade transition between video clips and saves the result.

  Args:
    video_clips: A list of moviepy VideoFileClip objects to be concatenated.
    padding: The duration of the cross-fade transition in seconds.
    target_path: The path where the concatenated video will be saved.
  """
  print('Concatenating video files...')
  video_fx_list = []
  composed_clip = video_clips[0]
  opposite_side = get_opposite_side(side)
  for index, video in enumerate(video_clips[1:]):
      opposite_transition = mp.video.fx.CrossFadeOut(padding).copy()
      transition = mp.video.fx.CrossFadeIn(padding).copy()

      composed_effect = opposite_transition.apply(composed_clip)

      video_effect = transition.apply(
          video.with_start(composed_clip.duration - padding))
      composed_clip = mp.CompositeVideoClip([composed_effect, video_effect])

  return composed_clip

def fade_in(
    video_clips: list[mp.VideoFileClip],
    padding: float,
    target_path: str
  ):
  """Applies a fade-in transition between video clips and saves the result.

  Args:
    video_clips: A list of moviepy VideoFileClip objects to be concatenated.
    padding: The duration of the fade-in transition in seconds.
    target_path: The path where the concatenated video will be saved.
  """

  print('Concatenating video files...')
  video_fx_list = [video_clips[0]]
  idx = video_clips[0].duration - padding
  for video in video_clips[1:]:
      transition = mp.video.fx.CrossFadeIn(padding).copy()
      video_fx_list.append(transition.apply(video.with_start(idx)))
      idx += video.duration - padding

  composed_clip = mp.CompositeVideoClip(video_fx_list)

  return composed_clip

def swipe(
    video_clips: list[mp.VideoFileClip],
    padding: float,
    side: str,
    target_path: str
  ):
  """Applies a swipe transition between video clips and saves the result.

  Args:
    video_clips: A list of moviepy VideoFileClip objects to be concatenated.
    padding: The duration of the transition (padding) in seconds.
    side: The direction of the swipe ('left', 'right', 'top', 'bottom').
    target_path: The path where the concatenated video will be saved.
  """
  print('Concatenating video files...')
  video_fx_list = []
  composed_clip = video_clips[0]
  opposite_side = get_opposite_side(side)
  for index, video in enumerate(video_clips[1:]):
      opposite_transition = mp.video.fx.SlideOut(padding, opposite_side).copy()
      transition = mp.video.fx.SlideIn(padding, side).copy()

      composed_effect = opposite_transition.apply(composed_clip)

      video_effect = transition.apply(
          video.with_start(composed_clip.duration - padding))
      composed_clip = mp.CompositeVideoClip([composed_effect, video_effect])

  return composed_clip

def slide_in(
    video_clips: list[mp.VideoFileClip],
    padding: float,
    side: str,
    target_path: str
  ):
  """Applies a slide-in transition to a video clip.
  Args:
    clip: The input video clip.
    duration: The duration of the transition in seconds.
    side: The direction from which the clip slides in
     ("left", "right", "top", or "bottom"). Defaults to "left".

  Returns:
    The video clip with the slide-in transition applied.

  Raises:
    ValueError: If the side is invalid.
  """
  print('Concatenating video files...')
  video_fx_list = [video_clips[0]]
  idx = video_clips[0].duration - padding
  for index, video in enumerate(video_clips[1:]):
      transition = mp.video.fx.SlideIn(padding, side).copy()
      video_fx_list.append(transition.apply(video.with_start(idx)))
      idx += video.duration - padding

  composed_clip = mp.CompositeVideoClip(video_fx_list)

  return composed_clip

def get_opposite_side(side: str) -> str:
  """Returns the opposite side of a given direction.

  Args:
    side: The input side ("left", "right", "top", or "bottom").

  Returns:
    The opposite side (e.g. "right" for "left", "left" for "right"...).

  Raises:
    ValueError: If the input side is not one of the valid options.
  """
  match side:
    case 'left':
      return 'right'
    case 'right':
      return 'left'
    case 'top':
      return 'bottom'
    case 'bottom':
      return 'top'
    case _:
      raise ValueError(f'Side {side} not supported.')

def check_file_exists(file_path: str) -> None:
  """Checks if a file exists at the given path.
  Args:
    file_path: The path to the file.
  Raises:
    FileNotFoundError: If the file does not exist.
  """
  if not os.path.exists(file_path):
    raise FileNotFoundError(f'Media file not found: {file_path}')

def merge_arrays(intro_outro_videos, main_content_videos):
    """Merges intro/outro videos with main content videos.

    This function takes two lists of videos: intro/outro videos and main
    content videos. It inserts the first intro/outro video at the beginning,
    appends all main content videos, and then adds the last intro/outro video
    at the end, creating a merged video sequence. If intro_outro_videos is
    empty, it returns main_content_videos directly.

    Args:
        intro_outro_videos: A list of intro/outro video paths.
                            The first element is used as the intro, and the last
                            as the outro. Can be empty. If intro_outro_videos is
                            empty, it returns main_content_videos directly. If
                            intro_outro_video contains only one video, it
                            appends the same video to both begining and the end
                            of main_content_videos.
        main_content_videos: A list of main content video paths.

    Returns:
        A new list containing the merged video paths, or the
        `main_content_videos` list if `intro_outro_videos` is empty.
    """
    if len(intro_outro_videos) > 0:
        return (
            [intro_outro_videos[0]]
            + main_content_videos
            + [intro_outro_videos[-1]]
        )
    # If intro_outro_videos is empty, return main_content_videos directly
    return main_content_videos

def stitch_videos_with_transitions(
    intro_outro_videos_uri: str,
    veo_clips_uri: str,
    audio_uri: str,
    stiching_output_uri: str,
    output_with_audio: str,
    output_without_audio: str,
    transition: str,
    transition_duration: float,
    transition_side: str,
    desired_length: int,
    trim_from: str,
) -> None:
    """Stitches videos with transitions, overlays audio, and uploads to gcs.

    Args:
        intro_outro_videos_uri: The GCS URI of the folder with intros and outros.
        veo_clips_uri: The GCS URI of the folder containing Veo overlay videos.
        audio_uri: The GCS URI of the folder containing audio files.
        stiching_output_uri: The GCS URI for the stitched video to be uploaded.
        output_with_audio: The filename for the stitched video with audio.
        output_without_audio: The filename for the stitched video without audio.
        transition: The name of the transition to use (e.g., "CROSS_FADE"...).
        transition_duration: The duration of the transition in seconds.
        transition_side: The side of the transition (e.g., "left", "right").
        desired_length: The desired length of the output video in seconds.
        trim_from: Where to trim the video ("start" or "end").

    Returns:
        None.
    """
    # 1. Retrieve videos and audio from GCS
    intro_outro_videos = gcs.retrieve_all_files_from_gcs_folder(
        intro_outro_videos_uri
    )
    input_veo_clips = gcs.retrieve_all_files_from_gcs_folder(veo_clips_uri)
    input_videos = merge_arrays(intro_outro_videos, input_veo_clips)
    input_audio = gcs.retrieve_all_files_from_gcs_folder(audio_uri)

    video_transition = models.VideoTransition(
        name=transition,
        padding=transition_duration,
        side=transition_side
    )

    # 2. Download videos and audio locally
    local_video_paths = gcs.download_files(input_videos)
    local_audio_paths = [
        models.AudioInput(path=path) for path in gcs.download_files(input_audio)
    ]

    # 3. Concatenate videos, apply transitions, and overlay audio
    selected_videos = select_videos_for_concatenation(
        local_video_paths,
        video_transition,
        desired_length,
        trim_from,
        local_audio_paths,
        stiching_output_uri,
        output_without_audio,
        output_with_audio
    )




In [None]:
#@title Main functions
def generate_and_select_videos(
    images_uri: str,
    output_uri_path: str,
    resized_image_width: int,
    resized_image_height: int,
    original_background_color: models.RGBColor,
    background_color: models.RGBColor
) -> list[dict]:
    """Generates videos from images and returns user selected videos.

    This function performs the following steps:
    1. Resizes input images into landscape and portrait formats.
    2. Recolors the background of the resized images.
    3. Generates videos (VEOs) from the recolored images.
    4. Presents the generated videos to the user for selection.
    5. Returns a list of dictionaries containing the selected videos.

    Args:
        images_uri: The GCS URI of the folder containing the input images.
        output_uri_path: The GCS URI of the folder where output will be stored.
        resized_image_width: The desired width of the resized images.
        resized_image_height: The desired height of the resized images.
        original_background_color: The original background color of the images.
        background_color: The desired background color of the images.

    Returns:
        A list of dictionaries with information about a selected video.
    """
    selected_products = utils.process_and_resize_images(
        images_uri,
        resized_image_width,
        resized_image_height,
        original_background_color,
        output_uri_path
    )
    utils.recolor_background_and_upload(
        selected_products,
        output_uri_path,
        original_background_color,
        background_color
    )
    output_video_files = generate_videos_concurrently(selected_products)

    selected_videos = []
    get_user_choice_with_videos(output_video_files)
    return selected_videos

def process_and_stitch_videos(
    input_videos: list[dict],
    gcs_images: list[models.ImageInput],
    text: models.TextInput,
    image_video_path: str,
    final_video_path: str,
    intros_outros_uri: str,
    veo_clips_uri: str,
    audio_uri: str,
    stitching_output_uri: str,
    output_with_audio: str,
    output_without_audio: str,
    transition: models.VideoTransition,
    output_length: int,
    trim_from: str
) -> None:
    """For selected videos, adds overlays and text, and stitches them together.

    This function performs the following steps:
    1. Processes selected videos by adding overlays and text.
    2. Stitches the processed videos together with transitions.

    Args:
        selected_videos: A list of dictionaries with selected videos.
        gcs_images: Images to be used for overlays.
        text: Text to be added to the videos.
        image_video_path: The path to save processed videos with image overlays.
        final_video_path: The path to save the final overlayed video.
        intros_outros_uri: The GCS URI for intro/outro videos.
        veo_overlays_folder: The GCS URI for the folder containing VEO overlays.
        audio_uri: The GCS URI for audio files.
        stitching_output_uri: The GCS URI for the output of stitched videos.
        output_with_audio: The filename for the output video with audio.
        output_without_audio: The filename for the output video without audio.
        transition:
        output_length: The desired length of the output video.
        trim_from: Where to trim the video to achieve the desired length.

    Returns:
        None
    """
    process_videos_with_overlays_and_text(
        input_videos,
        gcs_images,
        text,
        image_video_path,
        final_video_path
    )

    stitch_videos_with_transitions(
        intros_outros_uri,
        veo_clips_uri,
        audio_uri,
        stitching_output_uri,
        output_with_audio,
        output_without_audio,
        transition.name,
        transition.padding,
        transition.side,
        output_length,
        trim_from
    )


# Execute Main Functions

## Part 1: Image resize, background recolor, veos generation and user selection

The cell from part1 execute in approximately **1 minute**. However it waits on user input and is not finished until the user makes the selection and clicks the 'Submit' button.

**Important**: If you run the next cell before you make the selection of the provided Veo generated videos and add the promo text, you will see errors. Please select the videos and promo texts before moving to the next one.

## Part 2: Overlaying the Veos videos with image and text overlays and final videos stiching: intro, overlayed veos, outro and audio

The cell from part2 executes in approximately **8 minutes**. However it waits on the user input to select which videos to stitch together: intro, overlayed veos and outro, and is not finished until the user makes the selection and clicks the 'Submit' button.

**Important**: If you run the next cell before you make the selection of the provided video parts, you will see errors. Please select the video parts before moving to the next one.
It will take in total between 15-20 minutes for this cell to finish.

In [None]:
#@title Part 1

selected_videos = generate_and_select_videos(
    IMAGES_URI,
    OUTPUT_URI_PATH,
    RESIZED_IMAGE_WIDTH,
    RESIZED_IMAGE_HEIGHT,
    ORIGINAL_BACKGROUND_COLOR,
    BACKGROUND_COLOR
)

In [None]:
#@title Part 2
# Please wait until previous cell is complete before running this one
process_and_stitch_videos(
    selected_videos,
    GCS_IMAGES_TEST,
    TEXT_TEST,
    IMAGE_OVERLAYS_PATH,
    FINAL_OVERLAYS_PATH,
    INTROS_OUTROS_URI,
    VEO_CLIPS_URI,
    AUDIO_URI,
    STITCHING_OUTPUT_URI,
    OUTPUT_WITH_AUDIO,
    OUTPUT_WITHOUT_AUDIO,
    TRANSITION_TEST,
    OUTPUT_LENGHT,
    TRIM_FROM
)

In [None]:
#@title (Optional) Upscale the local video to higher resolution
def upscale_video(
    input_video_path,
    output_video_path,
    target_width,
    target_height):
  """Upscales a video to a higher resolution using ffmpeg.

  Args:
      input_video_path: The path to the input video file.
      output_video_path: The path to the output video file.
      target_width: The desired width of the upscaled video.
      target_height: The desired height of the upscaled video.
  """
  !ffmpeg -i {input_video_path} -vf scale={target_width}:{target_height} -c:a copy {output_video_path}

# Example usage:
output_video_path = f'{TMP_STRING}/upscaled_video.mp4'
target_width = UPSCALE_FACTOR * RESIZED_IMAGE_WIDTH
target_height = UPSCALE_FACTOR * RESIZED_IMAGE_WIDTH

video_audio_filename = gcs.download_file_locally(
    f'{STITCHING_OUTPUT_URI}{OUTPUT_WITH_AUDIO}')
upscale_video(
    video_audio_filename,
    output_video_path,
    target_width,
    target_height
)

In [None]:
#@title (Optional) Clean up local files
def cleanup_tmp_folder(folder_path):
    """
    Removes all files and subdirectories within the specified folder.

    Args:
        folder_path (str): The path to the folder to clean up.
    """
    for filename in os.listdir(folder_path):
        file_path = os.path.join(folder_path, filename)
        if os.path.exists(file_path):
          os.remove(file_path)


# Call the function with the TMP_STRING folder path
cleanup_tmp_folder(TMP_STRING)