In [5]:
user_query = "I want condos in Los Angeles, CA with studio options, priced over 1500, and any bathrooms."

In [None]:
from decouple import Config, RepositoryEnv
import os
from pathlib import Path
import re
import urllib.parse
import json
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from bs4 import BeautifulSoup, SoupStrainer

from langchain_ollama import ChatOllama, OllamaEmbeddings
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.documents import Document
from langchain_community.document_loaders import WebBaseLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field

from langchain import hub
from langgraph.graph import START, StateGraph  # For potential future extension.
from typing_extensions import List, TypedDict

# ------------------------------------------------------------------------------
# Environment & Global Setup
# ------------------------------------------------------------------------------

root_dir = Path().resolve()
print(root_dir.parent / '.env')

config = Config(RepositoryEnv(root_dir.parent / '.env'))  # Explicitly load .env

os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = config('LANGSMITH_API_KEY')

# Initialize two LLM instances:
# 1. For URL generation.
llm_url = ChatOllama(model="llama3.2")
# 2. For downstream QA.
llm = ChatOllama(model="llama3.2")

embeddings = OllamaEmbeddings(model="llama3.2")
vector_store = InMemoryVectorStore(embeddings)

# ------------------------------------------------------------------------------
# LLM-based URL Generation Chain
# ------------------------------------------------------------------------------

class ApartmentQuery(BaseModel):
    city: str = Field(
        description="City with state abbreviation, e.g., 'Los Angeles, CA'"
    )
    min_price: int = Field(
        description="Minimum price as an integer, e.g., 1500"
    )
    home_type: str = Field(
        default="apartments",
        description="Home type: one of apartments, houses, condos, townhomes. Defaults to apartments."
    )
    bedrooms: str = Field(
        default="2+",
        description="Bedroom filter: one of any, studio, 1+, 2+, 3+, 4+. For a studio, output 'studios'. Defaults to 2+."
    )
    bathrooms: str = Field(
        default="1+",
        description="Bathroom filter: one of any, 1+, 2+, 3+. Defaults to 1+."
    )

# Set up the output parser.
parser = PydanticOutputParser(pydantic_object=ApartmentQuery)

prompt_template = """
You are a data extraction assistant. Extract the following details from the user's query:
- city: The city with its state abbreviation (e.g., "Los Angeles, CA").
- min_price: The minimum price as an integer (e.g., 1500).
- home_type: One of: apartments, houses, condos, townhomes. (Default: apartments)
- bedrooms: One of: any, studio, 1+, 2+, 3+, 4+ (Default: 2+). For a studio, output "studios".
- bathrooms: One of: any, 1+, 2+, 3+ (Default: 1+).

Return your answer as a JSON object with exactly these keys: "city", "min_price", "home_type", "bedrooms", "bathrooms".
Query: "{query}"
"""
prompt = ChatPromptTemplate.from_template(prompt_template)

def generate_apartments_url(params: dict) -> str:
    city_slug = re.sub(r'[,\s]+', '-', params["city"].lower().strip())
    home_type_lower = params["home_type"].lower().strip()
    if home_type_lower not in ["apartments", "houses", "condos", "townhomes"]:
        home_type_lower = "apartments"
    bed_map = {
        "any": "",
        "studio": "studios",
        "1+": "min-1-bedrooms",
        "2+": "min-2-bedrooms",
        "3+": "min-3-bedrooms",
        "4+": "min-4-bedrooms"
    }
    bedroom_seg = bed_map.get(params["bedrooms"].lower().strip(), "min-2-bedrooms")
    bath_map = {
        "any": "",
        "1+": "1-bathrooms",
        "2+": "2-bathrooms",
        "3+": "3-bathrooms"
    }
    bathroom_seg = bath_map.get(params["bathrooms"].lower().strip(), "1-bathrooms")
    price_segment = f"over-{params['min_price']}"
    segments = []
    if bedroom_seg:
        segments.append(bedroom_seg)
    if bathroom_seg:
        segments.append(bathroom_seg)
    segments.append(price_segment)
    middle_segment = "-".join(segments)
    return f"https://www.apartments.com/{home_type_lower}/{city_slug}/{middle_segment}/"

def parse_json_with_stripped_keys(text: str) -> dict:
    try:
        data = json.loads(text)
    except json.JSONDecodeError as e:
        print("Failed to decode JSON from LLM response. Response text:")
        print(text)
        raise e
    return {k.strip(): v for k, v in data.items()}

def build_search_url(query: str) -> str:
    formatted_prompt = prompt.format(
        query=query,
        format_instructions=parser.get_format_instructions()
    )
    llm_response = llm_url.invoke(formatted_prompt)
    response_text = llm_response.content if hasattr(llm_response, "content") else str(llm_response)
    
    # If the response is empty, log and use default parameters
    if not response_text.strip():
        print("LLM response is empty. Using default parameters.")
        default_data = {
            "city": "Los Angeles, CA",
            "min_price": 1500,
            "home_type": "apartments",
            "bedrooms": "2+",
            "bathrooms": "1+"
        }
        parsed_params = ApartmentQuery(**default_data)
        return generate_apartments_url(parsed_params.dict())
    
    # Try to parse the JSON response
    try:
        fixed_data = parse_json_with_stripped_keys(response_text)
    except json.JSONDecodeError as e:
        print("Failed to decode JSON from LLM response. Response text:")
        print(response_text)
        # Use fallback default parameters if JSON parsing fails.
        default_data = {
            "city": "Los Angeles, CA",
            "min_price": 1500,
            "home_type": "apartments",
            "bedrooms": "2+",
            "bathrooms": "1+"
        }
        parsed_params = ApartmentQuery(**default_data)
        return generate_apartments_url(parsed_params.dict())
    
    parsed_params = ApartmentQuery(**fixed_data)
    return generate_apartments_url(parsed_params.dict())


# ------------------------------------------------------------------------------
# Custom Web Loader
# ------------------------------------------------------------------------------

class CustomWebLoader(WebBaseLoader):
    def load(self):
        headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
        session = requests.Session()
        retry_strategy = Retry(
            total=5,
            backoff_factor=1,
            status_forcelist=[500, 502, 503, 504],
            allowed_methods=["HEAD", "GET", "OPTIONS"]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        session.mount("https://", adapter)
        session.mount("http://", adapter)
        try:
            response = session.get(self.web_path, headers=headers, timeout=10)
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            print("Error fetching the page:", e)
            return []
        filter_section = SoupStrainer(
            "div",
            id="placardContainer",
            class_="placardContainer",
            attrs={"data-analytics-profiletype": "Unknown"}
        )
        content = BeautifulSoup(response.text, "html.parser", parse_only=filter_section)\
            .get_text(separator="\n", strip=True)
        return [Document(page_content=content, metadata={"source": self.web_path})]

# ------------------------------------------------------------------------------
# Downstream Processing: Indexing & QA Pipeline
# ------------------------------------------------------------------------------


target_url = build_search_url(user_query)
print("Generated URL:", target_url)

loader = CustomWebLoader(target_url)
docs = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=20)
all_splits = text_splitter.split_documents(docs)
vector_store.add_documents(documents=all_splits)

# Updated generate function with strict instructions to avoid hallucinations.
def generate(state: dict) -> dict:
    # Combine all retrieved document contents.
    docs_content = "\n\n".join(doc.page_content for doc in state["context"])
    messages = [
        {
            "role": "system",
            "content": (
                "You are a highly confident, precise real estate recommendation engine. "
                "Based solely on the provided crawled context, you must select one specific condo listing that best meets the search criteria. "
                "Even if some details are missing, combine the available information to provide a clear, detailed recommendation. "
                "Do NOT state that you cannot recommend or express uncertainty. "
                "Always output a recommendation that includes the exact address, unit type, and amenities as they appear in the context. "
                "If a particular detail is not mentioned, simply omit it—do not add or hallucinate any information."
            )
        },
        {
            "role": "user",
            "content": (
                "Below is the crawled context from apartments.com:\n\n"
                f"{docs_content}\n\n"
                "Based solely on this data, please provide a detailed recommendation for one specific condo listing that meets the search criteria. "
                "Include all available details (e.g. address, unit type, and amenities) exactly as found in the context."
            )
        }
    ]
    response = llm.invoke(messages)
    state["answer"] = response.content
    return state


def retrieve(state: dict) -> dict:
    state["context"] = vector_store.similarity_search(state["question"])
    return state

class State(TypedDict):
    question: str
    context: List[Document]
    answer: str

def run_pipeline(state: State) -> State:
    state = retrieve(state)
    state = generate(state)
    return state

initial_state: State = {"question": user_query, "context": [], "answer": ""}
result_state = run_pipeline(initial_state)
print("QA Answer:", result_state["answer"])


/Users/taejunsong/workspace/rag_tutorial/.env


/var/folders/5z/52k791n10qj37p5d0t3151w80000gn/T/ipykernel_90635/908103340.py:165: PydanticDeprecatedSince20: The `dict` method is deprecated; use `model_dump` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.10/migration/
  return generate_apartments_url(parsed_params.dict())


Generated URL: https://www.apartments.com/condos/los-angeles-ca/min-2-bedrooms-over-1500/
