In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Getting Started - Template

| | |
|-|-|
|Author(s) | [Elia Secchi](https://github.com/eliasecchig) |

## Overview

This tutorial guides you through creating the first version of Generative AI chain, the core of a Gen AI application.

It covers:

1. Creating chains using different methods:
   - LangChain LCEL (LangChain Expression Language)
   - LangGraph
   - Custom Python code
2. Evaluating these chains
3. Next steps for deploying the chain in an application


### Costs

This tutorial uses billable components of Google Cloud:

- Vertex AI

Learn about [Vertex AI pricing](https://cloud.google.com/vertex-ai/pricing) and use the [Pricing Calculator](https://cloud.google.com/products/calculator/) to generate a cost estimate based on your projected usage.


## Getting Started

### Execute Jupyter notebooks using Poetry (Recommended)

Poetry is a tool for dependency management and packaging in Python. It helps you declare, manage, and install dependencies of Python projects.
As this notebook is part of a project that already uses Poetry, the usage of it ensures consistent dependency management across different environments.

1. Install dependencies:
   ```
   poetry install --with streamlit,jupyter
   ```

2. Run Jupyter:
   ```bash
   poetry run jupyter
   ```
   
3. Open this notebook in the Jupyter interface.

### Install Vertex AI SDK for Rapid Evaluation (Alternative)

In [None]:
%pip install --quiet --upgrade nest_asyncio
%pip install --upgrade --user --quiet langchain-core langchain-google-vertexai langchain langgraph
%pip install --upgrade --user --quiet "google-cloud-aiplatform[rapid_evaluation]"

### Restart runtime

To use the newly installed packages in this Jupyter runtime, you must restart the runtime. You can do this by running the cell below, which restarts the current kernel.

The restart might take a minute or longer. After it's restarted, continue to the next step.

In [None]:
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>
</div>


### Authenticate your notebook environment (Colab only)

If you're running this notebook on Google Colab, run the cell below to authenticate your environment.

In [None]:
# import sys

# if "google.colab" in sys.modules:
#     from google.colab import auth

#     auth.authenticate_user()

### Set Google Cloud project information and initialize Vertex AI SDK

To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).

Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment).

In [None]:
PROJECT_ID = "your-project-id"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}


import vertexai

vertexai.init(project=PROJECT_ID, location=LOCATION)

### Import libraries

In [None]:
import sys

sys.path.append("../")

In [None]:
import pandas as pd
import yaml
from json import JSONDecodeError
from typing import Any, Dict, Iterator, Literal

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnableConfig
from langchain_core.tools import tool
from langchain_google_community.vertex_rank import VertexAIRank
from langchain_google_vertexai import ChatVertexAI, VertexAI, VertexAIEmbeddings
from langgraph.graph import END, MessagesState, StateGraph
from langgraph.prebuilt import ToolNode
from vertexai.evaluation import CustomMetric, EvalTask

from app.eval.utils import batch_generate_messages, generate_multiturn_history
from app.patterns.custom_rag_qa.templates import query_rewrite_template, rag_template
from app.patterns.custom_rag_qa.vector_store import get_vector_store
from app.utils.output_types import OnChatModelStreamEvent, OnToolEndEvent, custom_chain

## Chain Definition

Chains are sequences of calls to LLMs, tools, or data preprocessing steps. They form the core of your Gen AI application.

### Input Interface

The chain must provide an `astream_events` method that accepts a dictionary with a "messages" key.
The "messages" value should be a list of alternating LangChain [HumanMessage](https://api.python.langchain.com/en/latest/messages/langchain_core.messages.human.HumanMessage.html) and [AIMessage](https://api.python.langchain.com/en/latest/messages/langchain_core.messages.ai.AIMessage.html) objects.

For example:

```python
{
    "messages": [
        HumanMessage("first"),
        AIMessage("a response"),
        HumanMessage("a follow up")
    ]
}
```

Alternatively you can use the shortened form:

```python
{
    "messages": [
        ("user", "first"),
        ("ai", "a response"),
        ("user", "a follow up")
    ]
}
```

### Output Interface

All chains use the [LangChain Astream Events (v2) API](https://python.langchain.com/v0.1/docs/expression_language/streaming/#using-stream-events). This API supports various use cases (simple chains, RAG, Agents). This API emits asynchronous events that can be used to stream the chain's output.

LangChain chains (LCEL, LangGraph) automatically implement the `astream_events` API. 

We provide examples of emitting `astream_events`-compatible events with custom Python code, allowing implementation with other SDKs (e.g., Vertex AI, LLamaIndex).

### Customizing I/O Interfaces

To modify the Input/Output interface, update `app/server.py` and related unit and integration tests.


## Events supported

The following list defines the events that are captured and supported by the Streamlit frontend.

In [None]:
SUPPORTED_EVENTS = [
    "on_tool_start",
    "on_tool_end",
    "on_retriever_start",
    "on_retriever_end",
    "on_chat_model_stream",
]

### Define the LLM
We set up the Large Language Model (LLM) for our conversational bot.


In [None]:
llm = ChatVertexAI(model_name="gemini-1.5-flash-001", temperature=0)

### Leveraging LangChain LCEL for Efficient Chain Composition

LangChain Expression Language (LCEL) provides a declarative approach to composing chains seamlessly. Key benefits include:

1. Rapid prototyping to production deployment without code changes
2. Scalability from simple "prompt + LLM" chains to complex, multi-step workflows
3. Enhanced readability and maintainability of chain logic

For comprehensive guidance on LCEL implementation, refer to the [official documentation](https://python.langchain.com/docs/expression_language/get_started).


In [None]:
template = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """You are a conversational bot that provides cooking suggestions to users""",
        ),
        MessagesPlaceholder(variable_name="messages"),
    ]
)

chain = template | llm

Let's test the chain with a dummy question:

In [None]:
input_message = {"messages": [("user", "Can you give me a Lasagne recipe?")]}

async for event in chain.astream_events(input=input_message, version="v2"):
    if event["event"] in SUPPORTED_EVENTS:
        print(event["data"])

This methodology is used for the chain defined in the [`app/chain.py`](../app/chain.py) file.


### Use LangGraph

LangGraph is a framework for building stateful, multi-actor applications with Large Language Models (LLMs). 
It extends the LangChain library, allowing you to coordinate multiple chains (or actors) across multiple steps of computation in a cyclic manner.

In [None]:
# 1. Define tools
@tool
def search(query: str):
    """Simulates a web search. Use it get information on weather"""
    if "sf" in query.lower() or "san francisco" in query.lower():
        return "It's 60 degrees and foggy."
    return "It's 90 degrees and sunny."


tools = [search]

# 2. Set up the language model
llm = llm.bind_tools(tools)


# 3. Define workflow components
def should_continue(state: MessagesState) -> Literal["tools", END]:
    """Determines whether to use tools or end the conversation."""
    last_message = state["messages"][-1]
    return "tools" if last_message.tool_calls else END


async def call_model(state: MessagesState, config: RunnableConfig):
    """Calls the language model and returns the response."""
    response = llm.invoke(state["messages"], config)
    return {"messages": response}


# 4. Create the workflow graph
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)
workflow.add_node("tools", ToolNode(tools))
workflow.set_entry_point("agent")

# 5. Define graph edges
workflow.add_conditional_edges("agent", should_continue)
workflow.add_edge("tools", "agent")

# 6. Compile the workflow
chain = workflow.compile()

Let's test the new chain with a dummy question:

In [None]:
input_message = {"messages": [("user", "What is the weather like in NY?")]}

async for event in chain.astream_events(input=input_message, version="v2"):
    if event["event"] in SUPPORTED_EVENTS:
        print(event["data"])

This methodology is used for the chain defined in the [`app/patterns/langgraph_dummy_agent/chain.py`](../app/patterns/langgraph_dummy_agent/chain.py) file.


### Use custom python code

You can also use pure python code to orchestrate the different steps of your chain and emit astream_events API compatible events. 

This offers full flexibility in how the different steps of a chain are orchestrated and allows you to include other SDK frameworks such as Vertex AI sdk or LlamaIndex.

We demonstrate this third methodology by implementing a RAG chain. The function `get_vector_store` provides a brute force Vector store (scikit-learn) initialized with chunks the [practictioners guide for MLOps](https://services.google.com/fh/files/misc/practitioners_guide_to_mlops_whitepaper.pdf)


In [None]:
embedding = VertexAIEmbeddings(model_name="text-embedding-004")


vector_store = get_vector_store(embedding=embedding)
retriever = vector_store.as_retriever(search_kwargs={"k": 20})
compressor = VertexAIRank(
    project_id=PROJECT_ID,
    location_id="global",
    ranking_config="default_ranking_config",
    title_field="id",
    top_n=5,
)

query_gen = query_rewrite_template | llm
response_chain = rag_template | llm


@custom_chain
def chain(
    input: Dict[str, Any], **kwargs
) -> Iterator[OnToolEndEvent | OnChatModelStreamEvent]:
    """
    Implements a RAG QA chain. Decorated with `custom_chain` to offer LangChain compatible astream_events
    and invoke interface and OpenTelemetry tracing.
    """
    # Generate optimized query
    query = query_gen.invoke(input).content

    # Retrieve and rank documents
    retrieved_docs = retriever.get_relevant_documents(query)
    ranked_docs = compressor.compress_documents(documents=retrieved_docs, query=query)

    # Yield tool results metadata
    yield OnToolEndEvent(data={"input": {"query": query}, "output": ranked_docs})

    # Stream LLM response
    for chunk in response_chain.stream(
        input={"messages": input["messages"], "relevant_documents": ranked_docs}
    ):
        yield OnChatModelStreamEvent(data={"chunk": chunk})

Have a look at the definition of the `@custom_chain` decorator to see how the provided function becomes compatible with the astream_events interface and add OpenTelemetry tracing
Let's test the custom chain we just created. 

This methodology is used for the chain defined in `app/patterns/custom_rag_qa/chain.py` file.


In [None]:
input_message = {"messages": [("user", "What is MLOps?")]}

async for event in chain.astream_events(input=input_message, version="v2"):
    if event["event"] in SUPPORTED_EVENTS:
        print(event["data"])

## Evaluation

Evaluation is the activity of assessing the quality of the model's outputs, ideally programmatically, to gauge its understanding and success in fulfilling the prompt's instructions.

In the context of Generative AI, evaluation extends beyond the evaluation of the model's outputs to include the evaluation of the chain's outputs and in some cases the evaluation of the intermediate steps (for example, the evaluation of the retriever's outputs).

Here is a diagram that illustrates the evaluation process for a chain:
# <img src="../images/chain_dev_cycle.png" alt="dev cycle evaluation" width="600">

To evaluate the chain's outputs, we'll utilize [Vertex AI Rapid Evaluation](https://cloud.google.com/vertex-ai/generative-ai/docs/models/rapid-evaluation) to assess our generative AI model's performance. This service within Vertex AI streamlines the evaluation process, integrates with [Vertex AI Experiments](https://cloud.google.com/vertex-ai/docs/experiments/intro-vertex-ai-experiments) for tracking, and offers a range of [pre-built metrics](https://cloud.google.com/vertex-ai/generative-ai/docs/models/determine-eval#task-and-metrics) and the capability to define custom ones.


For a comprehensive list of samples on Vertex AI Evaluation, visit the [official documentation](https://cloud.google.com/vertex-ai/generative-ai/docs/models/evaluation-examples)

Let's start by defining again a simple chain:

In [None]:
template = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """You are a conversational bot that provides cooking suggestions to users""",
        ),
        MessagesPlaceholder(variable_name="messages"),
    ]
)

chain = template | llm

We then import the ground truth data we will use for evaluation. Data is stored in [`app/eval/data/chats.yaml`](../app/eval/data/chats.yaml)


In [None]:
y = yaml.safe_load(open("app/eval/data/chats.yaml"))
df = pd.DataFrame(y)
df

We leverage the helper functions `generate_multiturn_history` and `batch_generate_messages` to prepare the data for evaluation and to generate the responses from the chain.

You can see below the documentation for the two functions.

In [None]:
help(generate_multiturn_history)

In [None]:
help(batch_generate_messages)

In [None]:
df = generate_multiturn_history(df)
df

In [None]:
scored_data = batch_generate_messages(df, chain)

We extract the user message and the reference (ground truth) message from dataframe so that we can use them for evaluation.

In [None]:
scored_data["user"] = scored_data["human_message"].apply(lambda x: x["content"])
scored_data["reference"] = scored_data["ai_message"].apply(lambda x: x["content"])
scored_data

#### Define a CustomMetric using Gemini model

Define a customized Gemini model-based metric function, with explanations for the score. The registered custom metrics are computed on the client side, without using online evaluation service APIs.

In [None]:
evaluator_llm = VertexAI(model_name="gemini-1.5-flash-001", temperature=0)


def custom_faithfulness(instance):
    prompt = f"""You are examining written text content. Here is the text:
************
Written content: {instance["response"]}
************
Original source data: {instance["reference"]}

Examine the text and determine whether the text is faithful or not.
Faithfulness refers to how accurately a generated summary reflects the essential information and key concepts present in the original source document.
A faithful summary stays true to the facts and meaning of the source text, without introducing distortions, hallucinations, or information that wasn't originally there.

Your response must be an explanation of your thinking along with single integer number on a scale of 0-5, 0
the least faithful and 5 being the most faithful.

Produce results in JSON

Expected format:

```json
{{
    "explanation": "< your explanation>",
    "custom_faithfulness": <your score>
}}
```
"""

    result = evaluator_llm.invoke(prompt)
    try:
        result = json.loads(result.replace("```", "").replace("json", ""))
    except JSONDecodeError:
        result = {"explanation": None, "custom_faithfulness": None}
    return result


# Register Custom Metric
custom_faithfulness_metric = CustomMetric(
    name="custom_faithfulness",
    metric_function=custom_faithfulness,
)

In [None]:
experiment_name = "rapid-eval-langchain-eval"  # @param {type:"string"}

We are now ready to run the evaluation. We will use different metrics, combining the custom metric we defined above with some pre-built metrics.

Results of the evaluation will be automatically tagged into the experiment_name we define.

You can click `View Experiment`, to see the experiment in Google Cloud Console.

In [None]:
metrics = ["fluency", "safety", custom_faithfulness_metric]

metrics = [custom_faithfulness_metric]
eval_task = EvalTask(
    dataset=scored_data,
    metrics=metrics,
    experiment=experiment_name,
    metric_column_mapping={"user": "prompt"},
)
eval_result = eval_task.evaluate()

Once an eval result is produced, we are able to display summary metrics:


In [None]:
eval_result.summary_metrics

We are also able to display a pandas dataframe containing a detailed summary of how our eval dataset performed and relative granular metrics.

In [None]:
eval_result.metrics_table

## Next Steps

Congratulations on completing the getting started tutorial! You've learned different methodologies to build a chain and how to evaluate it. Here's how to move forward:

### 1. Prepare for Production

Once you're satisfied with your chain's evaluation results:

1. Write your chain into the [`app/chain.py` file](../app/chain.py).
2. Remove the `patterns` folder and its associated tests (these are for demonstration only).

### 2. Local Testing

Test your chain using the Streamlit playground:

```bash
make playground
```

This launches af feature-rich playground, including chat curation, user feedback collection, multimodal input, and more!


### 3. Production Deployment

Once you are satisfied with the results, you can setup your CI/CD pipelines to deploy your chain to production.

Please refer to the [deployment guide](../deployment/README.md) for more information on how to do that.