
<style>
body {
    font-family: 'Arial', sans-serif;  /* Change to your desired font */
}
</style>


```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: pcorreia@google.com
```

# Metadata Enrichment with Gemini ✨


---



## Overview



**Purpose of this notebook**

The purpose of this notebook is to showcase how Gemini works with multi-modal inputs to generate insights and support customers that have either audio, image, or video assets. More info on Gemini's multi-modality [here](https://cloud.google.com/use-cases/multimodal-ai?hl=en).

Specifically the scenario we are showcasing in this notebook is the creation of a pipeline that given a set of assets in a Google Cloud Storage bucket is able to create metadata that is accurate and informative about these assets.

**Generic Prompts**

The prompts are generic and have not been tailored to a specific content type. With further prompt engineering you'd expect to have more detailed metadata generate. For example: if most of your content is sports related, doing prompt that captures specific moments of that sport (red cards, penalties, etc) you'll have richer metadata.


## Before you start


**Requirements**

Make sure you have the following resources in your GCP environment:


*   Google cloud project with the APIs enabled;
*   Two Google cloud storage buckets, one for the input files, another to store the resulting assets (such as transcriptions, clips of the original videos and thumbnails)
*   Firestore enabled and with a collection created.
*   And your media files place in the input bucket.

Once you have this place, you'll be able to run the notebook.


**Ingestion Pipeline**


These are the relevant steps that the notebook will take you on:

1.   Load The items from the input bucket;
2.   Run the prompts for the audio files;
3.   Run the prompts for the image metadata;
4.   Run the pipeline for videos (more detail on that section).

**Result**

At the end of this pipeline you now have your GCS output bucket should have the following structure:

```bash
├── file A
│   ├── splits
│   │   ├── video split / key moment
│   │   ├── wav - audio file for the split
│   │   ├── json object with the transcription
├── file B
│   ├── splits
│   │   ├── video split / key moment
│   │   ├── wav - audio file for the split
│   │   ├── json object with the transcription
```
The schema for the metadata for each of the asset types is shown in those sections.



# Set up 🛠

In [None]:
!pip install --upgrade google-cloud-aiplatform google-cloud-speech firebase-admin librosa tqdm


In [None]:
# Restart kernel after installs so that your environment can access the new packages
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

In [None]:
import sys

# Additional authentication is required for Google Colab
if "google.colab" in sys.modules:
    # Authenticate user to Google Cloud
    from google.colab import auth

    auth.authenticate_user()

## Variables

In [None]:
PROJECT_ID = "editorial-solaris"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}
MODEL = "gemini-1.5-flash-001" # @param {type:"string"}
INPUT_BUCKET="editorial-solaris-media-input" # @param {type:"string"}
OUTPUT_BUCKET="gs://editorial-solaris-media-output" # @param {type:"string"}
#db collection for firestore
DB_COLLECTION="metadata"   # @param {type:"string"}
IMAGE_COLLECTION="images" # @param {type:"string"}
AUDIO_COLLECTION="audio"  # @param {type:"string"}

## Imports


In [None]:
#common import
import base64
import vertexai
from vertexai.generative_models import GenerativeModel, Part, FinishReason
import vertexai.preview.generative_models as generative_models
from google.cloud import storage
import re
import json
import tqdm
import os
import pandas as pd


#speech imports
from google.api_core.client_options import ClientOptions
from google.cloud.speech_v2 import SpeechClient
from google.cloud.speech_v2.types import cloud_speech
from google.api_core.client_options import ClientOptions
import librosa

#storage
import firebase_admin
from firebase_admin import firestore

#threading
from concurrent.futures import ThreadPoolExecutor, as_completed



## Common Functions

In [None]:
def generate(prompt : list, model :str = MODEL) -> str:
  vertexai.init(project=PROJECT_ID, location=LOCATION)

  model = GenerativeModel(
    model,
  )

  generation_config = {
      "max_output_tokens": 8192,
      "temperature": 1,
      "top_p": 0.95,
  }

  safety_settings = {
      generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
  }

  responses = model.generate_content(
      prompt,
      generation_config=generation_config,
      safety_settings=safety_settings,
      stream=False,
  )
  return responses.text



In [None]:
def download_blob(bucket_name, source_blob_name, destination_file_name):
    """Downloads a blob from the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The ID of your GCS object
    # source_blob_name = "storage-object-name"

    # The path to which the file should be downloaded
    # destination_file_name = "local/path/to/file"

    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)

    # Construct a client side representation of a blob.
    # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
    # any content from Google Cloud Storage. As we don't need additional data,
    # using `Bucket.blob` is preferred here.
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

    print(
        "Downloaded storage object {} from bucket {} to local file {}.".format(
            source_blob_name, bucket_name, destination_file_name
        )
    )

In [None]:
def read_json_from_gcs(bucket_name, file_name):
    """Reads a JSON file from Google Cloud Storage into a Python dictionary.

    Args:
        bucket_name: The name of the GCS bucket.
        file_name: The name of the JSON file within the bucket.

    Returns:
        A Python dictionary representing the JSON data, or None if an error occurred.
    """

    # Remove 'gs://' prefix if present using replace()
    bucket_name = bucket_name.replace("gs://", "", 1)  # Replace only the first occurrence

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)

    try:
        contents = blob.download_as_string()
        data = json.loads(contents)
        return data
    except Exception as e:
        print(f"Error reading JSON from GCS: {e}")
        return None

In [None]:
def list_bucket_files_pd(bucket_name):
    """
    Lists files in a GCS bucket with properties (name, size, updated time).

    Args:
        bucket_name (str): Name of the GCS bucket.

    Returns:
        pd.DataFrame: DataFrame containing file properties.
    """

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blobs = bucket.list_blobs()

    file_data = []
    for blob in blobs:
        file_data.append({
            'file_name': blob.name,
            'size': blob.size,
              # Last updated timestamp
            'type':blob.content_type,
            'created': blob.time_created,
            'updated': blob.updated,

        })

    df = pd.DataFrame(file_data)
    return df

In [None]:
def list_bucket_files(bucket_name):
    """
    Lists files in a GCS bucket with properties (name, size, updated time).

    Args:
        bucket_name (str): Name of the GCS bucket.

    Returns:
        list: List of JSON objects containing file properties.
    """

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blobs = bucket.list_blobs()

    file_data = []
    for blob in blobs:
        file_data.append({
            'file_name': blob.name,
            'size': blob.size,
            'type': blob.content_type,
            'created': blob.time_created,
            'updated': blob.updated,
        })

    return file_data  # Return the list of JSON objects directly

In [None]:
#just removes the long running operations from the files
def rename_transcription_files(bucket_name):
    """Renames files in a GCS bucket using a regular expression.

    Args:
        bucket_name: The name of the GCS bucket.
    """

    # Remove 'gs://' prefix if present using replace()
    bucket_name = bucket_name.replace("gs://", "", 1)  # Replace only the first occurrence

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    blobs = bucket.list_blobs()  # Get a list of all blobs (files) in the bucket

    for blob in blobs:
        # Check if the file name matches the pattern
        if re.match(r".*_transcript_.*\.json", blob.name):
            new_name = re.sub(r"_transcript_.+\.json", "_transcript.json", blob.name)

            # Only rename if the new name is different
            if new_name != blob.name:
                bucket.rename_blob(blob, new_name)
                print(f"Renamed {blob.name} to {new_name}")



# Metadata Generation 🤖



## Loading assets

Create a list of all the objects that are in the input bucket.

In [None]:
file_list = list_bucket_files(INPUT_BUCKET)
file_list

## Audio 🎧
From the list of all input files, filter out the items that have audio.

| Field | Description |
|---|---|
| **Long Summary** | A detailed and comprehensive summary of the content. |
| **Short Summary** | A concise and brief overview of the content. |
| **Labels** | Keywords or tags associated with the content. |
| **Transcript** | The transcript of the audio content. |

In [None]:
#filter out audio
audio_list = []
for file in file_list:
  if 'audio' in file['type']:
    audio_list.append(file)
audio_list

### Summary metadata 🧙

In [None]:
def generate_audio(prompt : list, model :str = MODEL) -> str:
  vertexai.init(project=PROJECT_ID, location=LOCATION)

  model = GenerativeModel(
    model,
  )

  generation_config = {
      "max_output_tokens": 8192,
      "temperature": 0.5,
      "top_p": 0.95,
  }

  safety_settings = {
      generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
  }

  responses = model.generate_content(
      prompt,
      generation_config=generation_config,
      safety_settings=safety_settings,
      stream=False,
  )

  return responses.text



In [None]:
#create a image metadata
def generate_metadata_audio(blob_uri: str, mime_type: str, model:str) -> str:
  media_asset = Part.from_uri(
      mime_type=mime_type,
      uri=blob_uri)

  prompt = """SYSTEM:```You are a skilled podcast expert. You have a deep understanding of media. Your task is to analyze the provided audio and extract key information.```
  INSTRUCTION: Please analyze the following video and provide long summary, short summary, subject topics.Please format your response as a JSON object with the given structure. Avoid any additional comment or text.
  OUTPUT:```=
  JSON
  {
    "show_name" : "the name of the podcast show"
    "short_summary": "[One paragraph summary of the content]",
    "long_summary": "[two-three paragraph summary of the content]",
    "subject_topics" :
      [     { "topic": "[topic]"}, { "topic": "[topic]"} ]
  }```
  """

  result_text = generate_audio(prompt=[media_asset, prompt], model = model )


  return result_text

In [None]:
def process_audio(audio : json):
  """Processes a single row from the audio list."""

  blob_uri = f"gs://{INPUT_BUCKET}/{audio['file_name']}"
  try:
    result = generate_metadata_audio(blob_uri, audio['type'], MODEL)
    response_text = re.sub(r"json|```", "", result)
    audio['metadata'] = json.loads(response_text)
    return result
  except Exception as e:
      print(f"Error processing {image['file_name']}: {e} " )
      traceback.print_exc()




with ThreadPoolExecutor() as executor:
    # Submit tasks to the executor
    futures = [executor.submit(process_audio, audio) for audio in audio_list]

    # You can remove the tqdm loop if you don't need progress updates
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures)):
        pass  # No need to process individual results here

print('Audio Summary Metadata Generated')


In [None]:
# create the uris in the object's metadata
for audio in audio_list:
  audio['name'] = audio['file_name'].split('.')[0]
  audio['gcs_uri'] = f"gs://{INPUT_BUCKET}/{audio['file_name']}"
  formatted_text = json.dumps(audio, indent=4, default=str)
  print(formatted_text)

## Images 📸



For each of the images we are simly creating some additional metadata to support future usage, like search or content generation. There's only one step: Run one prompt to get description, photo type, location, subject topics and person


The resulting meatada has the following format:

| Field | Description |
|---|---|
| **Description** | A textual description of the image content. |
| **Photo Shot Type** | The type of shot used (e.g., close-up, landscape, portrait). |
| **Location** | The place where the photo was taken. |
| **Labels** | Keywords or tags associated with the image. |
| **People** | Names or descriptions of people present in the image. |



In [None]:
#filter out videos
image_list = []
for file in file_list:
  if 'image' in file['type']:
    image_list.append(file)
image_list

In [None]:
def generate_image(prompt : list, model :str = MODEL) -> str:
  vertexai.init(project=PROJECT_ID, location=LOCATION)

  model = GenerativeModel(
    model,
  )

  generation_config = {
      "max_output_tokens": 8192,
      "temperature": 0.5,
      "top_p": 0.95,
  }

  safety_settings = {
      generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
  }

  responses = model.generate_content(
      prompt,
      generation_config=generation_config,
      safety_settings=safety_settings,
      stream=False,
  )

  return responses.text



In [None]:
#create a image metadata
def generate_metadata_image(blob_uri: str, mime_type: str, model:str) -> str:
  media_asset = Part.from_uri(
      mime_type=mime_type,
      uri=blob_uri)

  prompt = """SYSTEM:```You are a skilled image analysis expert. You have a deep understanding of media. Your task is to analyze the provided image and extract key information.```
  INSTRUCTION: ```Please analyze the following image and provide a description, subject topics, photo type, persons.Please format your response as a JSON object with the given structure. Avoid any additional comment or text.```
  OUTPUT:```=
    JSON
  {
    "description": "[A one line description that would support understanding the contents of the image]",
    "photo_type": "[Type of angles used to take the photography]",
    "location" : "A description of the location where the shot is taken, or if it is a known sight, its name",
    "subject_topics" :
      [     { "topic": "[topic]"}, { "topic": "[topic]"} ]
    "persons" :
  [     { "person": "[person1]"}, { "person": "[person2]"} ]
  }```
  """

  result_text = generate_image(prompt=[media_asset, prompt], model = model )


  return result_text

In [None]:
def process_image(image : json):
  """Processes a single row from the image list."""

  blob_uri = f"gs://{INPUT_BUCKET}/{image['file_name']}"
  try:
    result = generate_metadata_image(blob_uri, image['type'], MODEL)
    # print(f"Processing {image['file_name']} > {result}")

    response_text = re.sub(r"json|```", "", result)
    image['metadata'] = json.loads(response_text)
    return result
  except Exception as e:
      print(f"Error processing {image['file_name']} {e}")


with ThreadPoolExecutor() as executor:
    # Submit tasks to the executor
    futures = [executor.submit(process_image, image) for image in image_list]

    # You can remove the tqdm loop if you don't need progress updates
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures)):
        pass  # No need to process individual results here

print('Images Summary Metadata Generated')


In [None]:
for image in image_list:
  image['name'] = image['file_name'].split('.')[0]
  image['gcs_uri'] = f"gs://{INPUT_BUCKET}/{image['file_name']}"


In [None]:
for image in image_list:
  formatted_text = json.dumps(image, indent=4, default=str)
  print(formatted_text)

## Videos 🎥

This section creates the metadata for the videos. The relevant steps are:

1.   Create the summary metadata with Gemini
2.   Ask Gemini to pick the timestamps where relevant thumbnails can be done, and create images out of it.
3.   Detect the different key moments in the video, create the cuts of that video.
4.   For each of the key moments, create a transcript of the audio.

The metadata generated will follow this structure:


| Field | Description |
|---|---|
| **Short Summary** | A concise and brief overview of the video content. |
| **Long Summary** | A detailed and comprehensive summary of the video content. |
| **Labels** | Keywords or tags associated with the video. |
| **Thumbnails** | A list of thumbnails for the video. |
| **Key Moments: Shot Type** | The type of shot used for the key moment (e.g., close-up, landscape). |
| **Key Moments: Order** | The order or sequence of the key moment within the video. |
| **Key Moments: Reason** | The reason why this moment is considered key or important. |
| **Key Moments: Start Timestamp** | The timestamp in the video where the key moment starts. |
| **Key Moments: End Timestamp** | The timestamp in the video where the key moment ends. |
| **Key Moments: Transcript** | The speech-to-text transcript. |

In [None]:
#filter out videos
video_list = []
for file in file_list:
  if file['type'] == 'video/mp4':
    video_list.append(file)
video_list

### Summary metadata 🧙
This is the function that generates metadata for the asset types that are videos. It is currently not specific to the type of content (i,e: sports highlights versus a press conference highlight) but further refinment of the prompt that is different per content type will yield more accurate data


In [None]:
#create a summary of the video
def generate_metadata_video_overview(blob_uri: str) -> str:
  video1 = Part.from_uri(
      mime_type="video/mp4",
      uri=blob_uri)

  text1 = """SYSTEM:```You are a skilled video analysis expert. You have a deep understanding of media. Your task is to analyze the provided video and extract key information.```
  INSTRUCTION: ```Please analyze the following video and provide long summary, short summary, subject topics.Please format your response as a JSON object with the given structure. Avoid any additional comment or text.```
  OUTPUT:```=
  JSON
  {
    "short_summary": "[One paragraph summary of the content]",
    "long_summary": "[two-three paragraph summary of the content]",
    "subject_topics" :
      [     { "topic": "[topic]"}, { "topic": "[topic]"} ]
  }```
  """

  result_text = generate(prompt=[video1, text1], model = 'gemini-1.5-pro-001' )
  # result_text = generate(prompt=[video1, text1], model = MODEL)

  return result_text

In [None]:
def process_row(video):
    """Processes a single row from the file list."""
    blob_uri = f"gs://{INPUT_BUCKET}/{video['file_name']}"
    result = generate_metadata_video_overview(blob_uri=blob_uri)
    response_text = re.sub(r"json|```", "", result)  # Combined regex for efficiency
    summary_metadata = json.loads(response_text)
    video['summary'] = summary_metadata
    return  summary_metadata

# Number of worker threads (adjust based on your system and workload)
num_workers = 4  # Or use os.cpu_count() for a reasonable default

with ThreadPoolExecutor(max_workers=num_workers) as executor:
    # Submit tasks to the executor
    futures = [executor.submit(process_row, video) for video in video_list]

    # You can remove the tqdm loop if you don't need progress updates
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures)):
        pass  # No need to process individual results here

print('Video Summary Metadata Generated')

Showing the results of the top entry

In [None]:
formatted_json = json.dumps(video_list[0], indent=4, default=str)
print(formatted_json)


### Thumbnails ❇

In [None]:
def extract_thumbnails(video_path, timestamps):
    """
    Extracts thumbnails from a video at specified timestamps using moviepy.

    Args:
        video_path (str): The path to the video file.
        timestamps (list): A list of timestamps (in seconds) for the snapshots.

    Returns:
        list: A list of snapshot images (PIL Image objects).
    """

    clip = VideoFileClip(video_path)

    snapshots = [clip.get_frame(t) for t in timestamps]

    clip.close()

    return snapshots


In [None]:
def generate_metadata_video_thumbnails(blob_uri: str) -> str:
  video1 = Part.from_uri(
      mime_type="video/mp4",
      uri=blob_uri)
  text1 = """SYSTEM:```You are a skilled video analysis expert. You have a deep understanding of media and can accurately identify key moments in a video. Your task is to analyze the provided video and extract key thumbnails.```
  INSTRUCTION: ```Give me the timestamp for 3 suitable thumbnails for this video that highlight the key moments.
  Do not add any additional text.```
  OUTPUT:```
  JSON
    "thumbnails": [
      {
        "reason": "[Why this would be a suitable thumbnail for the video]",
        "time": "[mm:ss]",

      },
      {
        "reason": "[Why this would be a suitable thumbnail for the video]",
        "time": "[mm:ss]",
      }
    ]
  ```
  """


  result_text = generate(prompt=[video1, text1], model = 'gemini-1.5-pro-001' )

  return result_text


In [None]:
def process_row_thumbnails(video):
    """Processes a single row from the video array."""
    blob_uri = f"gs://{INPUT_BUCKET}/{video['file_name']}"
    result = generate_metadata_video_thumbnails(blob_uri=blob_uri)
    response_text = re.sub(r"json|```", "", result)  # Combined regex for efficiency
    thumbnails_metadata = json.loads(response_text)
    video['thumbnails'] = thumbnails_metadata['thumbnails']

# Number of worker threads (adjust based on your system and workload)
num_workers = 4  # Or use os.cpu_count() for a reasonable default

with ThreadPoolExecutor(max_workers=num_workers) as executor:
    # Submit tasks to the executor
    futures = [executor.submit(process_row_thumbnails, video) for video in video_list]

    # waiting for threads to complete
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures)):
        pass  # No need to process individual results here

print('Video Thumbnails Metadata Generated')

In [None]:
formatted_json = json.dumps(video_list, indent=4, default=str)
print(formatted_json)


### Section Metadata 🧙

In [None]:
def generate_metadata_video_sections(blob_uri: str) -> str:
  video1 = Part.from_uri(
      mime_type="video/mp4",
      uri=blob_uri)
  text1 = """SYSTEM:```You are a skilled video analysis expert. You have a deep understanding of media and can accurately identify key moments in a video. Your task is to analyze the provided video and extract all the highlight clips. For each clip, you need to classify the type of highlight and provide the precise start and end timestamps.```
  INSTRUCTION: ```Please analyze the following video and provide a list of all the highlight clips with their type and timestamps. Also explain the reason why the selection of that particular timestamp has been made. Please format your response as a JSON object with the given structure. Make sure the audio is not truncated while suggesting the clips. Avoid any additional comment or text.```
  OUTPUT:```
  JSON
  {
    "sections": [
      {
        "type": "[highlight type]",
        "start_time": "[mm:ss]",
        "end_time": "[mm:ss]",
        "reason" : ""
      },
      {
        "type":"[highlight type]",
        "start_time": "[mm:ss]",
        "end_time": "[mm:ss]",
        "reason" : ""
      }
    ]
  }```
  Please make sure the timestamps are accurate and reflect the precise start and end of each highlight clip."""


  result_text = generate(prompt=[video1, text1], model = 'gemini-1.5-pro-001' )

  return result_text


In [None]:
def process_row(video):
    """Processes a single row from the video array, with error handling."""
    try:
        blob_uri = f"gs://{INPUT_BUCKET}/{video['file_name']}"
        result = generate_metadata_video_sections(blob_uri=blob_uri)
        response_text = re.sub(r"json|```", "", result)
        sections_metadata = json.loads(response_text)
        video['sections'] = sections_metadata['sections']
    except Exception as e:
        # Log or print the error for debugging
        print(f"Error processing video sections: {video['file_name']}: {e}")

# Number of worker threads (adjust based on your system and workload)
num_workers = 4  # Or use os.cpu_count() for a reasonable default

with ThreadPoolExecutor(max_workers=num_workers) as executor:
    # Submit tasks to the executor
    futures = [executor.submit(process_row, video) for video in video_list]

    # waiting for threads to complete
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures)):
        pass  # No need to process individual results here

print('Video Sections Metadata Generated')

Showing the result of the top video

In [None]:
formatted_json = json.dumps(video_list[0], indent=4, default=str)
print(formatted_json)


### Section Clipping ✂

Preparing the folders in the notebook

In [None]:
# Create the output folder if it doesn't exist
output_folder = f"content/"
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

#create splits folder if it doesn't exist
splits_folder = f"content/splits"
if not os.path.exists(splits_folder):
    os.makedirs(splits_folder)


for video in video_list:
  #copy file to notebook to start splitting it
  destination_file_name = f"content/{video['file_name']}"
  download_blob(INPUT_BUCKET, video['file_name'], destination_file_name)
  #create the splits folders
  split_folder = f"content/splits/{video['file_name'].split('.')[0]}"
  if not os.path.exists(split_folder):
    os.makedirs(split_folder)


Create sections of the original video. Each thread should launch a video split, update the json object and output the file to the local notebook


In [None]:
import os
from moviepy.editor import *


def process_segment(video, index):
    source_file_name = f"content/{video['file_name']}"
    clip = VideoFileClip(source_file_name)

    video_name = video['file_name'].split('.')[0]
    clip_filename = f"content/splits/{video_name}/{video_name}_split_{index}.mp4"
    audio_filename = f"content/splits/{video_name}/{video_name}_split_{index}.wav"

    section_info = video['sections'][index]

    #handling section for video
    clip_subsection = clip.subclip(section_info["start_time"], section_info["end_time"])
    clip_subsection.write_videofile(clip_filename, verbose=False, logger=None)

    #handling audio split
    audio_clip = clip_subsection.audio
    audio_clip.write_audiofile(audio_filename, codec = 'pcm_s16le', verbose=False, logger=None)

    #update the uri
    gcs_video_uri = f"{OUTPUT_BUCKET}/splits/{video_name}/{video_name}_split_{index}.mp4"
    gcs_audio_uri = f"{OUTPUT_BUCKET}/splits/{video_name}/{video_name}_split_{index}.wav"

    video['sections'][index]['split_video_uri'] = gcs_video_uri
    video['sections'][index]['split_audio_uri'] = gcs_audio_uri



def process_video_segments(video):

  #launching threads to do segment cut for one video
  with ThreadPoolExecutor() as executor:
    # futures = [executor.submit(process_segment, video) for video in video['sections']['highlights']]#TODO: unecessary sections in json object
    futures = [executor.submit(process_segment, video, index) for index, row in enumerate(video['sections'])]
    # waiting for threads to complete
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures), desc = f"Processing {video['file_name']}"):
        pass  # No need to process individual results here


for video in video_list:
  process_video_segments(video)


print('Video Sections Metadata Generated')

In [None]:
for video in video_list[:1]:
  # print(video['file_name'])
  string = json.dumps(video, indent=4, default=str)
  print(string)

Copy the files from the local folder to the GCS Bucket

In [None]:
# prompt: Move all files in content/splits to a gcs bucket using gsutils
!gsutil -m cp -r content/splits {OUTPUT_BUCKET}


### Transcription 📓


First, you need to initiate a Recognizer which uses the Chirp model and transcribe the audio in English.

See [the documentation](https://cloud.google.com/python/docs/reference/speech/latest/google.cloud.speech_v2.types.CreateRecognizerRequest) to learn more about how to configure the `CreateRecognizerRequest` request.

In [None]:
client = SpeechClient(
    client_options=ClientOptions(api_endpoint=f"{LOCATION}-speech.googleapis.com")
)

language_code = "en-AU"
recognizer_id = f"chirp-{language_code.lower()}-interview"

recognizer_request = cloud_speech.CreateRecognizerRequest(
    parent=f"projects/{PROJECT_ID}/locations/{LOCATION}",
    recognizer_id=recognizer_id,
    recognizer=cloud_speech.Recognizer(
        language_codes=[language_code],
        model="chirp",
    ),
)

Then, you create a Speech-to-Text [Recognizer](https://cloud.google.com/speech-to-text/v2/docs/recognizers) that uses Chirp running a create operation.

In [None]:
try:
    recognizer_name = f"projects/{PROJECT_ID}/locations/{LOCATION}/recognizers/{recognizer_id}"
    recognizer = client.get_recognizer(name=recognizer_name)
    print(f"Recognizer '{recognizer_id}' already exists.")
except Exception:
    print(f"Recognizer '{recognizer_id}' does not exist.")
    create_operation = client.create_recognizer(request=recognizer_request)
    recognizer = create_operation.result()

In [None]:
recognizer

Now create the batch long running request and wait for the results

In [None]:

def process_segment_transcription(element, client, recognizer):
    long_audio_uri = element['split_audio_uri']
    directory_path = os.path.dirname(long_audio_uri)

    long_audio_config = cloud_speech.RecognitionConfig(
      features=cloud_speech.RecognitionFeatures(
          enable_automatic_punctuation=True, enable_word_time_offsets=True
      ),
      auto_decoding_config={},
    )


    long_audio_request = cloud_speech.BatchRecognizeRequest(
        recognizer=recognizer.name,
        recognition_output_config={
            "gcs_output_config": {"uri": directory_path}
        },
        files=[{"config": long_audio_config, "uri": long_audio_uri}],
    )

    long_audio_operation = client.batch_recognize(request=long_audio_request)
    return long_audio_operation

futures = []  # Create an empty list to store the results

def process_video_segments_transcription(video):
  with ThreadPoolExecutor() as executor:
    futures = [executor.submit(process_segment_transcription, section, client, recognizer) for index, section in enumerate(video['sections'])]


    # # waiting for threads to complete
    # for _ in tqdm.tqdm(as_completed(futures), total=len(futures), desc=f"Processing {video['file_name']}"):
    #     pass  # No need to process individual results here

    for future in tqdm.tqdm(as_completed(futures), total=len(futures), desc=f"Transcribing segments {video['file_name']}"):
      operation = future.result()  # Get the Operation object
      # Wait for the operation to complete
      response = operation.result()

for video in video_list:
  process_video_segments_transcription(video)

print(f"Transcriptions done.")

remove the long running operations ids from the transcriptions

In [None]:
rename_transcription_files(OUTPUT_BUCKET)

Add the transcriptions information to the overall object.



1.   Add the transcription_uri
2.   put the accepted transcript in the json object



In [None]:
for video in video_list:
  for index, segment in enumerate(video['sections']):
    video_name = video['file_name'].split('.')[0]
    segment['split_transcription_uri'] = f"{OUTPUT_BUCKET}/splits/{video_name}/{video_name}_split_{index}_transcript.json"
    #read transcription into memory
    file_path = segment['split_transcription_uri'].replace(f"{OUTPUT_BUCKET}/", "")

    transcription_results = read_json_from_gcs(OUTPUT_BUCKET, file_path)
    final_transcription = ""

    # Extract and concatenate transcripts
    transcripts = []
    for result in transcription_results['results']:
      if 'alternatives' in result:
        for alternative in result['alternatives']:
            transcripts.append(alternative['transcript'])
    if transcripts:
      concatenated_transcript = ' '.join(transcripts)
      segment['transcription'] = concatenated_transcript

In [None]:
for video in video_list[:1]:
  string = json.dumps(video, indent=4, default=str)
  print(string)

# Storing Metadata 💾

In [None]:
# starting the firebase db
# Initialize Firebase Admin SDK
firebase_admin.initialize_app()


In [None]:
# Get a reference to the Firestore database
db = firestore.client()

In [None]:
for audio in tqdm.tqdm(audio_list, desc="Storing audio metada"):
  doc_ref = db.collection(AUDIO_COLLECTION).document(audio['name'])
  doc_ref.set(audio)

In [None]:
for image in tqdm.tqdm(image_list, desc="Storing images metada"):
  doc_ref = db.collection(IMAGE_COLLECTION).document(image['name'])
  doc_ref.set(image)

In [None]:
# Storing the video information
for video in tqdm.tqdm(video_list, desc="Storing videos metada"):
    video_name = video['file_name'].split('.')[0]
    doc_ref = db.collection(DB_COLLECTION).document(video_name)

    # Insert the JSON object into the document
    doc_ref.set(video)