<a href="https://colab.research.google.com/github/mmistroni/Magentic-AlgoTrading101/blob/main/gemini_runinference_example.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Using Gemini on Vertex AI with Apache Beam's RunInference

This notebook demonstrates how to build a scalable data processing pipeline that uses a Gemini large language model for inference. We will use **Apache Beam** to create the pipeline and its `RunInference` transform to efficiently call the **Gemini API on Vertex AI**.

### Key Components:
1.  **`GeminiModelHandler`**: A built-in Beam model handler that connects to the Vertex AI Gemini API.
2.  **`RunInference`**: A generic Beam transform that manages batching, parallelism, and execution of model inferences.
3.  **`DirectRunner`**: We will use Beam's local runner for this example, but the same code can be executed on a distributed runner like `DataflowRunner` for massive scale.

## 1. Setup and Installation

First, we need to install the required Python libraries. `apache-beam[gcp]` includes the necessary components for interacting with Google Cloud services, and `google-genai` provides the Gemini API.

In [1]:
!pip install apache-beam[gcp]==2.66 -U

Collecting google-genai<2.0.0,>=1.37.0 (from google-cloud-aiplatform<2.0,>=1.26.0->apache-beam[gcp]==2.66)
  Downloading google_genai-1.42.0-py3-none-any.whl.metadata (45 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.3/45.3 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
Downloading google_genai-1.42.0-py3-none-any.whl (236 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m236.2/236.2 kB[0m [31m19.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: google-genai
  Attempting uninstall: google-genai
    Found existing installation: google-genai 1.21.1
    Uninstalling google-genai-1.21.1:
      Successfully uninstalled google-genai-1.21.1
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
google-adk 1.15.1 requires google-genai!=1.37.0,!=1.38.0,!=1.39.0,<=1.40.0,>=1.21.1, but you have google-genai 1.42.0

In [3]:
!pip install google-genai==1.21.1 -U



### Authentication

This notebook assumes you are running in an environment where you have already authenticated with Google Cloud. The simplest way to do this for local development is to use the `gcloud` CLI:

```bash
gcloud auth application-default login
```

This command makes your user credentials available to libraries like the Vertex AI SDK.

In [5]:
from google.colab import auth

auth.authenticate_user()

## 2. Import Libraries

Next, we import the necessary classes and functions from Apache Beam and other libraries.

In [7]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Import the core components for ML inference in Beam.
from apache_beam.ml.inference.base import RunInference, PredictionResult

# Import the specific model handler for Gemini on Vertex AI.
from apache_beam.ml.inference.gemini_inference import GeminiModelHandler, generate_from_string

# Helper for iterating over collections.
from collections.abc import Iterable

# Python Package Version
from packaging.version import Version

## 3. Configuration

Set your Google Cloud project details and the model you wish to use.

In [9]:
# --- UPDATE THESE VALUES --- #
PROJECT_ID = "your-gcp-project-id"  # ⬅️ Replace with your Google Cloud project ID
LOCATION = "us-central1"          # ⬅️ Replace with your desired GCP region

# You can choose from available Gemini models in Vertex AI.
# For a full list, see: https://cloud.google.com/vertex-ai/docs/generative-ai/learn/models
MODEL_NAME = "gemini-2.5-flash"

# --- Pipeline Configuration ---
# Number of threads to use for the local DirectRunner.
NUM_WORKERS = 1

# Get the Installed Version of genai Python package

#genai_version = Version(genai.__version__)

## 4. Define Pipeline Components

### Post-Processor DoFn

The `RunInference` transform returns a `PredictionResult` object, which contains both the original input (`example`) and the full, complex response from the API (`inference`). We create a simple Beam `DoFn` (a parallel processing function) to parse this object and extract the clean text output.

In [10]:
class PostProcessor(beam.DoFn):
  """Parses the PredictionResult to extract a human-readable string."""
  def process(self, element: PredictionResult) -> Iterable[str]:
    """
    Extracts the generated text from the Gemini API response.

    The inference result from GeminiModelHandler is a tuple containing:
    ('sdk_http_response', [<google.cloud.aiplatform_v1.types.GenerateContentResponse>])

    We navigate this structure to get the final text.
    """
    # The original input prompt is stored in `element.example`
    input_prompt = element.example

    # The API response is in `element.inference`
    # Path to text: response -> candidates -> content -> parts -> text
    gemini_response = element.inference[1][0]

    # Only supported for genai package 1.21.1 or earlier
    output_text = gemini_response.content.parts[0].text

    # Yield a formatted string for printing
    yield f"Input:\n{input_prompt}\n\nOutput:\n{output_text.strip()}\n"

### Input Data

For this example, we'll use a simple Python list of prompts. In a real-world application, you would replace `beam.Create` with an I/O transform to read from a source like a text file, a database, or a message queue.

In [11]:
prompts = [
    "What is 1+2?",
    "How is the weather in NYC in July?",
    "Write a short, 3-line poem about a robot learning to paint."
]

## 5. Define and Run the Beam Pipeline

Now we define the main function that constructs and runs the Apache Beam pipeline.

In [12]:
def run_pipeline(prompts, model_name, num_workers):
    """Constructs and runs the Beam pipeline for Gemini inference."""
    from google.colab import userdata
    # 1. Define the Model Handler
    # This object knows how to communicate with the Vertex AI Gemini API.
    # `generate_from_string` is a helper that formats a simple string prompt
    # into the required API request format.
    model_handler = GeminiModelHandler(
      model_name=model_name,
      request_fn=generate_from_string,
      #project=PROJECT_ID,
      #location=LOCATION
      api_key=userdata.get('GOOGLE_API_KEY')
    )

    # 2. Set Pipeline Options
    # For local execution, we use the DirectRunner.
    # `direct_num_workers` controls the number of parallel threads.
    pipeline_options = PipelineOptions(
        direct_num_workers=num_workers
    )

    # 3. Construct the Pipeline
    with beam.Pipeline(options=pipeline_options) as pipeline:
        # Create a PCollection from our list of prompts.
        read_prompts = pipeline | "GetPrompts" >> beam.Create(prompts)

        # The core of our pipeline: apply the RunInference transform.
        # Beam will handle batching and parallel API calls.
        predictions = read_prompts | "RunInference" >> RunInference(model_handler)

        # Parse the results to get clean text.
        processed = predictions | "PostProcess" >> beam.ParDo(PostProcessor())

        # Print the final, formatted output to the console.
        # This is a simple "sink" for demonstration purposes.
        _ = processed | "PrintOutput" >> beam.Map(print)

    print("\n--- Pipeline finished ---")

## 6. Execute the Pipeline

Finally, let's call our function to run the pipeline. The output will be printed directly below this cell.

In [13]:
run_pipeline(prompts, MODEL_NAME, NUM_WORKERS)





Input:
What is 1+2?

Output:
1 + 2 = 3

Input:
How is the weather in NYC in July?

Output:
NYC in July is the quintessential East Coast summer: **hot, humid, and often sunny, with a chance of afternoon thunderstorms.**

Here's a breakdown:

*   **Temperature:**
    *   **Average Daytime:** Expect average temperatures in the **high 70s to low 80s Fahrenheit (25-29°C)**.
    *   **Highs:** Many days will reach into the **mid-to-high 80s (30-32°C)**.
    *   **Heatwaves:** It's common for NYC to experience several **heatwaves** in July, pushing temperatures well into the **90s (32-37°C)**, sometimes even touching 100°F (38°C).
    *   **Nights:** Nights offer little relief, often staying in the **high 60s or low 70s (20-23°C)**, making it feel warm and sticky even after dark.

*   **Humidity:** This is the defining characteristic of July. The air often feels **thick, heavy, and sticky**. High humidity makes the heat feel even more intense than the mercury alone suggests.

*   **Sunshine:*

### Expected Output

The output should look similar to the following (the exact wording from Gemini may vary slightly):

```
Input:
What is 1+2?

Output:
3

Input:
How is the weather in NYC in July?

Output:
The weather in New York City in July is typically hot and humid. Average high temperatures are around 84°F (29°C), and average low temperatures are around 69°F (21°C). It is also one of the rainiest months of the year, so you can expect some thunderstorms.

Input:
Write a short, 3-line poem about a robot learning to paint.

Output:
Steel fingers grip a brush so light,
A canvas blooms with colors bright,
A circuit hums, a new delight.

--- Pipeline finished ---
```