
<style>
body {
    font-family: 'Arial', sans-serif;  /* Change to your desired font */
}
</style>


```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: pcorreia@google.com
```

# Metadata Enrichment with Gemini ✨


---



## Overview



**Purpose of this notebook**

The purpose of this notebook is to showcase how Gemini works with multi-modal inputs to extract information. More info on Gemini's multi-modality [here](https://cloud.google.com/use-cases/multimodal-ai?hl=en).

Specifically the scenario we are showcasing in this notebook is the creation of a pipeline that given a set of medical report in a Google Cloud Storage bucket is able to create metadata that represents all the details of these reports.

**Generic Prompts**

The prompts are generic and have not been tailored to a specific content type. With further prompt engineering you'd expect to have more detailed metadata generate. For example: if most of your content is a specific type of report, doing prompt that captures specific items of that report you'll have richer metadata.


## Before you start


**Requirements**

Make sure you have the following resources in your GCP environment:


*   Google cloud project with the APIs enabled;
*   One Google cloud storage buckets - for the input files
*   Firestore enabled and with a collection created.
*   And your reports placed in the input bucket.

Once you have this place, you'll be able to run the notebook.


**Ingestion Pipeline**


These are the relevant steps that the notebook will take you on:

1.   Load The items from the input bucket;
2.   Run the prompts for the reports;
3.   Store the metadata;


**Result**

At the end of this pipeline you now have your GCS output bucket should have the following structure:

```bash
├── file A
│   ├── splits
│   │   ├── video split / key moment
│   │   ├── wav - audio file for the split
│   │   ├── json object with the transcription
├── file B
│   ├── splits
│   │   ├── video split / key moment
│   │   ├── wav - audio file for the split
│   │   ├── json object with the transcription
```
The schema for the metadata for each of the asset types is shown in those sections.



# Set up 🛠

In [None]:
!pip install --upgrade google-cloud-aiplatform google-cloud-speech firebase-admin librosa tqdm


In [None]:
# Restart kernel after installs so that your environment can access the new packages
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

In [None]:
import sys

# Additional authentication is required for Google Colab
if "google.colab" in sys.modules:
    # Authenticate user to Google Cloud
    from google.colab import auth

    auth.authenticate_user()

## Variables

In [None]:
PROJECT_ID = "editorial-solaris"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}
MODEL = "gemini-1.5-pro-001" # @param {type:"string"}
PIPELINE_INPUT_BUCKET = "editorial-solaris-demo-pipeline-input" #@param {type:"string"}
INPUT_BUCKET="editorial-solaris-demo-input" # @param {type:"string"}
OUTPUT_BUCKET="gs://editorial-solaris-demo-output" # @param {type:"string"}
EMBEDDINGS_BUCKET = "editorial-solaris-demo-embeddings" # @param {type:"string"} editorial-solaris-demo-embeddings

#db collection for firestore
VIDEO_COLLECTION="video-demo"   # @param {type:"string"}
IMAGE_COLLECTION="images-demo" # @param {type:"string"}
AUDIO_COLLECTION="audio-demo"  # @param {type:"string"}
ARTICLE_COLLECTION="article-demo"  # @param {type:"string"}


## Imports


In [None]:
#common import
import base64
import vertexai
from vertexai.generative_models import GenerativeModel, Part, FinishReason
import vertexai.preview.generative_models as generative_models
from vertexai.language_models import TextEmbeddingModel
from google.cloud import storage
import re
import json
import tqdm
import os
import io
import pandas as pd
import time


#speech imports
from google.api_core.client_options import ClientOptions
from google.cloud.speech_v2 import SpeechClient
from google.cloud.speech_v2.types import cloud_speech
from google.api_core.client_options import ClientOptions
import librosa

#storage
import firebase_admin
from firebase_admin import firestore

#threading
from concurrent.futures import ThreadPoolExecutor, as_completed

# Video
from moviepy.editor import VideoFileClip
from moviepy.editor import ImageClip

## Common Functions

In [None]:
def generate(prompt : list, model :str = MODEL, location :str = LOCATION) -> str:
  vertexai.init(project=PROJECT_ID, location=location)

  model = GenerativeModel(
    model,
  )

  generation_config = {
      "max_output_tokens": 8192,
      "temperature": 1,
      "top_p": 0.95,
  }

  safety_settings = {
      generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
  }

  responses = model.generate_content(
      prompt,
      generation_config=generation_config,
      safety_settings=safety_settings,
      stream=False,
  )
  return responses.text



In [None]:
def download_blob(bucket_name, source_blob_name, destination_file_name):
    """Downloads a blob from the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The ID of your GCS object
    # source_blob_name = "storage-object-name"

    # The path to which the file should be downloaded
    # destination_file_name = "local/path/to/file"

    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)

    # Construct a client side representation of a blob.
    # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
    # any content from Google Cloud Storage. As we don't need additional data,
    # using `Bucket.blob` is preferred here.
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

    print(
        "Downloaded storage object {} from bucket {} to local file {}.".format(
            source_blob_name, bucket_name, destination_file_name
        )
    )

In [None]:
def read_json_from_gcs(bucket_name, file_name):
    """Reads a JSON file from Google Cloud Storage into a Python dictionary.

    Args:
        bucket_name: The name of the GCS bucket.
        file_name: The name of the JSON file within the bucket.

    Returns:
        A Python dictionary representing the JSON data, or None if an error occurred.
    """

    # Remove 'gs://' prefix if present using replace()
    bucket_name = bucket_name.replace("gs://", "", 1)  # Replace only the first occurrence

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)

    try:
        contents = blob.download_as_string()
        data = json.loads(contents)
        return data
    except Exception as e:
        print(f"Error reading JSON from GCS: {e}")
        return None

In [None]:
def read_string_from_gcs(bucket_name, file_name):
    """Reads a text file from Google Cloud Storage into a Python string.

    Args:
        bucket_name: The name of the GCS bucket.
        file_name: The name of the text file within the bucket.

    Returns:
        A string that has the data from the bucket, or None if an error occurred.
    """

    # Remove 'gs://' prefix if present using replace()
    bucket_name = bucket_name.replace("gs://", "", 1)  # Replace only the first occurrence

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)

    try:
        contents = blob.download_as_string()
        return contents

    except Exception as e:
        print(f"Error reading text from GCS: {e}")
        return None

In [None]:
def list_bucket_files_pd(bucket_name):
    """
    Lists files in a GCS bucket with properties (name, size, updated time).

    Args:
        bucket_name (str): Name of the GCS bucket.

    Returns:
        pd.DataFrame: DataFrame containing file properties.
    """

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blobs = bucket.list_blobs()

    file_data = []
    for blob in blobs:
        file_data.append({
            'file_name': blob.name,
            'size': blob.size,
              # Last updated timestamp
            'type':blob.content_type,
            'created': blob.time_created,
            'updated': blob.updated,

        })

    df = pd.DataFrame(file_data)
    return df

In [None]:
def list_bucket_files(bucket_name):
    """
    Lists files in a GCS bucket with properties (name, size, updated time).

    Args:
        bucket_name (str): Name of the GCS bucket.

    Returns:
        list: List of JSON objects containing file properties.
    """

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blobs = bucket.list_blobs()

    file_data = []
    for blob in blobs:
        file_data.append({
            'file_name': blob.name,
            'size': blob.size,
            'type': blob.content_type,
            'created': blob.time_created,
            'updated': blob.updated,
        })

    return file_data  # Return the list of JSON objects directly

In [None]:
#just removes the long running operations from the files
def rename_transcription_files(bucket_name):
    """Renames files in a GCS bucket using a regular expression.

    Args:
        bucket_name: The name of the GCS bucket.
    """

    # Remove 'gs://' prefix if present using replace()
    bucket_name = bucket_name.replace("gs://", "", 1)  # Replace only the first occurrence

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    blobs = bucket.list_blobs()  # Get a list of all blobs (files) in the bucket

    for blob in blobs:
        # Check if the file name matches the pattern
        if re.match(r".*_transcript_.*\.json", blob.name):
            new_name = re.sub(r"_transcript_.+\.json", "_transcript.json", blob.name)

            # Only rename if the new name is different
            if new_name != blob.name:
                bucket.rename_blob(blob, new_name)
                print(f"Renamed {blob.name} to {new_name}")



# Metadata Generation 🤖



## Loading assets

Create a list of all the objects that are in the input bucket.

In [None]:
file_list = list_bucket_files(PIPELINE_INPUT_BUCKET)
file_list

## Transcription setup

First, you need to initiate a Recognizer which uses the Chirp model and transcribe the audio in English.

See [the documentation](https://cloud.google.com/python/docs/reference/speech/latest/google.cloud.speech_v2.types.CreateRecognizerRequest) to learn more about how to configure the `CreateRecognizerRequest` request.

In [None]:
client = SpeechClient(
    client_options=ClientOptions(api_endpoint=f"{LOCATION}-speech.googleapis.com")
)

language_code = "en-AU"
recognizer_id = f"chirp-{language_code.lower()}-interview"

recognizer_request = cloud_speech.CreateRecognizerRequest(
    parent=f"projects/{PROJECT_ID}/locations/{LOCATION}",
    recognizer_id=recognizer_id,
    recognizer=cloud_speech.Recognizer(
        language_codes=[language_code],
        model="chirp",
    ),
)

Then, you create a Speech-to-Text [Recognizer](https://cloud.google.com/speech-to-text/v2/docs/recognizers) that uses Chirp running a create operation.

In [None]:
try:
    recognizer_name = f"projects/{PROJECT_ID}/locations/{LOCATION}/recognizers/{recognizer_id}"
    recognizer = client.get_recognizer(name=recognizer_name)
    print(f"Recognizer '{recognizer_id}' already exists.")
except Exception:
    print(f"Recognizer '{recognizer_id}' does not exist.")
    create_operation = client.create_recognizer(request=recognizer_request)
    recognizer = create_operation.result()

In [None]:
recognizer

## Audio 🎧
From the list of all input files, filter out the items that have audio.

| Field | Description |
|---|---|
| **Long Summary** | A detailed and comprehensive summary of the content. |
| **Short Summary** | A concise and brief overview of the content. |
| **Labels** | Keywords or tags associated with the content. |
| **Transcript** | The transcript of the audio content. |

In [None]:
#filter out audio
audio_list = []
for file in file_list:
  if 'audio' in file['type']:
    audio_list.append(file)
audio_list

### Summary metadata 🧙

In [None]:
def generate_audio(prompt : list, model :str = MODEL) -> str:
  vertexai.init(project=PROJECT_ID, location=LOCATION)

  model = GenerativeModel(
    model,
  )

  generation_config = {
      "max_output_tokens": 8192,
      "temperature": 0.5,
      "top_p": 0.95,
  }

  safety_settings = {
      generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
  }

  responses = model.generate_content(
      prompt,
      generation_config=generation_config,
      safety_settings=safety_settings,
      stream=False,
  )

  return responses.text



In [None]:
#create a image metadata
def generate_metadata_audio(blob_uri: str, mime_type: str, model:str) -> str:
  media_asset = Part.from_uri(
      mime_type=mime_type,
      uri=blob_uri)

  prompt = """SYSTEM:```You are a skilled podcast expert. You have a deep understanding of media. Your task is to analyze the provided audio and extract key information.```
  INSTRUCTION: Please analyze the following video and provide long summary, short summary, subject topics.Please format your response as a JSON object with the given structure. Avoid any additional comment or text.
  OUTPUT:```=
  JSON
  {
    "show_name" : "the name of the podcast show"
    "short_summary": "[One paragraph summary of the content]",
    "long_summary": "[two-three paragraph summary of the content]",
    "subject_topics" :
      [     { "topic": "[topic]"}, { "topic": "[topic]"} ]
  }```
  """

  result_text = generate_audio(prompt=[media_asset, prompt], model = model )


  return result_text

In [None]:
def process_audio(audio : json):
  """Processes a single row from the audio list."""

  blob_uri = f"gs://{INPUT_BUCKET}/{audio['file_name']}"
  try:
    result = generate_metadata_audio(blob_uri, audio['type'], MODEL)
    response_text = re.sub(r"json|```", "", result)
    audio['metadata'] = json.loads(response_text)
    return result
  except Exception as e:
      print(f"Error processing {audio['file_name']}: {e} " )





with ThreadPoolExecutor() as executor:
    # Submit tasks to the executor
    futures = [executor.submit(process_audio, audio) for audio in audio_list]

    # You can remove the tqdm loop if you don't need progress updates
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures)):
        pass  # No need to process individual results here

print('Audio Summary Metadata Generated')


In [None]:
# create the uris in the object's metadata
for audio in audio_list:
  audio['name'] = audio['file_name'].split('.')[0]
  audio['gcs_uri'] = f"gs://{INPUT_BUCKET}/{audio['file_name']}"
  formatted_text = json.dumps(audio, indent=4, default=str)
  print(formatted_text)

### Transcription

In [None]:
#TODO: Change for the audio configuration instead of the video sgement
def process_segment_transcription(element, client, recognizer):
    long_audio_uri = element['split_audio_uri']
    directory_path = os.path.dirname(long_audio_uri)

    long_audio_config = cloud_speech.RecognitionConfig(
      features=cloud_speech.RecognitionFeatures(
          enable_automatic_punctuation=True, enable_word_time_offsets=True
      ),
      auto_decoding_config={},
    )


    long_audio_request = cloud_speech.BatchRecognizeRequest(
        recognizer=recognizer.name,
        recognition_output_config={
            "gcs_output_config": {"uri": directory_path}
        },
        files=[{"config": long_audio_config, "uri": long_audio_uri}],
    )

    long_audio_operation = client.batch_recognize(request=long_audio_request)
    return long_audio_operation

futures = []  # Create an empty list to store the results

def process_video_segments_transcription(video):
  with ThreadPoolExecutor() as executor:
    futures = [executor.submit(process_segment_transcription, section, client, recognizer) for index, section in enumerate(video['sections'])]


    # # waiting for threads to complete
    # for _ in tqdm.tqdm(as_completed(futures), total=len(futures), desc=f"Processing {video['file_name']}"):
    #     pass  # No need to process individual results here

    for future in tqdm.tqdm(as_completed(futures), total=len(futures), desc=f"Transcribing segments {video['file_name']}"):
      operation = future.result()  # Get the Operation object
      # Wait for the operation to complete
      response = operation.result()

for audio in audio_list:
  process_video_segments_transcription(audio)

print(f"Transcriptions done.")

remove the long running operations ids from the transcriptions

In [None]:
rename_transcription_files(OUTPUT_BUCKET)

## Images 📸



For each of the images we are simly creating some additional metadata to support future usage, like search or content generation. There's only one step: Run one prompt to get description, photo type, location, subject topics and person


The resulting meatada has the following format:

| Field | Description |
|---|---|
| **Description** | A textual description of the image content. |
| **Photo Shot Type** | The type of shot used (e.g., close-up, landscape, portrait). |
| **Location** | The place where the photo was taken. |
| **Labels** | Keywords or tags associated with the image. |
| **People** | Names or descriptions of people present in the image. |



In [None]:
#filter out videos
image_list = []
for file in file_list:
  if 'image' in file['type']:
    image_list.append(file)
image_list

In [None]:
def generate_image(prompt : list, model :str = MODEL) -> str:
  vertexai.init(project=PROJECT_ID, location=LOCATION)

  model = GenerativeModel(
    model,
  )

  generation_config = {
      "max_output_tokens": 8192,
      "temperature": 0.5,
      "top_p": 0.95,
  }

  safety_settings = {
      generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
  }

  responses = model.generate_content(
      prompt,
      generation_config=generation_config,
      safety_settings=safety_settings,
      stream=False,
  )

  return responses.text



In [None]:
#create a image metadata
def generate_metadata_image(blob_uri: str, mime_type: str, model:str) -> str:
  media_asset = Part.from_uri(
      mime_type=mime_type,
      uri=blob_uri)

  prompt = """SYSTEM:```You are a skilled image analysis expert. You have a deep understanding of media. Your task is to analyze the provided image and extract key information.```
  INSTRUCTION: ```Please analyze the following image and provide a description, subject topics, photo type, persons.Please format your response as a JSON object with the given structure. Avoid any additional comment or text.```
  OUTPUT:```=
    JSON
  {
    "description": "[A one line description that would support understanding the contents of the image]",
    "photo_type": "[Type of angles used to take the photography]",
    "location" : "A description of the location where the shot is taken, or if it is a known sight, its name",
    "subject_topics" :
      [     { "topic": "[topic]"}, { "topic": "[topic]"} ]
    "persons" :
  [     { "person": "[person1]"}, { "person": "[person2]"} ]
  }```
  """

  result_text = generate_image(prompt=[media_asset, prompt], model = model )


  return result_text

In [None]:
def process_image(image : json):
  """Processes a single row from the image list."""

  blob_uri = f"gs://{INPUT_BUCKET}/{image['file_name']}"
  try:
    result = generate_metadata_image(blob_uri, image['type'], MODEL)
    # print(f"Processing {image['file_name']} > {result}")

    response_text = re.sub(r"json|```", "", result)
    image['metadata'] = json.loads(response_text)
    return result
  except Exception as e:
      print(f"Error processing {image['file_name']} {e}")


with ThreadPoolExecutor() as executor:
    # Submit tasks to the executor
    futures = [executor.submit(process_image, image) for image in image_list]

    # You can remove the tqdm loop if you don't need progress updates
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures)):
        pass  # No need to process individual results here

print('Images Summary Metadata Generated')


In [None]:
for image in image_list:
  image['name'] = image['file_name'].split('.')[0]
  image['gcs_uri'] = f"gs://{INPUT_BUCKET}/{image['file_name']}"


In [None]:
for image in image_list:
  formatted_text = json.dumps(image, indent=4, default=str)
  print(formatted_text)

## Videos 🎥

This section creates the metadata for the videos. The relevant steps are:

1.   Create the summary metadata with Gemini
2.   Ask Gemini to pick the timestamps where relevant thumbnails can be done, and create images out of it.
3.   Detect the different key moments in the video, create the cuts of that video.
4.   For each of the key moments, create a transcript of the audio.

The metadata generated will follow this structure:


| Field | Description |
|---|---|
| **Short Summary** | A concise and brief overview of the video content. |
| **Long Summary** | A detailed and comprehensive summary of the video content. |
| **Labels** | Keywords or tags associated with the video. |
| **Thumbnails** | A list of thumbnails for the video. |
| **Key Moments: Shot Type** | The type of shot used for the key moment (e.g., close-up, landscape). |
| **Key Moments: Order** | The order or sequence of the key moment within the video. |
| **Key Moments: Reason** | The reason why this moment is considered key or important. |
| **Key Moments: Start Timestamp** | The timestamp in the video where the key moment starts. |
| **Key Moments: End Timestamp** | The timestamp in the video where the key moment ends. |
| **Key Moments: Transcript** | The speech-to-text transcript. |

In [None]:
#filter out videos
video_list = []
for file in file_list:
  if file['type'] == 'video/mp4':
    video_list.append(file)
video_list

In [None]:
#copy files locally
# Create the output folder if it doesn't exist
output_folder = f"content/"
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

for video in video_list:
  #copy file to notebook
  destination_file_name = f"content/{video['file_name']}"
  download_blob(PIPELINE_INPUT_BUCKET, video['file_name'], destination_file_name)


### Summary metadata 🧙
This is the function that generates metadata for the asset types that are videos. It is currently not specific to the type of content (i,e: sports highlights versus a press conference highlight) but further refinment of the prompt that is different per content type will yield more accurate data


In [None]:
#create a summary of the video
def generate_metadata_video_overview(blob_uri: str) -> str:
  video1 = Part.from_uri(
      mime_type="video/mp4",
      uri=blob_uri)

  text1 = """SYSTEM:```You are a skilled video analysis expert. You have a deep understanding of media. Your task is to analyze the provided video and extract key information.```
  INSTRUCTION: ```Please analyze the following video and provide long summary, short summary, subject topics.Please format your response as a JSON object with the given structure. Avoid any additional comment or text.```
  OUTPUT:```=
  JSON
  {
    "short_summary": "[One paragraph summary of the content]",
    "long_summary": "[two-three paragraph summary of the content]",
    "subject_topics" :
      [     { "topic": "[topic]"}, { "topic": "[topic]"} ]
  }```
  """

  result_text = generate(prompt=[video1, text1], model = 'gemini-1.5-pro-001' )

  return result_text

In [None]:
def process_row_summary(video):
    """Processes a single row from the video array."""
    try:
        blob_uri = f"gs://{PIPELINE_INPUT_BUCKET}/{video['file_name']}"
        result = generate_metadata_video_overview(blob_uri=blob_uri)
        response_text = re.sub(r"json|```", "", result)
        summary = json.loads(response_text)
        video['summary'] = summary
    except Exception as e:
        print(f"Error processing video {video['file_name']}: {e}")

# Number of worker threads (adjust based on your system and workload)
num_workers = 4  # Or use os.cpu_count() for a reasonable default

with ThreadPoolExecutor(max_workers=num_workers) as executor:
    # Submit tasks to the executor
    futures = [executor.submit(process_row_summary, video) for video in video_list]

    # futures = []
    # for video in tqdm.tqdm(video_list, desc='Launching prompts'):
    #   future = executor.submit(process_row_summary, video)
    #   time.sleep(60)

    #   futures.append(future)

    # waiting for threads to complete
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures)):
        pass  # No need to process individual results here

print('Video Summary Metadata Generated')

Showing the results of the top entry

In [None]:
formatted_json = json.dumps(video_list[0], indent=4, default=str)
print(formatted_json)


### Thumbnails ❇

In [None]:
def extract_thumbnails(video_path, timestamps):
    """
    Extracts thumbnails from a video at specified timestamps using moviepy.

    Args:
        video_path (str): The path to the video file.
        timestamps (list): A list of timestamps (in seconds) for the snapshots.

    Returns:
        list: A list of snapshot images (PIL Image objects).
    """

    clip = VideoFileClip(video_path)

    snapshots = [clip.get_frame(t) for t in timestamps]

    clip.close()

    return snapshots


In [None]:
def generate_metadata_video_thumbnails(blob_uri: str) -> str:
  video1 = Part.from_uri(
      mime_type="video/mp4",
      uri=blob_uri)
  text1 = """SYSTEM:```You are a skilled video analysis expert. You have a deep understanding of media and can accurately identify key moments in a video. Your task is to analyze the provided video and extract key thumbnails.```
  INSTRUCTION: ```Give me the timestamp for 3 suitable thumbnails for this video that highlight the key moments.
  Do not add any additional text.```
  OUTPUT:```
  JSON
    "thumbnails": [
      {
        "reason": "[Why this would be a suitable thumbnail for the video]",
        "time": "[mm:ss]",

      },
      {
        "reason": "[Why this would be a suitable thumbnail for the video]",
        "time": "[mm:ss]",
      }
    ]
  ```
  """


  result_text = generate(prompt=[video1, text1], model = 'gemini-1.5-pro-001' )

  return result_text


In [None]:
def process_row_thumbnails(video):
    """Processes a single row from the video array."""
    try:
        blob_uri = f"gs://{PIPELINE_INPUT_BUCKET}/{video['file_name']}"
        result = generate_metadata_video_thumbnails(blob_uri=blob_uri)
        response_text = re.sub(r"json|```", "", result)
        print(response_text)
        thumbnails_metadata = json.loads(response_text)
        video['thumbnails'] = thumbnails_metadata['thumbnails']
    except Exception as e:
        print(f"Error processing video {video['file_name']}: {e}")

# Number of worker threads (adjust based on your system and workload)
num_workers = 4  # Or use os.cpu_count() for a reasonable default

with ThreadPoolExecutor(max_workers=num_workers) as executor:
    # Submit tasks to the executor
    futures = [executor.submit(process_row_thumbnails, video) for video in video_list]

    # waiting for threads to complete
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures)):
        pass  # No need to process individual results here

print('Video Thumbnails Metadata Generated')

100%|██████████| 1/1 [02:35<00:00, 155.20s/it]


{
  "thumbnails": [
    {
      "reason": "Shows the opening keynote title card for Google Cloud Next",
      "time": "0:04"
    },
    {
      "reason": "Shows Thomas Kurian, CEO of Google Cloud, entering the stage",
      "time": "1:48"
    },
    {
      "reason": "Shows Sundar Pichai, CEO of Google, on the screen addressing the audience",
      "time": "3:37"
    }
  ]
}

Video Thumbnails Metadata Generated





In [None]:
formatted_json = json.dumps(video_list[2], indent=4, default=str)
print(formatted_json)


IndexError: list index out of range

In [None]:
def store_thumbnails(video_path, timestamps, output_prefix):
    """
    Extracts thumbnails from a video at specified timestamps and uploads them to a GCS bucket as PNG files.

    Args:
        video_path (str): The path to the video file.
        timestamps (list): A list of timestamps (in seconds) for the snapshots.
        bucket_name (str): The name of the GCS bucket.
        output_prefix (str): A prefix for the output filenames in the bucket (e.g., "thumbnails/").
    """

    try:
        snapshots = extract_thumbnails(video_path, timestamps)

        for i, snapshot in enumerate(snapshots):
            try:
                img_clip = ImageClip(snapshot)
                thumbnail_name = f"{output_prefix}_thumbnail_{i}.png"

                #save to the local file
                img_clip.save_frame(thumbnail_name)

                img_clip.close()
            except Exception as e:
                print(f"Error processing and uploading thumbnail {i}: {e}")

    except Exception as e:
        print(f"Error extracting thumbnails from video {video_path}: {e}")

In [None]:
def convert_time_to_seconds(time_str):
  """Converts a time string in the format 'M:SS' to seconds.

  Args:
    time_str: A string representing the time in the format 'M:SS'.

  Returns:
    An integer representing the total number of seconds.
  """

  minutes, seconds = map(int, time_str.split(':'))
  total_seconds = minutes * 60 + seconds
  return total_seconds


#copy files locally
# Create the output folder if it doesn't exist
output_folder = f"thumbnails"
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

for video in video_list:
  #create the field name, which is the file_name without the file extension
  video['name'] = video['file_name'].split('.')[0]


for video in video_list:
  file_name = f"content/{video['file_name']}"
  if 'thumbnails' in video:
    seconds_array = []
    for thumbnail in video['thumbnails']:
      seconds_array.append(convert_time_to_seconds(thumbnail['time']))
      ouput_prefix = f"thumbnails/{video['name']}"
      store_thumbnails(file_name, seconds_array, ouput_prefix)

In [None]:
thumbnails_folder = f"{OUTPUT_BUCKET}"

!gsutil -m cp -r thumbnails/ {thumbnails_folder}


Copying file://thumbnails/Google Cloud Next 24 - Keynote_thumbnail_2.png [Content-Type=image/png]...
/ [0/3 files][    0.0 B/  1.8 MiB]   0% Done                                    Copying file://thumbnails/Google Cloud Next 24 - Keynote_thumbnail_0.png [Content-Type=image/png]...
Copying file://thumbnails/Google Cloud Next 24 - Keynote_thumbnail_1.png [Content-Type=image/png]...
/ [3/3 files][  1.8 MiB/  1.8 MiB] 100% Done                                    
Operation completed over 3 objects/1.8 MiB.                                      


### Section Metadata 🧙

In [None]:
def generate_metadata_video_sections(blob_uri: str) -> str:
  video1 = Part.from_uri(
      mime_type="video/mp4",
      uri=blob_uri)
  text1 = """SYSTEM:```You are a skilled video analysis expert. You have a deep understanding of media and can accurately identify key moments in a video. Your task is to analyze the provided video and extract all the highlight clips. For each clip, you need to classify the type of highlight and provide the precise start and end timestamps.```
  INSTRUCTION: ```Please analyze the following video and provide a list of all the highlight clips with their type and timestamps. Also explain the reason why the selection of that particular timestamp has been made. Please format your response as a JSON object with the given structure. Make sure the audio is not truncated while suggesting the clips. Avoid any additional comment or text.```
  OUTPUT:```
  JSON
  {
    "sections": [
      {
        "type": "[highlight type]",
        "start_time": "[mm:ss]",
        "end_time": "[mm:ss]",
        "reason" : ""
      },
      {
        "type":"[highlight type]",
        "start_time": "[mm:ss]",
        "end_time": "[mm:ss]",
        "reason" : ""
      }
    ]
  }```
  Please make sure the timestamps are accurate and reflect the precise start and end of each highlight clip."""


  result_text = generate(prompt=[video1, text1], model = MODEL )

  return result_text


In [None]:
def process_row(video):
    """Processes a single row from the video array, with error handling."""
    try:
        blob_uri = f"gs://{PIPELINE_INPUT_BUCKET}/{video['file_name']}"
        result = generate_metadata_video_sections(blob_uri=blob_uri)
        response_text = re.sub(r"json|```", "", result)
        print(response_text)
        sections_metadata = json.loads(response_text)
        video['sections'] = sections_metadata['sections']
    except Exception as e:
        # Log or print the error for debugging
        print(f"Error processing video sections: {video['file_name']}: {e}")

# Number of worker threads (adjust based on your system and workload)
num_workers = 4  # Or use os.cpu_count() for a reasonable default

with ThreadPoolExecutor(max_workers=num_workers) as executor:
    # Submit tasks to the executor
    futures = [executor.submit(process_row, video) for video in video_list]

    # waiting for threads to complete
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures)):
        pass  # No need to process individual results here

print('Video Sections Metadata Generated')

100%|██████████| 1/1 [01:23<00:00, 83.34s/it]


{
  "sections": [
    {
      "type": "product announcement",
      "start_time": "00:16",
      "end_time": "01:35",
      "reason": "The video showcases various examples of how Google AI is being used today."
    },
    {
      "type": "speaker introduction",
      "start_time": "01:37",
      "end_time": "02:06",
      "reason": "The CEO of Google Cloud, Thomas Kurian, is introduced to the stage."
    },
    {
      "type": "speaker introduction",
      "start_time": "03:26",
      "end_time": "03:35",
      "reason": "The CEO of Google and Alphabet, Sundar Pichai, is introduced on stage."
    },
    {
      "type": "product announcement",
      "start_time": "04:49",
      "end_time": "06:01",
      "reason": "Sundar Pichai talks about Google's AI infrastructure and the introduction of Gemini 1.5 Pro."
    },
    {
      "type": "partnership announcement",
      "start_time": "06:52",
      "end_time": "07:27",
      "reason": "Sundar Pichai announces partnerships with iconic comp




Showing the result of the top video

In [None]:
formatted_json = json.dumps(video_list[0], indent=4, default=str)
print(formatted_json)


{
    "file_name": "Google Cloud Next 24 - Keynote.mp4",
    "size": 614140803,
    "type": "video/mp4",
    "created": "2024-11-14 12:56:10.137000+00:00",
    "updated": "2024-11-14 12:56:10.137000+00:00",
    "summary": {
        "short_summary": "Thomas Kurian, CEO of Google Cloud, opens the Google Cloud Next keynote, emphasizing the company's advancements in AI infrastructure, generative AI tools, and partnerships. The event showcases how AI is transforming businesses and improving daily lives. Sundar Pichai, CEO of Google & Alphabet, highlights the incredible momentum in the cloud business, with a revenue run rate of $36 billion, five times the rate from five years ago. He emphasizes Google's deep investments in AI, particularly Gemini, a large language model with exceptional capabilities. David Solomon, Chairman and CEO of Goldman Sachs, discusses how Goldman Sachs utilizes AI to synthesize complex information efficiently, boosting developer productivity and enhancing client expe

### Section Clipping ✂

Preparing the folders in the notebook

In [None]:
#create splits folder if it doesn't exist
splits_folder = f"content/splits"
if not os.path.exists(splits_folder):
    os.makedirs(splits_folder)


for video in video_list:
  #create the splits folders
  split_folder = f"content/splits/{video['file_name'].split('.')[0]}"
  if not os.path.exists(split_folder):
    os.makedirs(split_folder)


Create sections of the original video. Each thread should launch a video split, update the json object and output the file to the local notebook


In [None]:
import os
from moviepy.editor import *


def process_segment(video, index):
    source_file_name = f"content/{video['file_name']}"
    clip = VideoFileClip(source_file_name)

    video_name = video['file_name'].split('.')[0]
    clip_filename = f"content/splits/{video_name}/{video_name}_split_{index}.mp4"
    audio_filename = f"content/splits/{video_name}/{video_name}_split_{index}.wav"

    section_info = video['sections'][index]

    #handling section for video
    clip_subsection = clip.subclip(section_info["start_time"], section_info["end_time"])
    clip_subsection.write_videofile(clip_filename, verbose=False, logger=None)

    #handling audio split
    audio_clip = clip_subsection.audio
    audio_clip.write_audiofile(audio_filename, codec = 'pcm_s16le', verbose=False, logger=None)

    #update the uri
    gcs_video_uri = f"{OUTPUT_BUCKET}/splits/{video_name}/{video_name}_split_{index}.mp4"
    gcs_audio_uri = f"{OUTPUT_BUCKET}/splits/{video_name}/{video_name}_split_{index}.wav"

    video['sections'][index]['split_video_uri'] = gcs_video_uri
    video['sections'][index]['split_audio_uri'] = gcs_audio_uri



def process_video_segments(video):

  #launching threads to do segment cut for one video
  with ThreadPoolExecutor() as executor:
    # futures = [executor.submit(process_segment, video) for video in video['sections']['highlights']]#TODO: unecessary sections in json object
    if 'sections' in video:
      futures = [executor.submit(process_segment, video, index) for index, row in enumerate(video['sections'])]
      # waiting for threads to complete
      for _ in tqdm.tqdm(as_completed(futures), total=len(futures), desc = f"Processing {video['file_name']}"):
          pass  # No need to process individual results here


for video in video_list:
  process_video_segments(video)


print('Video Sections Metadata Generated')

Processing Google Cloud Next 24 - Keynote.mp4: 100%|██████████| 15/15 [20:14<00:00, 80.96s/it]

Video Sections Metadata Generated





In [None]:
for video in video_list:
  # print(video['file_name'])
  string = json.dumps(video, indent=4, default=str)
  print(string)

Copy the files from the local folder to the GCS Bucket

In [None]:
# prompt: Move all files in content/splits to a gcs bucket using gsutils
!gsutil -m cp -r content/splits {OUTPUT_BUCKET}


Copying file://content/splits/Google Cloud Next 24 - Keynote/Google Cloud Next 24 - Keynote_split_8.mp4 [Content-Type=video/mp4]...
/ [0/30 files][    0.0 B/353.1 MiB]   0% Done                                   Copying file://content/splits/Google Cloud Next 24 - Keynote/Google Cloud Next 24 - Keynote_split_7.mp4 [Content-Type=video/mp4]...
/ [0/30 files][    0.0 B/353.1 MiB]   0% Done                                   Copying file://content/splits/Google Cloud Next 24 - Keynote/Google Cloud Next 24 - Keynote_split_10.mp4 [Content-Type=video/mp4]...
Copying file://content/splits/Google Cloud Next 24 - Keynote/Google Cloud Next 24 - Keynote_split_0.mp4 [Content-Type=video/mp4]...
/ [0/30 files][    0.0 B/353.1 MiB]   0% Done                                   Copying file://content/splits/Google Cloud Next 24 - Keynote/Google Cloud Next 24 - Keynote_split_7.wav [Content-Type=audio/x-wav]...
/ [0/30 files][    0.0 B/353.1 MiB]   0% Done                                   / [0/30 files

### Scene description

In [None]:
def generate_metadata_video_sections_descriptions(blob_uri: str) -> str:
  video1 = Part.from_uri(
      mime_type="video/mp4",
      uri=blob_uri)
  text1 = """SYSTEM:```You are a skilled video analysis expert. You have a deep understanding of media and can accurately identify key moments in a video. Your task is to analyze the provided video and extract all the highlight clips. For each clip, you need to classify the type of highlight and provide the precise start and end timestamps.```
  INSTRUCTION: ```Generate a scene description for somebody that is visually impaired.
    Format it as something that will be dictated.
    In each scene capture people's movement and actions.
    [description] should be 1-2 sentences long

  Output should be:
  ```
  OUTPUT:```
  JSON
  {
    "descriptions" :
      [
		{ "start_timestamp": "[timecode to start the description]", "description": [short description of the scene]},
		{ "start_timestamp": "[timecode to start the description]", "description": [short description of the scene]},
	]
  }```
  """


  result_text = generate(prompt=[video1, text1], model = MODEL )

  return result_text

In [None]:
for video in video_list:
  if 'sections' in video:
    for index, segment in enumerate(video['sections']):
      video_name = video['file_name'].split('.')[0]
      segment['split_transcription_uri'] = f"{OUTPUT_BUCKET}/splits/{video_name}/{video_name}_split_{index}_transcript.json"


### Transcription 📓


Ccreate the batch long running request and wait for the results

In [None]:

def process_segment_transcription(element, client, recognizer):
    long_audio_uri = element['split_audio_uri']
    directory_path = os.path.dirname(long_audio_uri)

    long_audio_config = cloud_speech.RecognitionConfig(
      features=cloud_speech.RecognitionFeatures(
          enable_automatic_punctuation=True, enable_word_time_offsets=True
      ),
      auto_decoding_config={},
    )


    long_audio_request = cloud_speech.BatchRecognizeRequest(
        recognizer=recognizer.name,
        recognition_output_config={
            "gcs_output_config": {"uri": directory_path}
        },
        files=[{"config": long_audio_config, "uri": long_audio_uri}],
    )

    long_audio_operation = client.batch_recognize(request=long_audio_request)
    return long_audio_operation

futures = []  # Create an empty list to store the results

def process_video_segments_transcription(video):
  with ThreadPoolExecutor() as executor:
    if 'sections' in video:
      futures = [executor.submit(process_segment_transcription, section, client, recognizer) for index, section in enumerate(video['sections'])]


      # # waiting for threads to complete
      # for _ in tqdm.tqdm(as_completed(futures), total=len(futures), desc=f"Processing {video['file_name']}"):
      #     pass  # No need to process individual results here

      for future in tqdm.tqdm(as_completed(futures), total=len(futures), desc=f"Transcribing segments {video['file_name']}"):
        operation = future.result()  # Get the Operation object
        # Wait for the operation to complete
        response = operation.result()

for video in video_list:
  process_video_segments_transcription(video)

print(f"Transcriptions done.")

Transcribing segments Google Cloud Next 24 - Keynote.mp4: 100%|██████████| 15/15 [00:13<00:00,  1.15it/s]

Transcriptions done.





remove the long running operations ids from the transcriptions

In [None]:
rename_transcription_files(OUTPUT_BUCKET)

Renamed splits/Google Cloud Next 24 - Keynote/Google Cloud Next 24 - Keynote_split_0_transcript_6738490d-0000-25f3-bcbc-30fd38137ee4.json to splits/Google Cloud Next 24 - Keynote/Google Cloud Next 24 - Keynote_split_0_transcript.json
Renamed splits/Google Cloud Next 24 - Keynote/Google Cloud Next 24 - Keynote_split_10_transcript_67380441-0000-2808-a9f9-f403043cbb2c.json to splits/Google Cloud Next 24 - Keynote/Google Cloud Next 24 - Keynote_split_10_transcript.json
Renamed splits/Google Cloud Next 24 - Keynote/Google Cloud Next 24 - Keynote_split_11_transcript_67380443-0000-2808-a9f9-f403043cbb2c.json to splits/Google Cloud Next 24 - Keynote/Google Cloud Next 24 - Keynote_split_11_transcript.json
Renamed splits/Google Cloud Next 24 - Keynote/Google Cloud Next 24 - Keynote_split_12_transcript_67381c30-0000-2b15-accc-30fd3816ef2c.json to splits/Google Cloud Next 24 - Keynote/Google Cloud Next 24 - Keynote_split_12_transcript.json
Renamed splits/Google Cloud Next 24 - Keynote/Google Cloud

Add the transcriptions information to the overall object.



1.   Add the transcription_uri
2.   put the accepted transcript in the json object



In [None]:
for video in video_list:
  if 'sections' in video:
    for index, segment in enumerate(video['sections']):
      video_name = video['file_name'].split('.')[0]
      segment['split_transcription_uri'] = f"{OUTPUT_BUCKET}/splits/{video_name}/{video_name}_split_{index}_transcript.json"
      #read transcription into memory
      file_path = segment['split_transcription_uri'].replace(f"{OUTPUT_BUCKET}/", "")

      transcription_results = read_json_from_gcs(OUTPUT_BUCKET, file_path)
      final_transcription = ""

      # Extract and concatenate transcripts
      transcripts = []
      for result in transcription_results['results']:
        if 'alternatives' in result:
          for alternative in result['alternatives']:
              transcripts.append(alternative['transcript'])
      if transcripts:
        concatenated_transcript = ' '.join(transcripts)
        segment['transcription'] = concatenated_transcript

In [None]:
for video in video_list[:1]:
  string = json.dumps(video, indent=4, default=str)
  print(string)

{
    "file_name": "Google Cloud Next 24 - Keynote.mp4",
    "size": 614140803,
    "type": "video/mp4",
    "created": "2024-11-14 12:56:10.137000+00:00",
    "updated": "2024-11-14 12:56:10.137000+00:00",
    "summary": {
        "short_summary": "Thomas Kurian, CEO of Google Cloud, opens the Google Cloud Next keynote, emphasizing the company's advancements in AI infrastructure, generative AI tools, and partnerships. The event showcases how AI is transforming businesses and improving daily lives. Sundar Pichai, CEO of Google & Alphabet, highlights the incredible momentum in the cloud business, with a revenue run rate of $36 billion, five times the rate from five years ago. He emphasizes Google's deep investments in AI, particularly Gemini, a large language model with exceptional capabilities. David Solomon, Chairman and CEO of Goldman Sachs, discusses how Goldman Sachs utilizes AI to synthesize complex information efficiently, boosting developer productivity and enhancing client expe

## News Articles

In [None]:
# load all news articles
articles_list = []
for file in file_list:
  if file['type'] == 'text/html':
    articles_list.append(file)
articles_list

### Summary metadata 🧙

In [None]:
def generate_article_metadata(prompt : list, model :str = MODEL) -> str:
  vertexai.init(project=PROJECT_ID, location=LOCATION)

  model = GenerativeModel(
    model,
  )

  generation_config = {
      "max_output_tokens": 8192,
      "temperature": 0.6,
      "top_p": 0.95,
  }

  safety_settings = {
      generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
  }

  responses = model.generate_content(
      prompt,
      generation_config=generation_config,
      safety_settings=safety_settings,
      stream=False,
  )

  return responses.text



In [None]:
#create a summary of the articles
def generate_metadata_articles_overview(blob_uri: str) -> str:
  article = Part.from_uri(
      mime_type="text/html",
      uri=blob_uri)

  text1 = """SYSTEM:```You are a skilled news analysis expert. You have a deep understanding of media. Your task is to analyze the provided video and extract key information.```
  INSTRUCTION: ```
   You are a content creation assistant.
   You provide information for journalists to get the information they need from articles.

   Summary the following [news_content] into three bullet points.

   [news_content]:
  ```
  OUTPUT:```=
  JSON
  {
    "short_summary": "[One paragraph summary of the content]",
    "long_summary": "[two-three paragraph summary of the content]",
    "subject_topics" :
      [     { "topic": "[topic]"}, { "topic": "[topic]"} ]
  }```
  """

  result_text = generate_article_metadata(prompt=[text1, article], model = MODEL )
  # result_text = generate(prompt=[video1, text1], model = MODEL)

  return result_text

In [None]:
#create a summary of the articles
def generate_metadata_articles_headline(blob_uri: str) -> str:
  article = Part.from_uri(
      mime_type="text/html",
      uri=blob_uri)

  text1 = """SYSTEM:```You are a skilled news analysis expert. You have a deep understanding of media. Your task is to analyze the provided video and extract key information.```
  INSTRUCTION: ```
  You are a content creation assistant.
  You provide information for journalists to get the information they need from articles.
  generate a relevant headline for [news_content].
  [news_content]:
  ```
  OUTPUT:```=

    "headline": "[a title for the news article]"
  ```
  """

  result_text = generate_article_metadata(prompt=[text1, article], model = MODEL )
  # result_text = generate(prompt=[video1, text1], model = MODEL)

  return result_text

Going through all articles and create their metadata

In [None]:
def process_row_article_summary(article):
    """Processes a single row from the articles array."""
    try:
        blob_uri = f"gs://{INPUT_BUCKET}/{article['file_name']}"
        result = generate_metadata_articles_overview(blob_uri=blob_uri)
        response_text = re.sub(r"json|```", "", result)
        summary = json.loads(response_text)
        article['summary'] = summary

        # headlines
        result = generate_metadata_articles_headline(blob_uri=blob_uri)
        response_text = re.sub(r"json|```", "", result)
        headline = json.loads(response_text)
        article['headline'] = headline
    except Exception as e:
        print(f"Error processing article {article['file_name']}: {e}")


# Number of worker threads (adjust based on your system and workload)
num_workers = 4  # Or use os.cpu_count() for a reasonable default

with ThreadPoolExecutor(max_workers=num_workers) as executor:
    # Submit tasks to the executor
    futures = [executor.submit(process_row_article_summary, article) for article in articles_list]

    # waiting for threads to complete
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures)):
        pass  # No need to process individual results here

print('News Articles Summary Metadata Generated')

### Sentiment Analysis

In [None]:
#create a summary of the articles
def generate_metadata_articles_sentiment(blob_uri: str) -> str:
  article = Part.from_uri(
      mime_type="text/html",
      uri=blob_uri)

  text1 = """SYSTEM:```You are a skilled news analysis expert. You have a deep understanding of media. Your task is to analyze the provided video and extract key information.```
  INSTRUCTION: ```
  You are a content creation assistant.
  You provide information for journalists.
  Do sentiment analysis on the different topics in the article [news_content].

  [news_content]:
  ```
  OUTPUT:```=
  JSON
  {
    "sentiment_analysis" :
      [
        { "topic": "[topic]",
          "score": "[Rating how negative the topic is perceived or how positive. score should be -1 and 1]"
          "magnitude": "[Rating the magnitude of the sentiment towards this topic. magnitude should be 0 and 1]"
        },
        { "topic": "[topic]",
          "score": "[Rating how negative the topic is perceived or how positive. score should be -1 and 1]"
          "magnitude": "[Rating the magnitude of the sentiment towards this topic. magnitude should be 0 and 1]"
        }
  }
  ```
  """

  result_text = generate_article_metadata(prompt=[text1, article], model = MODEL )

  return result_text

In [None]:
def process_row_article_summary(article):
    """Processes a single row from the articles array."""
    try:
        blob_uri = f"gs://{INPUT_BUCKET}/{article['file_name']}"
        result = generate_metadata_articles_sentiment(blob_uri=blob_uri)
        response_text = re.sub(r"json|```", "", result)
        summary = json.loads(response_text)
        article['sentiment_analysis'] = summary['sentiment_analysis']

    except Exception as e:
        print(f"Error processing article {article['file_name']}: {e}")


# Number of worker threads (adjust based on your system and workload)
num_workers = 4  # Or use os.cpu_count() for a reasonable default

with ThreadPoolExecutor(max_workers=num_workers) as executor:
    # Submit tasks to the executor
    futures = [executor.submit(process_row_article_summary, article) for article in articles_list]

    # waiting for threads to complete
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures)):
        pass  # No need to process individual results here

print('News Articles Summary Metadata Generated')

In [None]:
for article in articles_list[:1]:
  print(article['sentiment_analysis'])

### Image Generation
In this section we mimic how generative ai could be applied to create the header images of the articles that we are processing

First we will ask Gemini to create a suitable prompt for the given news article

In [None]:
#create a summary of the articles
def generate_metadata_articles_image_prompt(blob_uri: str) -> str:
  article = Part.from_uri(
      mime_type="text/html",
      uri=blob_uri)

  text1 = """SYSTEM:```You are a skilled news analysis expert. You have a deep understanding of media.```
  INSTRUCTION: ```
  You are a content creation assistant.
  You understand the content that is in news_content and provide a suitable prompt for image generation.
  Generate up to two prompts

  [news_content]:
  ```
  OUTPUT:```=
  JSON
  {
    "image_generation_prompts" :
      [
        {
          "prompt_number" : "[Number of the prompt suggest. Goes from 0 onwards]",
          "prompt": "[instructions for the image generation model]",
          "reasoning": "Description of the decision behind this image generation prompt"
          "style": "[Best artistic style for this prompt]"
        },
        {
          "prompt_number" : "[Number of the prompt suggest. Goes from 0 onwards]",
          "prompt": "[instructions for the image generation model]",
          "reasoning": "Description of the decision behind this image generation prompt"
          "style": "[Best artistic style for this prompt]"
        }
  }
  ```
  """

  result_text = generate_article_metadata(prompt=[text1, article], model = MODEL )

  return result_text

In [None]:
def process_row_article_image_prompt(article):
    """Processes a single row from the articles array."""
    try:
        blob_uri = f"gs://{INPUT_BUCKET}/{article['file_name']}"
        result = generate_metadata_articles_image_prompt(blob_uri=blob_uri)
        response_text = re.sub(r"json|```", "", result)
        summary = json.loads(response_text)
        article['image_generation_prompts'] = summary['image_generation_prompts']

    except Exception as e:
        print(f"Error processing article {article['file_name']}: {e}")


# Number of worker threads (adjust based on your system and workload)
num_workers = 4  # Or use os.cpu_count() for a reasonable default

with ThreadPoolExecutor(max_workers=num_workers) as executor:
    # Submit tasks to the executor
    futures = [executor.submit(process_row_article_image_prompt, article) for article in articles_list]

    # waiting for threads to complete
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures)):
        pass  # No need to process individual results here

print('News Articles Image generation promtp Generated')

Show the example of one article and it's respective prompts

In [None]:
articles_list[0]['image_generation_prompts']

Generate one image per prompt

In [None]:
#image generation imports
from vertexai.preview.vision_models import ImageGenerationModel
# start the model

def generate_image(prompt : str, style : str):
  generation_model = ImageGenerationModel.from_pretrained("imagen-3.0-generate-001")

  final_prompt = f"""{prompt}. The style should be: {style}"""

  response = generation_model.generate_images(
      prompt=final_prompt,
      number_of_images=1,
  )

  return response

In [None]:
for article in articles_list:
  article['name'] = article['file_name'].split('.')[0]


In [None]:
def process_row_prompt_image(prompt, article):
    """Processes a single row from the articles array."""
    try:
        result = generate_image(prompt=prompt['prompt'], style=prompt['style'])
        if(result.images):
          file_name = f"articles/images/{article['name']}_{prompt['prompt_number']}.jpeg"
          prompt['image_path'] = file_name
          result.images[0].save(location=file_name)
    except Exception as e:
        print(f"Error generating image prompt {prompt['prompt']}: {e}")

#create thumbnails folder if it doesn't exist
splits_folder = f"articles/images/"
if not os.path.exists(splits_folder):
    os.makedirs(splits_folder)

# Number of worker threads (adjust based on your system and workload)
num_workers = 4  # Or use os.cpu_count() for a reasonable default

with ThreadPoolExecutor(max_workers=num_workers) as executor:
    # Submit tasks to the executor
    for article in articles_list:
      futures = [executor.submit(process_row_prompt_image, prompt, article) for prompt in article['image_generation_prompts']]

    # waiting for threads to complete
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures)):
        pass  # No need to process individual results here

print('News Articles Images Generated')

In [None]:
for article in articles_list:
  for prompt in article['image_generation_prompts']:
    file_name = f"articles/images/{article['name']}_{prompt['prompt_number']}.jpeg"
    prompt['image_path'] = file_name

In [None]:
# prompt: Move all files in content/splits to a gcs bucket using gsutils
!gsutil -m cp -r articles/images {OUTPUT_BUCKET}/articles/images

In [None]:
# For each of the prompts generate the image and add it in the correct json object
# for article in articles_list:
#   for prompt in article['image_generation_prompts']:
#     result = generate_image(prompt=prompt['prompt'], style=prompt['style'])
#     prompt['image'] = result.images[0]



In [None]:

# article = articles_list[0]
# file_name = f"articles/images/{article['name']}_0.jpeg"
# articles_list[0]['image_generation_prompts'][0]['image'].save(location=file_name)

### Source system metadata

This part shouldn't be required in a production environment but we are mimicking data that would come from the source systems.

In [None]:
# create metadata that could be inferred by the source systems
# this is completely custom to your naming conventions. In this case we assumed that file names are multiple words separated by hyphens
for article in tqdm.tqdm(articles_list, desc="Creating additional articles metadata"):
  name = article['file_name'].split('.')[:-1]
  name = name[0].split('-')
  name = ' '.join(name).title()
  article['name'] = name
  article['content'] = read_string_from_gcs(bucket_name= INPUT_BUCKET, file_name= article['file_name'])

# Storing Metadata 💾

In [None]:
# starting the firebase db
# Initialize Firebase Admin SDK
firebase_admin.initialize_app()


In [None]:
# Get a reference to the Firestore database
db = firestore.client()

In [None]:
for audio in tqdm.tqdm(audio_list, desc="Storing audio metada"):
  doc_ref = db.collection(AUDIO_COLLECTION).document(audio['name'])
  doc_ref.set(audio)

In [None]:
for image in tqdm.tqdm(image_list, desc="Storing images metada"):
  doc_ref = db.collection(IMAGE_COLLECTION).document(image['name'])
  doc_ref.set(image)

In [None]:
# Storing the video information
for video in tqdm.tqdm(video_list, desc="Storing videos metada"):
  if 'sections' in video:
    video_name = video['file_name'].split('.')[0]
    doc_ref = db.collection(VIDEO_COLLECTION).document(video_name)

    # Insert the JSON object into the document
    doc_ref.set(video)

Storing videos metada: 100%|██████████| 1/1 [00:00<00:00,  7.48it/s]


In [None]:
for article in tqdm.tqdm(articles_list, desc="Storing articles metada"):
  doc_ref = db.collection(ARTICLE_COLLECTION).document(article['file_name'])
  doc_ref.set(article)

In [None]:
!gsutil -m mv -r gs://{PIPELINE_INPUT_BUCKET}/* gs://{INPUT_BUCKET}/

# Embeddings


## Embeddings Creation

In [None]:
vertexai.init(project=PROJECT_ID, location=LOCATION)

In [None]:
model = TextEmbeddingModel.from_pretrained("text-embedding-004")


In [None]:
def get_embeddings_wrapper(texts):
    BATCH_SIZE = 5
    embs = []
    for i in tqdm.tqdm(range(0, len(texts), BATCH_SIZE)):
        time.sleep(1)  # to avoid the quota error
        result = model.get_embeddings(texts[i : i + BATCH_SIZE])
        embs = embs + [e.values for e in result]
    return embs

In [None]:
def write_embeddings_to_jsonl(embeddings, filename):
  """Writes an array of embeddings with IDs to a JSONL file.

  Args:
    embeddings: An array of dictionaries, where each dictionary has 'id' and 'embedding' keys.
    filename: The name of the JSONL file to write to.
  """

  with open(filename, 'w') as f:
    for embedding in embeddings:
      json.dump(embedding, f)  # Write each embedding as a JSON object
      f.write('\n')  # Add a newline character after each object


### Article embeddings

Load the firestore entries into a pandas dataframe.

In [None]:
# Get all documents from the collection
docs = db.collection(ARTICLE_COLLECTION).stream()

# Convert Firestore documents to a list of dictionaries
data = []
for doc in docs:
    data.append(doc.to_dict())

df_articles = pd.DataFrame(data)

Document data: {'file_name': 'Make up series YT.mp4', 'type': 'video/mp4', 'sections': [{'start_time': '0:00', 'type': 'introduction', 'split_transcription_uri': 'gs://editorial-solaris-demo-output/splits/Make up series YT/Make up series YT_split_0_transcript.json', 'reason': "The video starts with a clapperboard introducing the YouTube series 'Our Makeup' and then transitions to a shot of the two hosts.", 'split_video_uri': 'gs://editorial-solaris-demo-output/splits/Make up series YT/Make up series YT_split_0.mp4', 'split_audio_uri': 'gs://editorial-solaris-demo-output/splits/Make up series YT/Make up series YT_split_0.wav', 'end_time': '0:03'}, {'split_video_uri': 'gs://editorial-solaris-demo-output/splits/Make up series YT/Make up series YT_split_1.mp4', 'type': 'self introduction', 'reason': 'Both the hosts introduce themselves and their background.', 'end_time': '0:29', 'split_transcription_uri': 'gs://editorial-solaris-demo-output/splits/Make up series YT/Make up series YT_split_

Create a column that will have the content that will be embedded

In [None]:
# Function to create embedding content
def create_embedding_content_article(row):
  json_obj = {
      'content' : row['content'].decode(encoding='utf-8'),
      'summary': row['summary']['short_summary'],
      'subject_topics' :row['summary']['subject_topics'] }
  return json.dumps(json_obj)

# create the column that will be embedded
df_articles['embedding_content'] = df_articles.apply(create_embedding_content_article, axis=1)
# Create and id column
df_articles['id'] = 'article:'+df_articles['file_name']

Generate the embeddings, assign and id column to the dataframe

In [None]:
df_articles = df_articles.assign(embedding=get_embeddings_wrapper(list(df_articles.embedding_content)))

100%|██████████| 3/3 [00:04<00:00,  1.53s/it]


### Image embeddings

Load the firestore entries into a pandas dataframe.

In [None]:
# Get all documents from the collection
docs = db.collection(IMAGE_COLLECTION).stream()

# Convert Firestore documents to a list of dictionaries
data = []
for doc in docs:
    data.append(doc.to_dict())

df_images = pd.DataFrame(data)

In [None]:
df_images = df_images.dropna(subset=['metadata'])

In [None]:
# Function to create embedding content
def create_embedding_content_image(row):
  json_obj = {
      'name' : row['name'],
      'description' : row['metadata']['description'],
      'location' :  row['metadata']['location'] if 'location' in row['metadata'] else None,
      'photo_type' : row['metadata']['photo_type'],
      'description': row['metadata']['description'],
      'subject_topics' :row['metadata']['subject_topics'] }
  return json.dumps(json_obj)


# Apply the function to create the new column
df_images['embedding_content'] = df_images.apply(create_embedding_content_image, axis=1)
# Create and id column
df_images['id'] = 'image:'+df_images['name']

Generate the embeddings, assign and id column to the dataframe

In [None]:
df_images = df_images.assign(embedding=get_embeddings_wrapper(list(df_images.embedding_content)))

100%|██████████| 2/2 [00:02<00:00,  1.13s/it]


### Audio embeddings

Load the firestore entries into a pandas dataframe.

In [None]:
# Get all documents from the collection
docs = db.collection(AUDIO_COLLECTION).stream()

# Convert Firestore documents to a list of dictionaries
data = []
for doc in docs:
    data.append(doc.to_dict())

df_audio = pd.DataFrame(data)

In [None]:
#TODO - Load transcription as well
# Function to create embedding content
def create_embedding_content_audio(row):
  json_obj = {
      'name' : row['name'],
      'show_name' :  row['metadata']['show_name'],
      'summary': row['metadata']['short_summary'],
      'subject_topics' :row['metadata']['subject_topics'] }
  return json.dumps(json_obj)

# Apply the function to create the new column
df_audio['embedding_content'] = df_audio.apply(create_embedding_content_audio, axis=1)
# Create and id column
df_audio['id'] = 'audio:'+df_audio['name']

In [None]:
df_audio = df_audio.assign(embedding=get_embeddings_wrapper(list(df_audio.embedding_content)))

100%|██████████| 1/1 [00:01<00:00,  1.18s/it]


### Video embeddings

Load the firestore entries into a pandas dataframe.

In [None]:
# Get all documents from the collection
docs = db.collection(VIDEO_COLLECTION).stream()

# Convert Firestore documents to a list of dictionaries
data = []
for doc in docs:
    data.append(doc.to_dict())

df_videos = pd.DataFrame(data)

In [None]:
# Function to create embedding content for video object
def create_embedding_content_video(row):
  json_obj = {
      'name' : row['name'],
      'summary': row['summary']['short_summary'],
      'long_summary': row['summary']['long_summary'],
      'subject_topics' :row['summary']['subject_topics'] }
  return json.dumps(json_obj)

# Apply the function to create the new column
df_videos['embedding_content'] = df_videos.apply(create_embedding_content_video, axis=1)
# Create and id column
df_videos['id'] = 'video:'+df_videos['name']

In [None]:
df_videos = df_videos.assign(embedding=get_embeddings_wrapper(list(df_videos.embedding_content)))

100%|██████████| 2/2 [00:02<00:00,  1.21s/it]


For videos there is an additional layer we can add - the key moments. By embeddings this content, you'll be able to pinpoint which actual part of the video is relevant to the search that the user does.

In [None]:
# Get all documents from the collection
docs = db.collection(VIDEO_COLLECTION).stream()

# load the previous data into a new dataframe by flatening it to the key segments / sections level
segments = []
for doc in docs:
  video = doc.to_dict()
  if 'sections' in video:
    section_num =0
    for section in video['sections']:
      section['id'] = f"video:{section_num}:{video['name']}"
      segments.append(section)
      section_num +=1

df_segments = pd.DataFrame(segments)

In [None]:
# Function to create embedding content for video object
def create_embedding_content_segment(row):
  json_obj = {
      'section' : row['reason'],
      'summary' : row['type'],
      'transcription': row['transcription']
      }
  return json.dumps(json_obj)

# Apply the function to create the new column
df_segments['embedding_content'] = df_segments.apply(create_embedding_content_segment, axis=1)

In [None]:
df_segments = df_segments.assign(embedding=get_embeddings_wrapper(list(df_segments.embedding_content)))

100%|██████████| 12/12 [00:13<00:00,  1.11s/it]


### Merge embeddings

In [None]:
merged_df = pd.concat([df_articles, df_images, df_audio, df_videos, df_segments], ignore_index=True)

In [None]:
# save id and embedding as a json file
jsonl_string = merged_df[["id", "embedding"]].to_json(orient="records", lines=True)
with open("embeddings.json", "w") as f:
    f.write(jsonl_string)

In [None]:
 !gsutil cp embeddings.json gs://{EMBEDDINGS_BUCKET}

Copying file://embeddings.json [Content-Type=application/json]...
/ [1 files][908.7 KiB/908.7 KiB]                                                
Operation completed over 1 objects/908.7 KiB.                                    


## Vector Search

Setup vector search index, endpoint and deploy the index.

In [None]:

# init the aiplatform package
from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=LOCATION)

VS_ITERATION_SUFFIX = '008'
VS_INDEX_NAME = f"embeddings-{VS_ITERATION_SUFFIX}"
VS_INDEX_ENDPOINT_NAME = f"embeddings-{VS_ITERATION_SUFFIX}-index-endpoint"
VS_DEPLOYED_INDEX_ID = f"embeddings_deployed_{VS_ITERATION_SUFFIX}"

In [None]:
# create index
my_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
    display_name=VS_INDEX_NAME,
    contents_delta_uri=f"gs://{EMBEDDINGS_BUCKET}",
    dimensions=768,
    approximate_neighbors_count=20,
    distance_measure_type="DOT_PRODUCT_DISTANCE",
)

INFO:google.cloud.aiplatform.matching_engine.matching_engine_index:Creating MatchingEngineIndex
INFO:google.cloud.aiplatform.matching_engine.matching_engine_index:Create MatchingEngineIndex backing LRO: projects/837701957704/locations/us-central1/indexes/7389874824887140352/operations/786722181949161472
INFO:google.cloud.aiplatform.matching_engine.matching_engine_index:MatchingEngineIndex created. Resource name: projects/837701957704/locations/us-central1/indexes/7389874824887140352
INFO:google.cloud.aiplatform.matching_engine.matching_engine_index:To use this MatchingEngineIndex in another session:
INFO:google.cloud.aiplatform.matching_engine.matching_engine_index:index = aiplatform.MatchingEngineIndex('projects/837701957704/locations/us-central1/indexes/7389874824887140352')


In [None]:
# create IndexEndpoint
my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
    display_name=VS_INDEX_ENDPOINT_NAME,
    public_endpoint_enabled=True,
)

INFO:google.cloud.aiplatform.matching_engine.matching_engine_index_endpoint:Creating MatchingEngineIndexEndpoint
INFO:google.cloud.aiplatform.matching_engine.matching_engine_index_endpoint:Create MatchingEngineIndexEndpoint backing LRO: projects/837701957704/locations/us-central1/indexEndpoints/5901100896506347520/operations/4856850345185247232
INFO:google.cloud.aiplatform.matching_engine.matching_engine_index_endpoint:MatchingEngineIndexEndpoint created. Resource name: projects/837701957704/locations/us-central1/indexEndpoints/5901100896506347520
INFO:google.cloud.aiplatform.matching_engine.matching_engine_index_endpoint:To use this MatchingEngineIndexEndpoint in another session:
INFO:google.cloud.aiplatform.matching_engine.matching_engine_index_endpoint:index_endpoint = aiplatform.MatchingEngineIndexEndpoint('projects/837701957704/locations/us-central1/indexEndpoints/5901100896506347520')


In [None]:
# deploy the Index to the Index Endpoint
my_index_endpoint.deploy_index(index=my_index, deployed_index_id=VS_DEPLOYED_INDEX_ID)


INFO:google.cloud.aiplatform.matching_engine.matching_engine_index_endpoint:Deploying index MatchingEngineIndexEndpoint index_endpoint: projects/837701957704/locations/us-central1/indexEndpoints/5901100896506347520
INFO:google.cloud.aiplatform.matching_engine.matching_engine_index_endpoint:Deploy index MatchingEngineIndexEndpoint index_endpoint backing LRO: projects/837701957704/locations/us-central1/indexEndpoints/5901100896506347520/operations/1407093030619447296
INFO:google.cloud.aiplatform.matching_engine.matching_engine_index_endpoint:MatchingEngineIndexEndpoint index_endpoint Deployed index. Resource name: projects/837701957704/locations/us-central1/indexEndpoints/5901100896506347520


<google.cloud.aiplatform.matching_engine.matching_engine_index_endpoint.MatchingEngineIndexEndpoint object at 0x7ef662f92740> 
resource name: projects/837701957704/locations/us-central1/indexEndpoints/5901100896506347520

## Test Index

# Appendix


## ad-hoc


In [None]:
# Get all documents from the collection
docs = db.collection(AUDIO_COLLECTION).stream()

# Convert Firestore documents to a list of dictionaries
audio_list = []
for doc in docs:
  entry = doc.to_dict()
  entry['gcs_uri'] = entry['gcs_uri'].replace('gs://editorial-solaris-demo-input/processed/','gs://editorial-solaris-demo-input/')
  audio_list.append(entry)

for audio in tqdm.tqdm(audio_list, desc="Storing audio metada"):
  doc_ref = db.collection(AUDIO_COLLECTION).document(audio['name'])
  doc_ref.set(audio)

## Others

In [None]:
from vertexai.language_models import TextEmbeddingModel

def generate_embeddings(text):
    model = TextEmbeddingModel.from_pretrained("text-embedding-004")
    embeddings = model.get_embeddings([text])
    embedding = embeddings[0].values
    return embedding

In [None]:
def prepare_embeddings(doc, selected_properties):
  """
  Prepares embeddings for a document with specified properties.

  Args:
    doc: The document object (dictionary or similar).
    selected_properties: A list of property names to include in the embedding.

  Returns:
    the embedding object
  """

  selected_data = {prop: doc[prop] for prop in selected_properties if prop in doc}
  embedding_content = json.dumps(selected_data)
  return  generate_embeddings(embedding_content)


In [None]:
# Get all documents from the collection
docs = db.collection(ARTICLE_COLLECTION).stream()

# Create an empty list to store the articles' embeddings
articles_embeddings = []
selected_properties = [ 'headline']

# Iterate over the documents and append them to the list
for doc in tqdm.tqdm(docs, desc="Generating embeddings"):  #
  article = doc.to_dict()
  id = 'article:'+doc.id
  article['content'] = article['content'].decode(encoding='utf-8')

  articles_embeddings.append(f"{{'id':{id},'embedding':{prepare_embeddings(article, selected_properties)}}} ")

write the file to a gcs bucket

In [None]:
def write_embeddings_to_jsonl(embeddings, filename):
  """Writes an array of embeddings with IDs to a JSONL file.

  Args:
    embeddings: An array of dictionaries, where each dictionary has 'id' and 'embedding' keys.
    filename: The name of the JSONL file to write to.
  """

  with open(filename, 'w') as f:
    for embedding in embeddings:
      json.dump(embedding, f)  # Write each embedding as a JSON object
      f.write('\n')  # Add a newline character after each object


In [None]:
#create thumbnails folder if it doesn't exist
splits_folder = f"embeddings/"
if not os.path.exists(splits_folder):
    os.makedirs(splits_folder)

write_embeddings_to_jsonl(articles_embeddings, 'embeddings/articles_embeddings.jsonl')

Moving the JSONL files from the colab environment to the designated bucket

In [None]:
!gsutil cp embeddings/articles_embeddings.jsonl gs://{EMBEDDINGS_BUCKET}

### Creating Matching Index

In [None]:
VS_ITERATION_SUFFIX = '003B'
VS_INDEX_NAME = f"embeddings-{VS_ITERATION_SUFFIX}"
VS_INDEX_ENDPOINT_NAME = f"embeddings-{VS_ITERATION_SUFFIX}-index-endpoint"
VS_DEPLOYED_INDEX_ID = f"embeddings_deployed_{VS_ITERATION_SUFFIX}"

In [None]:
# init the aiplatform package
from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=LOCATION)

# create index
my_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
    display_name=VS_INDEX_NAME,
    contents_delta_uri=f"gs://{EMBEDDINGS_BUCKET}/articles_embeddings.jsonl",
    dimensions=768,
    approximate_neighbors_count=20,
    distance_measure_type="DOT_PRODUCT_DISTANCE",


)

In [None]:
# create IndexEndpoint
my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
    display_name=VS_INDEX_ENDPOINT_NAME, public_endpoint_enabled=True
)



In [None]:
# deploy the Index to the Index Endpoint
my_index_endpoint.deploy_index(index=my_index, deployed_index_id=VS_DEPLOYED_INDEX_ID)



Running a sample query

In [None]:
query_emb = get_embeddings_wrapper( ['diversity and inclusion'])
# print(query_emb)


In [None]:
# run query
response = my_index_endpoint.find_neighbors(
    deployed_index_id=VS_DEPLOYED_INDEX_ID, queries=query_emb, num_neighbors=10
)

print(response)

# show the results
for idx, neighbor in enumerate(response[0]):
    print(f"{neighbor.distance:.2f} {neighbor.id}")


In [None]:
my_index.to_dict()

In [None]:
DEPLOYED_INDEX_ID

In [None]:

test_embeddings = get_embeddings_wrapper(["Growth in cloud"])

In [None]:
# Test query
response = my_index_endpoint.find_neighbors(
    deployed_index_id=VS_DEPLOYED_INDEX_ID,
    queries=test_embeddings,
    num_neighbors=20,
)

print(response)

# show the result
import numpy as np

for idx, neighbor in enumerate(response[0]):
    id = str(neighbor.id)
    similar = df.query("id == @id", engine="python")
    print(f"{neighbor.distance:.4f} {similar.file_name.values[0]}")