In [None]:
import pandas as pd
import json
import re
import logging
import tiktoken
from typing import List, Dict, Any, Optional, TypedDict, Literal
from langchain_openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.schema import Document
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain.utilities.tavily_search import TavilySearchAPIWrapper
from langchain_openai import ChatOpenAI
import os
import ast
import uuid
from langgraph.graph import StateGraph, END

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("product_rag_agent.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

from dotenv import load_dotenv
load_dotenv(".env.local")

openai_key = os.getenv('OPENAI_API_KEY')

#EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"  # open-source model - less token limit 
EMBEDDING_MODEL = "text-embedding-3-small"
LLM_MODEL = "gpt-3.5-turbo"

In [76]:
# Define State Type
class ConversationItem(TypedDict): # defines a structure for conversation messages(role and content)
    role: Literal["user", "assistant"]
    content: str

class UserPreferences(TypedDict): # Stores user preferences related to skincare 
    skin_type: Optional[str] # oily or dry
    skin_concerns: Optional[List[str]] # acne, wrinkles
    budget: Optional[float] # how much they want to spend
    preferences: Optional[List[str]] # specific brands, ingredients
    product_type: Optional[List[str]] # product types like Hair Lotion, Day Cream etc.

class AgentState(TypedDict): # defines the overall state of the AI Agent 
    conversation: List[ConversationItem] # a list of messages exchanged b/w the user and assistant
    user_preferences: UserPreferences # stores user-specific information
    query_type: Optional[str] # helps classify the user requests(product, skincare, casual)
    search_results: Optional[List[Dict[str, Any]]] # 
    current_response: Optional[str] # stores the assistant's latest response
    session_id: str # unique session id for every session

In [77]:
# Helper Functions
def num_tokens_from_string(string: str, encoding_name: str = "cl100k_base") -> int:
    """Returns the number of tokens in a text string."""
    encoding = tiktoken.get_encoding(encoding_name)
    num_tokens = len(encoding.encode(string))
    return num_tokens

def parse_list_field(field):
    """Parse a string representation of a list into an actual list."""
    if pd.isna(field):
        return []
    
    if isinstance(field, list):
        return field
    
    # Handle string representation of lists like "['item1', 'item2']"
    if isinstance(field, str):
        if field.startswith('[') and field.endswith(']'):
            try:
                return ast.literal_eval(field)
            except:
                # Fallback to regex-based parsing
                items = re.findall(r'"([^"]*)"', field)
                if not items:
                    items = re.findall(r"'([^']*)'", field)
                return items
        return [field]  # Return as a single-item list if not in list format
    
    return []

In [78]:
# 1. Data Preparation
def prepare_product_data(xlsx_path):
    """Load and clean the product data from XLSX file."""
    logger.info("Loading product data...")
    df = pd.read_excel(xlsx_path) if xlsx_path.endswith('.xlsx') else pd.read_csv(xlsx_path) # loads the dataset
    
    # Keep only relevant columns
    relevant_columns = [
        'Title', 'D', 'Vendor', 'Type', 'Tags', 'Category: Name', 'Variant Price', 
        'Metafield: my_fields.subtitle [single_line_text_field]', 
        'Metafield: custom.key_ingredients_list [list.single_line_text_field]',
        'Metafield: custom.skin_concerns [list.single_line_text_field]',
        'Metafield: custom.ingredients [list.single_line_text_field]',
        'Metafield: custom.key_benefits_list [list.single_line_text_field]',
        'Metafield: custom.type_of_product [list.single_line_text_field]',
        'Metafield: reviews.rating [rating]',
        'Metafield: reviews.rating_count [number_integer]'
    ]
    
    # Filter columns that exist in the dataframe
    existing_columns = [col for col in relevant_columns if col in df.columns]
    cleaned_df = df[existing_columns].copy()
    
    # Rename columns for easier access
    column_mapping = {
        'Title': 'title', # object
        'D': 'description', # object
        'Vendor': 'brand', # object
        'Type': 'type', # object
        'Tags': 'tags', # object
        'Category: Name': 'category', # object
        'Variant Price': 'price', # float64
        'Metafield: my_fields.subtitle [single_line_text_field]': 'subtitle', # object
        'Metafield: custom.key_ingredients_list [list.single_line_text_field]': 'key_ingredients', # object
        'Metafield: custom.skin_concerns [list.single_line_text_field]': 'skin_concerns', # object
        'Metafield: custom.ingredients [list.single_line_text_field]': 'ingredients', # object
        'Metafield: custom.key_benefits_list [list.single_line_text_field]': 'benefits', # object 
        'Metafield: custom.type_of_product [list.single_line_text_field]': 'product_type', # object
        'Metafield: reviews.rating [rating]': 'rating', # object
        'Metafield: reviews.rating_count [number_integer]': 'rating_count' # float64
    }
    
    # Apply only mappings for columns that exist
    applicable_mappings = {old: new for old, new in column_mapping.items() if old in cleaned_df.columns}
    cleaned_df = cleaned_df.rename(columns=applicable_mappings)
    
    # Parse list fields from string format [item1, item2] to actual lists
    list_columns = ['subtitle', 'key_ingredients', 'skin_concerns', 'ingredients', 'benefits', 'product_type', 'tags']
    for col in list_columns:
        if col in cleaned_df.columns:
            cleaned_df[col] = cleaned_df[col].apply(parse_list_field)
    
    # Convert price to numeric
    if 'price' in cleaned_df.columns:
        cleaned_df['price'] = pd.to_numeric(cleaned_df['price'], errors='coerce')
    
    logger.info(f"Processed {len(cleaned_df)} product records")
    return cleaned_df

# 2. Create documents for vector store
def create_documents(products_df):
    """Create documents from products dataframe for embedding."""
    documents = []
    skipped_documents = 0
    long_documents = 0

    for idx, row in products_df.iterrows():
        # Create a content list that captures all relevant product information
        content_parts = []
        
        # Add basic information
        if 'title' in row and not pd.isna(row['title']):
            content_parts.append(f"Product: {row['title']}")
        if 'description' in row and not pd.isna(row['description']):
            content_parts.append(f"Description: {row['description']}")
        if 'brand' in row and not pd.isna(row['brand']):
            content_parts.append(f"Brand: {row['brand']}")
        if 'type' in row and not pd.isna(row['type']):
            content_parts.append(f"Type: {row['type']}")
        if 'category' in row and not pd.isna(row['category']):
            content_parts.append(f"Category: {row['category']}")
        if 'price' in row and not pd.isna(row['price']):
            content_parts.append(f"Price: {row['price']}")
            
        # Add list fields
        if 'subtitle' in row and row['subtitle']:
            subtitles = ', '.join(row['subtitle']) if isinstance(row['subtitle'], list) else row['subtitle']
            content_parts.append(f"Subtitles: {subtitles}")
        if 'tags' in row and row['tags']:
            tags = ', '.join(row['tags']) if isinstance(row['tags'], list) else row['tags']
            content_parts.append(f"Tags: {tags}")
        if 'skin_concerns' in row and row['skin_concerns']:
            concerns = ', '.join(row['skin_concerns']) if isinstance(row['skin_concerns'], list) else row['skin_concerns']
            content_parts.append(f"Skin Concerns: {concerns}")
        if 'ingredients' in row and row['ingredients']:
            ingredients = ', '.join(row['ingredients']) if isinstance(row['ingredients'], list) else row['ingredients']
            content_parts.append(f"Ingredients: {ingredients}")
        if 'key_ingredients' in row and row['key_ingredients']:
            key_ingredients = ', '.join(row['key_ingredients']) if isinstance(row['key_ingredients'], list) else row['key_ingredients']
            content_parts.append(f"Key Ingredients: {key_ingredients}")
        if 'benefits' in row and row['benefits']:
            benefits = ', '.join(row['benefits']) if isinstance(row['benefits'], list) else row['benefits']
            content_parts.append(f"Benefits: {benefits}")
        if 'product_type' in row and row['product_type']:
            product_types = ', '.join(row['product_type']) if isinstance(row['product_type'], list) else row['product_type']
            content_parts.append(f"Product Type: {product_types}")
            
        # Add rating
        if 'rating' in row and not pd.isna(row['rating']):
            content_parts.append(f"Rating: {row['rating']}")
        if 'rating_count' in row and not pd.isna(row['rating_count']):
            content_parts.append(f"Rating count: {row['rating_count']}")

        content = '\n'.join(content_parts) # Joining all parts into a single content string

        if num_tokens_from_string(content, "cl100k_base") > 8191:
            long_documents += 1
            logger.warning(f"Long document ({len(content)} chars) for product: {row.get('title', f'at index {idx}')}")
            # Truncate content to fit within token limit
            while num_tokens_from_string(content, "cl100k_base") > 8191:
                content_parts.pop()  # Remove the last content part
                content = '\n'.join(content_parts)
    
        # Creating a document with the content and metadata
        metadata = {}
        for col, val in row.items():
            # Fix for the ValueError - check if scalar first
            if pd.api.types.is_scalar(val):
                # It's a single value, safe to use pd.isna()
                if not pd.isna(val):
                    if isinstance(val, list):
                        metadata[col] = ', '.join(str(v) for v in val)
                    elif isinstance(val, (int, float)):
                        metadata[col] = val  # Keep numeric types as is
                    else:
                        metadata[col] = str(val)
            else:
                # Handle non-scalar values (like Series)
                try:
                    # For collections/arrays, join non-NA values
                    if isinstance(val, list) or hasattr(val, '__iter__'):
                        non_na_values = [str(v) for v in val if not pd.isna(v)]
                        if non_na_values:  # Only add if we have values
                            metadata[col] = ', '.join(non_na_values)
                except:
                    # Skip problematic values
                    logger.warning(f"Skipping metadata for column {col} due to processing error")
        
        metadata['id'] = f"product_{idx}" # Add an identifier to help with retrieval
                
        documents.append(Document(page_content=content, metadata=metadata))
    
    logger.info(f"Created {len(documents)} documents for embedding. Skipped: {skipped_documents}, Long docs: {long_documents}")
    return documents

In [79]:
x = prepare_product_data("product_data.xlsx")

2025-03-17 05:00:35,171 - __main__ - INFO - Loading product data...
2025-03-17 05:00:37,222 - __main__ - INFO - Processed 2999 product records


In [80]:
# Set up vector database
def setup_vector_store(documents):
    """Create a vector store from product documents."""
    logger.info("Setting up vector store...")
    try:
        # Using OpenAI embeddings
        embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL)
        
        # Creating the vector store
        vectorstore = FAISS.from_documents(documents, embeddings)
        logger.info("Vector database setup complete.")
        return vectorstore
    except Exception as e:
        logger.error(f"Error setting up vector database: {e}")
        raise

# Set up LangChain Tavily search for skincare knowledge
def setup_skincare_search():
    """Setup LangChain Tavily search capability for skincare queries."""
    logger.info("Setting up LangChain Tavily search for skincare knowledge...")
    try:
        tavily_search = TavilySearchAPIWrapper()
        
        logger.info("LangChain Tavily search setup complete.")
        return tavily_search
    
    except Exception as e:
        logger.error(f"Error setting up LangChain Tavily search: {e}")
        raise

_TAVILY_INSTANCE = None

def get_tavily_instance():
    """Get or create a singleton Tavily search instance."""
    global _TAVILY_INSTANCE
    if _TAVILY_INSTANCE is None:
        _TAVILY_INSTANCE = setup_skincare_search()
    return _TAVILY_INSTANCE

def get_skincare_information(query, tavily_search=None):
    """Get skincare information from LangChain Tavily search."""
    if tavily_search is None:
        tavily_search = get_tavily_instance()

    # Initialize Tavily search
    trusted_domains = [ # Only searches trusted sources so that the assistant provides accurate skincare advice
        "mayoclinic.org",
        "aad.org",  # American Academy of Dermatology
        "healthline.com",
        "ncbi.nlm.nih.gov",  # PubMed/NIH
        "webmd.com",
        "dermnetnz.org",  # DermNet NZ
        "skincancer.org"  # Skin Cancer Foundation
    ]
        
    try:
        search_query = f"skincare {query}"
            
        # Added timeout handling
        import concurrent.futures
        with concurrent.futures.ThreadPoolExecutor() as executor:
            # Pass parameters correctly to the results() method
            future = executor.submit(
                tavily_search.results,  # Just pass the method reference, not the call
                query=search_query,  # Pass as keyword argument
                include_domains=trusted_domains  # Pass domains as parameter at search time
            )
            try:
                full_results = future.result(timeout=30)  # 30-second timeout

                # Handle different return types
                if isinstance(full_results, dict):
                    # Original expected format
                    answer = full_results.get("answer", "")
                    return {"answer": answer, "results": full_results.get("results", [])}
                elif isinstance(full_results, list):
                    # Format appears to be a list of results directly
                    combined_answer = "Based on search results: "
                    for item in full_results[:2]:  # Use first few results for a summary
                        if isinstance(item, dict) and "content" in item:
                            combined_answer += item.get("content", "")[:100] + "... "
                    
                    return {"answer": combined_answer, "results": full_results}
                else:
                    # Unexpected format
                    return {"answer": "Found some information but in an unexpected format.", 
                            "results": [{"content": str(full_results), "title": "Raw Results"}]}

            except concurrent.futures.TimeoutError:
                logger.warning(f"Tavily search timed out for query: {query}")
                return {"answer": "Search timed out. Please try again with a more specific question.", "results": []}
                
    except Exception as e:
        logger.error(f"Error performing LangChain Tavily search: {e}")
        return {"answer": f"I couldn't find specific information about {query} at the moment.", "results": []}

# Create a LangChain tool for use in chains or agents
def create_skincare_search_tool():
    """Create a LangChain tool for skincare searches."""
    from langchain.tools import Tool
    
    skincare_tool = Tool(
        name="SkincareSearch",
        description="Useful for when you need to answer questions about skincare, treatments, routines, or specific skin conditions.",
        func=lambda q: get_skincare_information(q)["answer"]  # Just return the answer for tool use
    )
    
    return skincare_tool

### LangGraph Node Functions

In [81]:
# Langgraph Node Functions

# Query Understanding Node
def query_understanding(state: AgentState) -> AgentState:
    """Analyze the latest user query to understand intent and extract information."""
    llm = ChatOpenAI(model=LLM_MODEL, temperature=0)
    
    latest_message = state["conversation"][-1]["content"] # gets the latest user message 
    
    # Create context from conversation history and user preferences
    conversation_history = '\n'.join([
        f"{item['role']}: {item['content']}" 
        for item in state["conversation"][:-1][-5:]  # Gets 5 most recent messages excluding the latest one
    ])

    preferences = state["user_preferences"] # gathers user preferences from stored data 
    preferences_context = ""
    if preferences.get("skin_type"):
        preferences_context += f"Skin type: {preferences['skin_type']}. "
    if preferences.get("skin_concerns"):
        preferences_context += f"Skin concerns: {', '.join(preferences['skin_concerns'])}. "
    if preferences.get("budget"):
        preferences_context += f"Budget: under {preferences['budget']}. "
    if preferences.get("preferences"):
        preferences_context += f"Other preferences: {', '.join(preferences['preferences'])}. "
    
    # prompt to identify the query type and extract useful information from the query
    prompt = ChatPromptTemplate.from_template("""
    You are an expert skincare assistant specialized in understanding user queries.
    
    Previous conversation:
    {conversation_history}
    
    Known user preferences:
    {preferences_context}
    
    Latest user message: "{latest_message}"
    
    First, determine the type of query:
    1. Product recommendation query: If user is asking for skincare product recommendations or comparing products
    2. General skincare question: If user is asking about skincare advice, routines, or techniques
    3. Casual conversation: If user is engaging in small talk, greetings, or off-topic discussion
    
    Then, extract relevant information:
    - Skin type mentioned (oily, dry, combination, sensitive, etc.)
    - Skin concerns (acne, aging, hyperpigmentation, etc.) 
    - Product types requested (cleanser, moisturizer, serum, etc.)
    - Budget constraints (any price limits mentioned)
    - Other preferences (fragrance-free, vegan, etc.)
    
    Response format:
    {{
        "query_type": "product_recommendation" OR "skincare_question" OR "casual_conversation",
        "skin_type": "type mentioned or null if none",
        "skin_concerns": ["list of skin concerns mentioned or empty list"],
        "product_type": ["list of product types mentioned or empty list"],
        "budget": "numeric value without currency if mentioned, or null",
        "preferences": ["list of other preferences or empty list"],
    }}
    """)
    
    # Executing the prompt
    chain = prompt | llm | StrOutputParser()
    result = chain.invoke({
        "conversation_history": conversation_history,
        "preferences_context": preferences_context,
        "latest_message": latest_message
    })

    # Converting string result to dictionary
    try:
        parsed_result = json.loads(result)
    except:
        # Fallback if JSON parsing fails
        parsed_result = {
            "query_type": "casual_conversation",
            "skin_type": None,
            "skin_concerns": [],
            "product_type": [],
            "budget": None,
            "preferences": [],
        }
    
    
    # Updating the state with query type and extracted preferences
    new_state = state.copy()
    new_state["query_type"] = parsed_result["query_type"]
    
    # Update user preferences with any new information
    if parsed_result["skin_type"]:
        new_state["user_preferences"]["skin_type"] = parsed_result["skin_type"]
    
    if parsed_result["skin_concerns"]:
        current_concerns = new_state["user_preferences"].get("skin_concerns", []) or []
        new_concerns = parsed_result["skin_concerns"]
        new_state["user_preferences"]["skin_concerns"] = list(set(current_concerns + new_concerns))
    
    if parsed_result["budget"] and parsed_result["budget"] != "null":
        try:
            budget = float(parsed_result["budget"])
            new_state["user_preferences"]["budget"] = budget
        except:
            pass
    
    if parsed_result["preferences"]:
        current_prefs = new_state["user_preferences"].get("preferences", []) or []
        new_prefs = parsed_result["preferences"]
        new_state["user_preferences"]["preferences"] = list(set(current_prefs + new_prefs))
    
    if parsed_result["product_type"]:
        current_products = new_state["user_preferences"].get("product_type", []) or []
        new_products = parsed_result["product_type"]

        # Only adding products that don't already exist in the list
        products_to_add = [product for product in new_products if product not in current_products]
        new_state["user_preferences"]["product_type"] = current_products + products_to_add # Updates the product_type list with newly added products

    return new_state

# Query Router Node - directs the query to the right function
def query_router(state: AgentState) -> Literal["product_search", "skincare_knowledge", "casual_conversation"]: # routes the user query to the correct processing function.
    """Route the query to the appropriate processing path."""
    query_type = state.get("query_type", "casual_conversation")
    
    if query_type == "product_recommendation":
        return "product_search"
    elif query_type == "skincare_question":
        return "skincare_knowledge"
    else:
        return "casual_conversation" # small talk or off-topic conversation

# Product Search Node
def product_search(state: AgentState, vector_store) -> AgentState:
    """
    Search for products based on the user query and preferences.
    
    Args:
        state: Current agent state containing conversation and user preferences
        vector_store: Vector database for product search
        
    Returns:
        Updated agent state with search results
    """

    try:
        latest_message = state["conversation"][-1]["content"] # gets the most recent message from the user
        preferences = state["user_preferences"] # gets all the user's saved preferences
        
        query_parts = [latest_message] # Creates a list that starts with the user's message and will be expanded with their preferences.
        
        # Adding all available preferences to the search query
        if preferences.get("skin_type"):
            query_parts.append(f"Skin type: {preferences['skin_type']}")
        
        if preferences.get("skin_concerns"):
            concerns = ", ".join(preferences["skin_concerns"])
            query_parts.append(f"Skin concerns: {concerns}")
        
        if preferences.get("preferences"):
            prefs = ", ".join(preferences["preferences"])
            query_parts.append(f"Preferences: {prefs}")
            
        combined_query = " ".join(query_parts) # combines all the parts into one search string.
        
        # Increasing initial search results to ensure enough remain after filtering
        initial_k = 10
        search_results = vector_store.similarity_search(combined_query, k=initial_k)
        
        # Converts search results to a list of products dictionaries with relevance preservation
        products = []
        for i, doc in enumerate(search_results):
            product = doc.metadata
            product["content"] = doc.page_content
            product["relevance_score"] = initial_k - i  # Simple scoring based on position
            products.append(product)
        
        filtered_products = products # Applying filters based on preferences, starting with all the products 
        
        if preferences.get("budget"): # Filters by budget if specified
            budget = float(preferences["budget"])
            filtered_products = [
                p for p in filtered_products 
                if "price" in p and p["price"] and float(p["price"]) <= budget
            ]
        
        product_type = state.get("product_type", []) # Filters by product type if specified in the state
        if product_type:
            requested_types = [t.lower().strip() for t in product_type]
            filtered_products = [
                p for p in filtered_products 
                if "product_type" in p and p["product_type"] and 
                any(req_type in p["product_type"].lower() for req_type in requested_types)
            ]
        
        if preferences.get("skin_concerns"): # Filter by skin concerns
            skin_concerns = [concern.lower().strip() for concern in preferences["skin_concerns"]]
            # Boost products that match skin concerns rather than strict filtering
            for product in filtered_products:
                match_score = 0 # Gives each product a "concern match" score based on how many concerns it addresses
                if product.get("skin_concerns"):
                    product_concerns = product["skin_concerns"].lower()
                    match_score = sum(1 for concern in skin_concerns if concern in product_concerns)
                product["concern_match"] = match_score
        
        if preferences.get("preferences"): # Filter by specific brands if in preferences
            brand_preferences = [pref.lower() for pref in preferences["preferences"] 
                                if "brand" in pref.lower() or pref.lower() in [b.lower() for b in filtered_products if "brand" in b]]
            
            if brand_preferences:
                # Boost products that match brand preferences
                for product in filtered_products:
                    if "brand" in product and any(brand.lower() in product["brand"].lower() for brand in brand_preferences):
                        product["relevance_score"] += 5  # Boost brand matches
        
        # Re-rank based on relevance score and any boosting factors
        filtered_products.sort(key=lambda p: (
            p.get("concern_match", 0) * 10 +  # Prioritize skin concern matches
            p.get("relevance_score", 0)       # Then consider relevance
        ), reverse=True)
        
        # If we filtered out all products, fall back to the original results
        if not filtered_products and products:
            filtered_products = products[:3]
        else:
            # Limit to top 3 recommendations
            filtered_products = filtered_products[:3]
        
        # Clean up temporary scoring fields
        for product in filtered_products:
            product.pop("relevance_score", None)
            product.pop("concern_match", None)
        
        # Update state with search results
        new_state = state.copy()
        new_state["search_results"] = filtered_products
        
        return new_state
    
    except Exception as e:
        # Error handling to prevent complete failure
        logging.error(f"Error in product search: {str(e)}")
        
        # Return original state with error message
        new_state = state.copy()
        new_state["search_results"] = []
        new_state["search_error"] = str(e)
        return new_state

# Skincare Knowledge Node with Tavily Search
def skincare_knowledge_search(state: AgentState) -> AgentState:
    """Search for skincare knowledge using Tavily search API."""
    # Get the latest user message
    latest_message = state["conversation"][-1]["content"]
    query_type = state.get("query_type", "")
    
    try:
        # Get or reuse the Tavily instance
        tavily_instance = get_tavily_instance()
        
        # Refine search terms based on query type and user preferences
        search_terms = latest_message
        if query_type == "skincare_question":
            # Add context from user preferences for better results
            if state["user_preferences"].get("skin_type"):
                search_terms += f" {state['user_preferences']['skin_type']} skin"
            if state["user_preferences"].get("skin_concerns"):
                concerns = " ".join(state["user_preferences"]["skin_concerns"])
                search_terms += f" {concerns}"
        
        # Get search results
        search_result = get_skincare_information(search_terms, tavily_instance)
        
        # Extract relevant information from search results
        knowledge_info = []
        
        # If there's a generated answer, add it first
        if search_result.get("answer"):
            knowledge_info.append({
                "content": search_result["answer"],
                "topic": "Generated Answer",
                "type": "summary"
            })
        
        # Add individual search results
        for result in search_result.get("results", []):
            # Extract a more concise snippet
            content = result.get("content", "")
            if len(content) > 500:
                content = content[:500] + "..."
                
            knowledge_info.append({
                "content": content,
                "topic": result.get("title", "Skincare Information"),
                "type": "web_search",
                "source": result.get("url", "")
            })
            
    except Exception as e:
        # Fallback if search fails
        knowledge_info = [{
            "content": f"I'm sorry, I couldn't find specific information about '{latest_message}' at the moment. Please try asking your question differently.",
            "topic": "Error",
            "type": "error"
        }]
        logger.error(f"Error performing Tavily search: {e}")
    
    # Update state with search results
    new_state = state.copy()
    new_state["search_results"] = knowledge_info
    
    return new_state

# Casual Conversation Node
def handle_casual_conversation(state: AgentState) -> AgentState:
    """Handle casual conversation and small talk."""
    llm = ChatOpenAI(model=LLM_MODEL, temperature=0.7)
    
    # Get the latest user message
    latest_message = state["conversation"][-1]["content"]
    
    # Create context from conversation history
    conversation_history = '\n'.join([
        f"{item['role']}: {item['content']}" 
        for item in state["conversation"][:-1][-5:]  # Get up to 5 most recent messages excluding the latest
    ])
    
    # Create the prompt
    prompt = ChatPromptTemplate.from_template("""
    You are a friendly skincare assistant. The user is engaging in casual conversation.
    
    Previous conversation:
    {conversation_history}
    
    User's message: "{latest_message}"
    
    Respond in a warm, conversational way. Be friendly but concise. If appropriate, gently guide the conversation back to skincare topics, but don't force it if it would seem unnatural.
    """)
    
    # Execute the prompt
    chain = prompt | llm | StrOutputParser()
    response = chain.invoke({
        "conversation_history": conversation_history,
        "latest_message": latest_message
    })
    
    # Update state with response
    new_state = state.copy()
    new_state["current_response"] = response
    
    return new_state

# Response Generation Node for Product Recommendations
def generate_product_response(state: AgentState) -> AgentState:
    """Generate a response for product recommendations."""
    llm = ChatOpenAI(model=LLM_MODEL, temperature=0.2)
    
    # Get search results
    products = state.get("search_results", [])
    
    # Get conversation history
    conversation_history = '\n'.join([
        f"{item['role']}: {item['content']}" 
        for item in state["conversation"][-5:]  # Get up to 5 most recent messages
    ])
    
    # Create product information for the prompt
    product_details = ""
    for i, product in enumerate(products, 1):
        product_details += f"Product {i}:\n"
        if "title" in product:
            product_details += f"Name: {product['title']}\n"
        if "price" in product:
            product_details += f"Price: ₹{product['price']}\n"
        if "brand" in product:
            product_details += f"Brand: {product['brand']}\n"
        if "skin_concerns" in product:
            product_details += f"Skin Concerns: {product['skin_concerns']}\n"
        if "benefits" in product:
            product_details += f"Benefits: {product['benefits']}\n"
        if "ingredients" in product:
            product_details += f"Key Ingredients: {product['ingredients']}\n"
        if "rating" in product:
            product_details += f"Rating: {product['rating']}\n"
        product_details += "\n"
    
    # Create the prompt
    prompt = ChatPromptTemplate.from_template("""
    You are a knowledgeable skincare assistant. Respond to the user's product query based on the information provided.
    
    Recent conversation:
    {conversation_history}
    
    Products that match the user's query:
    {product_details}
    
    Generate a helpful, conversational response that recommends the most relevant products. Include:
    1. Acknowledge what the user is looking for
    2. Explain why your recommendations match their needs
    3. Include key product details (name, price, benefits)
    4. Briefly mention key ingredients that address their concerns
    
    Keep your response concise but informative.
    """)
    
    # Handle case with no products found
    if not products:
        prompt = ChatPromptTemplate.from_template("""
        You are a knowledgeable skincare assistant. The user has asked for product recommendations, but no matching products were found.
        
        Recent conversation:
        {conversation_history}
        
        Generate a helpful response that:
        1. Acknowledges what the user is looking for
        2. Explains that you couldn't find products that exactly match their criteria
        3. Suggests they try broadening their search (e.g., higher price range, different product type)
        4. Offers to help with a different query
        
        Keep your response concise but helpful.
        """)
    
    # Execute the prompt
    chain = prompt | llm | StrOutputParser()
    response = chain.invoke({
        "conversation_history": conversation_history,
        "product_details": product_details
    })
    
    # Update state with response
    new_state = state.copy()
    new_state["current_response"] = response
    
    return new_state

# Response Generation Node for Skincare Knowledge
def generate_knowledge_response(state: AgentState) -> AgentState:
    """Generate a response for skincare knowledge questions using Tavily search results."""
    llm = ChatOpenAI(model=LLM_MODEL, temperature=0.2)
    
    # Get search results
    knowledge_info = state.get("search_results", [])
    
    # Get conversation history
    conversation_history = '\n'.join([
        f"{item['role']}: {item['content']}" 
        for item in state["conversation"][-5:]  # Get up to 5 most recent messages
    ])
    
    # Create knowledge information for the prompt
    knowledge_details = ""
    for i, info in enumerate(knowledge_info, 1):
        knowledge_details += f"Information {i}:\n"
        knowledge_details += f"Topic: {info.get('topic', 'General')}\n"
        knowledge_details += f"Content: {info.get('content', '')}\n"
        if info.get("source"):
            knowledge_details += f"Source: {info.get('source')}\n"
        knowledge_details += "\n"
    
    # Create the prompt
    prompt = ChatPromptTemplate.from_template("""
    You are a knowledgeable skincare assistant. Respond to the user's skincare question based on the information provided from web search results.
    
    Recent conversation:
    {conversation_history}
    
    Relevant skincare information from trusted sources:
    {knowledge_details}
    
    Generate a helpful, conversational response that addresses the user's question. Include:
    1. Direct answers to their specific questions
    2. Evidence-based information from the search results
    3. Practical advice they can implement
    4. If relevant, mention that your information comes from dermatology and medical sources
    
    Keep your response concise but informative. If the search results don't fully address their question, acknowledge this and provide what you do know.
    
    Do not mention "Tavily" or the search API in your response.
    """)
    
    # Execute the prompt
    chain = prompt | llm | StrOutputParser()
    response = chain.invoke({
        "conversation_history": conversation_history,
        "knowledge_details": knowledge_details
    })
    
    # Update state with response
    new_state = state.copy()
    new_state["current_response"] = response
    
    return new_state

# Update Conversation History Node
def update_conversation(state: AgentState) -> AgentState:
    """Update the conversation history with the latest response."""
    current_response = state.get("current_response", "I'm not sure how to respond to that.")
    
    # Create new state
    new_state = state.copy()
    
    # Add assistant response to conversation history
    new_state["conversation"].append({
        "role": "assistant",
        "content": current_response
    })
    
    return new_state


In [None]:
def save_vector_store(vector_store, file_path):
    """Save the vector store to a file for later use."""
    vector_store.save_local(file_path)
    print(f"Vector store saved to {file_path}")

def load_vector_store(file_path):
    """Load a vector store from a file."""
    try:
        # Using OpenAI embeddings as specified earlier
        embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
        vector_store = FAISS.load_local(file_path, embeddings)
        #vector_store = FAISS.load_local(file_path, embeddings, allow_dangerous_deserialization=True)
        print(f"Vector store loaded from {file_path}")
        return vector_store
    except Exception as e:
        print(f"Error loading vector store: {e}")
        return None

# Function to preprocess and save the vector database
def preprocess_and_save_data(product_data_path, vector_store_path):
    """Preprocess the product data and save the vector store for later use."""
    # Prepare data
    products_df = prepare_product_data(product_data_path)
    documents = create_documents(products_df)
    vector_store = setup_vector_store(documents)
    
    # Save the vector store
    save_vector_store(vector_store, vector_store_path)
    
    return vector_store

In [None]:
class SkinCareChat:
    def __init__(self, vector_store_path=None, product_data_path=None):
        """
        Initialize the skincare chat interface with vector store caching.
        
        Args:
            vector_store_path: Path to load/save vector store (optional)
            product_data_path: Path to product data file (optional)
            
        At least one of vector_store_path or product_data_path must be provided.
        """
        # Load or create vector store
        if vector_store_path and os.path.exists(vector_store_path):
            # Load existing vector store
            self.vector_store = load_vector_store(vector_store_path)
            if self.vector_store is None and product_data_path:
                # Fall back to creating a new one
                self.vector_store = preprocess_and_save_data(product_data_path, vector_store_path)
        elif product_data_path:
            # Create and save a new vector store
            vector_store_path = vector_store_path or "skincare_vector_store"
            self.vector_store = preprocess_and_save_data(product_data_path, vector_store_path)
        else:
            raise ValueError("Either vector_store_path or product_data_path must be provided")
        
        # Setup Tavily search for skincare knowledge (cached singleton)
        self.tavily_search = get_tavily_instance()
        
        # Create workflow
        workflow = StateGraph(AgentState)
        
        # Add nodes to the graph
        workflow.add_node("query_understanding", query_understanding)
        workflow.add_node("product_search", lambda state: product_search(state, self.vector_store))
        
        # Use Tavily-based skincare knowledge search
        workflow.add_node("skincare_knowledge", skincare_knowledge_search)
        
        workflow.add_node("casual_conversation", handle_casual_conversation)
        workflow.add_node("generate_product_response", generate_product_response)
        workflow.add_node("generate_knowledge_response", generate_knowledge_response)
        workflow.add_node("update_conversation", update_conversation)
        
        # Define the edges
        workflow.add_conditional_edges(
            "query_understanding",
            query_router,
            {
                "product_search": "product_search",
                "skincare_knowledge": "skincare_knowledge",
                "casual_conversation": "casual_conversation"
            }
        )
        
        workflow.add_edge("product_search", "generate_product_response")
        workflow.add_edge("skincare_knowledge", "generate_knowledge_response")
        workflow.add_edge("generate_product_response", "update_conversation")
        workflow.add_edge("generate_knowledge_response", "update_conversation")
        workflow.add_edge("casual_conversation", "update_conversation")
        workflow.add_edge("update_conversation", END)
        
        workflow.set_entry_point("query_understanding")
        
        self.app = workflow.compile()
        
        # Initialize state
        self.state = {
            "conversation": [],
            "user_preferences": {
                "skin_type": None,
                "skin_concerns": [],
                "budget": None,
                "preferences": [],
                "product_type": []
            },
            "query_type": None,
            "search_results": None,
            "current_response": None,
            "session_id": str(uuid.uuid4()),  # Add unique session ID
        }
    
    def chat(self, user_input):
        """Process a user message and return the response."""
        # Add user message to conversation history
        self.state["conversation"].append({
            "role": "user",
            "content": user_input
        })
        
        # Process the message through the graph
        self.state = self.app.invoke(self.state)
        
        # Return the latest assistant response
        return self.state["conversation"][-1]["content"]
    
    def get_conversation_history(self):
        """Return the full conversation history."""
        return self.state["conversation"]
    
    def get_user_preferences(self):
        """Return the stored user preferences."""
        return self.state["user_preferences"]
    
    def reset_conversation(self, preserve_preferences=True):
        """
        Reset the conversation while optionally preserving user preferences.
        
        Args:
            preserve_preferences: If True, keep user preferences after reset
        """
        preferences = self.state["user_preferences"] if preserve_preferences else {
            "skin_type": None,
            "skin_concerns": [],
            "budget": None,
            "preferences": [],
            "product_type": []
        }
        
        self.state = {
            "conversation": [],
            "user_preferences": preferences,
            "query_type": None,
            "search_results": None,
            "current_response": None,
            "session_id": str(uuid.uuid4()),
        }
        
        return "Conversation has been reset."

# main function to use the class
def main():
    """Main function to demonstrate the skincare RAG system."""
    
    product_data_path = "product_data.xlsx"
    vector_store_path = "skincare_vector_store"
    
    print("Initializing Skincare RAG System...")
    
    # Try to load existing vector store, or create from product data
    chat_system = SkinCareChat(
        vector_store_path=vector_store_path, 
        product_data_path=product_data_path
    )
    
    print("\nSkincare Assistant Ready! Type 'exit' to quit.")
    print("---------------------------------------------")
    
    while True:
        user_input = input("\nYou: ")
        
        if user_input.lower() in ['exit', 'quit', 'bye']:
            print("\nThank you for using the Skincare Assistant. Goodbye!")
            break
        elif user_input.lower() == 'reset':
            chat_system.reset_conversation()
            print("\nAssistant: Conversation has been reset.")
            continue
        
        response = chat_system.chat(user_input)
        print(f"\nAssistant: {response}")

In [None]:
# testing code 

# Initialize the chat system with vector store caching
product_data_path = "product_data.xlsx"  
vector_store_path = "skincare_vector_store"   

# Create the chat system
chat_system = SkinCareChat(
    vector_store_path=vector_store_path,
    product_data_path=product_data_path
)

# Function to display chat in a notebook-friendly way
def test_chat(query):
    print(f"USER: {query}")
    response = chat_system.chat(query)
    print(f"\nASSISTANT: {response}\n")
    print("-" * 80)
    
    # Optionally display updated preferences
    prefs = chat_system.get_user_preferences()
    print("\nCurrent preferences:")
    print(f"Skin type: {prefs.get('skin_type')}")
    print(f"Skin concerns: {', '.join(prefs.get('skin_concerns', []) or [])}")
    print(f"Hair concerns: {', '.join(prefs.get('hair_concerns', []) or [])}")
    print(f"Budget: {prefs.get('budget')}")
    print(f"Product types: {', '.join(prefs.get('product_type', []) or [])}")
    print(f"Other preferences: {', '.join(prefs.get('preferences', []) or [])}")
    print("-" * 80)
    return response


2025-03-17 05:00:37,882 - __main__ - INFO - Loading product data...
2025-03-17 05:00:39,887 - __main__ - INFO - Processed 2999 product records
2025-03-17 05:00:41,415 - __main__ - INFO - Created 2999 documents for embedding. Skipped: 0, Long docs: 0
2025-03-17 05:00:41,417 - __main__ - INFO - Setting up vector store...
2025-03-17 05:00:54,162 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-03-17 05:01:07,631 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-03-17 05:01:22,748 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-03-17 05:01:25,922 - __main__ - INFO - Vector database setup complete.
2025-03-17 05:01:25,970 - __main__ - INFO - Setting up LangChain Tavily search for skincare knowledge...
2025-03-17 05:01:25,971 - __main__ - INFO - LangChain Tavily search setup complete.


Vector store saved to skincare_vector_store


In [85]:
test_chat("Hello, how are you doing?")

USER: Hello, how are you doing?


2025-03-17 05:01:26,974 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-03-17 05:01:27,916 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"



ASSISTANT: Hello there! I'm doing great, thank you for asking. How about you? Is there anything skincare-related you'd like to chat about today? Feel free to ask any questions or share any concerns you may have.

--------------------------------------------------------------------------------

Current preferences:
Skin type: None
Skin concerns: 
Hair concerns: 
Budget: None
Product types: 
Other preferences: 
--------------------------------------------------------------------------------


"Hello there! I'm doing great, thank you for asking. How about you? Is there anything skincare-related you'd like to chat about today? Feel free to ask any questions or share any concerns you may have."

In [86]:
test_chat("give me a cheap anti-dandruff shampoo")

USER: give me a cheap anti-dandruff shampoo


2025-03-17 05:01:29,001 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-03-17 05:01:29,790 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-03-17 05:01:31,519 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"



ASSISTANT: Hello! I see you're looking for a cheap anti-dandruff shampoo. I recommend checking out the Logidruf Anti Dandruff Shampoo priced at ₹289.0. It is designed for daily care and has a high rating of 4.76. Another option is the C Win Shampoo priced at ₹339.0, which targets Seborrheic Dermatitis with a rating of 4.67. Both products effectively combat dandruff and soothe the scalp. Key ingredients like ketoconazole in Logidruf and zinc pyrithione in C Win help address dandruff concerns. These options provide quality care at an affordable price point.

--------------------------------------------------------------------------------

Current preferences:
Skin type: None
Skin concerns: dandruff
Hair concerns: 
Budget: None
Product types: anti-dandruff shampoo
Other preferences: 
--------------------------------------------------------------------------------


"Hello! I see you're looking for a cheap anti-dandruff shampoo. I recommend checking out the Logidruf Anti Dandruff Shampoo priced at ₹289.0. It is designed for daily care and has a high rating of 4.76. Another option is the C Win Shampoo priced at ₹339.0, which targets Seborrheic Dermatitis with a rating of 4.67. Both products effectively combat dandruff and soothe the scalp. Key ingredients like ketoconazole in Logidruf and zinc pyrithione in C Win help address dandruff concerns. These options provide quality care at an affordable price point."

In [87]:
test_chat("What are some good ingredients for anti-ageing?")

USER: What are some good ingredients for anti-ageing?


2025-03-17 05:01:48,072 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-03-17 05:01:52,446 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"



ASSISTANT: Hello! When it comes to good ingredients for anti-aging, hyaluronic acid is a popular choice. It is known for its moisturizing, anti-aging, and antioxidant properties. Commercially available cosmetics containing hyaluronic acid often combine it with other active ingredients like probiotics, amino acids, peptides, and vitamins for additional benefits.

Additionally, protecting your skin from the sun is crucial for preventing early aging. Using a moisturizer with SPF daily can help shield your skin from harmful UV rays. This is a key tip from Mayo Clinic for maintaining healthy skin and preventing premature aging.

Remember to incorporate these ingredients and sun protection into your skincare routine for effective anti-aging benefits. The information provided is based on dermatology and medical sources to ensure accuracy and reliability. If you have any more questions or need further advice, feel free to ask!

-----------------------------------------------------------------

'Hello! When it comes to good ingredients for anti-aging, hyaluronic acid is a popular choice. It is known for its moisturizing, anti-aging, and antioxidant properties. Commercially available cosmetics containing hyaluronic acid often combine it with other active ingredients like probiotics, amino acids, peptides, and vitamins for additional benefits.\n\nAdditionally, protecting your skin from the sun is crucial for preventing early aging. Using a moisturizer with SPF daily can help shield your skin from harmful UV rays. This is a key tip from Mayo Clinic for maintaining healthy skin and preventing premature aging.\n\nRemember to incorporate these ingredients and sun protection into your skincare routine for effective anti-aging benefits. The information provided is based on dermatology and medical sources to ensure accuracy and reliability. If you have any more questions or need further advice, feel free to ask!'

In [88]:
test_chat("Do you have any product for that?")

USER: Do you have any product for that?


2025-03-17 05:01:56,782 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-03-17 05:01:57,361 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
2025-03-17 05:01:59,364 - httpx - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"



ASSISTANT: Hello! I see you're looking for anti-aging products. Based on your needs, I recommend the following options:

1. Dermaceutic Activ Retinol 0.5 - Price: ₹3909.0
   Benefits: Reduces fine lines and wrinkles, restores skin radiance, smoothens skin texture.
   Key Ingredients: AHA-BHA Range, Ceramides, Retinol

2. Renewcell Retinol 2% + Argireline - Price: ₹1339.0
   Benefits: Combats aging, brightens, hydrates, soothes, improves skin texture and tone.
   Key Ingredients: Hyaluronic Acid Range, Niacinamide, Retinol, Vitamin E

These products are specifically formulated to address anti-aging concerns like fine lines, wrinkles, and skin texture. They contain key ingredients like retinol and hyaluronic acid known for their anti-aging properties. Feel free to explore these options for effective anti-aging benefits.

--------------------------------------------------------------------------------

Current preferences:
Skin type: None
Skin concerns: dandruff, anti-ageing
Hair concern

"Hello! I see you're looking for anti-aging products. Based on your needs, I recommend the following options:\n\n1. Dermaceutic Activ Retinol 0.5 - Price: ₹3909.0\n   Benefits: Reduces fine lines and wrinkles, restores skin radiance, smoothens skin texture.\n   Key Ingredients: AHA-BHA Range, Ceramides, Retinol\n\n2. Renewcell Retinol 2% + Argireline - Price: ₹1339.0\n   Benefits: Combats aging, brightens, hydrates, soothes, improves skin texture and tone.\n   Key Ingredients: Hyaluronic Acid Range, Niacinamide, Retinol, Vitamin E\n\nThese products are specifically formulated to address anti-aging concerns like fine lines, wrinkles, and skin texture. They contain key ingredients like retinol and hyaluronic acid known for their anti-aging properties. Feel free to explore these options for effective anti-aging benefits."