In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Async, Concurrency, and Batching: Feature & Vector Engineering



### Install Dependencies


In [None]:
! pip3 install --upgrade --user --quiet google-cloud-aiplatform
! pip3 install --upgrade --user --quiet PyPDF2

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/5.2 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/5.2 MB[0m [31m60.6 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m5.2/5.2 MB[0m [31m91.0 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.2/5.2 MB[0m [31m54.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m232.6/232.6 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[?25h

### Restart runtime

To use the newly installed packages in this Jupyter runtime, you must restart the runtime. You can do this by running the cell below, which restarts the current kernel.

In [None]:
import sys

if "google.colab" in sys.modules:
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Please wait until it is finished before continuing to the next step. ⚠️</b>
</div>


### Authenticate your notebook environment (Colab only)

If you are running this notebook on Google Colab, run the cell below to authenticate your environment.


In [1]:
import sys

if "google.colab" in sys.modules:
    from google.colab import auth

    auth.authenticate_user()

### Set Google Cloud project information and initialize Vertex AI SDK

To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).

Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [2]:
# Define project information
PROJECT_ID = ""  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}
BUCKET_NAME = "mlops-for-genai" # @param {type:"string"}
# Initialize Vertex AI
import vertexai

vertexai.init(project=PROJECT_ID, location=LOCATION)

# Initialize cloud storage
from google.cloud import storage

storage_client = storage.Client(project=PROJECT_ID)
bucket = storage_client.bucket(BUCKET_NAME)

### Import libraries


In [3]:
import IPython.display
from IPython.core.interactiveshell import InteractiveShell

InteractiveShell.ast_node_interactivity = "all"
import pandas as pd
from vertexai.generative_models import (
    GenerationConfig,
    GenerativeModel,
    HarmBlockThreshold,
    HarmCategory,
    Part,
)
import seaborn as sns
import matplotlib.pyplot as plt
from rich.markdown import Markdown as rich_Markdown
from rich import print as rich_print
import nltk
from nltk import ngrams
from nltk.tokenize import word_tokenize
nltk.download('punkt')
from wordcloud import STOPWORDS, WordCloud
from collections import Counter
import PyPDF2
import io
from io import BytesIO
from google.cloud import storage
import pandas as pd
from PIL import Image
import concurrent.futures
import random, string
import asyncio
import time
from tenacity import retry, wait_random_exponential, stop_after_attempt
import vertexai
from vertexai.generative_models import GenerativeModel, Part
import psutil
import asyncio.locks
import os
import json
import aiohttp
import uuid

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


### Load the models

To learn more about all [Gemini API models on Vertex AI](https://cloud.google.com/vertex-ai/generative-ai/docs/learn/models#gemini-models).


In [4]:
MODEL_ID_PRO = "gemini-1.5-pro-001"  # @param {type:"string"}
MODEL_ID_FLASH = "gemini-1.5-flash-001" # @param {type:"string"}

model_pro = GenerativeModel(MODEL_ID_PRO)
model_flash = GenerativeModel(MODEL_ID_FLASH)

### Add data path

In [5]:
prototype_data = "multimodal-finanace-qa/data/unstructured/prototype/"  # @param {type:"string"}
production_data = "multimodal-finanace-qa/data/unstructured/production/"  # @param {type:"string"}
image_output_path = "multimodal-finanace-qa/data/unstructured/temp/img"  # @param {type:"string"}
embedding_input_path = "multimodal-finanace-qa/data/embeddings"  # @param {type:"string"}

## Feature Engineeing




### Building features from PDFs

extracting page wise text and images from the documents

### Building Features from PDF Files - Text

In [6]:
def randomword(length):
   letters = string.ascii_lowercase
   return ''.join(random.choice(letters) for i in range(length))


def upload_gcs_image_file(bucket_object, image_gcs_path, img_bytes):
    """Uploads a file to the bucket and returns the full GCS URI."""
    blob = bucket_object.blob(image_gcs_path)
    blob.upload_from_string(img_bytes)

    # Construct the full GCS URI
    gcs_uri = f"gs://{bucket_object.name}/{image_gcs_path}"

    # print(f"Image file uploaded to: {gcs_uri}")
    return gcs_uri


def extract_image_metadata(xObject,filename,page_num, image_output_path,bucket_object, upload_image_to_gcs=False):
    image_metadata = []
    for image_number, obj in enumerate(xObject):
        if xObject[obj]['/Subtype'] == '/Image':
            image_available = True
            # Explicitly handle desired formats
            if not xObject[obj].get('/Filter') == '/FlateDecode':
              try:
                  img = Image.open(BytesIO(xObject[obj]._data))
                  # if img.format.upper() in ['JPEG', 'PNG', 'TIFF']:
                  img_bytes = io.BytesIO()
                  img.save(img_bytes, format=img.format)
                  img_bytes = img_bytes.getvalue()

                  image_name = f"{filename.replace(' ', '_').lower().split('.')[0]}_page{page_num+1}_{randomword(6)}.{img.format.lower()}"
                  # image_gcs_path = f"gs://{image_output_path}/{image_name}"
                  image_gcs_path = image_output_path+f"/{image_name}"
                  image_size = img.size
                  # print(f"Saving image to: {image_gcs_path}")
                  # if upload_image_to_gcs:
                  if upload_image_to_gcs:
                    image_gcs_path_final = upload_gcs_image_file(bucket_object, image_gcs_path, img_bytes)
                  else:
                    image_gcs_path_final = None
                  image_metadata.append({
                      'image_available': image_available,
                      'image_counter': int(image_number+1),
                      'image_gcs_path': image_gcs_path_final,
                      'image_size': image_size
                  })
                  # else:
                  #     print(f"Unsupported image format: {img.format}")
                  #     continue
              except Exception as e:
                print("Error: ", e)
                print("Error for image in file: ", filename, "and page number: ",page_num + 1,
                      " with gcs path: ", f"gs://{blob.bucket.name}/{blob.name}")
                print(f"Error: Unable to identify image format.")
                print(f"Image Subtype: {xObject[obj]['/Subtype']}")
                print(f"Image Filter: {xObject[obj].get('/Filter', 'None')}")
                print("Skipping this. Debug this. ")
                # print("Error: ", e)
                continue
    return image_metadata

def extract_pdf_data(blob, image_output_path, upload_image_to_gcs=False, bucket_object=None):
  """Extracts text and images from a PDF blob and returns metadata."""
  pdf_content = BytesIO(blob.download_as_bytes())
  try:
    pdf_reader = PyPDF2.PdfReader(pdf_content)
    pdf_data = []
    # pdf_type = "/".join(blob.name.split("/")[1:-1])
    pdf_type = blob.name.split("/")[-2]
    filename = blob.name.split("/")[-1]
    # print("filename: ", filename)

    for page_num in range(len(pdf_reader.pages)):
      page = pdf_reader.pages[page_num]
      text = page.extract_text()
      image_available = False
      image_gcs_path = None
      image_size = None

      page_metadata = {
          'text_type' : pdf_type,
          'file_name': filename,
          'gcs_path': "gs://"+blob.bucket.name+"/"+blob.name,
          'page_number': page_num+1,
          'text': text if text else None,
          'image_available': False,
          'image_counter': None,
          'image_gcs_path': None,
          'image_size': None
      }

      if '/XObject' in page['/Resources']:
        xObject = page['/Resources']['/XObject'].get_object()
        image_metadata = extract_image_metadata(xObject,filename,page_num, image_output_path,bucket_object, upload_image_to_gcs)
        for metadata in image_metadata:
          page_metadata = {
              'text_type' : pdf_type,
              'file_name': filename,
              'gcs_path': "gs://"+blob.bucket.name+"/"+blob.name,
              'page_number': page_num+1,
              'text': text if text else None,
              'image_available': metadata['image_available'],
              'image_counter': int(metadata['image_counter']),
              'image_gcs_path': metadata['image_gcs_path'],
              'image_size': metadata['image_size']
          }

          pdf_data.append(page_metadata)
      else:
        pdf_data.append(page_metadata)

    return pdf_data

  except Exception as e:
    print(f"Warning: Could not read PDF file '{blob.name}' (might be encrypted or corrupted). Error: {e}")
    return []

def get_text_from_pdf(bucket_object, prefix='', image_output_path=None,upload_image_to_gcs=False,extract_tables=False):
  """Extracts text and images from PDFs in a GCS bucket, processes in parallel."""

  all_pdf_data = []
  with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = []
    for blob in bucket_object.list_blobs(prefix=prefix):
      if blob.name.lower().endswith('.pdf'):
        # print(blob.name)
        futures.append(executor.submit(extract_pdf_data, blob, image_output_path,
                                       upload_image_to_gcs, bucket_object))

    for future in concurrent.futures.as_completed(futures):
      all_pdf_data.extend(future.result())

  return pd.DataFrame(all_pdf_data)

In [7]:
%%time
pdf_metadata_flash = get_text_from_pdf(bucket, prefix=production_data,
                       image_output_path=image_output_path,
                       upload_image_to_gcs=True,
                       )

Error:  cannot identify image file <_io.BytesIO object at 0x7e02bcfd2930>
CPU times: user 2min 51s, sys: 2.58 s, total: 2min 53s
Wall time: 2min 57s


In [8]:
pdf_metadata_flash.shape

(1549, 9)

In [9]:
pdf_metadata_flash.head(2)

Unnamed: 0,text_type,file_name,gcs_path,page_number,text,image_available,image_counter,image_gcs_path,image_size
0,blogpost,Google announces the Coalition for Secure AI.pdf,gs://mlops-for-genai/multimodal-finanace-qa/da...,3,"Additionally, CoSAI will collaborate with orga...",False,,,
1,blogpost,"Google Gemini updates_ Flash 1.5, Gemma 2 and ...",gs://mlops-for-genai/multimodal-finanace-qa/da...,4,1.5 Pro can now follow increasingly complex an...,False,,,


In [10]:
pdf_metadata_flash[pdf_metadata_flash['image_available']==True].shape

(128, 9)

In [11]:
pdf_metadata_flash[pdf_metadata_flash['image_available']==True].head(2)

Unnamed: 0,text_type,file_name,gcs_path,page_number,text,image_available,image_counter,image_gcs_path,image_size
17,blogpost,Goodput metric as measure of ML productivity _...,gs://mlops-for-genai/multimodal-finanace-qa/da...,3,"Productivity Goodput, to measure this e�ciency...",True,1.0,gs://mlops-for-genai/multimodal-finanace-qa/da...,"(2200, 924)"
18,blogpost,Goodput metric as measure of ML productivity _...,gs://mlops-for-genai/multimodal-finanace-qa/da...,4,how you can measure and maximize runtime for\n...,True,1.0,gs://mlops-for-genai/multimodal-finanace-qa/da...,"(2200, 941)"


### Building Features from Image Files

In [12]:
image_metadata_flash = pdf_metadata_flash[pdf_metadata_flash['image_available']==True]

In [13]:
image_metadata_flash.shape

(128, 9)

In [17]:
image_metadata_flash.head(2)

Unnamed: 0,text_type,file_name,gcs_path,page_number,text,image_available,image_counter,image_gcs_path,image_size
17,blogpost,Goodput metric as measure of ML productivity _...,gs://mlops-for-genai/multimodal-finanace-qa/da...,3,"Productivity Goodput, to measure this e�ciency...",True,1.0,gs://mlops-for-genai/multimodal-finanace-qa/da...,"(2200, 924)"
18,blogpost,Goodput metric as measure of ML productivity _...,gs://mlops-for-genai/multimodal-finanace-qa/da...,4,how you can measure and maximize runtime for\n...,True,1.0,gs://mlops-for-genai/multimodal-finanace-qa/da...,"(2200, 941)"


In [16]:
@retry(wait=wait_random_exponential(multiplier=1, max=120),stop=stop_after_attempt(2))
async def async_generate_df(session, page_text, prompt, gcs_uri):
    try:
        safety_settings = {
        HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_ONLY_HIGH,
        HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
        HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
        HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
        }
        model = GenerativeModel(
            "gemini-1.5-flash-001",
            safety_settings = safety_settings
        )
        final_prompt = prompt + "**page_text:** \n" + page_text
        response = await model.generate_content_async(
            [final_prompt, Part.from_uri(gcs_uri, mime_type='image/png')],
            stream=False,
        )
        return response.text
    except Exception as e:
        print("Something failed, retrying")
        print(e)
        with retry.stop_after_attempt(2) as retry_state:
            if retry_state.attempt > 2:
                return None
        raise  # Re-raise the exception for tenacity to handle


async def process_row(row, session, semaphore):
    async with semaphore:
        return await async_generate_df(session, row['text'], image_description_prompt, row['image_gcs_path'])

async def process_dataframe(df, batch_size=100, max_concurrency=10):
    semaphore = asyncio.Semaphore(max_concurrency)
    async with aiohttp.ClientSession() as session:
        # Measure initial memory usage
        initial_memory_usage = psutil.Process().memory_info().rss / 1024 / 1024

        # Start a timer
        start_time = time.time()

        print(f"Batch size: {batch_size}")
        print(f"Initial memory usage: {initial_memory_usage:.2f} MB")

        for i in range(0, len(df), batch_size):
            chunk = df[i:i+batch_size]
            tasks = [process_row(row, session, semaphore) for index, row in chunk.iterrows()]
            results = await asyncio.gather(*tasks)
            for index, result in enumerate(results):
                df.loc[chunk.index[index], 'image_description'] = result

        # Measure final memory usage
        final_memory_usage = psutil.Process().memory_info().rss / 1024 / 1024

        # Calculate elapsed time
        elapsed_time = time.time() - start_time

        print(f"Elapsed time: {elapsed_time:.2f} seconds")
        print(f"Final memory usage: {final_memory_usage:.2f} MB")

        return df

In [15]:
image_description_prompt = """You are a technical image analysis expert. You will be provided with various types of images extracted from documents like research papers, technical blogs, and more.
Your task is to generate concise, accurate descriptions of the images without adding any information you are not confident about.
Focus on capturing the key details, trends, or relationships depicted in the image. Use provided **page_text:** to ground the generation.

Important Guidelines:
* Prioritize accuracy:  If you are uncertain about any detail, state "Unknown" or "Not visible" instead of guessing.
* Avoid hallucinations: Do not add information that is not directly supported by the image.
* Be specific: Use precise language to describe shapes, colors, textures, and any interactions depicted.
* Consider context in Image: If the image is a screenshot or contains text, incorporate that information into your description.
* Consider context of Page: Consider the **page_text:** to understand the context of the image.
"""

In [18]:
image_metadata_flash_final = await process_dataframe(image_metadata_flash, batch_size=10, max_concurrency=4)

Batch size: 10
Initial memory usage: 632.40 MB


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.loc[chunk.index[index], 'image_description'] = result


Elapsed time: 68.04 seconds
Final memory usage: 643.77 MB


In [19]:
image_metadata_flash_final.head(2)

Unnamed: 0,text_type,file_name,gcs_path,page_number,text,image_available,image_counter,image_gcs_path,image_size,image_description
17,blogpost,Goodput metric as measure of ML productivity _...,gs://mlops-for-genai/multimodal-finanace-qa/da...,3,"Productivity Goodput, to measure this e�ciency...",True,1.0,gs://mlops-for-genai/multimodal-finanace-qa/da...,"(2200, 924)",The image depicts a diagram illustrating the t...
18,blogpost,Goodput metric as measure of ML productivity _...,gs://mlops-for-genai/multimodal-finanace-qa/da...,4,how you can measure and maximize runtime for\n...,True,1.0,gs://mlops-for-genai/multimodal-finanace-qa/da...,"(2200, 941)",The diagram depicts the elements of ML Product...


In [20]:
image_metadata_flash_final.shape

(128, 10)

In [22]:
index = 7
print("********Page Text: ********")
rich_Markdown(image_metadata_flash_final.iloc[index]['text'])

********Page Text: ********


In [23]:
print("********Image Description: ********")
rich_Markdown(image_metadata_flash_final.iloc[index]['image_description'])

********Image Description: ********


### Building Features from Audio Files

In [24]:
@retry(wait=wait_random_exponential(multiplier=1, max=120),stop=stop_after_attempt(2))
async def async_generate(prompt, gcs_uri, mime_type):
    try:
        safety_settings = {
        HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_ONLY_HIGH,
        HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
        HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
        HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
        }
        model = GenerativeModel(
            "gemini-1.5-flash-001",
            safety_settings = safety_settings
        )
        # print("Hitting")

        response = await model.generate_content_async(
            [prompt, Part.from_uri(gcs_uri, mime_type=mime_type)],
            stream=False,
        )
        # print(len(response.text))
        return response.text
    except Exception as e:
        print("Something failed, retrying")
        print(e)
        with retry.stop_after_attempt(2) as retry_state:
            if retry_state.attempt > 2:
                return None
        raise  # Re-raise the exception for tenacity to handle

async def batch_and_profile(gcs_uris, prompt, mime_type, batch_size=2, max_concurrent=4):
    start_time = time.time()
    memory_usage = psutil.Process().memory_info().rss / 1024**2

    semaphore = asyncio.locks.Semaphore(max_concurrent)
    async def process_batch(batch):
        async with semaphore:
            return await asyncio.gather(*[async_generate(prompt, f, mime_type) for f in batch])

    batches = [gcs_uris[i:i+batch_size] for i in range(0, len(gcs_uris), batch_size)]
    get_responses = [asyncio.create_task(process_batch(batch)) for batch in batches]
    final_response_list = [item for sublist in await asyncio.gather(*get_responses) for item in sublist]

    end_time = time.time()
    elapsed_time = end_time - start_time
    final_memory_usage = psutil.Process().memory_info().rss / 1024**2

    print(f"Batch size: {batch_size}")
    print(f"Elapsed time: {elapsed_time:.2f} seconds")
    print(f"Initial memory usage: {memory_usage:.2f} MB")
    print(f"Final memory usage: {final_memory_usage:.2f} MB")

    return final_response_list

def get_gcs_uri_list(bucket,data,file_extension):
  gcs_uri_list = []
  for blob in bucket.list_blobs():
      if blob.name.startswith(data):
        if blob.name.lower().endswith(file_extension):
          gcs_path = "gs://"+"/".join(blob.id.split("/")[:-1])
          gcs_uri_list.append(gcs_path)
  return gcs_uri_list

In [25]:
gcs_uri_list_audio = get_gcs_uri_list(bucket,production_data,'.mp3')

In [26]:
gcs_uri_list_audio

['gs://mlops-for-genai/multimodal-finanace-qa/data/unstructured/production/earning_call/Alphabet 2023 Q2 Earnings Call (128 kbps).mp3',
 'gs://mlops-for-genai/multimodal-finanace-qa/data/unstructured/production/earning_call/Alphabet 2023 Q3 Earnings Call (128 kbps).mp3',
 'gs://mlops-for-genai/multimodal-finanace-qa/data/unstructured/production/earning_call/Alphabet 2024 Q2 Earnings Call (128 kbps).mp3',
 'gs://mlops-for-genai/multimodal-finanace-qa/data/unstructured/production/earning_call/Alphabet_2023_Q1_Earnings_Call.mp3',
 'gs://mlops-for-genai/multimodal-finanace-qa/data/unstructured/production/earning_call/Alphabet_2023_Q4_Earnings_Call.mp3',
 'gs://mlops-for-genai/multimodal-finanace-qa/data/unstructured/production/earning_call/Alphabet_2024_Q1_Earnings_Call.mp3',
 'gs://mlops-for-genai/multimodal-finanace-qa/data/unstructured/production/podcast/episode1.mp3',
 'gs://mlops-for-genai/multimodal-finanace-qa/data/unstructured/production/podcast/episode2.mp3',
 'gs://mlops-for-gena

In [32]:
len(gcs_uri_list_audio)

12

In [27]:
batch_size = 3
max_concurrent = 4
audio_description_extraction_prompt = """Transcribe and analyze the audio, identifying key topic shifts or changes in focus. Divide the audio into segments based on these transitions.
For each segment:
* **Summarize:** Briefly describe the main topic or theme of the segment.
* **Contextualize:** Explain how this topic fits into the broader conversation or narrative.
* **Analyze:** Explore the significance of this topic, the perspectives presented, and any potential biases or underlying assumptions.
* **Synthesize:** Connect this topic to other themes or ideas medntioned in the audio, highlighting relationships and overarching patterns.
Conclude with a thematic analysis of the entire audio. Identify the most prominent themes, how they are interconnected, and the overall message or purpose of the audio.
"""

final_response_list_audio = await batch_and_profile(gcs_uri_list_audio,
                                                    audio_description_extraction_prompt,
                                                    "audio/mpeg",
                                                    batch_size, max_concurrent)

Batch size: 3
Elapsed time: 61.78 seconds
Initial memory usage: 643.75 MB
Final memory usage: 643.84 MB


In [28]:
audio_metadata_flash = pd.DataFrame([gcs_uri_list_audio, final_response_list_audio]).T
audio_metadata_flash.columns = ['audio_gcs', 'audio_description']
audio_metadata_flash.head(2)

Unnamed: 0,audio_gcs,audio_description
0,gs://mlops-for-genai/multimodal-finanace-qa/da...,## Alphabet's Second Quarter 2023 Earnings Con...
1,gs://mlops-for-genai/multimodal-finanace-qa/da...,## Alphabet Third Quarter 2023 Earnings Confer...


In [29]:
rich_Markdown(audio_metadata_flash.iloc[5]['audio_description'])

### Building Features from Video Files

In [30]:
gcs_uri_list_video = get_gcs_uri_list(bucket,production_data,'.mp4')

In [35]:
gcs_uri_list_video[:4]

['gs://mlops-for-genai/multimodal-finanace-qa/data/unstructured/production/product_launch/gemini/Can AI understand new emojis  Testing Gemini.mp4',
 'gs://mlops-for-genai/multimodal-finanace-qa/data/unstructured/production/product_launch/gemini/Can AI understand your outfit  Testing Gemini.mp4',
 'gs://mlops-for-genai/multimodal-finanace-qa/data/unstructured/production/product_launch/gemini/Converting images into code with AI  Testing Gemini.mp4',
 'gs://mlops-for-genai/multimodal-finanace-qa/data/unstructured/production/product_launch/gemini/Finding connections with AI  Testing Gemini.mp4']

In [36]:
len(gcs_uri_list_video)

18

In [37]:
batch_size = 5
max_concurrent = 4
video_description_extraction_prompt = """Transcribe and analyze the video, intelligently segmenting it based on shifts in topic, focus, or narrative progression.
For each identified segment:
**Concise Summary**: Distill the core theme or message in 1-2 sentences.
**Thematic Context**: How does this segment contribute to the overarching narrative or argument?
**Critical Analysis**: Delve into the segment's implications, perspectives presented, and potential biases.
**Connections**: Link this segment to other parts of the video, revealing patterns and relationships.

Conclude by synthesizing the video's main themes, their interconnections, and the overarching purpose or message.
"""


final_response_list_video = await batch_and_profile(gcs_uri_list_video,
                                                    video_description_extraction_prompt,
                                                    "video/mp4",
                                                    batch_size, max_concurrent)

Something failed, retrying
500 Internal error encountered.
Batch size: 5
Elapsed time: 47.86 seconds
Initial memory usage: 643.84 MB
Final memory usage: 646.67 MB


In [40]:
video_metadata_flash = pd.DataFrame([gcs_uri_list_video, final_response_list_video]).T
video_metadata_flash.columns = ['video_gcs', 'video_description']
video_metadata_flash.head(2)

Unnamed: 0,video_gcs,video_description
0,gs://mlops-for-genai/multimodal-finanace-qa/da...,## Video Transcription and Analysis: Can Gemin...
1,gs://mlops-for-genai/multimodal-finanace-qa/da...,## Video Analysis: Can Gemini Understand Outfi...


In [39]:
rich_Markdown(video_metadata_flash.iloc[5]['video_description'])

In [None]:
# audio_metadata_flash.to_csv("audio_metadata_flash.csv")

In [None]:
# video_metadata_flash.to_csv("video_metadata_flash.csv")

### Splitting/Chunking the text

In [41]:
import uuid
uuid.uuid4()

UUID('1928f6d0-8aeb-4cc6-acbe-9ce0c88d02a6')

In [42]:
def assign_unique_uuids(dataframes):
    """Assigns unique UUIDs to each row of multiple dataframes.

    Args:
        dataframes (list): A list of pandas DataFrames.

    Returns:
        list: A list of DataFrames with the 'uid' column added.
    """


    result_dataframes = []
    for df in dataframes:
        df['uid'] = df.apply(lambda row: str(uuid.uuid4().hex), axis=1)
        result_dataframes.append(df)

    return result_dataframes

def split_text_into_chunks(df, text_column, chunk_size):
    """Splits text into chunks of specified size, preserving other column values and adding a chunk number column."""

    # Create a list of new dataframes, one for each chunk
    new_dfs = []
    for _, row in df.iterrows():
        text_chunks = [row[text_column][i:i + chunk_size] for i in range(0, len(row[text_column]), chunk_size)]
        for chunk_index, chunk in enumerate(text_chunks):
            new_row = row.copy()  # Copy all other columns
            new_row[text_column] = chunk
            new_row['chunk_number'] = chunk_index + 1  # Add chunk number starting from 1
            new_dfs.append(pd.DataFrame([new_row]))

    return pd.concat(new_dfs, ignore_index=True)  # Combine into single dataframe

In [43]:
%%time
# Step 2 [Why do we still do chunking? Explain ----
# 1) show the token count
# 2) reduce noise while search  ]
# latency and cost consideriation - you can still do that, but would it make sense
# Out of 5M token "information" -> you would still want to makes ure that the 1M that you send are the most relevant 1M
# Chunking the Text to smaller size to make precise match with queries

chunk_size = 500
extracted_text_chunk_df = split_text_into_chunks(pdf_metadata_flash[~pdf_metadata_flash['text'].isnull()], 'text', chunk_size)
image_metadata_chunk_df = split_text_into_chunks(image_metadata_flash_final, 'image_description', chunk_size)
video_metadata_chunk_df = split_text_into_chunks(video_metadata_flash, 'video_description', chunk_size)
audio_metadata_chunk_df = split_text_into_chunks(audio_metadata_flash, 'audio_description', chunk_size)

CPU times: user 20 s, sys: 263 ms, total: 20.3 s
Wall time: 21.6 s


In [44]:
(extracted_text_chunk_df,
image_metadata_chunk_df,
 video_metadata_chunk_df,
 audio_metadata_chunk_df) = assign_unique_uuids([extracted_text_chunk_df,
                                                image_metadata_chunk_df,
                                                video_metadata_chunk_df,
                                                audio_metadata_chunk_df],
                                               )

In [45]:
extracted_text_chunk_df.head()

Unnamed: 0,text_type,file_name,gcs_path,page_number,text,image_available,image_counter,image_gcs_path,image_size,chunk_number,uid
0,blogpost,Google announces the Coalition for Secure AI.pdf,gs://mlops-for-genai/multimodal-finanace-qa/da...,3,"Additionally, CoSAI will collaborate with orga...",False,,,,1,91eb9e70cc47499ea0e66c7bac787471
1,blogpost,Google announces the Coalition for Secure AI.pdf,gs://mlops-for-genai/multimodal-finanace-qa/da...,3,"ll to help organizations securely implement, t...",False,,,,2,9e1cbaf288584e13b5cdd2ef505da1fb
2,blogpost,Google announces the Coalition for Secure AI.pdf,gs://mlops-for-genai/multimodal-finanace-qa/da...,3,"\nSafety & Security AI8/13/24, 8:09 PM Google...",False,,,,3,6b2aa4129b1649e9aeadea99037dde31
3,blogpost,"Google Gemini updates_ Flash 1.5, Gemma 2 and ...",gs://mlops-for-genai/multimodal-finanace-qa/da...,4,1.5 Pro can now follow increasingly complex an...,False,,,,1,96c064d349d441a7bdca3f95b839ebe8
4,blogpost,"Google Gemini updates_ Flash 1.5, Gemma 2 and ...",gs://mlops-for-genai/multimodal-finanace-qa/da...,4,.5 Pro can now reason\nacross image and audio ...,False,,,,2,632d6d7b82be456b8df44fd7dd6dda1e


In [46]:
image_metadata_chunk_df.head()

Unnamed: 0,text_type,file_name,gcs_path,page_number,text,image_available,image_counter,image_gcs_path,image_size,image_description,chunk_number,uid
0,blogpost,Goodput metric as measure of ML productivity _...,gs://mlops-for-genai/multimodal-finanace-qa/da...,3,"Productivity Goodput, to measure this e�ciency...",True,1.0,gs://mlops-for-genai/multimodal-finanace-qa/da...,"(2200, 924)",The image depicts a diagram illustrating the t...,1,b90f4e1ef996437bae36eb24a17f149f
1,blogpost,Goodput metric as measure of ML productivity _...,gs://mlops-for-genai/multimodal-finanace-qa/da...,3,"Productivity Goodput, to measure this e�ciency...",True,1.0,gs://mlops-for-genai/multimodal-finanace-qa/da...,"(2200, 924)",odput:** Measures the fraction of peak availab...,2,d565e9515a794299b659a17683b295a7
2,blogpost,Goodput metric as measure of ML productivity _...,gs://mlops-for-genai/multimodal-finanace-qa/da...,4,how you can measure and maximize runtime for\n...,True,1.0,gs://mlops-for-genai/multimodal-finanace-qa/da...,"(2200, 941)",The diagram depicts the elements of ML Product...,1,3266e9efa28f40af8a13b61bc2f9ddc7
3,blogpost,Goodput metric as measure of ML productivity _...,gs://mlops-for-genai/multimodal-finanace-qa/da...,4,how you can measure and maximize runtime for\n...,True,1.0,gs://mlops-for-genai/multimodal-finanace-qa/da...,"(2200, 941)","are layer contains Compute (GPUs, TPUs), Stora...",2,38e7b9f6e0784345a462ae01a1e79e46
4,blogpost,Goodput metric as measure of ML productivity _...,gs://mlops-for-genai/multimodal-finanace-qa/da...,5,"As indicated in the diagram above, AI\nHyperco...",True,1.0,gs://mlops-for-genai/multimodal-finanace-qa/da...,"(2000, 708)",The image is a timeline diagram illustrating t...,1,2756487575cb4d21a59829e2a776f4db


In [47]:
video_metadata_chunk_df.head()

Unnamed: 0,video_gcs,video_description,chunk_number,uid
0,gs://mlops-for-genai/multimodal-finanace-qa/da...,## Video Transcription and Analysis: Can Gemin...,1,bcb310c65f4b4d5489b1de57e5db714f
1,gs://mlops-for-genai/multimodal-finanace-qa/da...,"analyze complex visual concepts. However, the ...",2,ae10cf80497a4f0083a9d4cc0a09d013
2,gs://mlops-for-genai/multimodal-finanace-qa/da...,o's focus on Gemini's visual comprehension abi...,3,3cd19da1300b47059433cefa7b82fa6e
3,gs://mlops-for-genai/multimodal-finanace-qa/da...,giving names and taglines to three unusual emo...,4,30f3e6ba326645668f156d657db978b1
4,gs://mlops-for-genai/multimodal-finanace-qa/da...,ncludes the video by showcasing Gemini's uniqu...,5,37e21f5ac4804aaf831b7ce63efdd21d


In [48]:
audio_metadata_chunk_df.head()

Unnamed: 0,audio_gcs,audio_description,chunk_number,uid
0,gs://mlops-for-genai/multimodal-finanace-qa/da...,## Alphabet's Second Quarter 2023 Earnings Con...,1,bb7109e8b69f474a83365e0b44264ef1
1,gs://mlops-for-genai/multimodal-finanace-qa/da...,"segment introduces the call, identifies the s...",2,a296b861ede249bba806d6521aee901f
2,gs://mlops-for-genai/multimodal-finanace-qa/da...,is:** This segment foreshadows the overarchin...,3,0149d44c23bd4cf683d92ac097a4dbb3
3,gs://mlops-for-genai/multimodal-finanace-qa/da...,"tone for the call, emphasizing Alphabet's com...",4,b98adfda4a354a5fa352d20ca9ba0755
4,gs://mlops-for-genai/multimodal-finanace-qa/da...,"iency, as AI is portrayed as a catalyst for bo...",5,7a3d87f7829a4a818f74a07e3c9284df


In [49]:
import json

def create_jsonl_file(extracted_text_chunk_df, video_metadata_chunk_df, audio_metadata_chunk_df,
                      bucket_object, jsonl_file_path):
    """
    Creates a JSONL file containing the combined text, video_description, and audio_description from the given dataframes.

    Args:
        extracted_text_chunk_df (pandas.DataFrame): The dataframe containing extracted text chunks.
        video_metadata_chunk_df (pandas.DataFrame): The dataframe containing video metadata.
        audio_metadata_chunk_df (pandas.DataFrame): The dataframe containing audio metadata.
    """

    json_data = []
    df_data = []

    for index, row in extracted_text_chunk_df.iterrows():
        json_data.append({"content": row['text']})

        df_data.append([row['uid'], "pdf_text", row['text']])

    for index, row in image_metadata_chunk_df.iterrows():
        json_data.append({"content": row['image_description']})

        df_data.append([row['uid'], "pdf_images", row['image_description']])

    for index, row in video_metadata_chunk_df.iterrows():
        json_data.append({"content": row['video_description']})

        df_data.append([row['uid'], "video_description", row['video_description']])

    for index, row in audio_metadata_chunk_df.iterrows():
        json_data.append({"content": row['audio_description']})

        df_data.append([row['uid'], "audio_description", row['audio_description']])

    # Convert the JSON data to a string
    jsonl_data = ""
    for item in json_data:
      jsonl_data += json.dumps(item) + "\n"

    # Upload the JSONL data to GCS
    blob = bucket.blob(jsonl_file_path+"/combined_data.jsonl")
    blob.upload_from_string(jsonl_data)
    print(f"File uploaded to GCS: {blob.public_url}")

    gcs_path_jsonl_data = f"gs://{bucket.name}/{blob.name}"

    return pd.DataFrame(json_data), pd.DataFrame(df_data, columns=['uid','type', 'content']), gcs_path_jsonl_data



In [50]:
json_db_emb, index_db, gcs_path_jsonl_data = create_jsonl_file(extracted_text_chunk_df, video_metadata_chunk_df, audio_metadata_chunk_df,
                                          bucket,embedding_input_path)

File uploaded to GCS: https://storage.googleapis.com/mlops-for-genai/multimodal-finanace-qa/data/embeddings/combined_data.jsonl


In [51]:
index_db.head()

Unnamed: 0,uid,type,content
0,91eb9e70cc47499ea0e66c7bac787471,pdf_text,"Additionally, CoSAI will collaborate with orga..."
1,9e1cbaf288584e13b5cdd2ef505da1fb,pdf_text,"ll to help organizations securely implement, t..."
2,6b2aa4129b1649e9aeadea99037dde31,pdf_text,"\nSafety & Security AI8/13/24, 8:09 PM Google..."
3,96c064d349d441a7bdca3f95b839ebe8,pdf_text,1.5 Pro can now follow increasingly complex an...
4,632d6d7b82be456b8df44fd7dd6dda1e,pdf_text,.5 Pro can now reason\nacross image and audio ...


In [52]:
index_db.shape

(10690, 3)

In [53]:
gcs_path_jsonl_data

'gs://mlops-for-genai/multimodal-finanace-qa/data/embeddings/combined_data.jsonl'

In [54]:
# https://cloud.google.com/bigquery/docs/generate-text-tutorial#grant-permissions

In [56]:
%%time

from vertexai.language_models import TextEmbeddingInput, TextEmbeddingModel
from vertexai.preview import language_models

input_uri = (
    gcs_path_jsonl_data
)
# Format: `gs://BUCKET_NAME/DIRECTORY/` or `bq://project_name.llm_dataset`
output_uri = "gs://mlops-for-genai/multimodal-finanace-qa/data/embeddings/combined_data_output/"

textembedding_model = language_models.TextEmbeddingModel.from_pretrained(
    "textembedding-gecko@003"
)

batch_prediction_job = textembedding_model.batch_predict(
    dataset=[input_uri],
    destination_uri_prefix=output_uri,
)
print(batch_prediction_job.display_name)
print(batch_prediction_job.resource_name)
print(batch_prediction_job.state)

INFO:google.cloud.aiplatform.jobs:Creating BatchPredictionJob
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob created. Resource name: projects/1054577272268/locations/us-central1/batchPredictionJobs/8041471906865479680
INFO:google.cloud.aiplatform.jobs:To use this BatchPredictionJob in another session:
INFO:google.cloud.aiplatform.jobs:bpj = aiplatform.BatchPredictionJob('projects/1054577272268/locations/us-central1/batchPredictionJobs/8041471906865479680')
INFO:google.cloud.aiplatform.jobs:View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/8041471906865479680?project=1054577272268
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/1054577272268/locations/us-central1/batchPredictionJobs/8041471906865479680 current state:
JobState.JOB_STATE_QUEUED
INFO:google.cloud.aiplatform.jobs:BatchPredictionJob projects/1054577272268/locations/us-central1/batchPredictionJobs/8041471906865479680 current state:
JobState.JOB_

BatchPredictionJob 2024-09-12 07:50:56.769157
projects/1054577272268/locations/us-central1/batchPredictionJobs/8041471906865479680
JobState.JOB_STATE_SUCCEEDED
CPU times: user 961 ms, sys: 130 ms, total: 1.09 s
Wall time: 1min 35s


In [57]:
def load_jsonl_from_gcs(bucket, file_path):
    """Loads a JSONL file from a GCS bucket and converts it into a DataFrame.

    Args:
        bucket_name (str): The name of the GCS bucket.
        file_path (str): The path to the JSONL file within the bucket.

    Returns:
        pandas.DataFrame: The DataFrame created from the JSONL data.
    """

    # storage_client = storage.Client()
    # bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_path)

    with blob.open('rb') as f:
        data = []
        for line in f:
            instance = json.loads(line)
            content = instance['instance']['content']
            predictions = instance['predictions'][0]['embeddings']['values']
            data.append({'content': content, 'predictions': predictions})

    df = pd.DataFrame(data)
    return df

In [58]:
%%time
file_path = 'multimodal-finanace-qa/data/embeddings/combined_data_output/prediction-model-2024-09-12T07:50:57.319163Z/000000000000.jsonl'
embedding_df = load_jsonl_from_gcs(bucket, file_path)

CPU times: user 1min 58s, sys: 1.12 s, total: 1min 59s
Wall time: 2min 3s


In [59]:
embedding_df.head()

Unnamed: 0,content,predictions
0,DKNNKQPNCTIGN[TGUWNVKPIHTQOCPK...,"[0.020302558317780495, 0.004051088821142912, -..."
1,\n6JGUGRCTVPGTUOC[PQVEQPVKPWGVQFQDWUKP...,"[0.023772958666086197, -0.025965938344597816, ..."
2,\nDCPFYKFVJVQYQTMGHHGEVKXGN[%WTTGPVN[V...,"[-0.0009582438506186008, -0.012345915660262108..."
3,\nUKIPKƒECPVKPHNWGPEGQXGTCNNOCVVGTUTGSWK...,"[0.012308558449149132, -0.02015942893922329, -..."
4,1EVQDGT\n #OGPFGFCPF...,"[-0.021374214440584183, -0.0028167085256427526..."


In [60]:
print("Size of embedding_df: ", embedding_df.shape)
print("Size of index_db: ", index_db.shape)

Size of embedding_df:  (10690, 2)
Size of index_db:  (10690, 3)


In [61]:
# Joining embedding_df with the index_df
index_db_final = index_db.merge(embedding_df, on='content', how='left')

In [62]:
index_db_final.head()

Unnamed: 0,uid,type,content,predictions
0,91eb9e70cc47499ea0e66c7bac787471,pdf_text,"Additionally, CoSAI will collaborate with orga...","[-0.0019601150415837765, -0.029520384967327118..."
1,9e1cbaf288584e13b5cdd2ef505da1fb,pdf_text,"ll to help organizations securely implement, t...","[0.027416100725531578, -0.02166818082332611, -..."
2,6b2aa4129b1649e9aeadea99037dde31,pdf_text,"\nSafety & Security AI8/13/24, 8:09 PM Google...","[0.01608710177242756, -0.0351220928132534, -0...."
3,96c064d349d441a7bdca3f95b839ebe8,pdf_text,1.5 Pro can now follow increasingly complex an...,"[-0.017015989869832993, -0.02816266193985939, ..."
4,632d6d7b82be456b8df44fd7dd6dda1e,pdf_text,.5 Pro can now reason\nacross image and audio ...,"[0.003014838322997093, -0.018015660345554352, ..."


In [63]:
# to test if mapping is done right.
test_index = 5000
print("*****original emb in embedding_db: *****\n", embedding_df.iloc[test_index]['predictions'][:5])
print("\n*****emb in index_db****\n", index_db_final[index_db_final['content']==embedding_df.iloc[test_index]['content']]['predictions'].values[0][:5])
print("\n*****Original content in embedding_db *****", embedding_df.iloc[test_index]['content'])
print("\n*****content in index_db*****", index_db_final[index_db_final['content']==embedding_df.iloc[test_index]['content']]['content'].values[0])

*****original emb in embedding_db: *****
 [0.03795702010393143, -0.03185933455824852, 0.01030017901211977, 0.035032641142606735, 0.05800672248005867]

*****emb in index_db****
 [0.03795702010393143, -0.03185933455824852, 0.01030017901211977, 0.035032641142606735, 0.05800672248005867]

*****Original content in embedding_db ***** cost of revenues, research and development (R&D) expenses, sales and marketing 
expenses, and general and administrative expenses may increase in amount and/or may increase as a 
percentage of revenues and may be affected by a number of factors;
•estimates of our future compensation expenses;
•our expectation that our other income (expense), net (OI&E), will fluctuate in the future, as it is largely 
driven by market dynamics;
•fluctuations in our effective tax rate;
•seasonal fluctuations in in

*****content in index_db***** cost of revenues, research and development (R&D) expenses, sales and marketing 
expenses, and general and administrative expenses may incr

In [64]:
index_db_final.value_counts('type')

Unnamed: 0_level_0,count
type,Unnamed: 1_level_1
pdf_text,14277
audio_description,262
video_description,188
pdf_images,171


In [None]:
# # Convert DataFrame to CSV string
# csv_string = index_db_final.to_csv()

# # Create a StringIO object to simulate a file-like object
# string_io = io.StringIO(csv_string)

In [None]:
# # Upload the index_db_final data to GCS
# blob = bucket.blob("multimodal-finanace-qa/data/embeddings"+"/index_db_final.csv")
# blob.upload_from_string(index_db_final)
# print(f"File uploaded to GCS: {blob.public_url}")