In [64]:
# Initialize to load modules
%load_ext autoreload
%autoreload 2

#For this project please set to False, since database and rag are already prepared.
#Enable SetupDBRag = True (This will allow to create new Database/Rag).
SetupDBRag = False
#To display the state at each step, default set to False unless need to debug
DebugLangGraph = False

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [65]:
# Initialize environment
import sys
import os
import importlib


# Add the parent directory to sys.path to resolve local imports
# Check if running in Google Colab
try:
    import google.colab
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

if IN_COLAB:
    # If in Colab, use the Google Drive path
    # Modify this path to point to the directory containing your 'controller' directory in Google Drive
    from google.colab import drive
    drive.mount('/content/drive')
    parent_dir = '/content/drive/MyDrive/Colab Notebooks/Emeritus_Generative_AI_Fundamentals_to_Advanced_Techniques_March_2025/Week13/family_travel_planner' # <--- UPDATE THIS PATH
    print(f"Running in Google Colab. Using parent_dir: {parent_dir}")
    if parent_dir not in sys.path:
        sys.path.insert(0, parent_dir)
    !pip install -r '{parent_dir}/requirements.txt'
else:
    # If not in Colab, use the parent directory of the current working directory
    parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
    print(f"Not running in Google Colab. Using parent_dir: {parent_dir}")
    #!python -m venv python310
    #!myenv\Scripts\python -m pip install ipykernel
    #!myenv\Scripts\python -m ipykernel install --user --name=python310 --display-name "Python (python 3.10.18 with requirements)"
    !python310\Scripts\python -m pip install -r C:\Users\thunaung\personal\education\AI\Emeritus_Generative_AI_Fundamentals_to_Advanced_Techniques_March_2025\Week13\ai_prototype_project\requirements.txt

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Running in Google Colab. Using parent_dir: /content/drive/MyDrive/Colab Notebooks/Emeritus_Generative_AI_Fundamentals_to_Advanced_Techniques_March_2025/Week13/family_travel_planner


In [66]:
if SetupDBRag:
  #Create family_members database id,role,age,preferences
  !python scripts/create_database.py

  # Update contents to database: husband,wife,son,daughter &  preferences
  !python scripts/seed_data.py

In [67]:
#Upload travel brouchers in PDF format and URL format from TripAdviser to rag
if SetupDBRag:
  import sys
  import os
  from utils.rag_utils import ingest_documents

  def create_rag():
      sources = [
          {"type": "pdf", "path": '/content/drive/MyDrive/Colab Notebooks/Emeritus_Generative_AI_Fundamentals_to_Advanced_Techniques_March_2025/Week13/family_travel_planner/travelguide/Americas_compressed.pdf'},
          {"type": "html", "url": "https://www.klook.com/en-SG/blog/cheapest-holidays-from-singapore/"}
      ]
      ingest_documents(sources, index_path="family_travel_rag.index")
      print("✅ RAG ingestion complete.")

  create_rag()

In [68]:
#If Running in colab then need to download the rag from virtual server to the google drive.
if SetupDBRag:
  import os

  # Define the source and destination paths
  source_path = "family_travel_rag.index"
  destination_dir = parent_dir # Copy to the project directory in Google Drive
  destination_path = os.path.join(destination_dir, source_path)
  # Check if the index file exists in the source location
  if os.path.exists(source_path):
      # Use shell command to copy the file
      !cp "{source_path}" "{destination_path}"
      print(f"Successfully copied {source_path} to {destination_path}")
  else:
      print(f"Error: {source_path} not found in the current directory.")

In [69]:
#Check if rag vectors (first 3 and last3 in vector format)
if SetupDBRag:
  import faiss
  import numpy as np
  import os
  import sys


  # Import the load_rag_index function from rag_utils
  try:
      from utils.rag_utils import load_rag_index
  except ImportError as e:
      print(f"Error importing load_rag_index: {e}")
      print(f"Current sys.path: {sys.path}")
      sys.exit(1)


  index_path = os.path.join(parent_dir, "family_travel_rag.index") # Use the path in Google Drive

  if os.path.exists(index_path):
      # Load the pickled FAISS index and documents
      try:
          index, docs = load_rag_index(index_path)
          print("✅ RAG index and documents loaded successfully.")

          # Get the total number of vectors in the index
          num_vectors = index.ntotal
          print(f"Total number of vectors in the index: {num_vectors}")

          if num_vectors > 0:
              # Get the first 3 vectors by internal index
              k_first = min(3, num_vectors)
              first_vectors_indices = list(range(k_first))
              try:
                  first_vectors = index.reconstruct_n(0, k_first)
                  print(f"\nFirst {k_first} vectors (by internal index):")
                  for i, vec in zip(first_vectors_indices, first_vectors):
                      print(f"Index {i}: {vec[:10]}...") # Print first 10 elements for brevity
              except Exception as e:
                  print(f"Could not retrieve first vectors by index: {e}")
                  print("Note: Not all FAISS index types support reconstruction.")


              # Get the last 3 vectors by internal index
              if num_vectors > 3:
                  k_last = 3
                  last_vectors_indices = list(range(max(0, num_vectors - k_last), num_vectors))
                  try:
                      last_vectors = index.reconstruct_n(max(0, num_vectors - k_last), k_last)
                      print(f"\nLast {k_last} vectors (by internal index):")
                      for i, vec in zip(last_vectors_indices, last_vectors):
                          print(f"Index {i}: {vec[:10]}...") # Print first 10 elements for brevity
                  except Exception as e:
                      print(f"Could not retrieve last vectors by index: {e}")
                      print("Note: Not all FAISS index types support reconstruction.")

              elif num_vectors > 0:
                  print("\nLess than 4 vectors in the index, cannot show last 3 unique vectors.")


          else:
              print("The index is empty.")

      except Exception as e:
          print(f"Error loading or processing index: {e}")


  else:
      print(f"Index file not found at: {index_path}")

In [70]:
# Assuming 'docs' and 'index' are already loaded from the previous step (cell 7de437df)
if SetupDBRag:
  if 'docs' in locals() and len(docs) > 0:
      print("Original text content for the first 3 vectors (by internal index):")
      k_first = min(3, len(docs))
      for i in range(k_first):
          print(f"\n--- Document for Index {i} ---")
          print(docs[i].page_content)

      if len(docs) > 3:
          print("\nOriginal text content for the last 3 vectors (by internal index):")
          k_last = 3
          for i in range(max(0, len(docs) - k_last), len(docs)):
              print(f"\n--- Document for Index {i} ---")
              print(docs[i].page_content)

      elif len(docs) > 0:
          print("\nLess than 4 documents available, cannot show last 3 unique documents.")

  else:
      print("Documents not loaded or the list is empty. Please run the cell to load the index and documents first.")

Simple Agents LangGraph

In [71]:
import re
import replicate
import numpy as np
from typing import Optional, List, Dict, Any, Literal
from pydantic import BaseModel
from sentence_transformers import SentenceTransformer, util

from langgraph.graph import StateGraph, END
from langchain_core.runnables import RunnableLambda
from langchain_openai import OpenAIEmbeddings, ChatOpenAI # Import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate # Import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser # Import StrOutputParser
from openai import OpenAI # Import the openai library

# Assuming these imports are necessary and the files exist in the mounted drive
from utils.utils import load_family
from utils.rag_utils import load_rag_index
from utils.config import DB_PATH, RAG_INDEX_PATH, WEATHER_API_KEY, OPENAI_API_KEY # Ensure WEATHER_API_KEY is available
from agents.weather_agent import get_weather

# Assuming these imports are necessary and the files exist in the mounted drive
# from agents.rag_agents import rag_retrieval_agent, rag_recommendation_agent # Not using these directly anymore
# from agents.weather_agent import weather_check_agent, get_weather # Need get_weather for weather_check_agent
# from agents.photo_memory_agent import photo_memory_agent
# from agents.photo_generation_agent import photo_generation_agent

from pydantic import ValidationError
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY

SAVE_PATH = "session_state.json"

# Define the HybridState model again (copy from original cell)
class HybridState(BaseModel):
    input: str # Still need input for initial state, though router is removed
    next: Optional[
        Literal["fetch_family","extract_location","photo_memory","summarize"
        ]
    ] = None
    output: Optional[str] = None # Still need output for final result
    history: List[str] = [] # Still need history

    family: Optional[List[Dict[str, Any]]] = None
    selected_activity: Optional[str] = None
    rag_context: Optional[List[str]] = None
    recommendations: Optional[str] = None
    location: Optional[str] = None
    weather_ok: Optional[bool] = None
    weather_info: Optional[str] = None
    photo_idea: Optional[str] = None
    photo_url: Optional[str] = None
    memory_log: Optional[List[str]] = []
    decision: Optional[str] = None


# Define the agent functions again (copy from original cell)

def fetch_family_agent():
    def _run(s: HybridState) -> HybridState:
        if DebugLangGraph:
          print(f"--- Inside fetch_family_agent ---")
          print(f"State on entry: {s}")
        dbpath = parent_dir+"/"+DB_PATH
        fam = load_family(dbpath)
        #print(f"Loaded family data: {fam}")
        prefs = {}
        for m in fam:
            for act, liked in m["preferences"].items():
                if liked:
                    prefs[act] = prefs.get(act, 0) + 1
        top = max(prefs, key=prefs.get) if prefs else "beach"
        s.family = fam
        s.selected_activity = top
        if DebugLangGraph:
          print(f"State before returning from fetch_family_agent: {s}")
          print(f"--- Exiting fetch_family_agent ---")
        return s
    return _run

def rag_retrieval_agent(index_path="family_travel_rag.index", k=5):
    index_path = parent_dir+"/"+index_path
    index, docs = load_rag_index(index_path)

    embedder = OpenAIEmbeddings()

    def _run(state):
        if DebugLangGraph:
          print(f"--- Inside rag_retrieval_agent ---")
          print(f"State on entry: {state}")
        query = state.selected_activity
        q_vec = np.array(embedder.embed_query(query)).astype("float32")
        # Corrected search method call to include distances and labels
        distances, I = index.search(q_vec.reshape(1, -1), k)
        state.rag_context = [docs[i].page_content for i in I[0]]
        if DebugLangGraph:
          print(f"State before returning from rag_retrieval_agent: {state}")
          print(f"--- Exiting rag_retrieval_agent ---")
        return state

    return _run

def rag_recommendation_agent():
    def _run(state):
        if DebugLangGraph:
          print(f"--- Inside rag_recommendation_agent ---")
          print(f"State on entry: {state}")
        ctx = "\n\n".join(state.rag_context or [])
        prompt = (
            f"Based on these documents:\n{ctx}\n\n"
            f"Recommend 1 family-friendly destinations or activities "
            f"related to '{state.selected_activity}'."
        )

        # Explicitly use openai.ChatCompletion
        openai.api_key = OPENAI_API_KEY
        resp = openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role":"user","content":prompt}],
            temperature=0.7
        )

        state.recommendations = resp.choices[0].message.content
        if DebugLangGraph:
          print(f"State before returning from rag_recommendation_agent: {state}")
          print(f"--- Exiting rag_recommendation_agent ---")
        return state

    return _run


def extract_location_agent():
    llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
    prompt = ChatPromptTemplate.from_messages([
        ("system", "Extract exactly one place name from the user’s query. If none, return an empty string."),
        ("user",   "{input}")
    ])
    chain = prompt | llm | StrOutputParser()
    def _run(s: HybridState) -> HybridState:
        if DebugLangGraph:
          print(f"--- Inside extract_location_agent ---")
          print(f"State on entry: {s}")
        place = chain.invoke({"input": s.recommendations}).strip()
        if not place:
          place =chain.invoke({"input": s.input}).strip()
        s.location= place
        if DebugLangGraph:
          print(f"State before returning from extract_location_agent: {s}")
          print(f"--- Exiting extract_location_agent ---")
        return s
    return _run


def weather_check_agent():
    def _run(state):
        if DebugLangGraph:
          print(f"--- Inside weather_check_agent ---")
          print(f"State on entry: {state}")
        # Assuming state.selected_activity can be used to determine a location
        # In a real scenario, you might need a location extraction step
        info = get_weather(state.location) # Call the imported get_weather function
        state.weather_info = info
        keywords = ("clear", "sunny", "cloudy")
        state.weather_ok = any (kw in info.lower() for kw in keywords)
        if DebugLangGraph:
          print(f"State before returning from weather_check_agent: {state}")
          print(f"--- Exiting weather_check_agent ---")
        return state
    return _run

def photo_memory_agent():
     def _run(state):
        if DebugLangGraph:
          print(f"--- Inside photo_memory_agent ---")
          print(f"State on entry: {state}")
        # Placeholder: Generate a simple photo idea based on selected activity
        photo_idea = f"A family enjoying {state.selected_activity}"
        state.photo_idea = photo_idea
        if DebugLangGraph:
          print(f"State before returning from photo_memory_agent: {state}")
          print(f"--- Exiting photo_memory_agent ---")
        return state
     return _run


def photo_generation_agent(photo_idea = "Good day"):
    def _run(state):
        if DebugLangGraph:
          print(f"--- Inside photo_generation_agent ---")
          print(f"State on entry: {state}")
        #prompt =  state.input if state.selected_activity is None else state.photo_idea
        prompt =  state.photo_idea + " at " + state.location + " on " + state.weather_info
        client = OpenAI(api_key=OPENAI_API_KEY)
        url = client.images.generate(
        model="dall-e-3",
        prompt=prompt,
        size="1024x1024",
        quality="standard",
        n=1
        )
        #print(f"Generated photo URL: {url.data[0].url}")
        state.photo_url = f"{url.data[0].url}"
        # Check if photo_url is not None before displaying
        if state.photo_url and state.photo_url.startswith("http"): # Added check for http start
          display_image_from_url(state.photo_url)
        if DebugLangGraph:
          print(f"State before returning from photo_generation_agent: {state}")
          print(f"--- Exiting photo_generation_agent ---")
        return state
    return _run

def summarize_agent():
    def _run(s: HybridState) -> HybridState:
        if DebugLangGraph:
          print(f"--- Inside summarize_agent ---")
          print(f"State on entry: {s}")

        log = s.memory_log or []
        # Modify summarize logic to handle different inputs
        if "plan" in s.input.lower(): #Travel Plan
            s.output = (
                f"Fetch Family data from SQL: {str(s.family)}\n"
                f"Common interest: {s.selected_activity}\n"
                f"Things to do based on RAG: {s.recommendations or ''}"
            )
        elif "weather" in s.input.lower():
             s.output = f"Tool Output → {s.weather_info}"
        elif "photo" in s.input.lower():
             s.output = f"Tool Output → {s.photo_idea}"
        else: # Fallback
             s.output  = "No specific output or recommendation"

        print(s.output)
        log.append(s.output)
        s.memory_log = log
        if DebugLangGraph:
          print(f"State before returning from summarize_agent: {s}")
          print(f"--- Exiting summarize_agent ---")

        return s
    return _run

# --- LLM Router Agent ---
def llm_router_agent():
    llm = ChatOpenAI(
        model="gpt-4o-mini",
        temperature=0,
        openai_api_key=OPENAI_API_KEY,
    )

    prompt = ChatPromptTemplate.from_messages([
        ("system", """
          You are a router. Given the user’s input, reply with exactly one of:
            - fetch_family   → to start trip planning
            - extract_location  → to check weather
            - photo_memory   → to generate photo ideas
            - summarize      → for a fallback or summary
          Do not output anything else.
          """),
        ("user", "{input}")
    ])

    router_chain = prompt | llm | StrOutputParser()

    def _run(state: HybridState) -> HybridState:
        if DebugLangGraph:
          print(f"--- Inside llm_router_agent ---")
          print(f"State on entry: {state}")
        decision = router_chain.invoke({"input": state.input}).strip().lower()
        # write the decision back into your state
        state.next = decision
        if DebugLangGraph:
          print(f"State before returning from llm_router_agent: {state}")
          print(f"--- Exiting llm_router_agent ---")
        return state
    return _run

# Build the travel planner graph with the LLM router
def build_travel_planner_graph_with_router():
    graph = StateGraph(HybridState)

    # 1) Router
    graph.add_node("router", RunnableLambda(llm_router_agent()))

    # 2) Task nodes
    graph.add_node("fetch_family",    RunnableLambda(fetch_family_agent()))
    graph.add_node("rag_retrieve",    RunnableLambda(rag_retrieval_agent()))
    graph.add_node("rag_recommend",   RunnableLambda(rag_recommendation_agent()))
    graph.add_node("extract_location",RunnableLambda(extract_location_agent()))
    graph.add_node("weather_check",   RunnableLambda(weather_check_agent()))
    graph.add_node("photo_memory",    RunnableLambda(photo_memory_agent()))
    graph.add_node("photo_generate",  RunnableLambda(photo_generation_agent()))
    graph.add_node("summarize",       RunnableLambda(summarize_agent()))

    graph.set_entry_point("router")

    #3) Routing: map router → target node names based on state.next
    graph.add_conditional_edges(
      "router",
      # 1) selector: returns the exact branch key
      lambda s: s.next
                if s.next in {
                  "fetch_family", #travel plan
                  "extract_location", #weather
                  "photo_memory", #photo
                  "summarize" #all others
                }
                else "summarize",  # default‐fallback
      # 2) mapping: branch key → node ID
      {
        "fetch_family":  "fetch_family",
        "extract_location":"extract_location",
        "photo_memory":  "photo_memory",
        "summarize":     "summarize"
      }
    )


    # Catch‐all: if none of the above matched, terminate
    #graph.add_edge("router", END)


    graph.add_edge("fetch_family",   "rag_retrieve")
    graph.add_edge("rag_retrieve",   "rag_recommend")
    graph.add_edge("rag_recommend",  "extract_location")
    graph.add_edge("extract_location","weather_check")
    graph.add_conditional_edges(
        "weather_check",
        # Define the condition based on state.weather_ok
        lambda s: "summarize" if "weather" in s.input  else "photo_memory"
    )

    #graph.add_edge("photo_memory",   "photo_generate")
    graph.add_conditional_edges(
    "photo_memory",
    # Define the condition based on state.weather_ok
    lambda s: "photo_generate" if "photo" in s.input  else "summarize"
    )
    #graph.add_edge("photo_generate", "summarize")
    graph.set_finish_point("summarize")
    return graph.compile()

def load_state() -> HybridState:
    """
    Load saved state from disk. If the file does not exist,
    is malformed JSON, or fails validation, return a fresh state.
    """
    if not os.path.exists(SAVE_PATH):
        return HybridState()

    try:
        with open(SAVE_PATH, "r") as f:
            data = json.load(f)
    except (json.JSONDecodeError, OSError):
        # file corrupt or unreadable
        return HybridState()

    # Ensure we at least have the required keys
    data.setdefault("input", "")
    data.setdefault("history", [])
    data.setdefault("memory_log", [])

    try:
        return HybridState(**data)
    except ValidationError:
        # data doesn’t match the model schema
        return HybridState()

def merge_state(state: HybridState, new_input: str) -> HybridState:
    """
    Append the new user input to history and set it as the current input.
    """
    state.input = new_input
    state.history.append(new_input)
    return state

def save_state(state: HybridState) -> None:
    """
    Persist the entire state back to disk as JSON.
    """
    # Convert to a serializable dict
    payload = state.dict()
    with open(SAVE_PATH, "w") as f:
        json.dump(payload, f, indent=2)


# Test the travel planner graph with the LLM router
print("Building travel planner graph with LLM router...")
try:
    travel_planner_graph_routed = build_travel_planner_graph_with_router()
    print("Travel planner graph with LLM router built successfully.")

    # --- Test with different inputs ---

    # Test case 1: Trigger travel planner sequence
    print(f"\n--- Invoking routed graph with input: Plan a family trip to the beach  ---")
    print(f"\n--- Expecting TRAVEL_PLANNER ---")
    try:
        test_input_plan = "Plan a family trip to the beach"
        test_state_plan = HybridState(input=test_input_plan)
        raw = travel_planner_graph_routed.invoke(test_state_plan)
        state = HybridState(**raw)       # or compile(return_state=True) to skip this
        save_state(state)
    except Exception as e:
         print(f"\nAn error occurred during TRAVEL_PLANNER invocation: {e}")


    # Test case 2: Trigger photo diea
    print(f"\n--- Invoking routed graph with input: Where can I take good photo there ---")
    print(f"\n--- Expecting PHOTO_MEMORY ---")
    try:
        state = load_state()
        state = merge_state(state, "Where can I take good photo there?")
        raw = travel_planner_graph_routed.invoke(state)
        state = HybridState(**raw)       # or compile(return_state=True) to skip this
        save_state(state)
    except Exception as e:
         print(f"\nAn error occurred during PHOTO_MEMORY invocation: {e}")

    # Test case 3: Weather
    print(f"\n--- Invoking routed graph with input: How is the weather there? ---")
    print(f"\n--- Expecting WEATHER_INFO ---")
    try:
        state = load_state()
        state = merge_state(state, "How is the weather there?")
        raw = travel_planner_graph_routed.invoke(state)
        state = HybridState(**raw)       # or compile(return_state=True) to skip this
        save_state(state)
    except Exception as e:
         print(f"\nAn error occurred during Weather invocation: {e}")


except NameError as e:
    print(f"\nError: {e}.")
except Exception as e:
    print(f"\nAn error occurred during graph building or invocation: {e}")

Output hidden; open in https://colab.research.google.com to view.