<a href="https://colab.research.google.com/github/lakshitaa4/Recursive_Question_Decomposer_over_Store_Sales_Data/blob/main/Recursive_Question_Decomposer_over_Store_Sales_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Recursive Question Decomposer over Store Sales Data

In [None]:
# Cell 1: Install required packages
!pip install -q -U langgraph langchain langchain-google-genai langchain-experimental pandas python-dotenv

## Authentication: Connecting to Gemini
This is a critical step where we authorize our Colab notebook to access your Google AI account to use gemini.

- Go to Google AI Studio: https://aistudio.google.com/
- Click "Get API key" and create a new project/key. Copy the key.
- In Colab, click the key icon (🔑) on the left sidebar (Secrets Manager).
- Click "+ Add new secret", name it GOOGLE_API_KEY, and paste your Gemini API key. Toggle the switch to make it accessible.

In [None]:
# === Part 1: Your Setup (with necessary additions) ===

import os
import re
import json
import logging
import pandas as pd
from typing import TypedDict, List, Dict, Optional

from langgraph.graph import StateGraph, END
from langchain_core.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_experimental.agents import create_pandas_dataframe_agent
from langchain_google_genai import ChatGoogleGenerativeAI
from google.colab import drive, userdata

# --- Configuration & Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

try:
    os.environ['GOOGLE_API_KEY'] = userdata.get('GOOGLE_API_KEY')
    print("Google API Key loaded successfully.")
except Exception as e:
    print(f"Failed to load Google API Key. Error: {e}")

llm = None
try:
    # Using the user-specified model name
    llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash", temperature=0.0)   # you may change the model as you please
    print("LLM 'gemini-2.5-flash' initialized.")
except Exception as e:
    print(f"CRITICAL: LLM 'llm' is NOT initialized. Error: {e}")

# --- Data Loading and Preparation ---
df = None
pandas_agent = None
try:
    print("\nMounting Google Drive...")
    drive.mount('/content/drive', force_remount=True)
    excel_file_path = r"/content/drive/MyDrive/langgraph assignment/Demo Sales Data.xlsx"    #change this according to where your file is saved on the drive
    df = pd.read_excel(excel_file_path, sheet_name=0)
    print(f"Data loaded successfully from {excel_file_path}")

    # --- Sanitizing column names for agent compatibility ---
    def sanitize_column_name(col_name):
        name = col_name.lower()
        name = re.sub(r'[^a-z0-9]+', '_', name)
        return name.strip('_')

    original_columns = df.columns.tolist()
    df.columns = [sanitize_column_name(col) for col in original_columns]
    sanitized_columns = df.columns.tolist()

    print("\n--- Column Name Sanitation ---")
    for orig, new in zip(original_columns, sanitized_columns):
        print(f"'{orig}' -> '{new}'")

    if 'retail' in df.columns and 'qty_sold' in df.columns:
        df['revenue'] = df['retail'] * df['qty_sold']
        print("'revenue' column (from retail * qty_sold) calculated and added.")

    # Initialize the Pandas Agent AFTER the dataframe is loaded and prepared
    if llm and df is not None:
        pandas_agent = create_pandas_dataframe_agent(
            llm,
            df,
            verbose=False,
            agent_executor_kwargs={"handle_parsing_errors": True},
            allow_dangerous_code=True
        )
        print("\nPandas DataFrame Agent initialized successfully.")

except Exception as e:
    print(f"\nError loading or processing data: {e}")

# In-memory cache for repeated sub-questions
memoization_cache = {}

Google API Key loaded successfully.
LLM 'gemini-2.5-flash' initialized.

Mounting Google Drive...
Mounted at /content/drive
Data loaded successfully from /content/drive/MyDrive/langgraph assignment/Demo Sales Data.xlsx

--- Column Name Sanitation ---
'Store Name' -> 'store_name'
'Description' -> 'description'
'Department' -> 'department'
'Qty Sold' -> 'qty_sold'
'Cost' -> 'cost'
'Retail' -> 'retail'
'Total Retail' -> 'total_retail'
'Margin' -> 'margin'
'Profit' -> 'profit'
'revenue' column (from retail * qty_sold) calculated and added.

Pandas DataFrame Agent initialized successfully.


In [None]:
# printing first 5 rows of the sheet
df.head()

In [None]:
#  testing whether pandas agent is working
pandas_agent.invoke({"input": "What is the total revenue for the 'SMOKE SHOP' department?"})

{'input': "What is the total revenue for the 'SMOKE SHOP' department?",
 'output': '60577.99999999999'}

In [None]:
# testing whether llm is working
llm.invoke("What is the capital of Zurich?")

AIMessage(content="The capital of the Canton of Zurich is **Zurich** itself.\n\nIt's both the name of the canton (a state within Switzerland) and its largest city, which serves as its capital and administrative center.", additional_kwargs={}, response_metadata={'prompt_feedback': {'block_reason': 0, 'safety_ratings': []}, 'finish_reason': 'STOP', 'model_name': 'gemini-2.5-flash', 'safety_ratings': []}, id='run--65c72256-dbbe-40ce-8d89-3770925454c3-0', usage_metadata={'input_tokens': 8, 'output_tokens': 44, 'total_tokens': 244, 'input_token_details': {'cache_read': 0}, 'output_token_details': {'reasoning': 192}})

In [None]:
# === Graph State and Node Functions ===

class GraphState(TypedDict):
    original_question: str
    questions_to_process: List[str]
    question_answer_pairs: Dict[str, str]
    final_answer: str
    logs: List[Dict]
    decision: Optional[str]

# this is just the starting node
def start_node(state: GraphState) -> Dict:
    logger.info("Starting a new question-answering process.")
    state["logs"].append({"step": "Start", "output": "Graph execution started."})
    return {}

# for determining whether the query is atomic or complex
def complexity_node(state: GraphState) -> Dict:
    question = state["questions_to_process"][0]
    logger.info(f"Deciding complexity for: '{question}'")
    prompt = f"You are a query classifier. Determine if a question is 'Atomic' or 'Complex'. Question: \"{question}\". Respond with only 'Atomic' or 'Complex'."
    decision = llm.invoke(prompt).content.strip()
    logger.info(f"Decision: '{decision}'")
    state["logs"].append({"step": "Complexity Decision", "input": question, "output": decision})
    return {"decision": decision}

# for decomposing complex queries into sub queries for simplification
def decomposition_node(state: GraphState) -> Dict:
    question = state["questions_to_process"].pop(0)
    logger.info(f"Decomposing complex question: '{question}'")
    prompt = f"""You are an expert query decomposer. Break down a complex question into a series of simpler, atomic sub-questions. Output ONLY a valid JSON list of strings.
                  ---
                  Example 1:
                  Original Question: "Which store is most profitable, and what is its top selling item by quantity?"
                  Output:
                  ["What is the total profit for each store?", "Which store has the highest total profit?", "For the most profitable store, what is its item description with the highest quantity sold?"]
                  ---
                  Example 2:
                  Original Question: "Compare the total revenue of the 'SMOKE SHOP' and 'CO : BAKERY' departments."
                  Output:
                  ["What is the total revenue for the 'SMOKE SHOP' department?", "What is the total revenue for the 'CO : BAKERY' department?"]
                  ---
                  Original Question: {question}
                  Output:
                  """
    response = llm.invoke(prompt).content.strip()
    try:
        cleaned_response = re.sub(r"```json\n?|```", "", response)
        sub_questions = json.loads(cleaned_response)
    except json.JSONDecodeError:
        logger.warning(f"Failed to parse JSON for decomposition. Fallback to original. Response: {response}")
        sub_questions = [question]

    new_questions_to_add = []
    for q in sub_questions:
        if q not in state["questions_to_process"] and q not in state["question_answer_pairs"]:
            new_questions_to_add.append(q)
        else:
            logger.info(f"Skipping redundant question: '{q}'")

    state["logs"].append({"step": "Decomposition", "input": question, "output": new_questions_to_add})
    state["questions_to_process"].extend(new_questions_to_add)
    return {}

# for resolving the subquery or atomic query by passing it to the pandas agent
def resolver_node(state: GraphState) -> Dict:
    question = state["questions_to_process"].pop(0)
    logger.info(f"Resolving atomic question: '{question}'")
    if question in memoization_cache:
        logger.info("Found answer in cache.")
        answer = memoization_cache[question]
    else:
        response = pandas_agent.invoke({"input": question})
        try:
            answer = response.get('output') or response.content or str(response)
        except Exception:
            answer = str(response)
        memoization_cache[question] = answer
    logger.info(f"Answer: {answer}")
    state["question_answer_pairs"][question] = answer
    state["logs"].append({"step": "Data Resolver", "input": question, "output": answer})
    return {}

# for forming the final answer using
def aggregator_node(state: GraphState) -> Dict:
    logger.info("Aggregating answers for the final response.")
    original_question = state["original_question"]
    sub_answers = json.dumps(state["question_answer_pairs"], indent=2)
    prompt = f"Synthesize a final, user-friendly answer from the following data. Original Question: {original_question}\n\nSub-Questions and Answers:\n{sub_answers}\n\nFinal Answer:"
    final_answer = llm.invoke(prompt).content.strip()
    logger.info(f"Final Synthesized Answer: {final_answer}")
    state["logs"].append({"step": "Aggregation", "input": sub_answers, "output": final_answer})
    # FIX 1: Explicitly return the update for 'final_answer' to ensure it's saved.
    return {"final_answer": final_answer}

# for deciding whether to resolve or decompose
def decide_path(state: GraphState) -> str:
    return "decompose" if state['decision'] == "Complex" else "resolve"

#if there are questions left to process
def should_continue(state: GraphState) -> str:
    return "continue_processing" if state["questions_to_process"] else "aggregate"

In [None]:
# === Graph Construction & Execution ===

workflow = StateGraph(GraphState)
workflow.add_node("start_node", start_node)
workflow.add_node("complexity_node", complexity_node)
workflow.add_node("decomposition_node", decomposition_node)
workflow.add_node("resolver_node", resolver_node)
workflow.add_node("aggregator_node", aggregator_node)
workflow.set_entry_point("start_node")
workflow.add_edge("start_node", "complexity_node")
workflow.add_conditional_edges("complexity_node", decide_path, {"decompose": "decomposition_node", "resolve": "resolver_node"})
workflow.add_edge("decomposition_node", "resolver_node")
workflow.add_edge("aggregator_node", END)
workflow.add_conditional_edges("resolver_node", should_continue, {"continue_processing": "complexity_node", "aggregate": "aggregator_node"})
app = workflow.compile()

def run_graph(question: str):
    initial_state = { "original_question": question, "questions_to_process": [question], "question_answer_pairs": {}, "final_answer": "", "logs": [], "decision": None }
    memoization_cache.clear()
    final_state = app.invoke(initial_state)
    output_json = {
        "original_question": final_state["original_question"],
        "final_answer": final_state["final_answer"],
        "sub_questions_and_answers": final_state["question_answer_pairs"],
        "logs": final_state["logs"]
    }
    return json.dumps(output_json, indent=2)

# === Part 4: User Interaction ===
if pandas_agent and llm:
    print("\n" + "="*50)
    print("Recursive Question Decomposer is Ready!")
    print("="*50)
    default_question = "Which store has the highest profit, and what is the total revenue for the 'CO : HOT FOOD' department in that store?"
    user_question = input(f"Please enter your analytical question (or press Enter for default):\n> ")
    if not user_question:
        print(f"\nNo question entered. Running with default:\n'{default_question}'")
        user_question = default_question
    final_json_output = run_graph(user_question)
    print("\n\n--- FINAL OUTPUT ---")
    print(final_json_output)
else:
    print("\nCould not run the application because the LLM or the Pandas Agent failed to initialize.")



Recursive Question Decomposer is Ready!
Please enter your analytical question (or press Enter for default):
> Which store had the highest average basket size in March 2023?


--- FINAL OUTPUT ---
{
  "original_question": "Which store had the highest average basket size in March 2023?",
  "final_answer": "Due to the available data not containing a date column or individual transaction details, it was not possible to determine the average basket size specifically for March 2023 or in the traditional sense of items per customer transaction.\n\nHowever, interpreting \"average basket size\" as the average quantity of items sold per product entry for each store, the store with the highest average was **Golden LLC**.",
  "sub_questions_and_answers": {
    "What is the average basket size for each store in March 2023?": "I cannot calculate the \"average basket size for each store in March 2023\" because:\n1. There is no date column in the dataframe to filter for \"March 2023\".\n2. The data i