In [1]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Batch calling Gemini

<table align="left">
  <td style="text-align: center">
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/generative_ai/batch_calling_gemini.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Google Colaboratory logo"><br> Open in Colab
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/colab/import/https:%2F%2Fraw.githubusercontent.com%2FGoogleCloudPlatform%2Fvertex-ai-samples%2Fmain%2Fnotebooks%2Fcommunity%2Fgenerative_ai%2Fbatch_calling_gemini.ipynb">
      <img width="32px" src="https://cloud.google.com/ml-engine/images/colab-enterprise-logo-32px.png" alt="Google Cloud Colab Enterprise logo"><br> Open in Colab Enterprise
    </a>
  </td>    
  <td style="text-align: center">
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/GoogleCloudPlatform/vertex-ai-samples/main/notebooks/community/generative_ai/batch_calling_gemini.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo"><br> Open in Workbench
    </a>
  </td>
  <td style="text-align: center">
    <a href="https://github.com/GoogleCloudPlatform/vertex-ai-samples/blob/main/notebooks/community/generative_ai/batch_calling_gemini.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo"><br> View on GitHub
    </a>
  </td>
</table>

## Overview

This notebook provides examples of how to use Gemini for batch workloads asynchronously. It covers both the Gemini online API and the Gemini Batch API.

This tutorial uses the following Google Cloud services:

- Gemini on Vertex AI
- Batch Gemini API on Vertex AI
- BigQuery
- Google Cloud Storage

The steps performed include:

- Installation and imports
- Calling Gemini online (synchronous calls)
- Building a sample dataset in JSONL format
- Calling Gemini asynchronously
- Using the Gemini Batch API via GCS
- Using the Gemini Batch API via BigQuery

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Google Cloud Storage
* BigQuery

Learn about [Google Cloud pricing](https://cloud.google.com/pricing/list?hl=en) and use the [Pricing Calculator](https://cloud.google.com/products/calculator/) to generate a cost estimate based on your projected usage.

**Important:** This notebook sends a large number of API calls to Gemini for inference. To limit costs, reduce the size of the test datasets.

## Get started

### Install Vertex AI SDK for Python and other required packages


In [41]:
! pip3 install --upgrade --quiet google-cloud-aiplatform google-cloud-bigquery google-cloud-storage

### Restart runtime (Colab only)

To use the newly installed packages, you must restart the runtime on Google Colab.

In [None]:
import sys

if "google.colab" in sys.modules:

    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>
</div>


### Authenticate your notebook environment (Colab only)

Authenticate your environment on Google Colab.


In [None]:
import sys

if "google.colab" in sys.modules:

    from google.colab import auth

    auth.authenticate_user()

### Set Google Cloud project information and initialize Vertex AI SDK for Python

To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com). Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [1]:
from datetime import datetime
import json
import random
import pandas as pd
import time
import asyncio
import nest_asyncio
nest_asyncio.apply()

from google.cloud import storage
from google.cloud import bigquery
import vertexai

from vertexai.generative_models import GenerativeModel, Part, GenerationConfig
from vertexai.batch_prediction import BatchPredictionJob
from tenacity import retry, wait_random_exponential

In [2]:
# VARIABLES SET BY USER
PROJECT_ID = "multi-tenancy-dataproc" # @param {type:"string"}
DEFAULT_MODEL_NAME = "gemini-1.5-flash-002"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}
batch_file_name = 'gemini_batch.jsonl' # @param {type:"string"}


# Initialize Vertex AI
vertexai.init(project=PROJECT_ID, location=LOCATION)

## Synchronous API Calls to Gemini

The default and most common way to interact with Gemini is by making synchronous API calls to the online endpoint. This approach allows you to send one input request at a time, which is not ideal for batch workloads.

Gemini on Vertex AI offers several optional parameters that allow you to fine-tune the model's output and tailor it to your specific needs. These parameters include:

* System instructions
* Security filters
* Generation config
* Metadata labels

For examples of how to use these parameters, see the [Intro to Gemini notebook](https://github.com/GoogleCloudPlatform/generative-ai/blob/main/gemini/getting-started/intro_gemini_python.ipynb).

In [9]:
# Basic API call to Gemini

# Initialize the multimodal model with system instructions.
# This tells Gemini to act as a zoologist whose mission is to find and rescue animals.
multimodal_model = GenerativeModel(
    model_name=DEFAULT_MODEL_NAME,  # Use the default Gemini model
    system_instruction=[
        "You are a zoologist",
        "Your mission is to find and rescue animals"
    ]
)

# Define the prompt for Gemini.
prompt = "Tell me what animals are in the image and which ones in the video"

# Create parts for the video and image.
# These parts point to the respective media files in Google Cloud Storage.
video_part = Part.from_uri(
    "gs://cloud-samples-data/video/animals.mp4", 
    mime_type="video/mp4"
)
image_part = Part.from_uri(
    "gs://download.tensorflow.org/example_images/320px-Felis_catus-cat_on_snow.jpg",
    mime_type="image/jpeg",
)

# API Call
start_time = time.time()  # Record the start time for measuring inference time

# Send the prompt and media parts to Gemini for processing.
response = multimodal_model.generate_content(
    contents=[
        prompt,
        video_part,
        image_part,
    ],
    # Configure generation parameters like temperature, top_p, top_k, etc.
    generation_config=GenerationConfig(
        temperature=0.9,
        top_p=1.0,
        top_k=32,
        candidate_count=1,
        max_output_tokens=8192,
    ),
)

# Print the response.
print(f"Gemini inference time with a single prompt: {time.time() - start_time:.1f} seconds.\n")
print(response.text)  # Output the text generated by Gemini

Gemini inference time with a single prompt: 7.4 seconds.

The image contains a tabby cat.

The video shows the following animals:

- Giraffes
- A Sumatran tiger
- Asian elephants
- Giant otters
- A sloth
- Dik-diks


## Building a test dataset

The Gemini Batch API requires datasets to be in JSONL format with a specific structure.  A [Sample JSONL file is available on GCS](https://storage.googleapis.com/cloud-samples-data/generative-ai/batch/batch_requests_for_multimodal_input_2.jsonl)  but we build a separate sample JSONL dataset for all the tests in this notebook.

In [8]:
# Each single line of the JSONL is structured like this:
baseline = '''
{
    "request": {
        "contents": [
            {
                "role": "user",
                "parts": [
                    {
                        "text": "Here goes the prompt."
                    },
                    {
                        "file_data": {
                            "file_uri": "Here goes the UCS URI with the image or video",
                            "mime_type": "image/jpeg"
                        }
                    }
                ]
            }
        ],
        "generationConfig": {
            "temperature": 0.4
        },
        "system_instruction": {
            "parts": [
                {
                    "text": "You are an expert in math"
                }
            ]
        }
    },
    "metadata": "prompt identifier"
}
'''
jsonl_baseline_dict = json.loads(baseline)
jsonl_baseline_dict

{'request': {'contents': [{'role': 'user',
    'parts': [{'text': 'Here goes the prompt.'},
     {'file_data': {'file_uri': 'Here goes the UCS URI with the image or video',
       'mime_type': 'image/jpeg'}}]}],
  'generationConfig': {'temperature': 0.4},
  'system_instruction': {'parts': [{'text': 'You are an expert in math'}]}},
 'metadata': 'prompt identifier'}

### To speed up the tests on this notebook, we remove the image and use only text

In [9]:
# Removing image part from baseline
del jsonl_baseline_dict['request']['contents'][0]['parts'][1]
jsonl_baseline_dict

{'request': {'contents': [{'role': 'user',
    'parts': [{'text': 'Here goes the prompt.'}]}],
  'generationConfig': {'temperature': 0.4},
  'system_instruction': {'parts': [{'text': 'You are an expert in math'}]}},
 'metadata': 'prompt identifier'}

### Helper functions to create a dataset using the baseline dictionary and to call Gemini synchronously with only text

In [10]:
def create_sample_dataset(size=10, batch_file_name=batch_file_name):
    """
    Creates a sample dataset of math problems and saves it to a JSONL file.

    Args:
      size: The number of problems to generate (default: 20).
      batch_file_name: The base name for the output file.

    Returns:
      A pandas DataFrame containing the generated dataset.
    """
    file_name = f"{str(size)}_prompt_{batch_file_name}"
    gemini_batch_jsonl = open(file_name, 'w')
    
    for i in range(size):
        # Generate a math problem in the form "a + b x c"
        prompt = f"Print the result of the following equation and explain the result: {i} + {str(random.randint(0,10))} x {str(random.randint(0,1000))}"
        
        # 'jsonl_baseline_dict' is a predefined dictionary with the basic structure for the JSONL file
        temp_dict = dict(jsonl_baseline_dict)  
        
        # Update the 'text' field with the generated prompt
        temp_dict['request']['contents'][0]['parts'][0]['text'] = prompt  
        
        # Update the 'temperature' with a random value betweeen 0 and 2
        temp_dict['request']['generationConfig']['temperature'] = round(random.uniform(0, 2), 1)
        
        # Add metadata to the dictionary
        temp_dict['metadata'] = f'row_id_{str(i)}'
        
        # Write the dictionary to the JSONL file
        gemini_batch_jsonl.write(json.dumps(temp_dict)+'\n')

    # Close the file
    gemini_batch_jsonl.close()
    print(f"JSONL file created: {file_name}")

    # Read the JSONL file into a pandas DataFrame
    sample_dataset_df = pd.read_json(file_name, lines=True)
    return sample_dataset_df, file_name


@retry(wait=wait_random_exponential(multiplier=1, max=20))
def pandas_call_gemini(row):
    """
    Calls the Gemini model with data from a pandas DataFrame row.

    This function is designed to be used with pandas' apply method to process
    rows in a DataFrame. It handles potential transient errors by retrying the 
    Gemini API call with exponential backoff.

    Args:
        row: A pandas DataFrame row containing the following columns:
            - 'system_instruction': A dictionary with a 'parts' key containing a list of dictionaries, 
                                    where the first dictionary has a 'text' key with the system instruction.
            - 'contents': A dictionary with a 'parts' key containing a list of dictionaries,
                          where the first dictionary has a 'text' key with the content for the model.
            - 'generationConfig': A dictionary with the generation configuration for the model.

    Returns:
        str: The text response from the Gemini model.
    """
    multimodal_model = GenerativeModel(DEFAULT_MODEL_NAME, system_instruction=[row['system_instruction']['parts'][0]['text']])
    response = multimodal_model.generate_content(contents=[row['contents'][0]['parts'][0]['text']], generation_config=row['generationConfig'])
    return response.text

## Synchronous test

* Creating a sample dataset
* Call gemini row by row synchronously

In [11]:
# Creating a sample dataset
sample_dataset_df, file_name = create_sample_dataset(size=20)
sample_dataset_df

JSONL file created: 20_prompt_gemini_batch.jsonl


Unnamed: 0,request,metadata
0,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_0
1,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_1
2,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_2
3,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_3
4,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_4
5,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_5
6,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_6
7,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_7
8,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_8
9,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_9


In [15]:
# Call the synchronous function for each row in the pandas DF
start_time = time.time()
sample_dataset_df['gemini_response'] = sample_dataset_df.request.apply(pandas_call_gemini)
print(f"Gemini inference time with {len(sample_dataset_df)} prompts: {time.time() - start_time:.1f} seconds")
sample_dataset_df.head()

Gemini inference time with 20 prompts: 16.6 seconds


Unnamed: 0,request,metadata,gemini_response
0,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_0,Following the order of operations (PEMDAS/BODM...
1,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_1,Following the order of operations (PEMDAS/BODM...
2,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_2,Following the order of operations (PEMDAS/BODM...
3,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_3,Following the order of operations (PEMDAS/BODM...
4,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_4,Following the order of operations (PEMDAS/BODM...


## Asynchronous test

* Define async function and a custom batch function
* Call gemini Asynchronously 

In [7]:
@retry(wait=wait_random_exponential(multiplier=1, max=20))
async def async_pandas_call_gemini(row):
    """
    Asynchronously calls the Gemini model with data from a pandas DataFrame row.

    This function is an asynchronous version of pandas_call_gemini. It's designed 
    to be used with pandas' apply method in an asynchronous context. It handles 
    potential transient errors by retrying the Gemini API call with exponential 
    backoff.

    Args:
        row: A pandas DataFrame row containing the following columns:
            - 'system_instruction': A dictionary with a 'parts' key containing a list of dictionaries, 
                                    where the first dictionary has a 'text' key with the system instruction.
            - 'contents': A dictionary with a 'parts' key containing a list of dictionaries,
                          where the first dictionary has a 'text' key with the content for the model.
            - 'generationConfig': A dictionary with the generation configuration for the model.

    Returns:
        str: The text response from the Gemini model.
    """
    multimodal_model = GenerativeModel(DEFAULT_MODEL_NAME, system_instruction=[row['system_instruction']['parts'][0]['text']])
    response = await multimodal_model.generate_content_async(contents=[row['contents'][0]['parts'][0]['text']], generation_config=row['generationConfig'])
    return response.text


async def custom_batch_function_gemini(size):
    """
    Generates a sample dataset of math problems, sends them to Gemini for evaluation, 
    and measures the inference time.

    Args:
        size: The number of problems to generate for the dataset.

    Returns:
        A pandas DataFrame containing the problems, Gemini responses, and metadata.
    """
    # Create the sample dataset
    sample_dataset_df, file_name = create_sample_dataset(size=size)  

    start_time = time.time()

    # Create a list of coroutines to call Gemini for each problem
    get_gemini_responses = [async_pandas_call_gemini(row['request']) for _, row in sample_dataset_df.iterrows()]

    # Execute the coroutines concurrently using asyncio.gather
    async_responses = await asyncio.gather(*get_gemini_responses)  

    # Add the Gemini responses to the DataFrame
    sample_dataset_df['async_gemini_response'] = async_responses  

    # Calculate and print the inference time
    print(f"Gemini inference time with {size} prompts: {time.time() - start_time:.1f} seconds")  

    return sample_dataset_df

### Timing Asynchronous tests 

#### Async call to Gemini with 20 prompts

In [8]:
# Async call to Gemini with 20 prompts
# Uncomment the lines below to retest
# sample_dataset_df = await custom_batch_function_gemini(20)  
# sample_dataset_df.tail()

JSONL file created: 20_prompt_gemini_batch.jsonl
Gemini inference time with 20 prompts: 1.2 seconds


Unnamed: 0,request,metadata,async_gemini_response
15,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_15,Following the order of operations (PEMDAS/BODM...
16,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_16,Following the order of operations (PEMDAS/BODM...
17,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_17,Following the order of operations (PEMDAS/BODM...
18,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_18,Following the order of operations (PEMDAS/BODM...
19,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_19,Following the order of operations (PEMDAS/BODM...


#### Async call to Gemini with 100 prompts

In [9]:
# Async call to Gemini with 100 prompts
# Uncomment the lines below to retest
# sample_dataset_df = await custom_batch_function_gemini(100)  
# sample_dataset_df.tail() 

JSONL file created: 100_prompt_gemini_batch.jsonl
Gemini inference time with 100 prompts: 2.2 seconds


Unnamed: 0,request,metadata,async_gemini_response
95,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_95,Following the order of operations (PEMDAS/BODM...
96,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_96,The result of the equation 96 + 0 x 520 is **9...
97,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_97,The result of the equation 97 + 0 x 602 is 97....
98,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_98,Following the order of operations (PEMDAS/BODM...
99,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_99,Following the order of operations (PEMDAS/BODM...


#### Async call to Gemini with 1000 prompts

In [11]:
# Async call to Gemini with 1000 prompts
# Uncomment the lines below to retest
# sample_dataset_df = await custom_batch_function_gemini(1000)
# sample_dataset_df.tail()

JSONL file created: 1000_prompt_gemini_batch.jsonl
Gemini inference time with 1000 prompts: 9.0 seconds


Unnamed: 0,request,metadata,async_gemini_response
995,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_995,The result of the equation 995 + 0 x 803 is **...
996,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_996,Following the order of operations (PEMDAS/BODM...
997,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_997,Following the order of operations (PEMDAS/BODM...
998,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_998,The result of the equation 998 + 0 x 79 is **9...
999,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_999,Following the order of operations (PEMDAS/BODM...


#### Async call to Gemini with 2000 prompts

In [20]:
# Async call to Gemini with 2000 prompts
# Uncomment the lines below to retest
# sample_dataset_df = await custom_batch_function_gemini(2000)
# sample_dataset_df.tail()

JSONL file created: 2000_prompt_gemini_batch.jsonl
Gemini inference time with 2000 prompts: 43.5 seconds


Unnamed: 0,request,metadata,async_gemini_response
1995,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_1995,The equation 1995 + 7 x 596 follows the order ...
1996,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_1996,The result of the equation 1996 + 0 x 80 is **...
1997,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_1997,Following the order of operations (PEMDAS/BODM...
1998,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_1998,The equation 1998 + 8 x 866 follows the order ...
1999,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_1999,The equation 1999 + 9 x 824 follows the order ...


#### Async call to Gemini with 5000 prompts

In [110]:
# Async call to Gemini with 5000 prompts
# Uncomment the lines below to retest
# sample_dataset_df = await custom_batch_function_gemini(5000)
# sample_dataset_df

JSONL file created: 5000_prompt_gemini_batch.jsonl
Gemini inference time with 5000 prompts: 197.7 seconds


Unnamed: 0,request,metadata,async_gemini_response
0,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_0,Following the order of operations (PEMDAS/BODM...
1,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_1,Following the order of operations (PEMDAS/BODM...
2,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_2,Following the order of operations (PEMDAS/BODM...
3,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_3,Following the order of operations (PEMDAS/BODM...
4,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_4,Following the order of operations (PEMDAS/BODM...
...,...,...,...
4995,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_4995,The equation 4995 + 8 x 396 follows the order ...
4996,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_4996,Following the order of operations (PEMDAS/BODM...
4997,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_4997,The equation 4997 + 9 x 164 follows the order ...
4998,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_4998,The equation 4998 + 7 x 154 follows the order ...


#### Additional helper function to chunk the dataset

In [12]:
async def custom_chunked_batch_function_gemini(size: int, step: int = 1000):
    """
    Processes a dataset in chunks, making asynchronous calls to Gemini for each chunk.

    This function generates a sample dataset, divides it into chunks, and processes each chunk
    asynchronously using Gemini. The results are then collected and added to the original dataset.

    Args:
        size (int): The size of the dataset to generate.
        step (int, optional): The size of each chunk. Defaults to 1000.

    Returns:
        pandas.DataFrame: The original dataset with an additional column containing the Gemini responses.
    """
    # Create the sample dataset
    sample_dataset_df, file_name = create_sample_dataset(size=size)
    all_async_responses = []
    start_time = time.time()

    for i in range(0, size, step):
        x = i
        chunk_df = sample_dataset_df[x : x + step]

        get_gemini_responses = [
            async_pandas_call_gemini(row["request"]) for _, row in chunk_df.iterrows()
        ]
        async_responses = await asyncio.gather(*get_gemini_responses)
        all_async_responses.extend(async_responses)

    sample_dataset_df["async_gemini_response"] = all_async_responses
    print(f"Gemini inference time with {size} prompts: {time.time() - start_time:.1f} seconds")
    return sample_dataset_df

#### Async call to Gemini with 5000 prompts in chunks

In [22]:
# Async call to Gemini with 5000 prompts in chunks
# Uncomment the lines below to retest
# sample_dataset_df = await custom_chunked_batch_function_gemini(size=5000)
# sample_dataset_df.tail()

JSONL file created: 5000_prompt_gemini_batch.jsonl
Gemini inference time with 5000 prompts: 45.6 seconds


Unnamed: 0,request,metadata,async_gemini_response
4995,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_4995,The equation 4995 + 3 x 768 follows the order ...
4996,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_4996,The equation 4996 + 6 x 370 follows the order ...
4997,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_4997,Following the order of operations (PEMDAS/BODM...
4998,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_4998,Following the order of operations (PEMDAS/BODM...
4999,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_4999,The equation 4999 + 4 x 138 follows the order ...


#### Async call to Gemini with 10000 prompts in chunks

In [23]:
# Async call to Gemini with 10000 prompts in chunks
# Uncomment the lines below to retest
# sample_dataset_df = await custom_chunked_batch_function_gemini(size=10000)
# sample_dataset_df

JSONL file created: 10000_prompt_gemini_batch.jsonl
Gemini inference time with 10000 prompts: 93.4 seconds


Unnamed: 0,request,metadata,async_gemini_response
0,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_0,Following the order of operations (PEMDAS/BODM...
1,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_1,Following the order of operations (PEMDAS/BODM...
2,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_2,Following the order of operations (PEMDAS/BODM...
3,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_3,Following the order of operations (PEMDAS/BODM...
4,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_4,Following the order of operations (PEMDAS/BODM...
...,...,...,...
9995,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_9995,The equation is 9995 + 3 x 144.\n\nFollowing t...
9996,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_9996,Following the order of operations (PEMDAS/BODM...
9997,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_9997,The result of the equation 9997 + 0 x 646 is *...
9998,"{'contents': [{'role': 'user', 'parts': [{'tex...",row_id_9998,The equation is 9998 + 2 x 431.\n\nFollowing t...


## Vertex AI Gemini Batch API

### Using Batch API with GCS as input source and output location

In [13]:
# Bucket variables defined by user
OUTPUT_BUCKET_URI = ""  # @param {type:"string"}
INPUT_BUCKET_URI = ""  # @param {type:"string"}

#### Helper function to submit batch job

In [14]:
# Create dataset and upload to GCS bucket
def gemini_batch_api_test(size: int) -> BatchPredictionJob:
    """
    Tests the Gemini Batch Prediction API.

    This function generates a sample dataset, uploads it to Google Cloud Storage, and submits a batch prediction job to Gemini.

    Args:
        size (int): The size of the dataset to generate.

    Returns:
        google.cloud.aiplatform.BatchPredictionJob: The submitted batch prediction job.
    """
    sample_dataset_df, file_name = create_sample_dataset(size=size)

    # Upload the dataset to Google Cloud Storage
    !gcloud storage cp {file_name} {INPUT_BUCKET_URI}
    INPUT_FILE_URI = f"{INPUT_BUCKET_URI}{file_name}"

    # Submit the batch prediction job
    job = BatchPredictionJob.submit(
        source_model=DEFAULT_MODEL_NAME,
        input_dataset=INPUT_FILE_URI,
        output_uri_prefix=OUTPUT_BUCKET_URI,
    )

    return job


#### Submitting a Gemini batch job with 5000 prompts

In [15]:
# Running a job with 5000 prompts
job_5k = gemini_batch_api_test(5000)

JSONL file created: 5000_prompt_gemini_batch.jsonl
Copying file://5000_prompt_gemini_batch.jsonl to gs://pemelend_genai_demos/batch/input/5000_prompt_gemini_batch.jsonl
  Completed files 1/1 | 1.4MiB/1.4MiB                                          
BatchPredictionJob created. Resource name: projects/1054251275628/locations/us-central1/batchPredictionJobs/4746631520045760512
To use this BatchPredictionJob in another session:
job = batch_prediction.BatchPredictionJob('projects/1054251275628/locations/us-central1/batchPredictionJobs/4746631520045760512')
View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/4746631520045760512?project=1054251275628


In [17]:
print(f"Job resource name: {job_5k.resource_name}")
print(f"Model resource name: {job_5k.model_name}")
print(f"Job state: {job_5k.state.name}")

Job resource name: projects/1054251275628/locations/us-central1/batchPredictionJobs/4746631520045760512
Model resource name: publishers/google/models/gemini-1.5-flash-002
Job state: JOB_STATE_PENDING


#### Submitting a Gemini batch job with 10000 prompts

In [18]:
# Submitting a Gemini batch job with 10000 prompts
job_10k = gemini_batch_api_test(10000)

JSONL file created: 10000_prompt_gemini_batch.jsonl
Copying file://10000_prompt_gemini_batch.jsonl to gs://pemelend_genai_demos/batch/input/10000_prompt_gemini_batch.jsonl
  Completed files 1/1 | 2.8MiB/2.8MiB                                          
BatchPredictionJob created. Resource name: projects/1054251275628/locations/us-central1/batchPredictionJobs/357873683173212160
To use this BatchPredictionJob in another session:
job = batch_prediction.BatchPredictionJob('projects/1054251275628/locations/us-central1/batchPredictionJobs/357873683173212160')
View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/357873683173212160?project=1054251275628


### Wait for the batch prediction job to complete

Depending on the number of input items that you submitted, a batch generation task can take some time to complete. You can use the following code to check the job status and wait for the job to complete.

In [19]:
# Refresh the job until complete
while not job_5k.has_ended:
    time.sleep(5)
    job_5k.refresh()

# Check if the job succeeds
if job_5k.has_succeeded:
    print("Job succeeded!")
    print(f"Time to complete job: {(job_5k.update_time - job_5k.create_time).total_seconds():.1f} seconds")
else:
    print(f"Job failed: {job.error}")

Job succeeded!
Time to complete job: 205.1 seconds


### Using Batch API with BigQuery 

In [3]:
# Dataset variable defined by user
DATASET = "batch_gemini_source_dataset"  # @param {type:"string"}
DATASET_ID = f"{PROJECT_ID}.{DATASET}"

#### Creating the BigQuery dataset and the table with the JSONL. 

The batch prediction job and your table must be in the same region (Location).

In [4]:
# Creating a dataset to host the test tables

# Construct a BigQuery client object.
client = bigquery.Client()

# Construct a full Dataset object to send to the API.
dataset = bigquery.Dataset(DATASET_ID)

# TODO(developer): Specify the geographic location where the dataset should reside.
dataset.location = LOCATION

# Send the dataset to the API for creation, with an explicit timeout.
# Raises google.api_core.exceptions.Conflict if the Dataset already
# exists within the project.
dataset = client.create_dataset(dataset, timeout=30)  # Make an API request.
print(f"Created dataset {PROJECT_ID}.{dataset.dataset_id}")

Created dataset multi-tenancy-dataproc.batch_gemini_source_dataset


In [14]:
# Create the sample dataset and upload to GCS
size = 100000
sample_dataset_df, file_name = create_sample_dataset(size=size)
!gcloud storage cp {file_name} {INPUT_BUCKET_URI}
INPUT_FILE_URI = f"{INPUT_BUCKET_URI}{file_name}"

JSONL file created: 100000_prompt_gemini_batch.jsonl
Copying file://100000_prompt_gemini_batch.jsonl to gs://pemelend_genai_demos/batch/input/100000_prompt_gemini_batch.jsonl
  Completed files 1/1 | 28.3MiB/28.3MiB                                        

Average throughput: 154.7MiB/s


In [15]:
# Create a table in BigQuery to store the data for batch processing.

# Set table_id to the ID of the table to create.
table_id = f"{DATASET_ID}.100K_prompt_table"

# Define the schema for the BigQuery table.
# The table will have two columns:
#   - `request`: JSON column to store the request payload for Gemini.
#   - `metadata`: STRING column to store any metadata associated with the request.
table_schema = [
    {
        "name": "request",
        "type": "JSON",
        "mode": "NULLABLE",
        "maxLength": "0",
        "precision": "0",
        "scale": "0"
    },
    {
        "name": "metadata",
        "type": "STRING",
        "mode": "NULLABLE",
        "maxLength": "0",
        "precision": "0",
        "scale": "0"
    }
]

# Initialize a BigQuery client.
client = bigquery.Client()

# Configure the load job.
job_config = bigquery.LoadJobConfig(
    schema=table_schema,  # Use the defined schema
    source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,  # Specify the input file format
)

# Create a load job to load data from the input file URI to the BigQuery table.
load_job = client.load_table_from_uri(
    INPUT_FILE_URI,  # URI of the input file in Cloud Storage
    table_id,  # ID of the table to create
    location="us-central1",  # Must match the batch job location
    job_config=job_config,  # Use the defined job configuration
)

# Make an API request and wait for the job to complete.
load_job.result()

# Get the table details.
destination_table = client.get_table(table_id)
print(f"Created table: {table_id} and loaded {destination_table.num_rows} rows.")

Created table: multi-tenancy-dataproc.batch_gemini_source_dataset.100K_prompt_table and loaded 100000 rows.


In [16]:
# Querying the table
bq_client = bigquery.Client(project=PROJECT_ID)

sql = f"""
        SELECT *
        FROM {table_id}
        ORDER BY metadata asc
        LIMIT 100
        """

query_result = bq_client.query(sql)

df = query_result.result().to_dataframe()
df.head()



Unnamed: 0,request,metadata
0,"{""contents"":[{""parts"":[{""text"":""Print the resu...",row_id_0
1,"{""contents"":[{""parts"":[{""text"":""Print the resu...",row_id_1
2,"{""contents"":[{""parts"":[{""text"":""Print the resu...",row_id_10
3,"{""contents"":[{""parts"":[{""text"":""Print the resu...",row_id_100
4,"{""contents"":[{""parts"":[{""text"":""Print the resu...",row_id_1000


#### Helper function to create smaller tables used for testing different batch sizes

In [35]:
def create_subset_table(source_table_id, destination_table_id, subset_size):
    """Creates a subset of a BigQuery table with a specified size.

    This function takes a source table, creates a new table with a subset of rows
    from the source table, and returns the URI of the new table.

    Args:
        source_table_id: The ID of the source BigQuery table.
        destination_table_id: The ID of the destination BigQuery table.
        subset_size: The number of rows to include in the subset table.

    Returns:
        The table ID and the URI of the created subset table in the format "bq://{table_id}".
    """
    client = bigquery.Client()

    # Configure the query job to write the results to the destination table.
    job_config = bigquery.QueryJobConfig(destination=destination_table_id)

    # Construct the SQL query to select a subset of rows from the source table.
    sql = f"""
        SELECT * FROM `{source_table_id}` LIMIT {subset_size}
    """

    # Start the query, passing in the configuration.
    query_job = client.query(sql, job_config=job_config)  # Make an API request.
    query_job.result()  # Wait for the job to complete.

    print(f"Subset table {destination_table_id} created")
    return destination_table_id

In [36]:
# Creating smaller tables
subset_10k_table_id = create_subset_table(table_id, f"{DATASET_ID}.10K_prompt", 10000)
subset_5k_table_id = create_subset_table(table_id, f"{DATASET_ID}.5K_prompt", 5000)


Subset table multi-tenancy-dataproc.batch_gemini_source_dataset.10K_prompt created
Subset table multi-tenancy-dataproc.batch_gemini_source_dataset.5K_prompt created


#### Submitting a Gemini batch job with 5000 prompts

In [37]:
output_table = f"{subset_5k_table_id}_gemini_output"
output_uri = f"bq://{output_table}"

job = BatchPredictionJob.submit(
    source_model=DEFAULT_MODEL_NAME, 
    input_dataset=subset_5k_uri, 
    output_uri_prefix=output_uri
)

BatchPredictionJob created. Resource name: projects/1054251275628/locations/us-central1/batchPredictionJobs/1715005283383640064
To use this BatchPredictionJob in another session:
job = batch_prediction.BatchPredictionJob('projects/1054251275628/locations/us-central1/batchPredictionJobs/1715005283383640064')
View Batch Prediction Job:
https://console.cloud.google.com/ai/platform/locations/us-central1/batch-predictions/1715005283383640064?project=1054251275628


In [38]:
bq_client = bigquery.Client(project=PROJECT_ID)

sql = f"""
        SELECT *
        FROM {output_table}
        where metadata > "row_id_10009"
        LIMIT 100
        """

query_result = bq_client.query(sql)

df = query_result.result().to_dataframe()
df.head()



Unnamed: 0,metadata,status,processed_time,request,response
0,row_id_595,,2024-12-05 00:08:15.991000+00:00,"{""contents"":[{""parts"":[{""text"":""Print the resu...","{""candidates"":[{""avgLogprobs"":-0.0210844195551..."
1,row_id_262,,2024-12-05 00:08:17.465000+00:00,"{""contents"":[{""parts"":[{""text"":""Print the resu...","{""candidates"":[{""avgLogprobs"":-0.0310279388685..."
2,row_id_8306,,2024-12-05 00:08:11.492000+00:00,"{""contents"":[{""parts"":[{""text"":""Print the resu...","{""candidates"":[{""avgLogprobs"":-0.0576387318697..."
3,row_id_675,,2024-12-05 00:07:58.707000+00:00,"{""contents"":[{""parts"":[{""text"":""Print the resu...","{""candidates"":[{""avgLogprobs"":-0.0204480872434..."
4,row_id_960,,2024-12-05 00:07:57.971000+00:00,"{""contents"":[{""parts"":[{""text"":""Print the resu...","{""candidates"":[{""avgLogprobs"":-0.0171888197169..."


In [40]:
# Print Gemini's response
print(json.loads(df.iloc[23].response)['candidates'][0]['content']['parts'][0]['text'])

Following the order of operations (PEMDAS/BODMAS), multiplication comes before addition.  Therefore:

1. **Multiplication:** 4 x 21 = 84

2. **Addition:** 1136 + 84 = 1220

Therefore, the result of the equation 1136 + 4 x 21 is $\boxed{1220}$.



## Summary

### Custom batch sending asyncronous calls to Online Gemini

* Pros:
    - Full flexibility and control of the data pipeline
    - No size limitation
    - It could be faster than batch API 
    - More predictable performance, specially with Provision Throughput
<br><br>

* Cons:
    - Higher cost 
    - Scripting can get complicated
    - Quota issues



### Using Gemini Batch API:
        
* Pros:
    - Simplified API. Easy to run job.
    - No custom scripting required
    - Lower cost 
    - Quotas abstraction
    - SQL interface with BigQuery (Easier to control dataset with SQL commands)

    
* Cons:
    - Harder to predict time to execute
    - Size limitations. Maximum of 1M prompts per batch is recommended
    - It might be slower than online


## Cleaning up

- Remove the BigQuery dataset
- Delete files from GCS or delete the bucket. 

**Caution: [Soft delete](https://cloud.google.com/storage/docs/soft-delete) is enabled on Cloud Storage buckets, which retain content in a deleted state, potentially resulting in significant storage costs. Disable soft delete to prevent unexpected charges.**


In [82]:
# Uncomment the line below to delete the BigQuery dataset
# !bq rm -r -f -d {DATASET_ID.replace('.',':')}