In [None]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# **LLMOps with Langfuse on Google Cloud**

---

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/GoogleCloudPlatform/python-docs-samples/blob/main/cloud-sql/postgres/pgvector/notebooks/pgvector_gen_ai_demo.ipynb)

## What is Langfuse?

Open Source LLM Engineering Platform
Traces, evals, prompt management and metrics to debug and improve your LLM application.

References:
- https://cloud.google.com/blog/products/databases/vectorstore-in-the-cloud-sql-for-postgresql-langchain-package


## Overview of the steps

1. Prepare Cloud Project
  - enable required services
  - create Cloud SQL
  - create service account for cloud run and add permission
  - add Secrets on Secret Manager
2. Deploy Langfuse on Cloud Run
3. Langfuse example

## Before you begin

>⚠️ **Running this codelab will incur Google Cloud charges. You may also be billed for Vertex AI API usages.**

Pre-requisities:
- You need to have an active Google Cloud account to successfully complete this tutorial.
-  This sample notebook must be connected to a **Google Cloud project**, but nothing else is needed other than your Google Cloud project.
- You can use an existing project. Alternatively, you can create a new Cloud project [with free trial cloud credits.](https://cloud.google.com/free/docs/gcp-free-tier)
- You can use an existing Cloud SQL PostgreSQL instance for this tutorial. If an existing instance is not found, this tutorial will automatically create one for you.
- Note that this notebook connects to the Cloud SQL instance via public IP using the [Cloud SQL Python connector](https://cloud.google.com/blog/topics/developers-practitioners/how-connect-cloud-sql-using-python-easy-way). Therefore, your Cloud SQL instance should have a public IP assigned to it.
- At the end of the tutorial, you can optionally clean-up these resources to avoid further charges.


### Using this interactive notebook

Click the **run** icon on the top left corner ▶️  of each cell within this notebook.

> 💡 Alternatively, you can run the currently selected cell with `Ctrl + Enter` (or `⌘ + Enter` on a Mac).

> ⚠️ **To avoid any errors**, wait for each cell to finish in their order before clicking the next “run” icon.

## Setup

### Install required packages

>⚠️ You may receive a warning to "Restart Runtime" after the packages are installed. Don't worry, the subsequent cells will help you restart the runtime.

In [None]:
# Install dependencies.
!pip install --quiet --upgrade asyncio==3.4.3 asyncpg==0.27.0 cloud-sql-python-connector["asyncpg"]==1.2.3 numpy pandas pgvector langchain>=0.1 langchain-google-genai langchain_google_vertexai==1.0.1 --upgrade google-cloud-aiplatform==1.53.0 google-cloud-secret-manager google-cloud-resource-manager langfuse

In [None]:
# Automatically restart kernel after installs so that your environment
# can access the new packages.
import IPython

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

### Setup Google Cloud environment

>⚠️ Please fill in your **Google Cloud project ID** and a new **password** for your Cloud SQL PostgreSQL database.

In [None]:
# @markdown Replace the required placeholder text below. You can modify any other default values, if you like.

# Please fill in these values.
project_id = "<YOUR_PROJECT_ID>"  # @param {type:"string"}
database_password = "<YOUR_NEW_DB_PASSWORD>"  # @param {type:"string"}
region = "us-central1"  # @param {type:"string"}
instance_name = "langfuse"  # @param {type:"string"}
database_name = "langfuse"  # @param {type:"string"}

# Quick input validations.
assert project_id, "⚠️ Please provide a Google Cloud project ID"
assert region, "⚠️ Please provide a Google Cloud region"
assert instance_name, "⚠️ Please provide the name of your instance"
assert database_password, "⚠️ Please provide a database password"
assert database_name, "⚠️ Please provide a database name"

In [None]:
#@markdown ###Authenticate your Google Cloud Account and enable APIs.
# Authenticate gcloud.
from google.colab import auth
auth.authenticate_user()

import os
os.environ['project_id'] = project_id

# Configure gcloud.
!gcloud config set project {project_id}

from google.cloud import resourcemanager_v3

client = resourcemanager_v3.ProjectsClient()

project = client.get_project(name=f'projects/{project_id}')
project_number = project.name.split('/')[-1]

print(f"Project number for {project_id}: {project_number}")

# Grant Cloud SQL Client role to authenticated user
current_user = !gcloud auth list --filter=status:ACTIVE --format="value(account)"

!gcloud projects add-iam-policy-binding --no-user-output-enabled {project_id} \
  --member=user:{current_user[0]} \
  --role="roles/cloudsql.client"

# Enable Cloud SQL Admin API
!gcloud services enable sqladmin.googleapis.com
!gcloud services enable aiplatform.googleapis.com
!gcloud services enable run.googleapis.com
!gcloud services enable cloudresourcemanager.googleapis.com

In [None]:
# Create the service account for Cloud Run
!gcloud iam service-accounts create cloud-run-sa \
    --display-name "Cloud Run Service Account" \
    --project {project_id}

# Add IAM permission
!gcloud projects add-iam-policy-binding --no-user-output-enabled {project_id} \
    --member serviceAccount:cloud-run-sa@{project_id}.iam.gserviceaccount.com \
    --role roles/storage.objectViewer
!gcloud projects add-iam-policy-binding --no-user-output-enabled {project_id} \
    --member serviceAccount:cloud-run-sa@{project_id}.iam.gserviceaccount.com \
    --role roles/secretmanager.secretAccessor

!gcloud projects add-iam-policy-binding --no-user-output-enabled {project_id} \
    --member serviceAccount:cloud-run-sa@{project_id}.iam.gserviceaccount.com \
    --role roles/cloudsql.client

### Setup Cloud SQL instance and PostgreSQL database

In [None]:
#@markdown Create and setup a Cloud SQL PostgreSQL instance, if not done already.
database_version = !gcloud sql instances describe {instance_name} --format="value(databaseVersion)"
if database_version[0].startswith("POSTGRES"):
  print("Found an existing Postgres Cloud SQL Instance!")
else:
  print("Creating new Cloud SQL instance...")
  !gcloud sql instances create {instance_name} --database-version=POSTGRES_15 \
    --region={region} --cpu=1 --memory=4GB --root-password={database_password}

# Create the database, if it does not exist.
out = !gcloud sql databases list --instance={instance_name} --filter="NAME:{database_name}" --format="value(NAME)"
if ''.join(out) == database_name:
  print("Database %s already exists, skipping creation." % database_name)
else:
  !gcloud sql databases create {database_name} --instance={instance_name}

In [None]:
# @markdown Verify that you are able to connect to the database. Executing this block should print the current PostgreSQL server version.

import asyncio
import asyncpg
from google.cloud.sql.connector import Connector


async def main():
    # get current running event loop to be used with Connector
    loop = asyncio.get_running_loop()
    # initialize Connector object as async context manager
    async with Connector(loop=loop) as connector:
        # create connection to Cloud SQL database
        conn: asyncpg.Connection = await connector.connect_async(
            f"{project_id}:{region}:{instance_name}",  # Cloud SQL instance connection name
            "asyncpg",
            user="postgres",
            password=f"{database_password}",
            db=f"{database_name}"
            # ... additional database driver args
        )

        # query Cloud SQL database
        results = await conn.fetch("SELECT version()")
        print(results[0]["version"])

        # close asyncpg connection
        await conn.close()


# Test connection with `asyncio`
await main()  # type: ignore

## Deploy Langfuse

### Prepare Google OAuth2

Steps:
- Open Google Cloud Console
- Go to -> API and Services -> Credentials
- Create Credentials with type "OAuth Client ID"

In [None]:
# @markdown Replace the required placeholder text below. You can modify any other default values, if you like.

# Please fill in these values.
PROJECT_ID = "<YOUR_PROJECT_ID>"  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}
SERVICE_NAME = "langfuse"  # @param {type:"string"}
AUTH_GOOGLE_CLIENT_ID = "<YOUR_CLIENT_ID>" # @param {type:"string"}
AUTH_GOOGLE_CLIENT_SECRET = "<YOUR_CLIENT_SECRET>" # @param {type:"string"}
AUTH_GOOGLE_ALLOW_ACCOUNT_LINKING = "true" # @param {type:"string"}
AUTH_GOOGLE_ALLOWED_DOMAINS = "nuttee.altostrat.com" # @param {type:"string"}

# Quick input validations.
assert PROJECT_ID, "⚠️ Please provide a Google Cloud project ID"
assert LOCATION, "⚠️ Please provide a Google Cloud region"
assert SERVICE_NAME, "⚠️ Please provide the name of your Cloud Run instance"
assert AUTH_GOOGLE_CLIENT_ID, "⚠️ Please provide a Google OAuth Client ID"
assert AUTH_GOOGLE_CLIENT_SECRET, "⚠️ Please provide a Google OAuth Client Secret"
assert AUTH_GOOGLE_ALLOW_ACCOUNT_LINKING, "⚠️ Please provide a boolean for allow account linking"
assert AUTH_GOOGLE_ALLOWED_DOMAINS, "⚠️ Please provide an Allowed Domain for Langfuse Login"

In [None]:
import os
import secrets
os.environ['DB_PASSWORD'] = database_password
os.environ['PROJECT_ID'] = PROJECT_ID
os.environ['LOCATION'] = LOCATION
os.environ['SERVICE_NAME'] = SERVICE_NAME
os.environ['AUTH_GOOGLE_CLIENT_ID'] = AUTH_GOOGLE_CLIENT_ID
os.environ['AUTH_GOOGLE_CLIENT_SECRET'] = AUTH_GOOGLE_CLIENT_SECRET
os.environ['AUTH_GOOGLE_ALLOW_ACCOUNT_LINKING'] = AUTH_GOOGLE_ALLOW_ACCOUNT_LINKING
os.environ['AUTH_GOOGLE_ALLOWED_DOMAINS'] = AUTH_GOOGLE_ALLOWED_DOMAINS
os.environ['SALT'] = secrets.token_urlsafe(32)
os.environ['NEXTAUTH_URL'] = "<YOUR_LANGFUSE_CLOUDRUN_URL>" #"https://langfuse.nuttee.info"
os.environ['NEXTAUTH_SECRET'] = secrets.token_urlsafe(32)
os.environ['DATABASE_URL'] = f"postgresql://postgres:{database_password}@localhost/{database_name}/?host=/cloudsql/{PROJECT_ID}:{LOCATION}:{instance_name}&sslmode=none&pgbouncer=true"
os.environ['DIRECT_URL'] = f"postgresql://postgres:{database_password}@localhost/{database_name}/?host=/cloudsql/{PROJECT_ID}:{LOCATION}:{instance_name}&sslmode=none"
# Connection <google-cloud-project-id>:<region-id>:<sql-instance-id>
os.environ['CLOUD_SQL_CONNECTION_NAME'] = f"{PROJECT_ID}:{LOCATION}:{instance_name}"

In [None]:
# Create secrets in Secret Manager

from google.cloud import secretmanager
from google.api_core.exceptions import AlreadyExists

# Create a client for interacting with Secret Manager API
client = secretmanager.SecretManagerServiceClient()

# Define the secret ID and secret data
secret_list = [
    'DB_PASSWORD',
    'DATABASE_URL',
    'DIRECT_URL',
    'AUTH_GOOGLE_CLIENT_SECRET',
    'NEXTAUTH_SECRET',
    'SALT'
    ]

# Create the secret if it doesn't exist
for secret_id in secret_list:
    parent = f"projects/{PROJECT_ID}"
    try:
        secret = client.create_secret(
            request={
                "parent": parent,
                "secret_id": secret_id,
                "secret": {"replication": {"automatic": {}}},
            }
        )
    except AlreadyExists:
        print(f"Secret {secret_id} already exists.")
        print(f"Update secret to new version...")
        secret_name = f"projects/{project_id}/secrets/{secret_id}"
        version = client.add_secret_version(
            request={
                "parent": secret_name,
                "payload": {
                    "data": os.environ[secret_id].encode('utf-8')
                    }
                }
        )
        #print(version)
        continue

    version = client.add_secret_version(
        request={
            "parent": secret.name,
            "payload": {
                "data": os.environ[secret_id].encode('utf-8')
                }
            }
    )
    print(f"Secret version {version.name} created.")

Deploy Langfuse to Cloud Run

In [None]:
!gcloud run deploy ${SERVICE_NAME} --image="docker.io/langfuse/langfuse:2" \
  --region=${LOCATION} \
  --platform=managed \
  --cpu=2 \
  --memory=8Gi \
  --concurrency=80 \
  --timeout=1200 \
  --min-instances=0 \
  --max-instances=10 \
  --port=3000 \
  --update-env-vars "AUTH_GOOGLE_CLIENT_ID=${AUTH_GOOGLE_CLIENT_ID},NEXTAUTH_URL=${NEXTAUTH_URL},AUTH_GOOGLE_ALLOW_ACCOUNT_LINKING=${AUTH_GOOGLE_ALLOW_ACCOUNT_LINKING},AUTH_GOOGLE_ALLOWED_DOMAINS=${AUTH_GOOGLE_ALLOWED_DOMAINS}" \
  --update-secrets "SALT=SALT:latest,DATABASE_URL=DATABASE_URL:latest,DIRECT_URL=DIRECT_URL:latest,AUTH_GOOGLE_CLIENT_SECRET=AUTH_GOOGLE_CLIENT_SECRET:latest,NEXTAUTH_SECRET=NEXTAUTH_SECRET:latest" \
  --session-affinity \
  --allow-unauthenticated \
  --execution-environment=gen2 \
  --service-account=cloud-run-sa@${PROJECT_ID}.iam.gserviceaccount.com \
  --set-cloudsql-instances=${CLOUD_SQL_CONNECTION_NAME}

Update NEXTAUTH_URL and Google OAuth "Authorized JavaScript origins" & "Authorized redirect URIs" to Cloud Run service URL

ex. https://langfuse-ghgzraehsq-uc.a.run.app

In [None]:
NEXTAUTH_URL="https://<YOUR_CLOUDRUN_URL>"

## Qucikstart: Langchain integration with Langfuse

https://langfuse.com/docs/get-started

### Create a new project in langfuse

1. Create Langfuse account or self-host
2. Create a new project
3. Create new API credentials in the project settings

### Example 1 - Langchain basic chain

In [None]:
# Initialize Langfuse handler
from langfuse.callback import CallbackHandler
langfuse_handler = CallbackHandler(
    secret_key="<YOUR_LANGFUSE_SECRET_KEY>",
    public_key="<YOUR_LANGFUSE_PUBLIC_KEY>",
    host="https://<YOUR_LANGFUSE_CLOUDRUN_URL>",
)

# Your Langchain code
from google.cloud import aiplatform
aiplatform.init(project=f"{project_id}", location=f"{region}")

from langchain.chat_models import ChatVertexAI
from langchain.prompts import ChatPromptTemplate
_prompt = ChatPromptTemplate.from_template(
    "Tell me a joke about Chuck Norris and {text}")
_model = ChatVertexAI()

chain = _prompt | _model

# Add Langfuse handler as callback (classic and LCEL)
## chain.invoke({"input": "<user_input>"}, config={"callbacks": [langfuse_handler]})

# Also works for run and predict methods.
## chain.run(input="<user_input>", callbacks=[langfuse_handler]) # Legacy
## conversation.predict(input="<user_input>", callbacks=[langfuse_handler])

In [None]:
chain.invoke({"text": "Cannelloni"}, config={"callbacks": [langfuse_handler]})

### Example 2 - Langchain Web Summarization
https://github.com/google/generative-ai-docs/blob/main/examples/gemini/python/langchain/Gemini_LangChain_Summarization_WebLoad.ipynb

In [None]:
from langchain import PromptTemplate
from langchain.document_loaders import WebBaseLoader
from langchain.schema import StrOutputParser
from langchain.schema.prompt_template import format_document
from google.cloud import aiplatform
from langchain_google_vertexai import VertexAI

# Initialize Vertex AI
aiplatform.init(project=f"{project_id}", location=f"{region}")
llm = VertexAI(
    model="gemini-1.5-pro",
    temperature=0.7,
    top_p=0.85
    )

# To extract data from WebBaseLoader
loader = WebBaseLoader("https://blog.google/technology/ai/google-gemini-ai/#sundar-note")
docs = loader.load()
doc_prompt = PromptTemplate.from_template("{page_content}")

# To query Gemini
llm_prompt_template = """Write a concise summary of the following:
"{text}"
CONCISE SUMMARY:"""
llm_prompt = PromptTemplate.from_template(llm_prompt_template)

stuff_chain = (
    # Extract data from the documents and add to the key `text`.
    {
        "text": lambda docs: "\n\n".join(
            format_document(doc, doc_prompt) for doc in docs
        )
    }
    | llm_prompt         # Prompt for Gemini
    | llm                # Gemini function
    | StrOutputParser()  # output parser
)

In [None]:
# Add Langfuse handler as callback
stuff_chain.invoke(docs, config={"callbacks": [langfuse_handler]})