
<style>
body {
    font-family: 'Arial', sans-serif;  /* Change to your desired font */
}
</style>


```
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: pcorreia@google.com
```

# Entity Extraction with Gemini ✨


---



## Overview



**Purpose of this notebook**

The purpose of this notebook is to showcase how Gemini works with multi-modal inputs to generate insights and support customers that have either audio, image, or video assets. More info on Gemini's multi-modality [here](https://cloud.google.com/use-cases/multimodal-ai?hl=en).

Specifically the scenario we are showcasing in this notebook is the creation of a pipeline that given a set of assets in a Google Cloud Storage bucket is able to create metadata that is accurate and informative about these assets.

**Generic Prompts**

The prompts are generic and have not been tailored to a specific content type. With further prompt engineering you'd expect to have more detailed metadata generate. For example: if most of your content is sports related, doing prompt that captures specific moments of that sport (red cards, penalties, etc) you'll have richer metadata.


## Before you start


**Requirements**

Make sure you have the following resources in your GCP environment:


*   Google cloud project with the APIs enabled;
*   One Google cloud storage buckets that will store your reports
*   Firestore enabled and with a collection created.
*   And your report files place in the input bucket.

Once you have this place, you'll be able to run the notebook.


**Ingestion Pipeline**


These are the relevant steps that the notebook will take you on:

1.   Load The items from the input bucket;
2.   Run the prompts for the metadata generation of your reports;


**Result**

At the end of this pipeline you'll have all of the reports with the following json structure:

```json
{
    "Summary": "Summary of the content of the report, measurements and conclusions",
    "PatientInfo":{
        "Age": "e.g., 45",
        "Sex": "e.g., Male/Female",
        "RelevantMedicalHistory": "e.g., Hypertension, Diabetes"
    },
    "Reporter":{
        "Name": "Name of the entity performing the test, e.g., 'Cardiovascular Imaging Center'",
        "Address": "Full address of the entity, e.g., '123 Main Street, Anytown, CA 91234'"
    },
    "TestInfo":{
        "Type": "Specify the type of the test or report, e.g., 'Echocardiogram'",
        "Indications": "Provide the indications for the test/report'",
        "ReportDate": "date when the report was created"
    },
    "Findings":[
        {
            "Heading": "Findings",
            "Description": "Description of the finding'"
        },
        {
            "Heading": "Findings",
            "Description": "Description of the finding'"
        }
    ],
    "Measurements":[
        {
            "Parameter": "Description of the parameter that is being measured",
            "Value": "Value of the measurement",
            "Unit": "unit of the measurement"
        },
        {
            "Parameter": "Description of the parameter that is being measured",
            "Value": "Value of the measurement",
            "Unit": "unit of the measurement"
        }
    ]
}
```




# Set up 🛠

In [None]:
!pip install --upgrade google-cloud-aiplatform google-cloud-speech firebase-admin tqdm


In [None]:
# Restart kernel after installs so that your environment can access the new packages
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

In [None]:
import sys

# Additional authentication is required for Google Colab
if "google.colab" in sys.modules:
    # Authenticate user to Google Cloud
    from google.colab import auth

    auth.authenticate_user()

## Variables

In [None]:
PROJECT_ID = "driven-crawler-436206"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}
MODEL = "gemini-1.5-flash-001" # @param {type:"string"}
INPUT_BUCKET="driven-crawler-436206-input" # @param {type:"string"}
#db collection for firestore
DB_COLLECTION="default"   # @param {type:"string"}

## Imports


In [None]:
#common import
import base64
import vertexai
from vertexai.generative_models import GenerativeModel, Part, FinishReason
import vertexai.preview.generative_models as generative_models
from google.cloud import storage
import re
import json
import tqdm
import os
import io
import pandas as pd

#storage
import firebase_admin
from firebase_admin import firestore

#threading
from concurrent.futures import ThreadPoolExecutor, as_completed

## Common Functions

In [None]:
def generate(prompt : list, model :str = MODEL) -> str:
  vertexai.init(project=PROJECT_ID, location=LOCATION)

  model = GenerativeModel(
    model,
  )

  generation_config = {
      "max_output_tokens": 8192,
      "temperature": 1,
      "top_p": 0.95,
  }

  safety_settings = {
      generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
  }

  responses = model.generate_content(
      prompt,
      generation_config=generation_config,
      safety_settings=safety_settings,
      stream=False,
  )
  return responses.text



In [None]:
def download_blob(bucket_name, source_blob_name, destination_file_name):
    """Downloads a blob from the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"

    # The ID of your GCS object
    # source_blob_name = "storage-object-name"

    # The path to which the file should be downloaded
    # destination_file_name = "local/path/to/file"

    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)

    # Construct a client side representation of a blob.
    # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
    # any content from Google Cloud Storage. As we don't need additional data,
    # using `Bucket.blob` is preferred here.
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

    print(
        "Downloaded storage object {} from bucket {} to local file {}.".format(
            source_blob_name, bucket_name, destination_file_name
        )
    )

In [None]:
def read_json_from_gcs(bucket_name, file_name):
    """Reads a JSON file from Google Cloud Storage into a Python dictionary.

    Args:
        bucket_name: The name of the GCS bucket.
        file_name: The name of the JSON file within the bucket.

    Returns:
        A Python dictionary representing the JSON data, or None if an error occurred.
    """

    # Remove 'gs://' prefix if present using replace()
    bucket_name = bucket_name.replace("gs://", "", 1)  # Replace only the first occurrence

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)

    try:
        contents = blob.download_as_string()
        data = json.loads(contents)
        return data
    except Exception as e:
        print(f"Error reading JSON from GCS: {e}")
        return None

In [None]:
def list_bucket_files_pd(bucket_name):
    """
    Lists files in a GCS bucket with properties (name, size, updated time).

    Args:
        bucket_name (str): Name of the GCS bucket.

    Returns:
        pd.DataFrame: DataFrame containing file properties.
    """

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blobs = bucket.list_blobs()

    file_data = []
    for blob in blobs:
        file_data.append({
            'file_name': blob.name,
            'size': blob.size,
              # Last updated timestamp
            'type':blob.content_type,
            'created': blob.time_created,
            'updated': blob.updated,

        })

    df = pd.DataFrame(file_data)
    return df

In [None]:
def list_bucket_files(bucket_name):
    """
    Lists files in a GCS bucket with properties (name, size, updated time).

    Args:
        bucket_name (str): Name of the GCS bucket.

    Returns:
        list: List of JSON objects containing file properties.
    """

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blobs = bucket.list_blobs()

    file_data = []
    for blob in blobs:
        file_data.append({
            'file_name': blob.name,
            'size': blob.size,
            'type': blob.content_type,
            'created': blob.time_created,
            'updated': blob.updated,
        })

    return file_data  # Return the list of JSON objects directly

# Metadata Generation 🤖



## Loading assets

Create a list of all the objects that are in the input bucket.

In [None]:
file_list = list_bucket_files(INPUT_BUCKET)
file_list

## Medical Reports ⚕



In [None]:
def generate_metadata(prompt : list, model :str = MODEL) -> str:
  vertexai.init(project=PROJECT_ID, location=LOCATION)

  model = GenerativeModel(
    model,
  )

  generation_config = {
      "max_output_tokens": 8192,
      "temperature": 1,
      "top_p": 0.95,
  }

  safety_settings = {
      generative_models.HarmCategory.HARM_CATEGORY_HATE_SPEECH: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
      generative_models.HarmCategory.HARM_CATEGORY_HARASSMENT: generative_models.HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
  }

  responses = model.generate_content(
      prompt,
      generation_config=generation_config,
      safety_settings=safety_settings,
      stream=False,
  )

  return responses.text



In [None]:
#create a image metadata
def generate_metadata_report(blob_uri: str, mime_type: str, model:str) -> str:
  report_asset = Part.from_uri(
      mime_type=mime_type,
      uri=blob_uri)

  prompt = """Extract the following information from the provided echocardiogram report and structure it in a JSON object according to the format below. If a piece of information is not available in the report, leave the corresponding field blank or use \'N/A\'. Ensure all values are extracted accurately and include appropriate units where applicable.
JSON format:
```
{
    "Summary": "Summary of the content of the report, measurements and conclusions",
    "PatientInfo":{
        "Age": "e.g., 45",
        "Sex": "e.g., Male/Female",
        "RelevantMedicalHistory": "e.g., Hypertension, Diabetes"
    },
    "Reporter":{
        "Name": "Name of the entity performing the test, e.g., 'Cardiovascular Imaging Center'",
        "Address": "Full address of the entity, e.g., '123 Main Street, Anytown, CA 91234'"
    },
    "TestInfo":{
        "Type": "Specify the type of the test or report, e.g., 'Echocardiogram'",
        "Indications": "Provide the indications for the test/report'",
        "ReportDate": "date when the report was created"
    },
    "Findings":[
        {
            "Heading": "Findings",
            "Description": "Description of the finding'"
        },
        {
            "Heading": "Findings",
            "Description": "Description of the finding'"
        }
    ],
    "Measurements":[
        {
            "Parameter": "Description of the parameter that is being measured",
            "Value": "Value of the measurement",
            "Unit": "unit of the measurement"
        },
        {
            "Parameter": "Description of the parameter that is being measured",
            "Value": "Value of the measurement",
            "Unit": "unit of the measurement"
        }
    ]
}
```"""

  result_text = generate_metadata(prompt=[report_asset, prompt], model = model )


  return result_text

In [None]:
def process_report(report : json):
  """Processes a single row from the image list."""

  blob_uri = f"gs://{INPUT_BUCKET}/{report['file_name']}"
  try:
    result = generate_metadata_report(blob_uri, report['type'], MODEL)
    # print(f"Processing {image['file_name']} > {result}")

    response_text = re.sub(r"json|```", "", result)
    report['metadata'] = json.loads(response_text)
    return result
  except Exception as e:
      print(f"Error processing {report['file_name']} {e}")


with ThreadPoolExecutor() as executor:
    # Submit tasks to the executor
    futures = [executor.submit(process_report, report) for report in file_list]

    # You can remove the tqdm loop if you don't need progress updates
    for _ in tqdm.tqdm(as_completed(futures), total=len(futures)):
        pass  # No need to process individual results here

print('Report Metadata Generated')


In [None]:
#creating the item name from the file name
for report in file_list:
  report['name'] = report['file_name'].split('.')[0]


# Storing Metadata 💾

In [None]:
# starting the firebase db
# Initialize Firebase Admin SDK
firebase_admin.initialize_app()


In [None]:
# Get a reference to the Firestore database
db = firestore.client()

In [None]:
for report in tqdm.tqdm(file_list, desc="Storing report metada"):
  doc_ref = db.collection(DB_COLLECTION).document(report['name'])
  doc_ref.set(report)