## Imports

In [89]:
import os
import asyncio
from pydantic import BaseModel, Field
from typing import List
from langgraph_sdk import get_client
from dotenv import load_dotenv
from supabase import create_client, Client

# Load environment variables
load_dotenv()


True

## Authentication

In [90]:
# Supabase configuration
SUPABASE_URL = os.environ.get("SUPABASE_URL")
SUPABASE_KEY = os.environ.get("SUPABASE_KEY")
USER_EMAIL = os.environ.get("USER_EMAIL")
USER_PASSWORD = os.environ.get("USER_PASSWORD")

# LangGraph configuration
LANGGRAPH_API_URL = "http://localhost:2024"  # Change if needed


In [91]:
def authenticate_supabase() -> str:
    """Authenticate with Supabase and return access token"""
    supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
    response = supabase.auth.sign_in_with_password({
        "email": USER_EMAIL,
        "password": USER_PASSWORD,
    })
    if response.session is None:
        raise RuntimeError(f"Supabase authentication failed: {response}")
    return response.session.access_token

def get_authenticated_client():
    """Get authenticated LangGraph client"""
    access_token = authenticate_supabase()
    headers = {
        "accept": "application/json",
        "Authorization": f"Bearer {access_token}",
    }
    return get_client(url=LANGGRAPH_API_URL, headers=headers)

In [92]:
client = get_authenticated_client()
print(client)

<langgraph_sdk.client.LangGraphClient object at 0x7a82600ad820>


## Create/Update Schema

In [93]:
class AllergenEntry(BaseModel):
    allergen: str = Field(..., description="Name of the allergen, e.g., 'peanut'")
    reason: str = Field(..., description="Reason this allergen is high risk")


class AllergyAnalysisResponse(BaseModel):
    high_risk_allergen: List[AllergenEntry] = Field(description="List of allergen-risk mappings")
    recommendation: str = Field(description="Overall recommendation for handling this recipe")

schema_name = "AllergyAnalysisResponse"

In [94]:
# Get user info to build namespace
access_token = authenticate_supabase()
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
user_response = supabase.auth.get_user(access_token)
user_id = user_response.user.id


In [95]:
await client.store.put_item([user_id, "schemas"], schema_name, AllergyAnalysisResponse.model_json_schema())
print(f"Stored schema: {schema_name}")

Stored schema: AllergyAnalysisResponse


In [96]:
schema_name, AllergyAnalysisResponse.model_json_schema()

('AllergyAnalysisResponse',
 {'$defs': {'AllergenEntry': {'properties': {'allergen': {'description': "Name of the allergen, e.g., 'peanut'",
      'title': 'Allergen',
      'type': 'string'},
     'reason': {'description': 'Reason this allergen is high risk',
      'title': 'Reason',
      'type': 'string'}},
    'required': ['allergen', 'reason'],
    'title': 'AllergenEntry',
    'type': 'object'}},
  'properties': {'high_risk_allergen': {'description': 'List of allergen-risk mappings',
    'items': {'$ref': '#/$defs/AllergenEntry'},
    'title': 'High Risk Allergen',
    'type': 'array'},
   'recommendation': {'description': 'Overall recommendation for handling this recipe',
    'title': 'Recommendation',
    'type': 'string'}},
  'required': ['high_risk_allergen', 'recommendation'],
  'title': 'AllergyAnalysisResponse',
  'type': 'object'})

## Read schemas

In [97]:
results = await client.store.search_items([user_id, "schemas"])
schemas = [item["key"] for item in results["items"]]
print("Available schemas:")
for schema in schemas:
    print(f"  - {schema}")


Available schemas:
  - OutputSchemaA
  - OutputSchemaB
  - RecipeNutritionAnalysis
  - IngredientClassification
  - FoodSafetyReport
  - MenuPlanning
  - RecipeInstructions
  - SupplierQuote
  - QualityInspection
  - AllergyAnalysisResponse


## Invoking agent with schema

In [98]:
ASSISTANT_ID = "agent"

# Test recipe for allergy analysis
FRESHPREP_TEST_RECIPE = """
Pulled Chicken Fajitas
with Chipotle-Pineapple Sauce & Cheddar
Poultry - Local, Free Run, Antibiotic Free
Milk
Gluten
Seafood
Serves
2
Difficulty
Moderate
Time
30 Min
Ingredients
Onion
Chipotle Pepper in Adobo Sauce
Chicken Breast
Worcestershire Sauce
Aged White Cheddar Cheese
Flour Tortillas 6"
Cilantro
Assorted Mini Bell Peppers
Coconut Sugar
Green Cabbage
Pineapple-Lime Juice
Spice Blend
Lime Crema
* Pineapple-Lime Juice: Pineapple Juice , Fresh Lime Juice
* Spice Blend: Smoked Paprika , Cumin
* Lime Crema: Sour Cream , Mayonnaise , Fresh Lime Juice
"""

In [99]:
assistant=await client.assistants.create(
    graph_id = ASSISTANT_ID,
    config={
        "configurable": {
            "OutputSchemaName": schema_name
        }
    },
    name="Structured Output Assistant"
)

print("Assistant created successfully")
print(f"Assistant ID: {assistant['assistant_id']}")
print(f"Assistant Name: {assistant['name']}")
print(f"Schema: {assistant['config']['configurable']['OutputSchemaName']}")
print(f"Version: {assistant['version']}")



Assistant created successfully
Assistant ID: c81e51c6-18e1-4acd-9076-61275307e4e8
Assistant Name: Structured Output Assistant
Schema: AllergyAnalysisResponse
Version: 1


In [100]:
# Create a new thread
thread = await client.threads.create()

print(f"Thread created:{thread['thread_id']}")

Thread created:6c701431-12ae-4a8c-b5fb-84d05c8ddcf6


In [101]:
input_data = {"messages": [{"role": "user", "content": f"Analyze this recipe for food allergens:\n\n{FRESHPREP_TEST_RECIPE}\n\nIdentify high-risk allergens with reasons, and provide recommendations for people with allergies."}]}

In [102]:
import json

# Stream the response with cleaner formatting
async for event in client.runs.stream(
    thread["thread_id"],
    assistant["assistant_id"],
    input=input_data,
    stream_mode="updates",
):
    if event.event == "metadata":
        print(f"📋 Run started (ID: {event.data.get('run_id', 'Unknown')[:8]}...)")
        
    elif event.event == "updates":
        
        # # Handle agent updates (AI messages and tool calls)
        # if "agent" in event.data:
        #     for msg in event.data["agent"]["messages"]:
        #         if msg["type"] == "ai":
        #             # Check if AI is making tool calls
        #             if msg.get("tool_calls"):
        #                 for tool_call in msg["tool_calls"]:
        #                     tool_name = tool_call.get("name", "Unknown")
        #                     print(f"🔧 Calling tool: {tool_name}")
        #                     # Show tool arguments in a clean way
        #                     if "args" in tool_call and tool_call["args"]:
        #                         args_str = str(tool_call["args"])
        #                         if len(args_str) > 100:
        #                             args_str = args_str[:100] + "..."
        #                         print(f"   └─ Args: {args_str}")
                    
        #             # Show AI response content (if not just tool calls)
        #             elif msg.get("content") and msg["content"].strip():
        #                 print(f"💬 Assistant Response:")
        #                 # Show full response for final answers
        #                 content = msg["content"]
        #                 if len(content) > 500:
        #                     print(f"{content}\n")
        #                 else:
        #                     print(f"{content}\n")
        
        # # Handle tool responses
        # elif "tools" in event.data:
        #     for msg in event.data["tools"]["messages"]:
        #         if msg["type"] == "tool":
        #             tool_name = msg.get("name", "Unknown tool")
        #             print(f"✅ Tool '{tool_name}' completed")
        #             # Show brief summary of tool response
        #             content = msg.get("content", "")
        #             if content:
        #                 # For research tool, try to count results
        #                 try:
        #                     import json
        #                     results = json.loads(content)
        #                     if isinstance(results, list):
        #                         print(f"   └─ Found {len(results)} results")
        #                     else:
        #                         print(f"   └─ Result: {str(content)[:150]}...")
        #                 except:
        #                     print(f"   └─ Result: {str(content)[:150]}...")
        
        # Handle the structured response event
        if "generate_structured_response" in event.data:
            structured_data = event.data["generate_structured_response"]["structured_response"]
            print("\n✨ Extracted Structured Response (JSON):")
            # Use json.dumps to print the JSON object cleanly
            print(json.dumps(structured_data, indent=4)) 

        

📋 Run started (ID: 1f09e519...)

✨ Extracted Structured Response (JSON):
{
    "high_risk_allergen": [
        {
            "allergen": "Milk",
            "reason": "The recipe includes aged white cheddar cheese and lime crema, which contains sour cream. Both ingredients are dairy products."
        },
        {
            "allergen": "Gluten",
            "reason": "Flour tortillas are made from wheat, which contains gluten and is problematic for those with gluten intolerance or celiac disease."
        },
        {
            "allergen": "Seafood",
            "reason": "Worcestershire sauce can sometimes contain anchovies, a type of seafood, which is a common allergen."
        }
    ],
    "recommendation": "For individuals with a milk allergy, substitute cheddar cheese with a dairy-free cheese alternative and replace lime crema with a dairy-free yogurt or mayonnaise. Use gluten-free tortillas for those with gluten intolerance. Verify Worcestershire sauce is seafood-free or use

## Delete Schema

In [103]:
schema_name_to_delete = "AllergyAnalysisResponse"

In [104]:
# await client.store.delete_item([user_id, "schemas"], schema_name_to_delete)
print(f"Deleted schema: {schema_name}")

Deleted schema: AllergyAnalysisResponse
