#### Note
You can Skip this NoteBook and come back again after completing all steps of Travel Agent Project.
Gemini Streaming API is a Challenge Part that all students will attempt after completing this project with OpenAI SDK.

Follow along we we create the Streaming FastAPI microservice in Step 01.

In [30]:
from vertexai.preview.generative_models import (
    Content,
    FunctionDeclaration,
    GenerativeModel,
    Part,
    Tool,
)
from vertexai.generative_models._generative_models import GenerationResponse, ContentsType

from typing import Any
from typing import Union, Iterable

import time
import json

## Step 1 Create Function And Pydantic Modals To Control The Map

#### A. Pydantic Modals

In [31]:
# models layer
from pydantic import BaseModel, model_validator
from typing_extensions import Annotated

# Validator functions
def validate_latitude(v: float) -> float:
    assert -90 <= v <= 90, 'Invalid latitude'
    return v

def validate_longitude(v: float) -> float:
    assert -180 <= v <= 180, 'Invalid longitude'
    return v

# Annotated types
Latitude = Annotated[float, validate_latitude]
Longitude = Annotated[float, validate_longitude]

# Pydantic models
class MapState(BaseModel):
    latitude: Latitude
    longitude: Longitude
    zoom: float

class MarkersState(BaseModel):
    latitudes: list[Latitude]
    longitudes: list[Longitude]
    labels: list[str]

    @model_validator(mode='after')
    def validate_marker_length(self):
        if len(self.latitudes) != len(self.longitudes) or len(self.latitudes) != len(self.labels):
            raise ValueError(
                "Latitudes, longitudes, and labels must have the same number of elements")
        return self


#### B. Function To Get Updated Map Coordindates and an inital Map State

In [32]:
# data layer
from typing import Optional, Any
from pydantic import ValidationError

# Initial map state with class instances
ai_powered_map: dict[str, Any] = {
    "map_state": MapState(latitude=39.949610, longitude=75.150282, zoom=16).model_dump(),
    "markers_state": MarkersState(latitudes=[], longitudes=[], labels=[]).model_dump()
}
# Function to update map and markers
def update_map_and_markers(
    map_state: Optional[MapState] = None,
    markers_state: Optional[MarkersState] = None
) -> dict[str, Any]:

    response_format = {"status": "", "values": ai_powered_map}
    try:
        if map_state is not None:
            ai_powered_map["map_state"] = map_state.model_dump()

        if markers_state is not None:
            ai_powered_map["markers_state"] = markers_state.model_dump()

        response_format["status"] = "Map location and markers updated Now continue answering my last question"

    except ValidationError as e:
        response_format["status"] = f"Error update map: {e}, continue answering my last question"

    return response_format

## Step 2. Building Gemini Pro Streaming Travel AI Service

In [33]:
## Seed Prompt
BASE_PROMPT: str = """You are an AI Travel Assistant who make global travellers traval planning fun and interactive:

Before replying perform the following steps:

1. If user share any travel location name, update the map to go to that place and Add markers on the place.
2. if user shared any travel suggestions update them map.

If user sends any general message share with them you are a helpful AI Travel Assistant and you can help them with travel planning.
"""

#### A. Create OpenAI Schema for the Gemini Function Calling

In [34]:
map_ai_controller_schema = FunctionDeclaration(
    name="update_map_and_markers",
    description="Update map to center on a particular location and add list of markers to the map",
    parameters={
        "type": "object",
        "properties": {
            "longitude": {
                "type": "number",
                "description": "Longitude of the location to center the map on"
            },
            "latitude": {
                "type": "number",
                "description": "Latitude of the location to center the map on"
            },
            "zoom": {
                "type": "integer",
                "description": "Zoom level of the map"
            },
            "longitudes": {
                "type": "array",
                "items": {
                    "type": "number"
                },
                "description": "List of longitudes for each marker"
            },
            "latitudes": {
                "type": "array",
                "items": {
                    "type": "number"
                },
                "description": "List of latitudes for each marker"
            },
            "labels": {
                "type": "array",
                "items": {
                    "type": "string"
                },
                "description": "List of labels for each marker"
            }
        },
        "required": ["longitude", "latitude", "zoom", "longitudes", "latitudes", "labels"]
    }
)

In [35]:
map_ai_tool = Tool(
    function_declarations=[map_ai_controller_schema],
)

In [36]:
available_functions = {
    "update_map_and_markers": update_map_and_markers,
}

In [37]:
# Load Gemini Pro History
user_message: Content = Content(role="user", parts=[Part.from_text(BASE_PROMPT)])
system_response: Content = Content(role="model", parts=[Part.from_text("Got It")])

seed_chat_history = [user_message, system_response]

In [38]:
class GeminiTravelBotModal():
    def __init__(self, initial_history: list[Content]=seed_chat_history):
        self.client: GenerativeModel = GenerativeModel("gemini-pro", generation_config={"temperature": 0.4}, tools=[map_ai_tool])
        self.assistant = self.client.start_chat(history = initial_history)
        self.map_control_values = ai_powered_map

    def get_history(self):
        return self.assistant.history
    
    def get_map_control_values(self):
        return self.map_control_values

    def run_assistant(self, prompt: str):

        if self.assistant is None:
            raise ValueError(
                """Assistant is not set. Cannot run assistant without an assistant.""")

        run_res: Union["GenerationResponse", Iterable["GenerationResponse"]] = self.assistant.send_message(prompt, stream=True)
        
        for message in run_res:
            for part in message.candidates[0].content.parts:
                try:
                    text_content = part.text
                    if text_content is not None:
                        print("Got Text")
                        yield text_content
                        continue  # Skip to next message part
                except ValueError:
                    # Handle cases where 'text' property is present but no actual text content is available
                    print("ValueError Text")
                    pass

                if hasattr(part, 'function_call') and part.function_call is not None:
                    print("Got Function Response")
                    try:
                        function_call = part.function_call  

                        # Check if the function name exists in the available_functions dictionary
                        if function_call.name in available_functions:
                            function_call_name = available_functions[function_call.name]
                            print("Function Call Name:", function_call.name)
                        else:
                            print(f"Function name '{function_call}' not recognized.")
                            yield f"DM Dev. Gemini is Sleeping!"

                        # Access 'args'
                        if hasattr(function_call, 'args'):
                            args = function_call.args

                            # Initialize variables with default values
                            zoom = 10
                            longitude = 0
                            latitude = 0
                            longitudes, latitudes, labels = [], [], []

                            # Iterate over each item in 'args'
                            for key, value in args.items():
                                # print("Key:", key, "Value:", value)

                                if key == "zoom":
                                    try:
                                        zoom = value
                                    except Exception as e:
                                        print(f"Error with 'zoom': {e}")
                                        pass

                                if key == "longitude":
                                    try:
                                        longitude = value
                                    except Exception as e:
                                        print(f"Error with 'longitude': {e}")
                                        pass

                                if key == "latitude":
                                    try:
                                        latitude = value
                                    except Exception as e:
                                        print(f"Error with 'latitude': {e}")
                                        pass

                                if key == "longitudes":
                                    try:
                                        longitudes = [v for v in value]
                                    except Exception as e:
                                        print(f"Error with 'longitudes': {e}")
                                        pass

                                if key == "latitudes":
                                    try:
                                        latitudes = [v for v in value]
                                    except Exception as e:
                                        print(f"Error with 'latitudes': {e}")
                                        pass

                                if key == "labels":
                                    try:
                                        labels = [v for v in value]
                                    except Exception as e:
                                        print(f"Error with 'labels': {e}")
                                        pass


                            # Create instances of MapState and MarkersState
                            construct_map_state = MapState(latitude=latitude, longitude=longitude, zoom=zoom)
                            construct_markers_state = MarkersState(latitudes=latitudes, longitudes=longitudes, labels=labels)

                            # Update the map and markers
                            update_map_res = function_call_name(map_state=construct_map_state, markers_state=construct_markers_state)

                            self.map_control_values = update_map_res['values']

                            time.sleep(0.05)

                            # /* This is the official Way to Pass Function Response 
                            # It works in Normal Testing But Here it often fails with grpc 404 error
                            # As Workaround here I have constructed a Manual Function Res and Passed it */

                            # func_call_gemini_response = self.assistant.send_message(
                            #    Part.from_function_response(
                            #     name=function_call.name,
                            #     response={
                            #         "content": json.dumps(update_map_res),
                            #     }
                            # ),
                            #     stream=True
                            # )

                            list_content: ContentsType = [prompt, json.dumps(update_map_res['status']), f"Now continue from last message and helop plan travel in text formt - Funcation Calling completed. "]
                            
                            func_call_gemini_response = self.assistant.send_message(
                                list_content,
                                stream=True
                            )

                            for message in func_call_gemini_response:
                                part = message.candidates[0].content.parts[0]

                                try:
                                    text_content = part.text if hasattr(part, 'text') else None
                                    if text_content:
                                        print("Got Func Calling Text:", text_content)
                                        yield text_content
                                except ValueError:
                                    print(
                                        "No text content available in part:", part)
                                    yield "Gemini is Sleeping!"
                                    pass
                    except ValueError as e:
                        print(f"Error processing function call: {e}")
                        yield "Funcation Calling Failed! Gemini is Sleeping!"
                        pass
                else:
                    yield "Got Nothing"
                    
        # Signal the end of the stream
        yield "__END__"

In [39]:
ai_travel_maanger: GeminiTravelBotModal = GeminiTravelBotModal()


def call_gemini_travel_assistant(prompt: str):
    # Wait for the assistant to complete
    messages = ai_travel_maanger.run_assistant(prompt)

    # Collect all responses from the generator and yield them
    for message in messages:
        yield message
        if message == "__END__":
            break  # Exit the loop when the end of the stream is indicated

#### Testing The Gemini Travel Bot Class Class

In [40]:
for message in call_gemini_travel_assistant("Let's visit Tokyo Japan and place 1 day travel trip there?"):
    # No need to check for __END__ here, as the loop in call_gemini_travel_assistant already handles it
    print(message)

ValueError Text
Got Function Response
Function Call Name: update_map_and_markers
Got Func Calling Text: Sure, I can help you plan a one-day trip to Tokyo, Japan
Sure, I can help you plan a one-day trip to Tokyo, Japan
Got Func Calling Text: . Here's a possible itinerary:

* **Morning:**
    *
. Here's a possible itinerary:

* **Morning:**
    *
Got Func Calling Text:  Start your day by visiting the iconic Tokyo Tower. Take the elevator to the top for stunning views of the city.
    * After that, head to the
 Start your day by visiting the iconic Tokyo Tower. Take the elevator to the top for stunning views of the city.
    * After that, head to the
Got Func Calling Text:  nearby Sensō-ji temple, Tokyo's oldest Buddhist temple. Explore the temple grounds and admire the beautiful architecture.
    * For lunch, try some of Tokyo's famous sushi at a local restaurant.
* **Afternoon:**
    * In the afternoon, visit the Harajuku district, known for its vibrant street fashion and pop culture.
 

In [41]:
ai_travel_maanger.get_map_control_values()

{'map_state': {'latitude': 35.6804, 'longitude': 139.766084, 'zoom': 12.0},
 'markers_state': {'latitudes': [35.6804],
  'longitudes': [139.766084],
  'labels': ['Tokyo']}}

In [42]:
for message in call_gemini_travel_assistant("What was my last message"):
    print(message)
    if message == "__END__":
        break

Got Text
Your last message was:

> Let's visit Tokyo Japan and place 
Got Text
1 day travel trip there?
__END__


## Create AI Powered Map Interactions using Plotly

In [43]:
import plotly.graph_objects as go
from typing import Union

# Function to create and display the map
def create_map(map_state: dict[str, Union[float, str]], markers_state: dict[str, list[Union[float, str]]]) -> None:
    
    figure = go.Figure(go.Scattermapbox(mode="markers"))
    
    figure.add_trace(
        go.Scattermapbox(
            mode="markers",
            marker=dict(color='red', size=14),
            lat=markers_state["latitudes"],
            lon=markers_state["longitudes"],
            text=markers_state["labels"]
        )
    )
    
    figure.update_layout(
        mapbox=dict(
            style="open-street-map",
            # accesstoken=MAPBOX_ACCESS_TOKEN, # If you don't have MAPBOX token, replace with style="open-street-map".
            center=go.layout.mapbox.Center(
                lat=map_state["latitude"],
                lon=map_state["longitude"]
            ),
            zoom=map_state["zoom"]
        ),
        margin=dict(l=0, r=0, t=0, b=0)
    )
    figure.show()

In [44]:
create_map(map_state=ai_powered_map['map_state'], markers_state=ai_powered_map['markers_state'])


Congrats on building the Streaming Travel AI Assistant Proof of concept proof of concept. Currently the concept is well suited for single user.

Now we have the complete code and we can move on designing the FastAPI Microservice. We will use the above plotly map to create AI powered map interactions with streamlit.

Does the Map looks outdated? Go to MapBox, signup and get your access token.

Next in the create_map function you can pass this token comment the map-style argument,

    figure.update_layout(
        mapbox=dict(
            accesstoken=MAPBOX_ACCESS_TOKEN, # If you don't have MAPBOX token, replace with style="open-street-map".
            center=go.layout.mapbox.Center(
                lat=map_state["latitude"],
                lon=map_state["longitude"]
            ),
            zoom=map_state["zoom"]
        ),
        margin=dict(l=0, r=0, t=0, b=0)
    )